Elasticsearch 是一个分布式的 RESTful 搜索和分析引擎,能够解决越来越多的用例。 作为Elastic Stack 的核心,它集中存储你的数据,以实现闪电般的快速搜索,微调的相关性以及易于扩展的强大分析。Elasticsearch 在很多的情况下可以帮我们解决实时的商业数据的分析及统计。 在很多的实时事件处理中,Websocket 经常会用到。WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。那么我们如何把 Websocket 数据导入到 Elasticsearch 中呢?
在今天的展示中,我们将使用一个 Python 应用作为 Websocket 的 router。我们的架构如下:
在上面,我们使用 Websocket 来采集数据并把它转为可以被导入的数据结果。这些实时的商业数据可以是股票等信息。这个原理和我们之前摄入 MQTT 的方法是一样的。
准备工作
如果你还没有安装好自己的 Elasticsearch 及 Kibana 的话,那么请你参照我之前的文章 “Elastic:菜鸟上手指南” 来安装 Elasticsearch 及 Kibana。
在今天的练习中,我将使用 https://finnhub.io/ 网站提供的 REST API 接口来进行展示。我们必须申请 API key 以得到数据:
当你签名过后,你可以看到如下所示的 API key:
我们点击上面的 API Documentation:
在左边我们点击 Trades,然后拷贝自己的代码并保存到本地文件 finnhub-websockets.py 中:
finnhub-websockets.py
-
#https:
//pypi.org/project/websocket_client/
-
import websocket
-
-
def on_message(ws, message):
-
print(message)
-
-
def on_error(ws, error):
-
print(error)
-
-
def on_close(ws):
-
print(
"### closed ###")
-
-
def on_open(ws):
-
ws.send('{
"type":
"subscribe",
"symbol":
"AAPL"}')
-
ws.send('{
"type":
"subscribe",
"symbol":
"AMZN"}')
-
ws.send('{
"type":
"subscribe",
"symbol":
"BINANCE:BTCUSDT"}')
-
ws.send('{
"type":
"subscribe",
"symbol":
"IC MARKETS:1"}')
-
-
if __name__ ==
"__main__":
-
websocket.enableTrace(
True)
-
ws = websocket.
WebSocketApp(
"wss://ws.finnhub.io?token=c091aan48v6tm13rku80",
-
on_message = on_message,
-
on_error = on_error,
-
on_close = on_close)
-
ws.on_open = on_open
-
ws.run_forever()
我们接下来安装 websocket-client:
pip3 install websocket-client
然后,我们直接运行上面的 Python 应用:
python3 finnhub-websockets.py
我们将看到如下的类似的输出:
好了,看起来我们的 API 是成功的。
导入数据到 Elasticsearch
我们接下来把我们的数据导入到 Elasticsearch 中去。我们访问网址 https://elasticsearch-py.readthedocs.io/en/v7.10.1/。首先,我们必须安装 elasticsearch 安装包:
pip3 install elasticsearch
我们可以参考我之前的文章 “Elasticsearch 开发入门 - Python”。我们可以安装那篇文章中介绍的方式来进行 ES 的连接,并进一步修改我们的 finnhub-websocket.py 文件:
finnhub-websocket.py
-
-
#https:
//pypi.org/project/websocket_client/
-
import json
-
import datetime
-
import websocket
-
from elasticsearch
import
Elasticsearch
-
-
es =
Elasticsearch([{
'host':
'localhost',
'port':
9200}])
-
-
def on_message(ws, message):
-
message_json = json.loads(message)
-
message_json[
"@timestamp"] = datetime.datetime.utcnow().isoformat()
-
res = es.index(index=
"websockets-data", body=message_json)
-
print(message_json)
-
-
def on_error(ws, error):
-
print(error)
-
-
def on_close(ws):
-
print(
"### closed ###")
-
-
def on_open(ws):
-
ws.send('{
"type":
"subscribe",
"symbol":
"AAPL"}')
-
ws.send('{
"type":
"subscribe",
"symbol":
"AMZN"}')
-
ws.send('{
"type":
"subscribe",
"symbol":
"TSLA"}')
-
ws.send('{
"type":
"subscribe",
"symbol":
"ESTC"}')
-
-
if __name__ ==
"__main__":
-
websocket.enableTrace(
True)
-
ws = websocket.
WebSocketApp(
"wss://ws.finnhub.io?token=<my-finnhub-token>",
-
on_message = on_message,
-
on_error = on_error,
-
on_close = on_close)
-
ws.on_open = on_open
-
ws.run_forever()
在上面你需要把自己的 token 填入到上面的代码中。这里我说明一下。
- es 变量是建立和 Elasticsearch 的连接。你需要根据自己的部署而修改上面的地址及端口。如果必要,你还需要提供相应的账号信息来进行连接
- 在 on_message 中,我们添加了当前的时间戳,这样可以使得我们我们的数据具有时效性,从而可以更精准地分析数据
- 我们的数据在 Elasticsearch 中被保存在 websockets-data 索引中。
我们重新运行我们的应用:
python3 finnhub-websockets.py
同样地,我们可以看到数据源源不断地导入到 Elasticsearch 中。
我们可以在 Kibana 中通过如下的命令来查看新生产的 websockets-data 索引:
GET _cat/indices
-
yellow open websockets-data n
3RU
2Ze
8Rj-hVi
3a
8H
3-zw
1
1
1
0
4kb
4kb
-
green open .apm-custom-link Fqq-lxCiQHKib
8kxdO
0uoQ
1
0
0
0
208b
208b
-
green open .kibana_task_manager_
1
29ilRYTkSOSx
1aFtR
0DUWQ
1
0
5
213
89.
3kb
89.
3kb
-
green open .apm-agent-configuration yY
8-Sbn
8TbWac
4R_l
1-
8qQ
1
0
0
0
208b
208b
-
green open .kibana-event-log-
7.
10.
0-
000001 g
7vkPKUHQiqxfpJDGnvKmw
1
0
1
0
5.
6kb
5.
6kb
-
green open .kibana_
1 oF
471rX
0R
8Cu
4H
1tvE
813Q
1
0
18
2
10.
4mb
10.
4mb
我们可以为这个索引创建一个索引模式:
在目前写这篇文章的时候,不是美国的交易时间,所以 websocket 里暂时没有数据。在交易的时间,Websocket 会自己向 Elasticsearch 推送数据。我们会发现如下的这些字段:
上面的字段的定义,我们可以在地址找到:
我们可以使用 Kibana 中的 Lens 为我们的 Stock 数据进行实时的数据分析:
总结
在本篇文章中,我们介绍了如何使用 Python 语言作为一个 router 把 websocket 所生成的实时数据导入到 Elasticsearch 中,并在 Elasticsearch 中进行分析。
转载:https://blog.csdn.net/UbuntuTouch/article/details/113307195