中国期货市场监控中心爬虫 | 臭大佬
实现功能
- 验证码自动识别
- 模拟登陆
- 多用户数据下载
- excel处理
- 数据库操作
梗概
炒期货的朋友是不是也有这样的体验,打开中国期货市场监控中心网站,手动登陆到每个帐户,然后在帐户上进行下载数据(逐日盯市),再通过EXCEL宏等统计制作成树状图的形式。
这样操作有许多缺点和不足的地方,比如:
- 工作量比较大;
- 不能统计一定时间内的数据;
- 不能统一管理和数据分析;
- 多个账号无法对比分析。
因此,写了个爬虫和后台,爬虫每天自动登录中国期货市场监控中心,自动下载逐日盯市的表格数据,然后队列处理数据,写入到自己的后台,就可以分析每个账号一段时间操作的净值、盈亏比等了。
分析
在写爬虫前,我们先手动操作一遍,分析一下流程。
打开登录页,登录页面有验证码,在实现的时候需要自动识别
手动登录后,进入的第一个页面是今天的逐笔对冲,
我们要的是逐日盯市,而且不一定是今天的数据,打开F12,切换到逐日盯市页面,
从 Network 中可以看到如下信息;
Request URL: https://investorservice.cfmmc.com/customer/setParameter.do
Request Method: POST
org.apache.struts.taglib.html.TOKEN: 08424ad1b28fabecccd7bf5161108b13
tradeDate: 2020-10-09
byType: date
从上面可以看到接口和参数,
org.apache.struts.taglib.html.TOKEN:是登录页面中的一个 input,经测试,不传似乎没什么影响,
tradeDate:查询的日期,
byType:trade为默认值,trade时是逐笔对冲,date是逐日盯市。
另外,返回的并不是接口形式的数据,而是一个页面,
有了接口和参数,我们就可以抓取指定日期的逐日盯市了。
思路其实很简单,获取用户账号密码,模拟登陆,切换到逐日盯市,下载数据,然后处理excel文件,写到数据库,后台对数据进行分析。
技术点
- 自动识别验证码
- 模拟登陆
- excel处理
- 数据库写入
整个流程下来,技术点基本就上面几个。
这个验证码看似很普通,但Google的开源工具Tesseract-OCR识别率太低了,基本识别不了,于是找了BAT大厂的识别接口,依次封装成接口,方便调用:
- Tesseract-OCR
- 腾讯文字识别API
- 百度文字识别API
- 百度文字识别SDK
由于阿里的文档只看到python2的,所以就忽略了。识别率自上而下越来越高,腾讯的有点坑,识别率不算高,每天免费次数少,超过还扣费,然后一直给你发欠费邮件,百度SDK识别率是最好的,一般都是5次以内,而且每月免费5000次好像。
代码
后台我是用laravel-wjfcms写的,这里只展示python爬虫部分的代码,代码微调一下就可以跑起来了。
目录结构如下:
config.yaml是数据库配置文件,格式如下:
mysql_config:
host: 'xxx'
port: 3306
user: 'root'
password: 'sss'
db: 'qihuo'
charset: 'utf8'
登录用户表大概有以下几个字段,根据自己的情况去修改:
CREATE TABLE `wjf_transaction_users` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '交易用户表',
`admin_id` tinyint(4) NOT NULL DEFAULT '0' COMMENT '交易账号对应的管理账号',
`username` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '用户名',
`password` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '密码',
`status` tinyint(4) NOT NULL DEFAULT '2' COMMENT '状态,1:已验证,2:待验证,3:验证失败',
`created_at` timestamp NULL DEFAULT NULL,
`updated_at` timestamp NULL DEFAULT NULL,
`deleted_at` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE,
KEY `wjf_transaction_users_admin_id_index` (`admin_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=COMPACT;
识别接口代码
百度API地址:https://ai.baidu.com/ai-doc/OCR/3k3h7yeqa
baiduBce.py
# -*- coding:utf8 -*-
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
from aip import AipOcr
""" 你的 APPID AK SK """
APP_ID = 'xxx'
API_KEY = 'xxxx'
SECRET_KEY = 'xxxx'
# 需要安装扩展
# pip install baidu-aip
class BaiduBce():
def __init__(self):
self.options = {}
self.setOptions()
self.client = AipOcr(APP_ID, API_KEY, SECRET_KEY)
def dealImg(self, image, type=1, is_local=False):
'''
识别
:param image:图片地址或者url
:param type:识别接口类型
:param is_local:是否为本地,False:远程,True,本地
:return:
'''
if type == 1: # 通用文字识别
if is_local is False:
res = self.client.basicGeneralUrl(image)
else:
res = self.client.basicGeneral(self.get_file_content(image),
self.options)
elif type == 2: # 通用文字识别(高精度版)
if is_local is True:
res = self.client.basicAccurate(self.get_file_content(image))
elif type == 3: # 网络图片文字识别
if is_local is False:
res = self.client.webImageUrl(image)
else:
res = self.client.webImage(self.get_file_content(image))
else:
if is_local is False:
res = self.client.basicGeneralUrl(image)
else:
res = self.client.basicGeneral(self.get_file_content(image),
self.options)
return res['words_result'][0]['words']
def get_file_content(self, filePath):
'''
读取图片
'''
with open(filePath, 'rb') as fp:
return fp.read()
def setOptions(self,
language_type='CHN_ENG',
detect_direction='false',
detect_language='false',
probability='false'):
'''
如果有可选参数
:param language_type:
:param detect_direction:
:param detect_language:
:param probability:
:return:
'''
options = {}
options["language_type"] = language_type
options["detect_direction"] = detect_direction
options["detect_language"] = detect_language
options["probability"] = probability
self.options = options
baiduImg.py
# encoding:utf-8
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
import requests
import base64
CLIENT_ID = 'xxxx'
CLIENT_SECRET = 'xxxx'
class BaiduImg():
def __init__(self):
self.base_url = 'https://aip.baidubce.com'
self.token = ''
def dealImg(self, path, type=1):
self.getToken()
if type == 1:
# 通用文字识别
request_url = self.base_url + "/rest/2.0/ocr/v1/general_basic"
elif type == 2:
# 位置信息版
request_url = self.base_url + "/rest/2.0/ocr/v1/general"
elif type == 3:
request_url = self.base_url + "/rest/2.0/ocr/v1/accurate_basic"
elif type == 4:
request_url = self.base_url + "/rest/2.0/ocr/v1/accurate"
else:
request_url = self.base_url + "/rest/2.0/ocr/v1/general_basic"
if isinstance(path, bytes):
s = path
else:
# 二进制方式打开图片文件
f = open(path, 'rb')
s = f.read()
img = base64.b64encode(s)
params = {"image": img}
access_token = self.token
request_url = request_url + "?access_token=" + access_token
headers = {'content-type': 'application/x-www-form-urlencoded'}
response = requests.post(request_url, data=params, headers=headers)
if response:
res = response.json()
if 'error_msg' in res:
return res['error_msg']
else:
return res['words_result'][0]['words']
def getToken(self):
# client_id 为官网获取的AK, client_secret 为官网获取的SK
host = self.base_url + '/oauth/2.0/token?grant_type=client_credentials&client_id=' + str(
CLIENT_ID) + '&client_secret=' + str(CLIENT_SECRET)
response = requests.get(host)
if response:
data = response.json()
self.token = data['access_token']
print(data['access_token'])
# return data['access_token']
def test(self):
print(11)
# if __name__ == '__main__':
# path = '../code_img/veriCode (1).do'
# baidu = BaiduImg()
# res = baidu.dealImg(path, 3)
# print(res)
tx.py
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.ocr.v20181119 import ocr_client, models
import json
def get_code(path):
try:
cred = credential.Credential(
"xxxx", "xxxx")
httpProfile = HttpProfile()
httpProfile.endpoint = "ocr.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = ocr_client.OcrClient(cred, "ap-guangzhou", clientProfile)
req = models.GeneralBasicOCRRequest()
params = '{\"ImageUrl\":\"' + path + '\"}'
req.from_json_string(params)
resp = client.GeneralBasicOCR(req)
res = resp.to_json_string()
res_json = json.loads(res)
if res_json['TextDetections'][0]['DetectedText'] is not None:
code = res_json['TextDetections'][0]['DetectedText']
# print(res_json['TextDetections'][0]['DetectedText'])
return code
except TencentCloudSDKException as err:
print(err)
cfmmc.py
# -*- coding:utf8 -*-
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
# python3
# from shibie import baiduImg
# from shibie import tx
from datetime import datetime
import time
import pytesseract
from shibie import baiduBce
import os
import pymysql
# from io import BytesIO
from requests import session
from PIL import Image
import PIL.ImageOps
import yaml
# 跟域名
BASE_URL = 'https://investorservice.cfmmc.com'
class Cfmmc(object):
def __init__(self, username=None, max_gap=2, code_num=10):
'''
初始化
:param username:用户名,为None时,查询所有用户
:param max_gap:从什么时候开始抓取数据,2表示从两天前开始
:param code_num:验证码识别次数
:return:
'''
# 配置
self.configs = self.get_config()
# 数据库
self.conn = pymysql.connect(
host=self.configs['mysql_config']['host'],
port=self.configs['mysql_config']['port'],
user=self.configs['mysql_config']['user'],
password=self.configs['mysql_config']['password'],
db=self.configs['mysql_config']['db'],
charset=self.configs['mysql_config']['charset']
)
# 创建一个游标
self.cursor = self.conn.cursor()
# 账号
self.users = ()
# 指定的用户名
self.username = username
# 查询区间,从今天往前数的天数
self.max_gap = max_gap
# 验证码执行次数
self.code_num = code_num
# 消息通知
self.msg = ''
# 客户账号状态
self.user_status = 0
# 运行时间
self.run_time = time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
def pymysql_action(self, sql):
# 执行 SQL 语句
self.cursor.execute(sql)
# 提交
self.conn.commit()
def get_config(self):
'''
获取config.yaml配置
:return:configs
'''
# 获取当前文件的Realpath
fileNamePath = os.path.split(os.path.realpath(__file__))[0]
# 读取文件
yamlPath = os.path.join(fileNamePath, 'config.yaml')
# 加上 ,encoding='utf-8',处理配置文件中含中文出现乱码的情况。
file = open(yamlPath, 'r', encoding='utf-8')
# 读取文件
cont = file.read()
# 返回配置
return yaml.safe_load(cont)
def file_path(self, path, file):
'''
返回文件路径
:param path:路径
:param file:文件名
:return:
'''
# 是否存在目录
if not os.path.exists(path):
os.makedirs(path)
return path+'/' + file
def reg_img(self, file_obj):
'''
OCR识别图片
:param file_obj:文件
:return:
'''
pytesseract.pytesseract.tesseract_cmd = r"D:\Program Files\Tesseract-OCR\tesseract.exe"
im = Image.open(file_obj)
im = im.convert('L')
binary_image = im.point([0 if i < 210 else 1 for i in range(256)], '1')
im1 = binary_image.convert('L')
im2 = PIL.ImageOps.invert(im1)
im3 = im2.convert('1')
im4 = im3.convert('L')
res = pytesseract.image_to_string(im4)
# print(res)
# im4.show()
return res
def get_download_url(self, content, key='下载'):
'''
获取下载链接
:param content:内容
:param key:key
:return:
'''
a = content.split(key)[0]
b = a.split('<div id="waitBody">')[1]
c = b.split('<a href="')[1]
d = c.split('" target="_blank">')[0]
return BASE_URL + d
def flag_filter(self, content, flag):
'''
匹配内容
:param content:内容
:param key:key
:return:
'''
if len(content.split(flag)) < 2:
return ''
result = content.split(flag)[1].split('"')[0]
return result
def get_day(self, gap=1):
'''
返回年月日时间,并过滤周末周六
:param gap:间隔数
:return:
'''
if gap >= self.max_gap:
return False
# 时间戳
day = time.time() - 86400 * (gap - 1)
# 输出年月日
ymd = time.strftime('%Y-%m-%d', time.localtime(day))
# 获取星期几 数字1-7代表周一到周日
week = datetime.strptime(ymd, '%Y-%m-%d').isoweekday()
if week > 5:
print('...' + ymd + '是周' + str(week))
return True
return ymd
def get_dingshi_data(self, ss, user_id, header, token=''):
'''
处理逐日盯市页面
:param ss:request.session
:param user_id:用户id
:param header:header
:param token:token
:return:
'''
for i in range(1, self.max_gap):
# 获取时间
ymd = self.get_day(i)
if ymd is False: # 说明循环完了
return True
elif ymd is True: # 说明是周末跳过
continue
post_data = {
"org.apache.struts.taglib.html.TOKEN": token,
"tradeDate": ymd,
"byType": 'data',
}
dingshi_url = BASE_URL+'/customer/setParameter.do'
dingshi_html = ss.post(dingshi_url,
data=post_data,
headers=header,
timeout=5)
dingshi_html_code = dingshi_html.content.decode()
# #看看html内容对不对
# with open('./other/yemian.html', 'w+',
# encoding='utf-8') as files:
# files.write(dingshi_html_code)
if '(逐日盯市)' in dingshi_html_code:
# 找到下载按钮
if '下载' in dingshi_html_code:
download_url = self.get_download_url(
dingshi_html_code, '下载')
xls_res = ss.get(download_url, headers=header)
# 下载文件
file_name = self.file_path(
'./exel', user_id + '_' + ymd + '.xls')
# 判断文件是否存在
if os.path.exists(file_name) is True:
text = '...' + file_name + '已存在,'
print(text)
self.msg += text
continue
print('...准备下载' + file_name + '的数据')
with open(file_name, "wb") as xls_file:
xls_file.write(xls_res.content)
else:
text = ymd+'...不包含下载按钮,'
print(text)
self.msg += text
continue
else:
print('...不是追日盯市页面')
return True
def main(self, user_id, passwd):
'''
处理逐日盯市页面
:param user_id:用户id
:param passwd:passwd
:return:
'''
header = {
'Connection':
'keep-alive',
'User-Agent':
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36",
}
url = BASE_URL+"/login.do"
token_flag = 'name="org.apache.struts.taglib.html.TOKEN" value="'
veri_code_flag = 'src="/veriCode.do?t='
ss = session()
res = ss.get(url, headers=header)
content = res.content.decode()
token = self.flag_filter(content, token_flag)
veri_code_url = BASE_URL+'/veriCode.do?t=' + self.flag_filter(
content, veri_code_flag)
for i in range(self.code_num):
print(f'...第{i + 1}次尝试')
try:
# OCR识别
# tmp_file = BytesIO()
# tmp_file.write(ss.get(veri_code_url).content)
# veri_code = reg_img(tmp_file)
# 百度API
# veri_code = baiduImg.BaiduImg().dealImg(path, type)
# 腾讯API
# veri_code = tx.get_code(veri_code_url)
# 百度接口
tmp_file = self.file_path('./code_img', 'logo_code.jpg')
req_res = ss.get(veri_code_url).content
open(tmp_file, 'wb').write(req_res)
veri_code = baiduBce.BaiduBce().dealImg(tmp_file, 2, True)
if veri_code and len(veri_code) >= 5:
veri_code = ''.join(filter(str.isalnum, veri_code))
print('...识别得到:' + veri_code)
post_data = {
"org.apache.struts.taglib.html.TOKEN": token,
"showSaveCookies": '',
"userID": user_id,
"password": passwd,
"vericode": veri_code,
}
content2 = ss.post(url,
data=post_data,
headers=header,
timeout=5)
res = content2.content.decode()
if "验证码错误" in res: # 验证码验证失败
print('...验证码不匹配')
elif ("用户名或密码错误" in res) or ("您错误尝试超过3次" in res): # 账号密码不通过
text = '...账号密码错误,数据库修改为验证不通过,'
print(text)
self.msg += text
print(text)
# 账号密码错误,数据库修改为验证不通过
update_sql = "UPDATE `wjf_transaction_users` SET status=3,updated_at='{0}' WHERE username='{1}'".format(
self.run_time, user_id)
self.pymysql_action(update_sql)
break
elif "登录超时,请重新登录" in res:
text = '...登录超时,重新登录,'
print(text)
self.msg += text
print(text)
break
# elif "资金安全特别提示": # 如有这这几个字表示登录失败
# text = '...登录失败,'
# print(text)
# self.msg += text
# print(text)
# break
else:
# 如果客户账号非正常状态,更新为正常
if self.user_status != 1:
update_sql = "UPDATE `wjf_transaction_users` SET status=1,updated_at='{0}' WHERE username='{1}'".format(
self.run_time, user_id)
self.pymysql_action(update_sql)
# 获取页面cookie
cookie_dict = dict(ss.cookies)
cookie = 'WMONID=' + cookie_dict[
'WMONID'] + ';JSESSIONID=' + cookie_dict['JSESSIONID']
header['Cookie'] = cookie
# 进入逐日盯市页面
res = self.get_dingshi_data(
ss, user_id, header, token)
if res is True:
break
time.sleep(1)
veri_code_url = BASE_URL+"/veriCode.do?t=" + str(
int(time.time() * 1000))
except Exception as e:
print(e)
text = '...' + user_id + '处理结束。'
print(text)
self.msg += text
def get_users(self):
'''
获取用户
:param username:当指定用户是,查询指定的用户
'''
if self.username is None:
users_sql = "SELECT username,password,status FROM `wjf_transaction_users` WHERE `deleted_at` is NULL"
else:
users_sql = "SELECT username,password,status FROM `wjf_transaction_users` WHERE `deleted_at` is NULL AND `username`="+self.username
# password 其实有加密的,这里忽略
self.pymysql_action(users_sql)
self.users = self.cursor.fetchall()
return self.users
def run(self):
'''
入口文件,支持多用户
'''
self.users = self.get_users()
default_max_gap = self.max_gap
for user in self.users:
self.msg = ''
# 账号密码必须
if user[0] is None or user[0] == '':
text = '...账号为空,'
print(text)
continue
if user[1] is None or user[1] == '':
text = f'...{user[0]}的密码为空,'
print(text)
self.msg += text
continue
# 获取客户账号状态
self.user_status = user[2]
if default_max_gap == 2:
# 如果是默认天数,那就按照数据库状态来,如果status为1:表示已验证,只抓取2天的数据,否则就抓取180天的,max_gap
if self.user_status != 1:
self.max_gap = 180
text = f'账号{user[0]}开始执行,'
print(text)
self.msg = text
self.main(user[0], user[1])
def __del__(self):
# 关闭游标
self.cursor.close()
# 关闭数据库连接
self.conn.close()
if __name__ == '__main__':
'''
模拟登陆 中国期货市场监控中心,抓取逐日盯市数据
'''
obj = Cfmmc()
res = obj.run()
excel.py(数据库操作部分,根据实际情况修改)
# -*- coding:utf8 -*-
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
import xlrd
import pymysql
import time
from itertools import groupby
import os
import yaml
class Excel(object):
def __init__(self):
# 配置
self.configs = self.get_config()
self.conn = pymysql.connect(
host=self.configs['mysql_config']['host'],
port=self.configs['mysql_config']['port'],
user=self.configs['mysql_config']['user'],
password=self.configs['mysql_config']['password'],
db=self.configs['mysql_config']['db'],
charset=self.configs['mysql_config']['charset']
)
# 创建一个游标
self.cursor = self.conn.cursor()
# 存储数据的字典
self.dict = {}
def get_config(self):
'''
获取config.yaml配置
:return:
'''
# 获取当前文件的Realpath
fileNamePath = os.path.split(os.path.realpath(__file__))[0]
# 读取文件
yamlPath = os.path.join(fileNamePath, 'config.yaml')
# 加上 ,encoding='utf-8',处理配置文件中含中文出现乱码的情况。
file = open(yamlPath, 'r', encoding='utf-8')
# 读取文件
cont = file.read()
# 返回配置
return yaml.safe_load(cont)
def pymysql_action(self, sql):
# 创建一个游标
# 执行 SQL 语句
self.cursor.execute(sql)
# 提交
self.conn.commit()
def del_null(self, list):
'''
清理空数据
:param list:
:return:
'''
list = [i for i in list if i != '']
return list
def filter_field(self, field, typeof='string'):
'''
处理每一个写入数据库的值
:param field: 字段值
:param typeof: 类型:string,decimal(四舍五入保留两位),num
:return:
'''
if field == '--' or field == '':
field = '0'
elif typeof == 'decimal':
field = round(float(field), 2)
elif typeof == 'num':
field = int(field)
else:
field = str(field)
field = field.lstrip().rstrip()
return field
def deal_excel(self, path):
workbook = xlrd.open_workbook(path)
# 获取所有sheet
sheet_name = workbook.sheet_names()[0]
# 根据sheet索引或者名称获取sheet内容
sheet = workbook.sheet_by_index(0) # sheet索引从0开始
# sheet的名称,行数,列数
nrows = sheet.nrows
# ncols = sheet.ncols
# 获取整行和整列的值(数组)
# rows = sheet.row_values(2) # 获取第2行内容
# cols = sheet.col_values(3) # 获取第3列内容
if sheet_name != '客户交易结算日报':
print('...不是需要的数据')
exit()
ding_shi = False # 如果是逐日盯市,才写入数据库
cny_i = 0 # cny 期货期权账户出入金明细(单位:人民币)开始
usd_i = 0 # 期货期权账户出入金明细(单位:美元)
deal_row_i = 0 # 期货成交汇总 开始的行号
position_row_i = 0 # 期货持仓汇总 开始的行号
for i in range(nrows):
try:
rows = sheet.row_values(i) # 获取第i行内容
deal_rows = self.del_null(rows)
if len(deal_rows) < 1:
# print(f'第{i + 1}行没有数据')
continue
if '客户交易结算日报(逐日盯市)' in deal_rows:
ding_shi = True
if i > 4 and ding_shi is False: # 表头不是逐日盯市,不处理
print('...不是逐日盯市表格,不处理')
break
# 客户期货期权内部资金账户
if '客户期货期权内部资金账户' in deal_rows:
self.dict['fund_account'] = deal_rows[1]
self.dict['deal_time'] = deal_rows[3]
self.dict['created_at'] = time.strftime(
'%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
timeArray = time.strptime(
self.dict['deal_time'], "%Y-%m-%d")
self.dict['deal_time_stamp'] = int(time.mktime(timeArray))
self.dict['out_money_cny'] = self.dict['enter_money_cny'] = self.dict['out_money_usd'] = self.dict[
'enter_money_usd'] = 0
self.dict['exchange_name'] = ''
self.dict['transaction_fees'] = '0.00'
self.dict['reporting_fee'] = '0.00'
self.dict['position_buy_position'] = '0.00'
self.dict['position_sell_position'] = '0.00'
self.dict['position_profit_loss'] = '0.00'
self.dict['position_trading_margin'] = '0.00'
self.dict['deal_num'] = 0
self.dict['deal_turnover'] = '0.00'
self.dict['deal_fee'] = '0.00'
self.dict['deal_profit_loss'] = '0.00'
if '客户名称' in deal_rows:
self.dict['admin_name'] = deal_rows[1].strip()
self.dict['query_time'] = deal_rows[3]
# 获取用户的后台id
admin_sql = "SELECT admin_id FROM `wjf_transaction_users` WHERE `admin_name` = '{0}' LIMIT 1".format(
self.dict['admin_name'])
self.cursor.execute(admin_sql)
first_data = self.cursor.fetchone()
if first_data is None:
print('没有客户名称为' + str(self.dict['admin_name']) + '的数据')
break
self.dict['admin_id'] = first_data[0]
# 如果数据库存在该用户的该天数据,就跳过
summary_sql = "SELECT id FROM `wjf_summary_datas` WHERE `admin_id` = '{0}' and `deal_time_stamp` = '{1}'LIMIT 1".format(
self.dict['admin_id'], self.dict['deal_time_stamp'])
self.cursor.execute(summary_sql)
summary_data = self.cursor.fetchone()
if summary_data is not None:
print(
'客户:' + str(self.dict['admin_name']) + str(self.dict['deal_time']) + '的数据已存在')
break
if '期货公司名称' in deal_rows:
self.dict['futures_company'] = deal_rows[1]
if '上日结存' in deal_rows:
self.dict['previous_day_balance'] = deal_rows[1]
self.dict['customer_rights'] = deal_rows[3]
if '当日存取合计' in deal_rows:
self.dict['total_access_for_the_day'] = deal_rows[1]
self.dict['actual_monetary_funds'] = deal_rows[3]
if '当日盈亏' in deal_rows:
self.dict['profit_loss_day'] = deal_rows[1]
self.dict['not_currency_credit_amount'] = deal_rows[3]
if '当日总权利金' in deal_rows:
self.dict['total_royalties_of_the_day'] = deal_rows[1]
self.dict['currency_credit_amount'] = deal_rows[3]
if '当日手续费' in deal_rows:
self.dict['day_handling_fee'] = deal_rows[1]
self.dict['frozen_funds'] = deal_rows[3]
if '当日结存' in deal_rows:
self.dict['day_balance'] = deal_rows[1]
self.dict['margin_occupation'] = deal_rows[3]
if '可用资金' in deal_rows:
self.dict['available_funds'] = deal_rows[1]
if '风险度' in deal_rows:
self.dict['risk'] = deal_rows[1]
if '追加保证金' in deal_rows:
self.dict['margin_call'] = deal_rows[1]
# 期货期权账户出入金明细(单位:人民币 开始
if '期货期权账户出入金明细(单位:人民币)' in deal_rows:
cny_i = i
if '合计' in deal_rows and (cny_i > 0):
self.dict['out_money_cny'] = self.filter_field(
deal_rows[1], 'decimal')
self.dict['enter_money_cny'] = self.filter_field(
deal_rows[2], 'decimal')
cny_i = 0
# 期货期权账户出入金明细(单位:美元)
if '期货期权账户出入金明细(单位:美元)' in deal_rows:
usd_i = i
if '合计' in deal_rows and (usd_i > 0):
usd_i = 0
self.dict['out_money_usd'] = self.filter_field(
deal_rows[1], 'decimal')
self.dict['enter_money_usd'] = self.filter_field(
deal_rows[2], 'decimal')
# 期货成交汇总 开始
if '期货成交汇总' in deal_rows: # 期货成交汇总 开始
deal_row_i = i
if '合约' in deal_rows:
continue
if ('合计' in deal_rows) and (deal_row_i > 0) and (position_row_i <= 0): # 期货成交汇总 结束
deal_row_i = 0
self.dict['deal_num'] = self.filter_field(
deal_rows[1], 'num')
self.dict['deal_turnover'] = self.filter_field(
deal_rows[2], 'decimal')
self.dict['deal_fee'] = self.filter_field(
deal_rows[3], 'decimal')
self.dict['deal_profit_loss'] = self.filter_field(
deal_rows[4], 'decimal')
if i > deal_row_i and deal_row_i > 0: # 成交汇总
deal_rows.append(self.dict['admin_name'])
deal_rows.append(self.dict['query_time'])
deal_rows.append(self.dict['deal_time'])
deal_rows.append(self.dict['created_at'])
# 合约中提取交易所code
contract = deal_rows[0]
contract_list = [''.join(list(g)) for k, g in groupby(
contract, key=lambda x: x.isdigit())]
exchange_code = contract_list[0]
exchange_num = contract_list[1]
deal_rows.append(exchange_code)
deal_rows.append(exchange_num)
# 字符串转数字,以免数据库保存报错
deal_rows[0] = self.filter_field(deal_rows[0])
deal_rows[1] = self.filter_field(deal_rows[1])
deal_rows[2] = self.filter_field(deal_rows[2])
deal_rows[3] = self.filter_field(deal_rows[3], 'decimal')
deal_rows[4] = self.filter_field(deal_rows[4], 'num')
deal_rows[5] = self.filter_field(deal_rows[5], 'decimal')
deal_rows[8] = self.filter_field(deal_rows[8], 'decimal')
deal_rows[6] = self.filter_field(deal_rows[6])
deal_rows[7] = self.filter_field(deal_rows[7])
deal_rows[9] = self.filter_field(deal_rows[9])
deal_rows[10] = self.filter_field(deal_rows[10])
deal_rows[11] = self.filter_field(deal_rows[11])
deal_rows[12] = self.filter_field(deal_rows[12])
deal_rows[13] = self.filter_field(deal_rows[13])
deal_rows[14] = self.filter_field(deal_rows[14])
sql = "INSERT INTO wjf_deal_datas(contract,buy_sell,speculation,final_price,num,turnover,open_flat,fee,profit_loss,admin_name,query_time,deal_time,created_at,exchange_code,exchange_num,admin_id,deal_time_stamp)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}','{12}','{13}','{14}','{15}','{16}');".format(
deal_rows[0], deal_rows[1], deal_rows[2], deal_rows[3], deal_rows[4], deal_rows[5],
deal_rows[6], deal_rows[7], deal_rows[8], deal_rows[9], deal_rows[10], deal_rows[11],
deal_rows[12], deal_rows[13], deal_rows[14], self.dict['admin_id'],
self.dict['deal_time_stamp'])
self.pymysql_action(sql)
# print("..." + str(i + 1) + "行期货成交写入完成")
if '期货持仓汇总' in deal_rows: # 期货持仓汇总 开始
position_row_i = i
deal_row_i = 0
if ('合计' in deal_rows) and (deal_row_i == 0) and (position_row_i > 0): # 期货持仓汇总 结束
position_row_i = 0
self.dict['position_buy_position'] = self.filter_field(
deal_rows[1], 'decimal')
self.dict['position_sell_position'] = self.filter_field(
deal_rows[2], 'decimal')
self.dict['position_profit_loss'] = self.filter_field(
deal_rows[3], 'decimal')
self.dict['position_trading_margin'] = self.filter_field(
deal_rows[4], 'decimal')
# 写入汇总数据
summary_sql = "INSERT INTO wjf_summary_datas(admin_id,admin_name,query_time,deal_time,deal_time_stamp,fund_account,futures_company,previous_day_balance,customer_rights,total_access_for_the_day,actual_monetary_funds,profit_loss_day,not_currency_credit_amount,total_royalties_of_the_day,currency_credit_amount,day_handling_fee,frozen_funds,day_balance,margin_occupation,available_funds,risk,margin_call,out_money_cny,enter_money_cny,out_money_usd,enter_money_usd,exchange_name,transaction_fees,reporting_fee,deal_num,deal_turnover,deal_fee,deal_profit_loss,position_buy_position,position_sell_position,position_profit_loss,position_trading_margin,created_at)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}',{12},'{13}','{14}','{15}','{16}','{17}','{18}','{19}','{20}','{21}','{22}','{23}','{24}','{25}','{26}','{27}','{28}','{29}','{30}','{31}','{32}','{33}','{34}','{35}','{36}','{37}');".format(
self.dict['admin_id'], self.dict['admin_name'], self.dict['query_time'], self.dict['deal_time'],
self.dict['deal_time_stamp'], self.dict['fund_account'], self.dict['futures_company'],
self.dict['previous_day_balance'], self.dict['customer_rights'],
self.dict['total_access_for_the_day'],
self.dict['actual_monetary_funds'], self.dict['profit_loss_day'],
self.dict['not_currency_credit_amount'], self.dict['total_royalties_of_the_day'],
self.dict['currency_credit_amount'], self.dict['day_handling_fee'], self.dict['frozen_funds'],
self.dict['day_balance'], self.dict['margin_occupation'], self.dict['available_funds'],
self.dict['risk'], self.dict['margin_call'], self.dict['out_money_cny'],
self.dict['enter_money_cny'], self.dict['out_money_usd'], self.dict['enter_money_usd'],
self.dict['exchange_name'], self.dict['transaction_fees'], self.dict['reporting_fee'],
self.dict['deal_num'], self.dict['deal_turnover'], self.dict['deal_fee'],
self.dict['deal_profit_loss'], self.dict['position_buy_position'],
self.dict['position_sell_position'], self.dict['position_profit_loss'],
self.dict['position_trading_margin'], self.dict['created_at'])
self.pymysql_action(summary_sql)
print("..." + "汇总合计写入完成")
break
if (i > position_row_i) and (position_row_i > 0): # 期货持仓汇总
rows.append(self.dict['admin_name'])
rows.append(self.dict['query_time'])
rows.append(self.dict['deal_time'])
rows.append(self.dict['created_at'])
# 合约中提取交易所code
contract = rows[0]
contract_list = [''.join(list(g)) for k, g in groupby(
contract, key=lambda x: x.isdigit())]
exchange_code = contract_list[0]
exchange_num = contract_list[1]
rows.append(exchange_code)
rows.append(exchange_num)
# 字符串转数字,以免数据库保存报错
rows[1] = self.filter_field(rows[1], 'int')
rows[2] = self.filter_field(rows[2], 'decimal')
rows[3] = self.filter_field(rows[3], 'int')
rows[4] = self.filter_field(rows[4], 'decimal')
rows[5] = self.filter_field(rows[5], 'decimal')
rows[6] = self.filter_field(rows[6], 'decimal')
rows[7] = self.filter_field(rows[7], 'decimal')
rows[8] = self.filter_field(rows[8], 'decimal')
rows[0] = self.filter_field(rows[0])
rows[9] = self.filter_field(rows[9])
rows[10] = self.filter_field(rows[10])
rows[11] = self.filter_field(rows[11])
rows[12] = self.filter_field(rows[12])
rows[13] = self.filter_field(rows[13])
rows[14] = self.filter_field(rows[14])
rows[15] = self.filter_field(rows[15])
sql = "INSERT INTO wjf_position_datas(contract,buy_position,buy_average_price,sell_position,sell_average_price,settlement_price_yesterday,settlement_price_today,profit_loss,trading_margin,speculation,admin_name,query_time,deal_time,created_at,exchange_code,exchange_num,admin_id,deal_time_stamp)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}','{12}','{13}','{14}','{15}','{16}','{17}');".format(
rows[0], rows[1], rows[2], rows[3], rows[4], rows[5], rows[6], rows[7], rows[8], rows[9],
rows[10], rows[11], rows[12], rows[13], rows[14], rows[15], self.dict['admin_id'],
self.dict['deal_time_stamp'])
self.pymysql_action(sql)
# print("..." + str(i + 1) + "行期货持仓写入完成")
# 当没有期货持仓汇总数据的时候,最后额数据在这里写入,如果有期货持仓汇总,不会执行到这里
if nrows-1 == i:
print('...没有期货持仓汇总,在最后一行写入汇总数据')
# 写入汇总数据
summary_sql = "INSERT INTO wjf_summary_datas(admin_id,admin_name,query_time,deal_time,deal_time_stamp,fund_account,futures_company,previous_day_balance,customer_rights,total_access_for_the_day,actual_monetary_funds,profit_loss_day,not_currency_credit_amount,total_royalties_of_the_day,currency_credit_amount,day_handling_fee,frozen_funds,day_balance,margin_occupation,available_funds,risk,margin_call,out_money_cny,enter_money_cny,out_money_usd,enter_money_usd,exchange_name,transaction_fees,reporting_fee,deal_num,deal_turnover,deal_fee,deal_profit_loss,position_buy_position,position_sell_position,position_profit_loss,position_trading_margin,created_at)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}',{12},'{13}','{14}','{15}','{16}','{17}','{18}','{19}','{20}','{21}','{22}','{23}','{24}','{25}','{26}','{27}','{28}','{29}','{30}','{31}','{32}','{33}','{34}','{35}','{36}','{37}');".format(
self.dict['admin_id'], self.dict['admin_name'], self.dict['query_time'], self.dict['deal_time'],
self.dict['deal_time_stamp'], self.dict['fund_account'], self.dict['futures_company'],
self.dict['previous_day_balance'], self.dict['customer_rights'],
self.dict['total_access_for_the_day'],
self.dict['actual_monetary_funds'], self.dict['profit_loss_day'],
self.dict['not_currency_credit_amount'], self.dict['total_royalties_of_the_day'],
self.dict['currency_credit_amount'], self.dict['day_handling_fee'], self.dict['frozen_funds'],
self.dict['day_balance'], self.dict['margin_occupation'], self.dict['available_funds'],
self.dict['risk'], self.dict['margin_call'], self.dict['out_money_cny'],
self.dict['enter_money_cny'], self.dict['out_money_usd'], self.dict['enter_money_usd'],
self.dict['exchange_name'], self.dict['transaction_fees'], self.dict['reporting_fee'],
self.dict['deal_num'], self.dict['deal_turnover'], self.dict['deal_fee'],
self.dict['deal_profit_loss'], self.dict['position_buy_position'],
self.dict['position_sell_position'], self.dict['position_profit_loss'],
self.dict['position_trading_margin'], self.dict['created_at'])
self.pymysql_action(summary_sql)
print("..." + "汇总合计写入完成")
except Exception as e:
print('......' + str(i + 1) + "行有异常抛出:")
print(e)
def get_files(self, path):
'''
获取所有excel文件
:param path:路径
:return:
'''
files = os.listdir(path)
if len(files) < 1:
print('没有文件')
else:
for file in files:
file_path = path + file
# 判断后缀
if '.xls' in file:
print('开始读取文件:' + file_path)
self.deal_excel(file_path)
# 删除文件
try:
print('删除文件:' + file_path)
os.remove(file_path)
except Exception as e:
print(e)
def test(self):
for i in range(5):
try:
if i == 3:
continue
print(i)
except Exception as e:
print(e)
def __del__(self):
# 关闭游标
self.cursor.close()
# 关闭数据库连接
self.conn.close()
if __name__ == '__main__':
path = './exel/'
obj = Excel()
obj.get_files(path)
总结
cfmmc.py
模拟登录获取数据,excel.py
文件处理表格数据,结合定时任务及队列,可以实现自动化处理数据。
如果您需要一个可视化的界面,管理和分析爬取到的数据,让数据更有价值。可以查看期货市场监控后台管理系统。