小言_互联网的博客

Elasticsearch:从 Kafka 到 Elasticsearch 的实时用户配置文件数据管道

423人阅读  评论(0)

如今,网络服务、数字媒体、传感器日志数据等众多来源产生了大量数据,只有一小部分数据得到妥善管理或利用来创造价值。读取大量数据、处理数据并根据这些数据采取行动比以往任何时候都更具挑战性。

在这篇文章中,我试图展示:

  • 在 Python 中生成模拟用户配置文件数据
  • 通过 Kafka Producer 将模za拟数据发送到 Kafka 主题
  • 使用 Logstash 读取数据并上传到 Elasticsearch
  • 使用 Kibana 可视化流数据

在我之前的文章 “Elastic:使用 Kafka 部署 Elastic Stack”,我实现了如下的一个数据 pipeline:

 在今天的文章中,我将实现如下的一个数据 pipeline:

在今天的展示中,我将使用最新的 Elastic Stack 8.6.1 来进行展示。我将使用如下的配置:

如上所示,我使用两台机器:macOS 用于安装 Elastic Stack,而另外一台 Ubuntu 机器将被用于安装 Kafka 及 Logstash。我将在 Ubuntu OS 机器上使用 Python 向 Kafka 写入数据。

安装

Elasticsearch 及 Kibana

我将使用 docker compose 的方法来安装 Elasticsearch 及 Kibana。我们可以参考文章 “Elasticsearch:使用 Docker compose 来一键部署 Elastic Stack 8.x” 来进行部署。当然,我们也可以参阅如下的文章来进行部署:

在默认的情况下,Elasticsearch 的访问是带有 HTTPS 的安全访问。

我们可以在电脑的 terminal 中打入如下的命令来检查:

curl -k -u elastic:password https://192.168.0.3:9200

 上述命令是在 Ubuntu OS 的机器上运行。它表明,我们可以在 Ubuntu OS 的机器上成功地访问 Elasticsearch。

安装 Kafka

我们安装涉及设置 Apache Kafka(我们的消息代理)。Kafka 使用 ZooKeeper 来维护配置信息和同步,因此在设置 Kafka 之前,我们需要先安装 ZooKeeper:

sudo apt-get install zookeeperd

接下来,让我们下载并解压缩 Kafka:


  
  1. wget https: / /apache.mivzakim.net /kafka / 2.4.0 /kafka_ 2.13- 2.4.0.tgz
  2. tar -xzvf kafka_ 2.13- 2.4.0.tgz
  3. sudo cp -r kafka_ 2.13- 2.4.0 /opt /kafka

现在,我们准备运行 Kafka,我们将使用以下脚本进行操作:

sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties

你应该开始在控制台中看到一些 INFO 消息:

Kafka 的配置如下

  • Kafka 正在监听 9092 端口
  • Zookeeper 正在监听 2181 端口
  • Kafka Manager 正在监听 9000 端口

我们接下来打开另外一个控制台中,并为 registered_user 创建一个主题:

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic registered_user

我们创建了一个叫做 registered_user 的 topic。上面的命令将返回如下的结果:


  
  1. $ /opt /kafka /bin /kafka-topics.sh --create --zookeeper localhost: 2181 --replication-factor 1 --partitions 1 --topic registered_user
  2. WARNING: Due to limitations in metric names, topics with a period ( '.') or underscore ( '_') could collide. To avoid issues it is best to use either, but not both.
  3. Created topic registered_user.

我们现在已经完全为开始管道做好了准备。

有关 kafka 的安装,我们也可以使用 docker-compose 来进行安装。具体安装步骤请参考 “Elastic:Data pipeline:使用 Kafka => Logstash => Elasticsearch”。

Logstash

我们接下来安装 Logstash。我们到 Elastic 的官方网站来下载时候我们平台的安装包:

 wget https://artifacts.elastic.co/downloads/logstash/logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w

  
  1. liuxg@liuxgu:~ /logstash$ wget https: / /artifacts.elastic.co /downloads /logstash /logstash- 8.6.1-amd 64.deb?_gl = 1 * 1b 5e 8ui *_ga *MTEyNjEzOTY 5Ni 4xNjQ 3MDY 1ODMx *_ga_Q 7TEQDPTH 5 *MTY 3NDk 3MjkzNS 4zODAuMS 4xNjc 0OTcyOTQ 4LjAuMC 4w
  2. -- 2023- 01- 29 14: 20: 31-- https: / /artifacts.elastic.co /downloads /logstash /logstash- 8.6.1-amd 64.deb?_gl = 1 * 1b 5e 8ui *_ga *MTEyNjEzOTY 5Ni 4xNjQ 3MDY 1ODMx *_ga_Q 7TEQDPTH 5 *MTY 3NDk 3MjkzNS 4zODAuMS 4xNjc 0OTcyOTQ 4LjAuMC 4w
  3. Resolving artifacts.elastic.co (artifacts.elastic.co)... 34.120.127.130, 2600: 1901: 0: 1d 7 ::
  4. Connecting to artifacts.elastic.co (artifacts.elastic.co)| 34.120.127.130|: 443... connected.
  5. HTTP request sent, awaiting response... 200 OK
  6. Length: 341638094 ( 326M) [ binary /octet-stream]
  7. Saving to: ‘logstash- 8.6.1-amd 64.deb?_gl = 1 * 1b 5e 8ui *_ga *MTEyNjEzOTY 5Ni 4xNjQ 3MDY 1ODMx *_ga_Q 7TEQDPTH 5 *MTY 3NDk 3MjkzNS 4zODAuMS 4xNjc 0OTcyOTQ 4LjAuMC 4w’
  8. logstash- 8.6.1-amd 64. de 100%[ = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = >] 325.81M 10.7MB /s in 31s
  9. 2023- 01- 29 14: 21: 03 ( 10.6 MB /s) - ‘logstash- 8.6.1-amd 64.deb?_gl = 1 * 1b 5e 8ui *_ga *MTEyNjEzOTY 5Ni 4xNjQ 3MDY 1ODMx *_ga_Q 7TEQDPTH 5 *MTY 3NDk 3MjkzNS 4zODAuMS 4xNjc 0OTcyOTQ 4LjAuMC 4w’ saved [ 341638094 / 341638094]
  10. liuxg@liuxgu:~ /logstash$ mv 'logstash-8.6.1-amd64.deb?_gl=1*1b5e8ui*_ga*MTEyNjEzOTY5Ni4xNjQ3MDY1ODMx*_ga_Q7TEQDPTH5*MTY3NDk3MjkzNS4zODAuMS4xNjc0OTcyOTQ4LjAuMC4w' logstash- 8.6.1-amd 64.deb

我们使用如下的命令来安装 Logstash:

sudo dpkg -i logstash-8.6.1-amd64.deb 

  
  1. liuxg@liuxgu:~ /logstash$ sudo dpkg -i logstash- 8.6.1-amd 64.deb
  2. [sudo] password for liuxg:
  3. (Reading database ... 386953 files and directories currently installed.)
  4. Preparing to unpack logstash- 8.6.1-amd 64.deb ...
  5. Unpacking logstash ( 1: 8.6.1- 1) over ( 1: 8.4.2- 1) ...
  6. Setting up logstash ( 1: 8.6.1- 1) ...
  7. Installing new version of config file /etc /logstash /jvm. options ...

为了能够配置 Logstash 能够正确地访问 Elasticsearch,我们可以参考我之前的文章 “Logstash:如何连接到带有 HTTPS 访问的集群”。我们需要按照文章里的要求创建一个叫做 truststore.p12 的文件。由于我们是以 docker 的形式启动 Elasticsearch 及 Kibana 的,我们在 macOS 的机器上使用如下的命令来拷贝证书。我们先查看容器:


  
  1. $ docker ps
  2. CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
  3. a2374f 620b 78 docker.elastic.co /kibana /kibana: 8.6.1 "/bin/tini -- /usr/l…" 7 hours ago Up 7 hours (healthy) 0.0.0.0: 5601- > 5601 /tcp elastic 8-kibana- 1
  4. e2d644 3b 8edb docker.elastic.co /elasticsearch /elasticsearch: 8.6.1 "/bin/tini -- /usr/l…" 7 hours ago Up 7 hours (healthy) 9200 /tcp, 9300 /tcp elastic 8-es 03- 1
  5. a29bbeb 4bdf 2 docker.elastic.co /elasticsearch /elasticsearch: 8.6.1 "/bin/tini -- /usr/l…" 7 hours ago Up 7 hours (healthy) 9200 /tcp, 9300 /tcp elastic 8-es 02- 1
  6. 81de3d 45943c docker.elastic.co /elasticsearch /elasticsearch: 8.6.1 "/bin/tini -- /usr/l…" 7 hours ago Up 7 hours (healthy) 0.0.0.0: 9200- > 9200 /tcp, 9300 /tcp elastic 8-es 01- 1

我们可以看到有一个叫做 elastic8-es01-1 的容器。


  
  1. $ pwd
  2. /Users /liuxg / data /elastic 8
  3. $ ls
  4. docker-compose.yml http_ca. crt kibana.yml write_ to_kafka.py
  5. $ docker cp 81 de 3d 45943c: /usr /share /elasticsearch /config /certs /ca /ca. crt .
  6. $ ls
  7. ca. crt docker-compose.yml http_ca. crt kibana.yml write_ to_kafka.py
  8. $ ls
  9. ca. crt docker-compose.yml kibana.yml

运用 ca.crt 文件,我们使用如下的命令来创建一个叫做 truststore.p12 的文件。它的 storepass 是 password:

keytool -import -file ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12

  
  1. $ keytool -import - file ca. crt -keystore truststore.p 12 -storepass password -noprompt -storetype pkcs 12
  2. Certificate was added to keystore
  3. $ ls
  4. ca. crt docker-compose.yml kibana.yml truststore.p 12

从上面,我们可以看出来它创建了 truststore.p12 这个文件。我们接下来把这个文件拷贝到 Ubuntu OS 机器下的 /etc/logstash/conf.d 目录中。


  
  1. liuxg @liuxgu :/etc/logstash/conf.d $ ls
  2. truststore.p12

我们接下来在地址 /etc/logstash/conf.d 创建一个叫做叫做 kafka_to_logstash.conf 的配置文件:

kafka_to_logstash.conf


  
  1. input {
  2. kafka {
  3. bootstrap_servers = > "192.168.0.4:9092"
  4. topics = > [ "registered_user"]
  5. }
  6. }
  7. filter {
  8. json {
  9. source = > "message"
  10. }
  11. }
  12. output {
  13. elasticsearch {
  14. hosts = > [ "https://192.168.0.3:9200"]
  15. index = > "registered_user"
  16. workers = > 1
  17. user = > "elastic"
  18. password = > "password"
  19. ssl_certificate_verification = > true
  20. truststore = > "/etc/logstash/conf.d/truststore.p12"
  21. truststore_password = > "password"
  22. }
  23. }

在上面,请注意的是:

  • 我们使用 Elasticsearch 的超级用户 elastic 来连接 Elasticsearch。它的密码是 password。在实际的使用中,我们可以创建一个合适权限的用户来进行连接。

这样我们的 Logstash 的配置就完成了。

sudo service logstash start

我们可以通过如下的命令来检查 Logstash 是否已经成功地运行起来了。

service logstash status

  
  1. liuxg@liuxgu:~$ service logstash status
  2. ● logstash.service - logstash
  3. Loaded: loaded ( /lib /systemd /system /logstash.service; disabled; vendor preset: enabled)
  4. Active: active (running) since Sun 2023- 01- 29 15: 25: 57 CST; 7s ago
  5. Main PID: 60841 (java)
  6. Tasks: 33 ( limit: 18977)
  7. Memory: 508.6M
  8. C Group: /system.slice /logstash.service
  9. └─ 60841 /usr /share /logstash /jdk /bin /java -Xms 1g -Xmx 1g -Djava.awt.headless = true - >
  10. 129 15: 25: 57 liuxgu systemd[ 1]: Started logstash.
  11. 129 15: 25: 57 liuxgu logstash[ 60841]: Using bundled JDK: /usr /share /logstash /jdk

上面表明我们的 logstash 服务已经被成功地运行起来了。我们还可以通过如下的命令来查看 logstash 服务的日志:

journalctl -u logstash

向 Kafka topic 写入数据

我们使用如下的 Python 应用向我们的 Kafka topic “registered_user” 来写入数据:

write_to_kafka.py


  
  1. from faker import Faker
  2. from kafka import KafkaProducer
  3. import json
  4. fake = Faker()
  5. import time
  6. def get_registered_dat a():
  7. return {
  8. 'first name': fake. first_name(),
  9. 'last name': fake. last_name(),
  10. 'age': fake. random_int( 0, 60),
  11. 'address': fake. address(),
  12. 'register_year': fake.year(),
  13. 'register_month': fake.month(),
  14. 'register_day': fake. day_ of_month(),
  15. 'monthly_income': fake. random_int( 28000, 100000)
  16. }
  17. def js on_serializer( data):
  18. return json.dumps( data).encode( 'utf-8')
  19. producer = KafkaProducer(bootstrap_servers =[ '192.168.0.4:9092'],
  20. value_serializer =json_serializer)
  21. if __name__ = = '__main__':
  22. while True:
  23. registered_ data = get_registered_dat a()
  24. print(registered_ data)
  25. producer. send( 'registered_user', registered_ data)
  26. time.sleep( 3)

为了运行上面的应用,我们必须安装如下的两个包:


  
  1. pip3 install Faker
  2. pip3 install kafka-python

我们在 Ubuntu OS 机器上运行上面的代码:

python write_to_kafka.py 

 我们回到 Kibana 的界面来进行查看:

GET _cat/indices

上面的命令显示:

我们可以对这个文件进行搜索:

GET registered_user/_search

 我们可以看到如下的结果:

从上面,我们可以看出来我们的数据已经被结构化。

我们可以针对这个索引进行可视化。你可以阅读我博客里的相应文章以了解更多。


转载:https://blog.csdn.net/UbuntuTouch/article/details/128786709
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场