中国期货市场监控中心爬虫 | 臭大佬

臭大佬 2020-10-10 23:53:22 4116
Python 
简介 中国期货市场监控中心爬虫,模拟登陆、自动识别验证码,爬取指定日期的数据。

实现功能

  1. 验证码自动识别
  2. 模拟登陆
  3. 多用户数据下载
  4. excel处理
  5. 数据库操作

梗概

炒期货的朋友是不是也有这样的体验,打开中国期货市场监控中心网站,手动登陆到每个帐户,然后在帐户上进行下载数据(逐日盯市),再通过EXCEL宏等统计制作成树状图的形式。

这样操作有许多缺点和不足的地方,比如:

  • 工作量比较大;
  • 不能统计一定时间内的数据;
  • 不能统一管理和数据分析;
  • 多个账号无法对比分析。

因此,写了个爬虫和后台,爬虫每天自动登录中国期货市场监控中心,自动下载逐日盯市的表格数据,然后队列处理数据,写入到自己的后台,就可以分析每个账号一段时间操作的净值、盈亏比等了。

分析

在写爬虫前,我们先手动操作一遍,分析一下流程。

打开登录页,登录页面有验证码,在实现的时候需要自动识别

手动登录后,进入的第一个页面是今天的逐笔对冲,

我们要的是逐日盯市,而且不一定是今天的数据,打开F12,切换到逐日盯市页面,

从 Network 中可以看到如下信息;

Request URL: https://investorservice.cfmmc.com/customer/setParameter.do
Request Method: POST
org.apache.struts.taglib.html.TOKEN: 08424ad1b28fabecccd7bf5161108b13
tradeDate: 2020-10-09
byType: date

从上面可以看到接口和参数,
org.apache.struts.taglib.html.TOKEN:是登录页面中的一个 input,经测试,不传似乎没什么影响,
tradeDate:查询的日期,
byType:trade为默认值,trade时是逐笔对冲,date是逐日盯市。

另外,返回的并不是接口形式的数据,而是一个页面,

有了接口和参数,我们就可以抓取指定日期的逐日盯市了。

思路其实很简单,获取用户账号密码,模拟登陆,切换到逐日盯市,下载数据,然后处理excel文件,写到数据库,后台对数据进行分析。

技术点

  1. 自动识别验证码
  2. 模拟登陆
  3. excel处理
  4. 数据库写入

整个流程下来,技术点基本就上面几个。

这个验证码看似很普通,但Google的开源工具Tesseract-OCR识别率太低了,基本识别不了,于是找了BAT大厂的识别接口,依次封装成接口,方便调用:

  • Tesseract-OCR
  • 腾讯文字识别API
  • 百度文字识别API
  • 百度文字识别SDK

由于阿里的文档只看到python2的,所以就忽略了。识别率自上而下越来越高,腾讯的有点坑,识别率不算高,每天免费次数少,超过还扣费,然后一直给你发欠费邮件,百度SDK识别率是最好的,一般都是5次以内,而且每月免费5000次好像。

代码

后台我是用laravel-wjfcms写的,这里只展示python爬虫部分的代码,代码微调一下就可以跑起来了。

目录结构如下:

config.yaml是数据库配置文件,格式如下:

mysql_config:
  host: 'xxx'
  port: 3306
  user: 'root'
  password: 'sss'
  db: 'qihuo'
  charset: 'utf8'

登录用户表大概有以下几个字段,根据自己的情况去修改:

CREATE TABLE `wjf_transaction_users` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '交易用户表',
  `admin_id` tinyint(4) NOT NULL DEFAULT '0' COMMENT '交易账号对应的管理账号',
  `username` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '用户名',
  `password` varchar(100) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '' COMMENT '密码',
  `status` tinyint(4) NOT NULL DEFAULT '2' COMMENT '状态,1:已验证,2:待验证,3:验证失败',
  `created_at` timestamp NULL DEFAULT NULL,
  `updated_at` timestamp NULL DEFAULT NULL,
  `deleted_at` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE,
  KEY `wjf_transaction_users_admin_id_index` (`admin_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci ROW_FORMAT=COMPACT;

识别接口代码

百度API地址:https://ai.baidu.com/ai-doc/OCR/3k3h7yeqa
baiduBce.py

# -*- coding:utf8 -*-
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
from aip import AipOcr
""" 你的 APPID AK SK """
APP_ID = 'xxx'
API_KEY = 'xxxx'
SECRET_KEY = 'xxxx'

# 需要安装扩展
# pip install baidu-aip


class BaiduBce():
    def __init__(self):
        self.options = {}
        self.setOptions()
        self.client = AipOcr(APP_ID, API_KEY, SECRET_KEY)

    def dealImg(self, image, type=1, is_local=False):
        '''
        识别
        :param image:图片地址或者url
        :param type:识别接口类型
        :param is_local:是否为本地,False:远程,True,本地
        :return:
        '''
        if type == 1:  # 通用文字识别
            if is_local is False:
                res = self.client.basicGeneralUrl(image)
            else:
                res = self.client.basicGeneral(self.get_file_content(image),
                                               self.options)
        elif type == 2:  # 通用文字识别(高精度版)
            if is_local is True:
                res = self.client.basicAccurate(self.get_file_content(image))
        elif type == 3:  # 网络图片文字识别
            if is_local is False:
                res = self.client.webImageUrl(image)
            else:
                res = self.client.webImage(self.get_file_content(image))
        else:
            if is_local is False:
                res = self.client.basicGeneralUrl(image)
            else:
                res = self.client.basicGeneral(self.get_file_content(image),
                                               self.options)
        return res['words_result'][0]['words']

    def get_file_content(self, filePath):
        '''
        读取图片
        '''
        with open(filePath, 'rb') as fp:
            return fp.read()

    def setOptions(self,
                   language_type='CHN_ENG',
                   detect_direction='false',
                   detect_language='false',
                   probability='false'):
        '''
        如果有可选参数
        :param language_type:
        :param detect_direction:
        :param detect_language:
        :param probability:
        :return:
        '''
        options = {}
        options["language_type"] = language_type
        options["detect_direction"] = detect_direction
        options["detect_language"] = detect_language
        options["probability"] = probability
        self.options = options

baiduImg.py

# encoding:utf-8
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
import requests
import base64

CLIENT_ID = 'xxxx'
CLIENT_SECRET = 'xxxx'


class BaiduImg():
    def __init__(self):
        self.base_url = 'https://aip.baidubce.com'
        self.token = ''

    def dealImg(self, path, type=1):
        self.getToken()
        if type == 1:
            # 通用文字识别
            request_url = self.base_url + "/rest/2.0/ocr/v1/general_basic"
        elif type == 2:
            # 位置信息版
            request_url = self.base_url + "/rest/2.0/ocr/v1/general"
        elif type == 3:
            request_url = self.base_url + "/rest/2.0/ocr/v1/accurate_basic"
        elif type == 4:
            request_url = self.base_url + "/rest/2.0/ocr/v1/accurate"
        else:
            request_url = self.base_url + "/rest/2.0/ocr/v1/general_basic"

        if isinstance(path, bytes):
            s = path
        else:
            # 二进制方式打开图片文件
            f = open(path, 'rb')
            s = f.read()
        img = base64.b64encode(s)
        params = {"image": img}
        access_token = self.token
        request_url = request_url + "?access_token=" + access_token
        headers = {'content-type': 'application/x-www-form-urlencoded'}
        response = requests.post(request_url, data=params, headers=headers)
        if response:
            res = response.json()
            if 'error_msg' in res:
                return res['error_msg']
            else:
                return res['words_result'][0]['words']

    def getToken(self):
        # client_id 为官网获取的AK, client_secret 为官网获取的SK
        host = self.base_url + '/oauth/2.0/token?grant_type=client_credentials&client_id=' + str(
            CLIENT_ID) + '&client_secret=' + str(CLIENT_SECRET)
        response = requests.get(host)
        if response:
            data = response.json()
            self.token = data['access_token']
            print(data['access_token'])
            # return data['access_token']

    def test(self):
        print(11)


# if __name__ == '__main__':
#     path = '../code_img/veriCode (1).do'
#     baidu = BaiduImg()
#     res = baidu.dealImg(path, 3)
#     print(res)

tx.py

"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.ocr.v20181119 import ocr_client, models
import json


def get_code(path):
    try:
        cred = credential.Credential(
            "xxxx", "xxxx")
        httpProfile = HttpProfile()
        httpProfile.endpoint = "ocr.tencentcloudapi.com"

        clientProfile = ClientProfile()
        clientProfile.httpProfile = httpProfile
        client = ocr_client.OcrClient(cred, "ap-guangzhou", clientProfile)

        req = models.GeneralBasicOCRRequest()
        params = '{\"ImageUrl\":\"' + path + '\"}'
        req.from_json_string(params)

        resp = client.GeneralBasicOCR(req)
        res = resp.to_json_string()
        res_json = json.loads(res)
        if res_json['TextDetections'][0]['DetectedText'] is not None:
            code = res_json['TextDetections'][0]['DetectedText']
            # print(res_json['TextDetections'][0]['DetectedText'])
            return code

    except TencentCloudSDKException as err:
        print(err)

cfmmc.py

# -*- coding:utf8 -*-
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""
# python3
# from shibie import baiduImg
# from shibie import tx
from datetime import datetime
import time
import pytesseract
from shibie import baiduBce
import os
import pymysql
# from io import BytesIO
from requests import session
from PIL import Image
import PIL.ImageOps
import yaml

# 跟域名
BASE_URL = 'https://investorservice.cfmmc.com'


class Cfmmc(object):
    def __init__(self, username=None, max_gap=2, code_num=10):
        '''
        初始化
        :param username:用户名,为None时,查询所有用户
        :param max_gap:从什么时候开始抓取数据,2表示从两天前开始
        :param code_num:验证码识别次数
        :return:
        '''
        # 配置
        self.configs = self.get_config()
        # 数据库
        self.conn = pymysql.connect(
            host=self.configs['mysql_config']['host'],
            port=self.configs['mysql_config']['port'],
            user=self.configs['mysql_config']['user'],
            password=self.configs['mysql_config']['password'],
            db=self.configs['mysql_config']['db'],
            charset=self.configs['mysql_config']['charset']
        )
        # 创建一个游标
        self.cursor = self.conn.cursor()
        # 账号
        self.users = ()
        # 指定的用户名
        self.username = username
        # 查询区间,从今天往前数的天数
        self.max_gap = max_gap
        # 验证码执行次数
        self.code_num = code_num
        # 消息通知
        self.msg = ''
        # 客户账号状态
        self.user_status = 0
        # 运行时间
        self.run_time = time.strftime(
            '%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

    def pymysql_action(self, sql):
        # 执行 SQL 语句
        self.cursor.execute(sql)
        # 提交
        self.conn.commit()

    def get_config(self):
        '''
        获取config.yaml配置
        :return:configs
        '''
        # 获取当前文件的Realpath
        fileNamePath = os.path.split(os.path.realpath(__file__))[0]
        # 读取文件
        yamlPath = os.path.join(fileNamePath, 'config.yaml')
        # 加上 ,encoding='utf-8',处理配置文件中含中文出现乱码的情况。
        file = open(yamlPath, 'r', encoding='utf-8')
        # 读取文件
        cont = file.read()
        # 返回配置
        return yaml.safe_load(cont)

    def file_path(self, path, file):
        '''
        返回文件路径
        :param path:路径
        :param file:文件名
        :return:
        '''
        # 是否存在目录
        if not os.path.exists(path):
            os.makedirs(path)
        return path+'/' + file

    def reg_img(self, file_obj):
        '''
        OCR识别图片
        :param file_obj:文件
        :return:
        '''
        pytesseract.pytesseract.tesseract_cmd = r"D:\Program Files\Tesseract-OCR\tesseract.exe"
        im = Image.open(file_obj)
        im = im.convert('L')
        binary_image = im.point([0 if i < 210 else 1 for i in range(256)], '1')
        im1 = binary_image.convert('L')
        im2 = PIL.ImageOps.invert(im1)
        im3 = im2.convert('1')
        im4 = im3.convert('L')
        res = pytesseract.image_to_string(im4)
        # print(res)
        # im4.show()
        return res

    def get_download_url(self, content, key='下载'):
        '''
        获取下载链接
        :param content:内容
        :param key:key
        :return:
        '''
        a = content.split(key)[0]
        b = a.split('<div id="waitBody">')[1]
        c = b.split('<a href="')[1]
        d = c.split('" target="_blank">')[0]
        return BASE_URL + d

    def flag_filter(self, content, flag):
        '''
        匹配内容
        :param content:内容
        :param key:key
        :return:
        '''
        if len(content.split(flag)) < 2:
            return ''
        result = content.split(flag)[1].split('"')[0]
        return result

    def get_day(self, gap=1):
        '''
        返回年月日时间,并过滤周末周六
        :param gap:间隔数
        :return:
        '''
        if gap >= self.max_gap:
            return False
        # 时间戳
        day = time.time() - 86400 * (gap - 1)
        # 输出年月日
        ymd = time.strftime('%Y-%m-%d', time.localtime(day))
        # 获取星期几 数字1-7代表周一到周日
        week = datetime.strptime(ymd, '%Y-%m-%d').isoweekday()
        if week > 5:
            print('...' + ymd + '是周' + str(week))
            return True
        return ymd

    def get_dingshi_data(self, ss, user_id, header, token=''):
        '''
        处理逐日盯市页面
        :param ss:request.session
        :param user_id:用户id
        :param header:header
        :param token:token
        :return:
        '''
        for i in range(1, self.max_gap):
            # 获取时间
            ymd = self.get_day(i)
            if ymd is False:  # 说明循环完了
                return True
            elif ymd is True:  # 说明是周末跳过
                continue
            post_data = {
                "org.apache.struts.taglib.html.TOKEN": token,
                "tradeDate": ymd,
                "byType": 'data',
            }
            dingshi_url = BASE_URL+'/customer/setParameter.do'
            dingshi_html = ss.post(dingshi_url,
                                   data=post_data,
                                   headers=header,
                                   timeout=5)
            dingshi_html_code = dingshi_html.content.decode()
            # #看看html内容对不对
            # with open('./other/yemian.html', 'w+',
            #           encoding='utf-8') as files:
            #     files.write(dingshi_html_code)
            if '(逐日盯市)' in dingshi_html_code:
                # 找到下载按钮
                if '下载' in dingshi_html_code:
                    download_url = self.get_download_url(
                        dingshi_html_code, '下载')
                    xls_res = ss.get(download_url, headers=header)
                    # 下载文件
                    file_name = self.file_path(
                        './exel', user_id + '_' + ymd + '.xls')
                    # 判断文件是否存在
                    if os.path.exists(file_name) is True:
                        text = '...' + file_name + '已存在,'
                        print(text)
                        self.msg += text
                        continue
                    print('...准备下载' + file_name + '的数据')
                    with open(file_name, "wb") as xls_file:
                        xls_file.write(xls_res.content)
                else:
                    text = ymd+'...不包含下载按钮,'
                    print(text)
                    self.msg += text
                    continue
            else:
                print('...不是追日盯市页面')
        return True

    def main(self, user_id, passwd):
        '''
        处理逐日盯市页面
        :param user_id:用户id
        :param passwd:passwd
        :return:
        '''
        header = {
            'Connection':
            'keep-alive',
            'User-Agent':
            "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36",
        }
        url = BASE_URL+"/login.do"
        token_flag = 'name="org.apache.struts.taglib.html.TOKEN" value="'
        veri_code_flag = 'src="/veriCode.do?t='
        ss = session()
        res = ss.get(url, headers=header)
        content = res.content.decode()
        token = self.flag_filter(content, token_flag)
        veri_code_url = BASE_URL+'/veriCode.do?t=' + self.flag_filter(
            content, veri_code_flag)
        for i in range(self.code_num):
            print(f'...第{i + 1}次尝试')
            try:
                # OCR识别
                # tmp_file = BytesIO()
                # tmp_file.write(ss.get(veri_code_url).content)
                # veri_code = reg_img(tmp_file)
                # 百度API
                # veri_code = baiduImg.BaiduImg().dealImg(path, type)
                # 腾讯API
                # veri_code = tx.get_code(veri_code_url)
                # 百度接口
                tmp_file = self.file_path('./code_img', 'logo_code.jpg')
                req_res = ss.get(veri_code_url).content
                open(tmp_file, 'wb').write(req_res)
                veri_code = baiduBce.BaiduBce().dealImg(tmp_file, 2, True)
                if veri_code and len(veri_code) >= 5:
                    veri_code = ''.join(filter(str.isalnum, veri_code))
                    print('...识别得到:' + veri_code)
                    post_data = {
                        "org.apache.struts.taglib.html.TOKEN": token,
                        "showSaveCookies": '',
                        "userID": user_id,
                        "password": passwd,
                        "vericode": veri_code,
                    }
                    content2 = ss.post(url,
                                       data=post_data,
                                       headers=header,
                                       timeout=5)
                    res = content2.content.decode()
                    if "验证码错误" in res:  # 验证码验证失败
                        print('...验证码不匹配')
                    elif ("用户名或密码错误" in res) or ("您错误尝试超过3次" in res):  # 账号密码不通过
                        text = '...账号密码错误,数据库修改为验证不通过,'
                        print(text)
                        self.msg += text
                        print(text)
                        # 账号密码错误,数据库修改为验证不通过
                        update_sql = "UPDATE `wjf_transaction_users` SET status=3,updated_at='{0}' WHERE username='{1}'".format(
                            self.run_time, user_id)
                        self.pymysql_action(update_sql)
                        break
                    elif "登录超时,请重新登录" in res:
                        text = '...登录超时,重新登录,'
                        print(text)
                        self.msg += text
                        print(text)
                        break
                    # elif "资金安全特别提示":  # 如有这这几个字表示登录失败
                    #     text = '...登录失败,'
                    #     print(text)
                    #     self.msg += text
                    #     print(text)
                    #     break
                    else:
                        # 如果客户账号非正常状态,更新为正常
                        if self.user_status != 1:
                            update_sql = "UPDATE `wjf_transaction_users` SET status=1,updated_at='{0}' WHERE username='{1}'".format(
                                self.run_time, user_id)
                            self.pymysql_action(update_sql)

                        # 获取页面cookie
                        cookie_dict = dict(ss.cookies)
                        cookie = 'WMONID=' + cookie_dict[
                            'WMONID'] + ';JSESSIONID=' + cookie_dict['JSESSIONID']
                        header['Cookie'] = cookie
                        # 进入逐日盯市页面
                        res = self.get_dingshi_data(
                            ss, user_id, header, token)
                        if res is True:
                            break
                time.sleep(1)
                veri_code_url = BASE_URL+"/veriCode.do?t=" + str(
                    int(time.time() * 1000))
            except Exception as e:
                print(e)
        text = '...' + user_id + '处理结束。'
        print(text)
        self.msg += text

    def get_users(self):
        '''
        获取用户
        :param username:当指定用户是,查询指定的用户
        '''
        if self.username is None:
            users_sql = "SELECT username,password,status FROM `wjf_transaction_users` WHERE `deleted_at` is NULL"
        else:
            users_sql = "SELECT username,password,status FROM `wjf_transaction_users` WHERE `deleted_at` is NULL AND `username`="+self.username
        # password 其实有加密的,这里忽略
        self.pymysql_action(users_sql)
        self.users = self.cursor.fetchall()
        return self.users

    def run(self):
        '''
        入口文件,支持多用户
        '''
        self.users = self.get_users()
        default_max_gap = self.max_gap
        for user in self.users:
            self.msg = ''
            # 账号密码必须
            if user[0] is None or user[0] == '':
                text = '...账号为空,'
                print(text)
                continue
            if user[1] is None or user[1] == '':
                text = f'...{user[0]}的密码为空,'
                print(text)
                self.msg += text
                continue
            # 获取客户账号状态
            self.user_status = user[2]
            if default_max_gap == 2:
                # 如果是默认天数,那就按照数据库状态来,如果status为1:表示已验证,只抓取2天的数据,否则就抓取180天的,max_gap
                if self.user_status != 1:
                    self.max_gap = 180
            text = f'账号{user[0]}开始执行,'
            print(text)
            self.msg = text
            self.main(user[0], user[1])

    def __del__(self):
        # 关闭游标
        self.cursor.close()
        # 关闭数据库连接
        self.conn.close()


if __name__ == '__main__':
    '''
    模拟登陆 中国期货市场监控中心,抓取逐日盯市数据
    '''
    obj = Cfmmc()
    res = obj.run()

excel.py(数据库操作部分,根据实际情况修改)

# -*- coding:utf8 -*-
"""
Author: gallopingvijay
Email: 1937832819@qq.com
Website: https://www.choudalao.com
"""

import xlrd
import pymysql
import time
from itertools import groupby
import os
import yaml


class Excel(object):
    def __init__(self):
        # 配置
        self.configs = self.get_config()
        self.conn = pymysql.connect(
            host=self.configs['mysql_config']['host'],
            port=self.configs['mysql_config']['port'],
            user=self.configs['mysql_config']['user'],
            password=self.configs['mysql_config']['password'],
            db=self.configs['mysql_config']['db'],
            charset=self.configs['mysql_config']['charset']
        )
        # 创建一个游标
        self.cursor = self.conn.cursor()
        # 存储数据的字典
        self.dict = {}

    def get_config(self):
        '''
        获取config.yaml配置
        :return:
        '''
        # 获取当前文件的Realpath
        fileNamePath = os.path.split(os.path.realpath(__file__))[0]
        # 读取文件
        yamlPath = os.path.join(fileNamePath, 'config.yaml')
        # 加上 ,encoding='utf-8',处理配置文件中含中文出现乱码的情况。
        file = open(yamlPath, 'r', encoding='utf-8')
        # 读取文件
        cont = file.read()
        # 返回配置
        return yaml.safe_load(cont)

    def pymysql_action(self, sql):
        # 创建一个游标
        # 执行 SQL 语句
        self.cursor.execute(sql)
        # 提交
        self.conn.commit()

    def del_null(self, list):
        '''
        清理空数据
        :param list:
        :return:
        '''
        list = [i for i in list if i != '']
        return list

    def filter_field(self, field, typeof='string'):
        '''
        处理每一个写入数据库的值
        :param field: 字段值
        :param typeof: 类型:string,decimal(四舍五入保留两位),num
        :return:
        '''
        if field == '--' or field == '':
            field = '0'
        elif typeof == 'decimal':
            field = round(float(field), 2)
        elif typeof == 'num':
            field = int(field)
        else:
            field = str(field)
            field = field.lstrip().rstrip()
        return field

    def deal_excel(self, path):
        workbook = xlrd.open_workbook(path)
        # 获取所有sheet
        sheet_name = workbook.sheet_names()[0]
        # 根据sheet索引或者名称获取sheet内容
        sheet = workbook.sheet_by_index(0)  # sheet索引从0开始
        # sheet的名称,行数,列数
        nrows = sheet.nrows
        # ncols = sheet.ncols
        # 获取整行和整列的值(数组)
        # rows = sheet.row_values(2)  # 获取第2行内容
        # cols = sheet.col_values(3) # 获取第3列内容

        if sheet_name != '客户交易结算日报':
            print('...不是需要的数据')
            exit()

        ding_shi = False  # 如果是逐日盯市,才写入数据库
        cny_i = 0  # cny 期货期权账户出入金明细(单位:人民币)开始
        usd_i = 0  # 期货期权账户出入金明细(单位:美元)
        deal_row_i = 0  # 期货成交汇总 开始的行号
        position_row_i = 0  # 期货持仓汇总 开始的行号
        for i in range(nrows):
            try:
                rows = sheet.row_values(i)  # 获取第i行内容
                deal_rows = self.del_null(rows)
                if len(deal_rows) < 1:
                    # print(f'第{i + 1}行没有数据')
                    continue
                if '客户交易结算日报(逐日盯市)' in deal_rows:
                    ding_shi = True
                if i > 4 and ding_shi is False:  # 表头不是逐日盯市,不处理
                    print('...不是逐日盯市表格,不处理')
                    break
                # 客户期货期权内部资金账户
                if '客户期货期权内部资金账户' in deal_rows:
                    self.dict['fund_account'] = deal_rows[1]
                    self.dict['deal_time'] = deal_rows[3]
                    self.dict['created_at'] = time.strftime(
                        '%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
                    timeArray = time.strptime(
                        self.dict['deal_time'], "%Y-%m-%d")
                    self.dict['deal_time_stamp'] = int(time.mktime(timeArray))
                    self.dict['out_money_cny'] = self.dict['enter_money_cny'] = self.dict['out_money_usd'] = self.dict[
                        'enter_money_usd'] = 0
                    self.dict['exchange_name'] = ''
                    self.dict['transaction_fees'] = '0.00'
                    self.dict['reporting_fee'] = '0.00'
                    self.dict['position_buy_position'] = '0.00'
                    self.dict['position_sell_position'] = '0.00'
                    self.dict['position_profit_loss'] = '0.00'
                    self.dict['position_trading_margin'] = '0.00'
                    self.dict['deal_num'] = 0
                    self.dict['deal_turnover'] = '0.00'
                    self.dict['deal_fee'] = '0.00'
                    self.dict['deal_profit_loss'] = '0.00'
                if '客户名称' in deal_rows:
                    self.dict['admin_name'] = deal_rows[1].strip()
                    self.dict['query_time'] = deal_rows[3]
                    # 获取用户的后台id
                    admin_sql = "SELECT admin_id FROM `wjf_transaction_users` WHERE `admin_name` = '{0}' LIMIT 1".format(
                        self.dict['admin_name'])
                    self.cursor.execute(admin_sql)
                    first_data = self.cursor.fetchone()
                    if first_data is None:
                        print('没有客户名称为' + str(self.dict['admin_name']) + '的数据')
                        break
                    self.dict['admin_id'] = first_data[0]
                    # 如果数据库存在该用户的该天数据,就跳过
                    summary_sql = "SELECT id FROM `wjf_summary_datas` WHERE `admin_id` = '{0}' and `deal_time_stamp` = '{1}'LIMIT 1".format(
                        self.dict['admin_id'], self.dict['deal_time_stamp'])
                    self.cursor.execute(summary_sql)
                    summary_data = self.cursor.fetchone()
                    if summary_data is not None:
                        print(
                            '客户:' + str(self.dict['admin_name']) + str(self.dict['deal_time']) + '的数据已存在')
                        break
                if '期货公司名称' in deal_rows:
                    self.dict['futures_company'] = deal_rows[1]
                if '上日结存' in deal_rows:
                    self.dict['previous_day_balance'] = deal_rows[1]
                    self.dict['customer_rights'] = deal_rows[3]
                if '当日存取合计' in deal_rows:
                    self.dict['total_access_for_the_day'] = deal_rows[1]
                    self.dict['actual_monetary_funds'] = deal_rows[3]
                if '当日盈亏' in deal_rows:
                    self.dict['profit_loss_day'] = deal_rows[1]
                    self.dict['not_currency_credit_amount'] = deal_rows[3]
                if '当日总权利金' in deal_rows:
                    self.dict['total_royalties_of_the_day'] = deal_rows[1]
                    self.dict['currency_credit_amount'] = deal_rows[3]
                if '当日手续费' in deal_rows:
                    self.dict['day_handling_fee'] = deal_rows[1]
                    self.dict['frozen_funds'] = deal_rows[3]
                if '当日结存' in deal_rows:
                    self.dict['day_balance'] = deal_rows[1]
                    self.dict['margin_occupation'] = deal_rows[3]
                if '可用资金' in deal_rows:
                    self.dict['available_funds'] = deal_rows[1]
                if '风险度' in deal_rows:
                    self.dict['risk'] = deal_rows[1]
                if '追加保证金' in deal_rows:
                    self.dict['margin_call'] = deal_rows[1]
                # 期货期权账户出入金明细(单位:人民币 开始
                if '期货期权账户出入金明细(单位:人民币)' in deal_rows:
                    cny_i = i
                if '合计' in deal_rows and (cny_i > 0):
                    self.dict['out_money_cny'] = self.filter_field(
                        deal_rows[1], 'decimal')
                    self.dict['enter_money_cny'] = self.filter_field(
                        deal_rows[2], 'decimal')
                    cny_i = 0
                # 期货期权账户出入金明细(单位:美元)
                if '期货期权账户出入金明细(单位:美元)' in deal_rows:
                    usd_i = i
                if '合计' in deal_rows and (usd_i > 0):
                    usd_i = 0
                    self.dict['out_money_usd'] = self.filter_field(
                        deal_rows[1], 'decimal')
                    self.dict['enter_money_usd'] = self.filter_field(
                        deal_rows[2], 'decimal')
                # 期货成交汇总 开始
                if '期货成交汇总' in deal_rows:  # 期货成交汇总 开始
                    deal_row_i = i
                if '合约' in deal_rows:
                    continue
                if ('合计' in deal_rows) and (deal_row_i > 0) and (position_row_i <= 0):  # 期货成交汇总 结束
                    deal_row_i = 0
                    self.dict['deal_num'] = self.filter_field(
                        deal_rows[1], 'num')
                    self.dict['deal_turnover'] = self.filter_field(
                        deal_rows[2], 'decimal')
                    self.dict['deal_fee'] = self.filter_field(
                        deal_rows[3], 'decimal')
                    self.dict['deal_profit_loss'] = self.filter_field(
                        deal_rows[4], 'decimal')
                if i > deal_row_i and deal_row_i > 0:  # 成交汇总
                    deal_rows.append(self.dict['admin_name'])
                    deal_rows.append(self.dict['query_time'])
                    deal_rows.append(self.dict['deal_time'])
                    deal_rows.append(self.dict['created_at'])
                    # 合约中提取交易所code
                    contract = deal_rows[0]
                    contract_list = [''.join(list(g)) for k, g in groupby(
                        contract, key=lambda x: x.isdigit())]
                    exchange_code = contract_list[0]
                    exchange_num = contract_list[1]
                    deal_rows.append(exchange_code)
                    deal_rows.append(exchange_num)
                    # 字符串转数字,以免数据库保存报错
                    deal_rows[0] = self.filter_field(deal_rows[0])
                    deal_rows[1] = self.filter_field(deal_rows[1])
                    deal_rows[2] = self.filter_field(deal_rows[2])
                    deal_rows[3] = self.filter_field(deal_rows[3], 'decimal')
                    deal_rows[4] = self.filter_field(deal_rows[4], 'num')
                    deal_rows[5] = self.filter_field(deal_rows[5], 'decimal')
                    deal_rows[8] = self.filter_field(deal_rows[8], 'decimal')
                    deal_rows[6] = self.filter_field(deal_rows[6])
                    deal_rows[7] = self.filter_field(deal_rows[7])
                    deal_rows[9] = self.filter_field(deal_rows[9])
                    deal_rows[10] = self.filter_field(deal_rows[10])
                    deal_rows[11] = self.filter_field(deal_rows[11])
                    deal_rows[12] = self.filter_field(deal_rows[12])
                    deal_rows[13] = self.filter_field(deal_rows[13])
                    deal_rows[14] = self.filter_field(deal_rows[14])

                    sql = "INSERT INTO wjf_deal_datas(contract,buy_sell,speculation,final_price,num,turnover,open_flat,fee,profit_loss,admin_name,query_time,deal_time,created_at,exchange_code,exchange_num,admin_id,deal_time_stamp)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}','{12}','{13}','{14}','{15}','{16}');".format(
                        deal_rows[0], deal_rows[1], deal_rows[2], deal_rows[3], deal_rows[4], deal_rows[5],
                        deal_rows[6], deal_rows[7], deal_rows[8], deal_rows[9], deal_rows[10], deal_rows[11],
                        deal_rows[12], deal_rows[13], deal_rows[14], self.dict['admin_id'],
                        self.dict['deal_time_stamp'])

                    self.pymysql_action(sql)
                    # print("..." + str(i + 1) + "行期货成交写入完成")
                if '期货持仓汇总' in deal_rows:  # 期货持仓汇总 开始
                    position_row_i = i
                    deal_row_i = 0
                if ('合计' in deal_rows) and (deal_row_i == 0) and (position_row_i > 0):  # 期货持仓汇总 结束
                    position_row_i = 0
                    self.dict['position_buy_position'] = self.filter_field(
                        deal_rows[1], 'decimal')
                    self.dict['position_sell_position'] = self.filter_field(
                        deal_rows[2], 'decimal')
                    self.dict['position_profit_loss'] = self.filter_field(
                        deal_rows[3], 'decimal')
                    self.dict['position_trading_margin'] = self.filter_field(
                        deal_rows[4], 'decimal')
                    # 写入汇总数据
                    summary_sql = "INSERT INTO wjf_summary_datas(admin_id,admin_name,query_time,deal_time,deal_time_stamp,fund_account,futures_company,previous_day_balance,customer_rights,total_access_for_the_day,actual_monetary_funds,profit_loss_day,not_currency_credit_amount,total_royalties_of_the_day,currency_credit_amount,day_handling_fee,frozen_funds,day_balance,margin_occupation,available_funds,risk,margin_call,out_money_cny,enter_money_cny,out_money_usd,enter_money_usd,exchange_name,transaction_fees,reporting_fee,deal_num,deal_turnover,deal_fee,deal_profit_loss,position_buy_position,position_sell_position,position_profit_loss,position_trading_margin,created_at)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}',{12},'{13}','{14}','{15}','{16}','{17}','{18}','{19}','{20}','{21}','{22}','{23}','{24}','{25}','{26}','{27}','{28}','{29}','{30}','{31}','{32}','{33}','{34}','{35}','{36}','{37}');".format(
                        self.dict['admin_id'], self.dict['admin_name'], self.dict['query_time'], self.dict['deal_time'],
                        self.dict['deal_time_stamp'], self.dict['fund_account'], self.dict['futures_company'],
                        self.dict['previous_day_balance'], self.dict['customer_rights'],
                        self.dict['total_access_for_the_day'],
                        self.dict['actual_monetary_funds'], self.dict['profit_loss_day'],
                        self.dict['not_currency_credit_amount'], self.dict['total_royalties_of_the_day'],
                        self.dict['currency_credit_amount'], self.dict['day_handling_fee'], self.dict['frozen_funds'],
                        self.dict['day_balance'], self.dict['margin_occupation'], self.dict['available_funds'],
                        self.dict['risk'], self.dict['margin_call'], self.dict['out_money_cny'],
                        self.dict['enter_money_cny'], self.dict['out_money_usd'], self.dict['enter_money_usd'],
                        self.dict['exchange_name'], self.dict['transaction_fees'], self.dict['reporting_fee'],
                        self.dict['deal_num'], self.dict['deal_turnover'], self.dict['deal_fee'],
                        self.dict['deal_profit_loss'], self.dict['position_buy_position'],
                        self.dict['position_sell_position'], self.dict['position_profit_loss'],
                        self.dict['position_trading_margin'], self.dict['created_at'])
                    self.pymysql_action(summary_sql)
                    print("..." + "汇总合计写入完成")
                    break
                if (i > position_row_i) and (position_row_i > 0):  # 期货持仓汇总
                    rows.append(self.dict['admin_name'])
                    rows.append(self.dict['query_time'])
                    rows.append(self.dict['deal_time'])
                    rows.append(self.dict['created_at'])
                    # 合约中提取交易所code
                    contract = rows[0]
                    contract_list = [''.join(list(g)) for k, g in groupby(
                        contract, key=lambda x: x.isdigit())]
                    exchange_code = contract_list[0]
                    exchange_num = contract_list[1]
                    rows.append(exchange_code)
                    rows.append(exchange_num)
                    # 字符串转数字,以免数据库保存报错
                    rows[1] = self.filter_field(rows[1], 'int')
                    rows[2] = self.filter_field(rows[2], 'decimal')
                    rows[3] = self.filter_field(rows[3], 'int')
                    rows[4] = self.filter_field(rows[4], 'decimal')
                    rows[5] = self.filter_field(rows[5], 'decimal')
                    rows[6] = self.filter_field(rows[6], 'decimal')
                    rows[7] = self.filter_field(rows[7], 'decimal')
                    rows[8] = self.filter_field(rows[8], 'decimal')
                    rows[0] = self.filter_field(rows[0])
                    rows[9] = self.filter_field(rows[9])
                    rows[10] = self.filter_field(rows[10])
                    rows[11] = self.filter_field(rows[11])
                    rows[12] = self.filter_field(rows[12])
                    rows[13] = self.filter_field(rows[13])
                    rows[14] = self.filter_field(rows[14])
                    rows[15] = self.filter_field(rows[15])
                    sql = "INSERT INTO wjf_position_datas(contract,buy_position,buy_average_price,sell_position,sell_average_price,settlement_price_yesterday,settlement_price_today,profit_loss,trading_margin,speculation,admin_name,query_time,deal_time,created_at,exchange_code,exchange_num,admin_id,deal_time_stamp)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}','{12}','{13}','{14}','{15}','{16}','{17}');".format(
                        rows[0], rows[1], rows[2], rows[3], rows[4], rows[5], rows[6], rows[7], rows[8], rows[9],
                        rows[10], rows[11], rows[12], rows[13], rows[14], rows[15], self.dict['admin_id'],
                        self.dict['deal_time_stamp'])
                    self.pymysql_action(sql)
                    # print("..." + str(i + 1) + "行期货持仓写入完成")
                # 当没有期货持仓汇总数据的时候,最后额数据在这里写入,如果有期货持仓汇总,不会执行到这里
                if nrows-1 == i:
                    print('...没有期货持仓汇总,在最后一行写入汇总数据')
                    # 写入汇总数据
                    summary_sql = "INSERT INTO wjf_summary_datas(admin_id,admin_name,query_time,deal_time,deal_time_stamp,fund_account,futures_company,previous_day_balance,customer_rights,total_access_for_the_day,actual_monetary_funds,profit_loss_day,not_currency_credit_amount,total_royalties_of_the_day,currency_credit_amount,day_handling_fee,frozen_funds,day_balance,margin_occupation,available_funds,risk,margin_call,out_money_cny,enter_money_cny,out_money_usd,enter_money_usd,exchange_name,transaction_fees,reporting_fee,deal_num,deal_turnover,deal_fee,deal_profit_loss,position_buy_position,position_sell_position,position_profit_loss,position_trading_margin,created_at)VALUES('{0}','{1}','{2}','{3}','{4}','{5}','{6}','{7}','{8}','{9}','{10}','{11}',{12},'{13}','{14}','{15}','{16}','{17}','{18}','{19}','{20}','{21}','{22}','{23}','{24}','{25}','{26}','{27}','{28}','{29}','{30}','{31}','{32}','{33}','{34}','{35}','{36}','{37}');".format(
                        self.dict['admin_id'], self.dict['admin_name'], self.dict['query_time'], self.dict['deal_time'],
                        self.dict['deal_time_stamp'], self.dict['fund_account'], self.dict['futures_company'],
                        self.dict['previous_day_balance'], self.dict['customer_rights'],
                        self.dict['total_access_for_the_day'],
                        self.dict['actual_monetary_funds'], self.dict['profit_loss_day'],
                        self.dict['not_currency_credit_amount'], self.dict['total_royalties_of_the_day'],
                        self.dict['currency_credit_amount'], self.dict['day_handling_fee'], self.dict['frozen_funds'],
                        self.dict['day_balance'], self.dict['margin_occupation'], self.dict['available_funds'],
                        self.dict['risk'], self.dict['margin_call'], self.dict['out_money_cny'],
                        self.dict['enter_money_cny'], self.dict['out_money_usd'], self.dict['enter_money_usd'],
                        self.dict['exchange_name'], self.dict['transaction_fees'], self.dict['reporting_fee'],
                        self.dict['deal_num'], self.dict['deal_turnover'], self.dict['deal_fee'],
                        self.dict['deal_profit_loss'], self.dict['position_buy_position'],
                        self.dict['position_sell_position'], self.dict['position_profit_loss'],
                        self.dict['position_trading_margin'], self.dict['created_at'])
                    self.pymysql_action(summary_sql)
                    print("..." + "汇总合计写入完成")
            except Exception as e:
                print('......' + str(i + 1) + "行有异常抛出:")
                print(e)

    def get_files(self, path):
        '''
        获取所有excel文件
        :param path:路径
        :return:
        '''
        files = os.listdir(path)
        if len(files) < 1:
            print('没有文件')
        else:
            for file in files:
                file_path = path + file
                # 判断后缀
                if '.xls' in file:
                    print('开始读取文件:' + file_path)
                    self.deal_excel(file_path)
                # 删除文件
                try:
                    print('删除文件:' + file_path)
                    os.remove(file_path)
                except Exception as e:
                    print(e)

    def test(self):
        for i in range(5):
            try:
                if i == 3:
                    continue
                print(i)
            except Exception as e:
                print(e)

    def __del__(self):
        # 关闭游标
        self.cursor.close()
        # 关闭数据库连接
        self.conn.close()


if __name__ == '__main__':
    path = './exel/'
    obj = Excel()
    obj.get_files(path)

总结

cfmmc.py模拟登录获取数据,excel.py文件处理表格数据,结合定时任务及队列,可以实现自动化处理数据。

如果您需要一个可视化的界面,管理和分析爬取到的数据,让数据更有价值。可以查看期货市场监控后台管理系统