小言_互联网的博客

今年,我只赚了一点点

243人阅读  评论(0)

大家好,我是 Jack。

之前一直有小伙伴问我,有没有免费的股票信息查询的 API 接口?

我看了一圈,很多免费的 API 接口都年久失修,失效了。

那好吧,咱自己写一个

想要玩量化交易,第一步,那得有稳定的股票数据来源。

然后再谈什么量化策略,怎么选股、何时买股。

怎么稳定的获取数据呢?

只能是抄起我的老板行,写个网络爬虫自动抓取数据

玩股票、玩基金的,应该多多少听过一款股票交流 APP 雪球。

这里面的数据很全,就它了!

前方提醒:使用网络爬虫,请控制好访问频率。

在雪球上,想要获得各种股票信息,那需要携带身份信息,也就是要有 Cookie。

没有 Cookie,很多信息是获取不到的。

2017 年的时候,我就写过关于 Cookie 的文章。

一些基础知识忘记的小伙伴,可以重温下我这个系列的文章。

网络爬虫教程(2020年)

想要获取 Cookie,那就需要进行模拟登录。

模拟登录 - 准备篇

模拟登录,顾名思义,就是模拟人类的行为,登录这个网站。

登录之后,我们就可以用保存身份信息的 Cookie,获取我们想要的各种数据:股票信息、基金信息等。

我们先手动登录,体验一下整个登录流程。

手动登录

第一步:点击登录按钮。

第二步:输入帐号和密码,并点击登录。

第三步:解锁滑块。

第四步:登录成功。

模拟登录

接下来,就是需要写个代码,让代码替我们完成上述操作

这里我使用 Selenium,它是一款自动化测试工具。

不过说实话,Selenium 这东西挺老了。

现在有不少更好的工具,不过对于模拟登录的知识储备,我还停留在 2017 年,也只会用它了。

有更好更好的方法的话,欢迎小伙伴们提交 PR。

不过,好在 Selenium 虽然老了点,但还能勉强胜任获取 Cookie 这项工作的。

Selenium 不会的小伙伴,可以看我从前的教程:

https://jackcui.blog.csdn.net/article/details/72331737

想要使用 Selenium,首先需要下载浏览器驱动,这里以 Chrome 浏览器为例。

打开 Chrome 浏览器,查看 Chrome 版本号。

然后根据这个版本号,下载相同大版本的驱动。

http://chromedriver.storage.googleapis.com/index.html

根据自己的操作系统,选择对应的版本。

我的是 Windows 电脑,选择 Win32 的版本。

下载好后,解压备用。

最后安装 Selenium 第三方依赖库。

python -m pip install selenium==3.4 --user

注意,需要安装 3.4 的版本,Selenium 的新版本改动较多,用我的代码会存在接口不兼容的情况。

模拟登录 - 实战篇

我们先睹为快,看下让代码自动登录雪球的效果:

https://cuijiahua.com/wp-content/uploads/2022/12/1.mp4

(PS:录屏时间 12.2,由于大家都知道的原因,页面为黑白)

其实模拟登录的思路很简单,就是根据审查元素,找到各个元素的位置。

比如登录按钮,右键审查元素,然后选择 Copy Xpath。

就能拷贝路径地址。

使用这种方法,找到帐号输入框、密码输入框的位置,然后点击登录即可。

这里的难点在于验证码。

不过好在,GEETEST 验证码的破解,我还是有些经验的,17 年的时候,就写过相关内容。

很多代码,直接复用即可

整体思路就是:

  • 使用Selenium打开页面。

  • 匹配到输入框,输入账号密码,点击登录。

  • 读取验证码图片,并做缺口识别。

  • 根据缺口位置,计算滑动距离。

  • 根据滑动距离,拖拽滑块到需要匹配的位置。

直接放代码:


  
  1. from selenium  import webdriver
  2. from selenium.webdriver  import ActionChains
  3. from io  import BytesIO
  4. import json
  5. import base64
  6. import time
  7. from selenium.webdriver.common.by  import By
  8. from selenium.webdriver.support.wait  import WebDriverWait
  9. from selenium.webdriver.support  import expected_conditions  as EC
  10. from time  import sleep
  11. from PIL  import Image
  12. from selenium  import webdriver
  13. # 账号
  14. USERNAME =  '***'
  15. # 密码
  16. PASSWORD =  '***'
  17. BORDER =  6
  18. class  Login( object):
  19.      def  __init__( self):
  20.         self.url =  'https://xueqiu.com/'
  21.         opt = webdriver.ChromeOptions()
  22.         opt.add_experimental_option( 'w3c'False)
  23.         self.browser = webdriver.Chrome( "chromedriver.exe", chrome_options=opt)
  24.         self.browser.maximize_window() #第一处修复,设置浏览器全屏
  25.         self.username = USERNAME
  26.         self.password = PASSWORD
  27.         self.wait = WebDriverWait(self.browser,  20)
  28.      def  __del__( self):
  29.          print( "close")
  30.      def  open( self):
  31.         self.browser.get(self.url)
  32.         ele = self.browser.find_element_by_xpath( '//*[@id="app"]/nav/div[1]/div[2]/div/div') #第二处修复,改xpath
  33.         ele.click()
  34.         username = self.wait.until(EC.presence_of_element_located((By.XPATH,  '//input[@name="username"]')))
  35.         pwd = self.wait.until(EC.presence_of_element_located((By.XPATH,  '//input[@name="password"]')))
  36.         username.send_keys(self.username)
  37.         time.sleep( 2)
  38.         pwd.send_keys(self.password)
  39.      # 获取验证码按钮
  40.      def  get_yzm_button( self):
  41.         button = self.wait.until(EC.presence_of_element_located((By.XPATH,  '/html/body/div[2]/div[1]/div/div/div/div[2]/div[2]/div[2]'))) #第三处修复,改xpath
  42.          return button
  43.      # 获取验证码图片对象
  44.      def  get_img_element( self):
  45.         element = self.wait.until(EC.presence_of_element_located((By.XPATH,  '//cavas[@name="geetest_canvas_bg geetest_absolute"]')))
  46.          return element
  47.      def  get_position( self):
  48.          # 获取验证码位置
  49.         element = self.get_img_element()
  50.         sleep( 2)
  51.         location = element.location
  52.         size = element.size
  53.         top, bottom, left, right = location[ 'y'], location[ 'y'] + size[ 'height'], location[ 'x'], location[ 'x'] + size[
  54.              'width']
  55.          return left, top, right, bottom
  56.      def  get_geetest_image( self):
  57.          """
  58.         获取验证码图片
  59.         :return: 图片对象
  60.         """
  61.          '''
  62.         <canvas class="geetest_canvas_bg geetest_absolute" height="160" width="260"></canvas>
  63.         '''
  64.          # 带阴影的图片
  65.          # im = self.wait.until(EC.presence_of_element_located((By.XPATH, '/html/body/div[4]/div[2]/div[6]/div/div[1]/div[1]/div/a/div[1]/div/canvas[1]')))
  66.         im = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR,  '.geetest_canvas_bg')))
  67.         time.sleep( 2)
  68.         im.screenshot( 'captcha.png')
  69.          # 执行 JS 代码并拿到图片 base64 数据
  70.         JS =  'return document.getElementsByClassName("geetest_canvas_fullbg")[0].toDataURL("image/png");'   # 不带阴影的完整图片
  71.         im_info = self.browser.execute_script(JS)   # 执行js文件得到带图片信息的图片数据
  72.          # 拿到base64编码的图片信息
  73.         im_base64 = im_info.split( ',')[ 1]
  74.          # 转为bytes类型
  75.         captcha1 = base64.b64decode(im_base64)
  76.          # 将图片保存在本地
  77.          with  open( 'captcha1.png''wb'as f:
  78.             f.write(captcha1)
  79.         JS =  'return document.getElementsByClassName("geetest_canvas_bg")[0].toDataURL("image/png");'
  80.          # 执行 JS 代码并拿到图片 base64 数据ng  # 带阴影的图片
  81.         im_info = self.browser.execute_script(JS)   # 执行js文件得到带图片信息的图片数据
  82.          # 拿到base64编码的图片信息
  83.         im_base64 = im_info.split( ',')[ 1]
  84.          # 转为bytes类型
  85.         captcha2 = base64.b64decode(im_base64)
  86.          # 将图片保存在本地
  87.          with  open( 'captcha2.png''wb'as f:
  88.             f.write(captcha2)
  89.         captcha1 = Image. open( 'captcha1.png')
  90.         captcha2 = Image. open( 'captcha2.png')
  91.          return captcha1, captcha2
  92.      # 获取网页截图
  93.      def  get_screen_shot( self):
  94.         screen_shot = self.browser.get_screenshot_as_png()
  95.         screen_shot = Image. open(BytesIO(screen_shot))
  96.          return screen_shot
  97.      def  get_yzm_img( self, name='captcha.png'):
  98.          #  获取验证码图片
  99.         left, top, right, bottom = self.get_position()
  100.          print( '验证码位置', top, bottom, left, right)
  101.         screen_shot = self.get_screen_shot()
  102.         captcha = screen_shot.crop((left, top, right, bottom))
  103.         captcha.save(name)
  104.          return captcha
  105.      def  get_slider( self):
  106.          # 获取滑块
  107.          # :return: 滑块对象
  108.         slider = self.wait.until(EC.element_to_be_clickable((By.CLASS_NAME,  'geetest_slider_track')))
  109.          return slider
  110.      def  get_gap( self, image1, image2):
  111.          """
  112.         获取缺口偏移量
  113.         :param image1: 不带缺口图片
  114.         :param image2: 带缺口图片
  115.         :return:
  116.         """
  117.         left =  62
  118.          for i  in  range(left, image1.size[ 0]):
  119.              for j  in  range(image1.size[ 1]):
  120.                  if  not self.is_pixel_equal(image1, image2, i, j):
  121.                     left = i
  122.                      return left
  123.          # return left
  124.      def  is_pixel_equal( self, image1, image2, x, y):
  125.          """
  126.         判断两个像素是否相同
  127.         :param image1: 图片1
  128.         :param image2: 图片2
  129.         :param x: 位置x
  130.         :param y: 位置y
  131.         :return: 像素是否相同
  132.         """
  133.          # 取两个图片的像素点
  134.         pixel1 = image1.load()[x, y]
  135.         pixel2 = image2.load()[x, y]
  136.         threshold =  60
  137.          if  abs(pixel1[ 0] - pixel2[ 0]) < threshold  and  abs(pixel1[ 1] - pixel2[ 1]) < threshold  and  abs(
  138.                 pixel1[ 2] - pixel2[ 2]) < threshold:
  139.              return  True
  140.          else:
  141.              return  False
  142.      def  get_track( self, distance):
  143.          """
  144.         根据偏移量获取移动轨迹
  145.         :param distance: 偏移量
  146.         :return: 移动轨迹
  147.         """
  148.          # 初速度
  149.         v =  0
  150.          # 单位时间为0.2s来统计轨迹,轨迹即0.2内的位移
  151.         t =  0.3
  152.          # 位移/轨迹列表,列表内的一个元素代表0.2s的位移
  153.         tracks = []
  154.          # 当前的位移
  155.         current =  5
  156.          # 到达mid值开始减速
  157.         mid = distance *  3 /  5
  158.          while current < distance:
  159.              if current < mid:
  160.                  # 加速度越小,单位时间的位移越小,模拟的轨迹就越多越详细
  161.                 a =  2
  162.              else:
  163.                 a = - 3
  164.              # 初速度
  165.             v0 = v
  166.              # 0.2秒时间内的位移
  167.             s = v0 * t +  0.4 * a * (t **  2)
  168.              # 当前的位置
  169.             current += s
  170.              # 添加到轨迹列表
  171.             tracks.append( round(s))
  172.              # 速度已经达到v,该速度作为下次的初速度
  173.             v = v0 + a * t
  174.          return tracks
  175.      def  move_to_gap( self, slider, track):
  176.          """
  177.         拖动滑块到缺口处
  178.         :param slider: 滑块
  179.         :param track: 轨迹
  180.         :return:
  181.         """
  182.         ActionChains(self.browser).click_and_hold(slider).perform()
  183.          for x  in track:
  184.             ActionChains(self.browser).move_by_offset(xoffset=x, yoffset= 0).perform()
  185.         time.sleep( 0.5)
  186.         ActionChains(self.browser).release().perform()
  187.      def  shake_mouse( self):
  188.          """
  189.         模拟人手释放鼠标抖动
  190.         :return: None
  191.         """
  192.         ActionChains(self.browser).move_by_offset(xoffset=- 2, yoffset= 0).perform()
  193.         ActionChains(self.browser).move_by_offset(xoffset= 2, yoffset= 0).perform()
  194.      def  operate_slider( self, track):
  195.          '''
  196.         拖动滑块
  197.         '''
  198.          # 获取拖动按钮
  199.         back_tracks = [- 1,- 1, - 1, - 1]
  200.         slider_bt = self.browser.find_element_by_xpath( '//div[@class="geetest_slider_button"]')
  201.          # 点击拖动验证码的按钮不放
  202.         ActionChains(self.browser).click_and_hold(slider_bt).perform()
  203.          # 按正向轨迹移动
  204.          for i  in track:
  205.             ActionChains(self.browser).move_by_offset(xoffset=i, yoffset= 0).perform()
  206.         time.sleep( 1)
  207.         ActionChains(self.browser).release().perform()
  208.      def  get_cookies( self):
  209.          try:
  210.             cookie_list = self.browser.get_cookies()
  211.             cookie_dict = {i[ 'name']: i[ 'value'for i  in cookie_list}
  212.              with  open( 'xueqiu_cookies''w', encoding= 'utf8') as f:
  213.                 cookie_dict = json.dumps(cookie_dict)
  214.                 f.write(cookie_dict)
  215.              return cookie_dict
  216.          except:
  217.              print( "cookie 获取失败")
  218.              return  None
  219.      # 读取cookie
  220.      def  return_cookie( self):
  221.         cookies =  ''
  222.          with  open( 'xueqiu_cookies''r') as f:
  223.             cookie = f.read()[ 1:- 1]
  224.             cookie = cookie.split( ', ')
  225.              for i  in cookie:
  226.                 cook = i.split( ': ')
  227.                 cookies += cook[ 0][ 1:- 1] +  '=' + cook[ 1][ 1:- 1] +  ';'
  228.              return cookies
  229.      def  run( self):
  230.          # 破解入口
  231.         self. open(), sleep( 3)
  232.         self.get_yzm_button().click(), sleep( 2) # 点击验证按钮
  233.          # 点按呼出缺口
  234.         slider = self.get_slider()
  235.          # slider.click()
  236.          # 获取带缺口的验证码图片
  237.         image1, image2 = self.get_geetest_image()
  238.         gap = self.get_gap(image1, image2)
  239.          print( '缺口位置', gap)
  240.         track = self.get_track(gap)
  241.          print( '滑动轨迹', track)
  242.         self.operate_slider(track)
  243.          # 判定是否成功
  244.         time.sleep( 8)
  245.          try:
  246.             elem = self.wait.until(
  247.                 EC.text_to_be_present_in_element((By.CLASS_NAME,  'nav__btn--longtext'),  '发帖'))
  248.              if elem:
  249.                 cookie = self.get_cookies()
  250.              else:
  251.                  print( "get cookies errors")
  252.          except Exception  as e:
  253.              print(e,  'fail! ')
  254.             time.sleep( 3)
  255.             self.run()
  256.          finally:
  257.             self.browser.quit()
  258. if __name__ ==  '__main__':
  259.     crack = Login()
  260.     crack.run()

代码我也上传到 Github 上了,代码的后续更新维护会放在这里建议 Star 收藏下

https://github.com/Jack-Cherish/quantitative

数据获取

等待模拟登录完成后,会保存一个名为 xueqiu_cookies 的文件。

这里保存的是帐号的 Cookie,使用这个 Cookie 就能获取雪球的数据了。

比如,获取一下股票实时行情和现金流量表,就可以这样写:


  
  1. #-*- coding:utf-8 -*-
  2. import requests
  3. import json
  4. def  fetch( url, xq_a_token):
  5.     headers = { 'Host''stock.xueqiu.com',
  6.          'Accept''application/json',
  7.          'Cookie''xq_a_token={};'. format(xq_a_token),
  8.          'User-Agent''Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36',
  9.          'Accept-Language''zh-Hans-CN;q=1, ja-JP;q=0.9',
  10.          'Accept-Encoding''br, gzip, deflate',
  11.          'Connection''keep-alive'}
  12.     response = requests.get(url, headers = headers)
  13.      if response.status_code !=  200:
  14.          raise Exception(response.content)
  15.      return json.loads(response.content)
  16. if __name__ ==  '__main__':
  17.      # 获取股票 SH600000 实时行情
  18.     url =  "http://stock.xueqiu.com/v5/stock/quote.json?extend=detail&symbol=SH600000&count=10"
  19.      with  open( "xueqiu_cookies""r"as f:
  20.         cookies_info = json.load(f)
  21.     res = fetch(url, cookies_info[ 'xq_a_token'])
  22.      print(res)
  23.      # 获取股票 SH600000 现金流量表
  24.     url =  "http://stock.xueqiu.com/v5/stock/finance/cn/cash_flow.json?symbol=SH600000&count=10"
  25.      with  open( "xueqiu_cookies""r"as f:
  26.         cookies_info = json.load(f)
  27.     res = fetch(url, cookies_info[ 'xq_a_token'])
  28.      print(res)

运行结果:

有了 Cookie,很多接口数据都能获取,实时行情、实时分笔、业绩预告、机构评级、资金流向趋势、资金流向历史、资金成交分布、大宗交易、融资融券、业绩指标、利润表、资产负债表、现金流量表、主营业务构成、F10 十大股东、F10 主要指标等等。

这些数据,都能获取。

絮叨

篇幅有限,今天就是带大家小小实战下。

后续我会完善各个常用查询接口,方便大家获取各类数据,用于量化分析。

万事开头难,先弄好数据,再看量化策略~

如果喜欢这类的内容,记得点赞,喜欢的人多的话,我会快速加更的~

最后必须提醒一下各位:

获取数据,请温柔,请勿高并发获取,且用且珍惜。

对了,还有不少小伙伴问我,我的量化策略收益如何。

去年的五万元实验,最后浮盈不到 10%,清仓之后就换新的策略实验了。

6月份的时候,又用上了新策略,新的策略一直跑到今年 10 月份,也就这样:

实验没放多少钱,随便玩玩,你觉得,这点收益如何?

好了,今天就聊这么多吧,我是 Jack,我们下期见~


转载:https://blog.csdn.net/c406495762/article/details/128267619
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场