飞道的博客

Elastic:如何摄入 Websocket 数据到 Elasticsearch

359人阅读  评论(0)

Elasticsearch 是一个分布式的 RESTful 搜索和分析引擎,能够解决越来越多的用例。 作为Elastic Stack 的核心,它集中存储你的数据,以实现闪电般的快速搜索,微调的相关性以及易于扩展的强大分析。Elasticsearch 在很多的情况下可以帮我们解决实时的商业数据的分析及统计。 在很多的实时事件处理中,Websocket 经常会用到。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。那么我们如何把 Websocket 数据导入到 Elasticsearch 中呢?

在今天的展示中,我们将使用一个 Python 应用作为 Websocket 的 router。我们的架构如下:

在上面,我们使用 Websocket 来采集数据并把它转为可以被导入的数据结果。这些实时的商业数据可以是股票等信息。这个原理和我们之前摄入 MQTT 的方法是一样的。

 

准备工作

如果你还没有安装好自己的 Elasticsearch 及  Kibana 的话,那么请你参照我之前的文章 “Elastic:菜鸟上手指南” 来安装 Elasticsearch 及 Kibana。

在今天的练习中,我将使用 https://finnhub.io/ 网站提供的 REST API 接口来进行展示。我们必须申请 API key 以得到数据:

当你签名过后,你可以看到如下所示的 API key:

我们点击上面的 API Documentation:

在左边我们点击 Trades,然后拷贝自己的代码并保存到本地文件 finnhub-websockets.py 中:

finnhub-websockets.py


  
  1. #https: //pypi.org/project/websocket_client/
  2. import websocket
  3. def on_message(ws, message):
  4. print(message)
  5. def on_error(ws, error):
  6. print(error)
  7. def on_close(ws):
  8. print( "### closed ###")
  9. def on_open(ws):
  10. ws.send('{ "type": "subscribe", "symbol": "AAPL"}')
  11. ws.send('{ "type": "subscribe", "symbol": "AMZN"}')
  12. ws.send('{ "type": "subscribe", "symbol": "BINANCE:BTCUSDT"}')
  13. ws.send('{ "type": "subscribe", "symbol": "IC MARKETS:1"}')
  14. if __name__ == "__main__":
  15. websocket.enableTrace( True)
  16. ws = websocket. WebSocketApp( "wss://ws.finnhub.io?token=c091aan48v6tm13rku80",
  17. on_message = on_message,
  18. on_error = on_error,
  19. on_close = on_close)
  20. ws.on_open = on_open
  21. ws.run_forever()

我们接下来安装 websocket-client:

pip3 install websocket-client

然后,我们直接运行上面的 Python 应用:

python3 finnhub-websockets.py

我们将看到如下的类似的输出:

好了,看起来我们的 API 是成功的。

 

导入数据到 Elasticsearch

我们接下来把我们的数据导入到 Elasticsearch 中去。我们访问网址 https://elasticsearch-py.readthedocs.io/en/v7.10.1/。首先,我们必须安装 elasticsearch 安装包:

pip3 install elasticsearch

我们可以参考我之前的文章 “Elasticsearch 开发入门 - Python”。我们可以安装那篇文章中介绍的方式来进行 ES 的连接,并进一步修改我们的 finnhub-websocket.py 文件:

finnhub-websocket.py


  
  1. #https: //pypi.org/project/websocket_client/
  2. import json
  3. import datetime
  4. import websocket
  5. from elasticsearch import Elasticsearch
  6. es = Elasticsearch([{ 'host': 'localhost', 'port': 9200}])
  7. def on_message(ws, message):
  8. message_json = json.loads(message)
  9. message_json[ "@timestamp"] = datetime.datetime.utcnow().isoformat()
  10. res = es.index(index= "websockets-data", body=message_json)
  11. print(message_json)
  12. def on_error(ws, error):
  13. print(error)
  14. def on_close(ws):
  15. print( "### closed ###")
  16. def on_open(ws):
  17. ws.send('{ "type": "subscribe", "symbol": "AAPL"}')
  18. ws.send('{ "type": "subscribe", "symbol": "AMZN"}')
  19. ws.send('{ "type": "subscribe", "symbol": "TSLA"}')
  20. ws.send('{ "type": "subscribe", "symbol": "ESTC"}')
  21. if __name__ == "__main__":
  22. websocket.enableTrace( True)
  23. ws = websocket. WebSocketApp( "wss://ws.finnhub.io?token=<my-finnhub-token>",
  24. on_message = on_message,
  25. on_error = on_error,
  26. on_close = on_close)
  27. ws.on_open = on_open
  28. ws.run_forever()

在上面你需要把自己的 token 填入到上面的代码中。这里我说明一下。

  1.  es 变量是建立和 Elasticsearch 的连接。你需要根据自己的部署而修改上面的地址及端口。如果必要,你还需要提供相应的账号信息来进行连接
  2. on_message 中,我们添加了当前的时间戳,这样可以使得我们我们的数据具有时效性,从而可以更精准地分析数据
  3. 我们的数据在 Elasticsearch 中被保存在 websockets-data 索引中。

我们重新运行我们的应用:

python3 finnhub-websockets.py 

同样地,我们可以看到数据源源不断地导入到 Elasticsearch 中。

我们可以在 Kibana 中通过如下的命令来查看新生产的 websockets-data 索引:

GET  _cat/indices

  
  1. yellow open websockets-data n 3RU 2Ze 8Rj-hVi 3a 8H 3-zw 1 1 1 0 4kb 4kb
  2. green open .apm-custom-link Fqq-lxCiQHKib 8kxdO 0uoQ 1 0 0 0 208b 208b
  3. green open .kibana_task_manager_ 1 29ilRYTkSOSx 1aFtR 0DUWQ 1 0 5 213 89. 3kb 89. 3kb
  4. green open .apm-agent-configuration yY 8-Sbn 8TbWac 4R_l 1- 8qQ 1 0 0 0 208b 208b
  5. green open .kibana-event-log- 7. 10. 0- 000001 g 7vkPKUHQiqxfpJDGnvKmw 1 0 1 0 5. 6kb 5. 6kb
  6. green open .kibana_ 1 oF 471rX 0R 8Cu 4H 1tvE 813Q 1 0 18 2 10. 4mb 10. 4mb

我们可以为这个索引创建一个索引模式:

在目前写这篇文章的时候,不是美国的交易时间,所以 websocket 里暂时没有数据。在交易的时间,Websocket 会自己向 Elasticsearch 推送数据。我们会发现如下的这些字段:

上面的字段的定义,我们可以在地址找到:

 

我们可以使用  Kibana 中的 Lens 为我们的 Stock 数据进行实时的数据分析:

 

总结

在本篇文章中,我们介绍了如何使用 Python 语言作为一个 router 把 websocket 所生成的实时数据导入到 Elasticsearch 中,并在 Elasticsearch 中进行分析。


转载:https://blog.csdn.net/UbuntuTouch/article/details/113307195
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场