小言_互联网的博客

Beats:如何避免重复的导入数据

297人阅读  评论(0)

Beats 框架保证至少一次交付,以确保将事件发送到支持确认的输出(例如 Elasticsearch,Logstash,Kafka 和 Redis)时不会丢失任何数据。 如果一切按计划进行,那就太好了。 但是,如果 Filebeat 在处理过程中关闭,或者在确认事件之前断开了连接,则最终可能会有重复的数据。那么我们该如何避免重复被导入的数据呢?

 

什么原因导致 Elasticsearch 中出现重复项?

当输出被阻止时,Filebeat 中的重试机制将尝试重新发送事件,直到输出确认它们为止。 如果输出接收到事件,但是无法确认事件,则数据可能会多次发送到输出。 由于文档 ID 通常是在 Elasticsearch 从 Beats 接收数据后由其设置的,因此重复事件被索引为新文档。

示例

我们仿照之前文章 “Beats:使用 Filebeat 导入 JSON 格式的日志文件” 来导入一个如下的 sample.json 文件:

sample.json


  
  1. { "id": "1", "user_name": "arthur", "verified": false, "event": "logged_in"}
  2. { "id": "2", "user_name": "arthur", "verified": true, "event": "changed_state"}

在上面的文档中,我们假定 "id" 字段为唯一值,也就是说没有任何文档的 "id" 值是和其它文档是一样的。虽然,这里只有两条文档,但足以说明问题。我们按照 “Beats:使用 Filebeat 导入 JSON 格式的日志文件” 文章中所示的方法把文档导入 Elasticsearch:

./filebeat -e -c  filebeat.yml

等文档导入完毕后,我们可以通过如下的命令来查看已经被导入的文档:

GET logs_json/_search

或者通过如下的命令来查看多少个文档:

GET logs_json/_count

上面的命令显示:


  
  1. {
  2. "count" : 2,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }

它有两个文档在 logs_json 索引中。

在通常的情况下,Filebeat 框架可以保证至少一次的交付,但是在某些特殊的情况下,比如 filebeat 坏掉,而没有收到从 Elasticsearch 发回来的确认信息,那么下次启动 filebeat 时,它会重复发送之前已经发送过的文档。或者确实信息在传输过程中,由于某种原因没有被正确收到,那么 filebeat 也会在规定的时间内重新发送已经发送过的文档。在 Elasticsearch 为多次重复发送的文档分别自动分配唯一的 _id 值。我们可以通过如下的实验来进行验证。首先我们找到 filebeat 安装的 registry 目录。

Filebeat 的 registry 文件存储 Filebeat 用于跟踪上次读取位置的状态和位置信息。 

  • data/registry 针对 .tar.gz and .tgz 归档文件安装
  • /var/lib/filebeat/registry 针对 DEB 及 RPM 安装包
  • c:\ProgramData\filebeat\registry 针对 Windows zip 文件

针对我的本地安装:


  
  1. $ pwd
  2. /Users/liuxg/elastic1/filebeat- 7.11. 0-darwin-x86_64
  3. $ ls
  4. LICENSE.txt filebeat module
  5. NOTICE.txt filebeat.reference.yml modules.d
  6. README.md filebeat.yml sample.json
  7. data filebeat1.yml
  8. fields.yml kibana
  9. $ rm -rf data/registry/

在 registry 中它保存了曾经已经被导入过文件的所有的信息。在上面我们把这个文件夹删除了,表明之前的记录都不存在,对于曾经导入过的文件,我们再次执行导入的动作,那么之前的文档也会被重新导入。这个模拟了确认信息丢失的情况。重新启动 filebeat 来导入刚才导入的文件:

./filebeat -e -c  filebeat.yml

然后,我们在 Kibana 中再次获得被导入的文档的个数:

GET logs_json/_count

  
  1. {
  2. "count" : 4,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }

从上面的数值中,我们可以看出来,文档的格式从之前的 2 变为 4,也就是说之前已经被导入的文档也被重新导入了。那么我们该如何解决这个问题呢?我们期望的结果是无论确认信息是否丢失,那么重新导入文档,我们不希望出现同样的文档。

在接下的实验中,我们在 Kibana 中删除刚才已经被导入的索引  logs_json:

DELETE logs_json

 

如何避免重复?

与其允许 Elasticsearch 设置文档 ID,不如在 Beats 中设置 ID。 该 ID 存储在 Beats@metadata._id 字段中,并用于在建立索引期间设置文档 ID。 这样,如果 Beats 将同一事件多次发送到 Elasticsearch,则 Elasticsearch 会覆盖现有文档,而不是创建一个新文档。 @metadata._id 字段随事件一起传递,因此你可以在 Filebeat 发布事件之后,然后在Elasticsearch 接收事件之前,使用它来设置文档 ID。 例如,请参阅 Logstash 管道示例。 有几种方法可以在 Beats 中设置文档 ID:

add_id 处理器:

如果你的数据没有自然键字段,并且你无法从现有字段派生唯一键,请使用 add_id 处理器。 本示例为每个事件生成一个唯一的 ID,并将其添加到 @metadata._id 字段中:

filebeat.yml


  
  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /Users/liuxg/data/processors/sample.json
  6. processors:
  7. - decode_json_fields:
  8. fields: [ 'message']
  9. target: ''
  10. overwrite_keys: true
  11. - drop_fields:
  12. fields: [ "message", "ecs", "agent", "log"]
  13. - add_id: ~
  14. setup.template.enabled: false
  15. setup.ilm.enabled: false
  16. output.elasticsearch:
  17. hosts: [ "localhost:9200"]
  18. index: "logs_json"
  19. bulk_max_size: 1000

我们删除 registry 目录,然后执行如下的命令:

./filebeat -e -c  filebeat.yml

在上面,我们通过 add_id 这个 processor 为我们的文档设定唯一的 id。最终在 Elasticsearch 中的文档 _id 是由 Filebeat 所生成的。如果我们再次删除 registry 并重新导入文档的话,那么在 Elasticsearch 中将会有重复的文档尽管我们可以通过这样的方式来设定最终文档的 _id。

为了下面的练习,我们在 Kibana 中删除 logs_json 索引,并同时删除 registry 目录。

 

fingerprint 处理器

这个方法和我之前在 Logstash 中介绍的方法是一样的。请详细阅读文章 “Logstash:运用 fingerprint 过滤器处理重复的文档”。使用 fingerprint 处理器从一个或多个现有字段中派生唯一密钥。本示例使用 id 和 user_name 的值来派生唯一键,并将其添加到 @metadata._id 字段中:

filebeat.yml


  
  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - /Users/liuxg/data/processors/sample.json
  6. processors:
  7. - decode_json_fields:
  8. fields: [ 'message']
  9. target: ''
  10. overwrite_keys: true
  11. - drop_fields:
  12. fields: [ "message", "ecs", "agent", "log"]
  13. - fingerprint:
  14. fields: [ "id", "user_name"]
  15. target_field: "@metadata._id"
  16. setup.template.enabled: false
  17. setup.ilm.enabled: false
  18. output.elasticsearch:
  19. hosts: [ "localhost:9200"]
  20. index: "logs_json"
  21. bulk_max_size: 1000

我们重新运行 filebeat 并导入数据到 Elasticsearch 中:

./filebeat -e -c  filebeat.yml

我们通过如下的命令查询文档数:

GET logs_json/_count

  
  1. {
  2. "count" : 2,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }

接下来,我们再次删除 registry 目录,但是保留 Elasticsearch 中的 logs_json 索引。我们再次运行 filebeat 重新导入 sample.json 文件。

./filebeat -e -c  filebeat.yml

运行完过后,我们重新在 Kibana 中检查文档的个数:

GET logs_json/_count

  
  1. {
  2. "count" : 2,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }

我们会发现这次文档的格式还是2,它表明虽然文档被重新导入,但是由于使用了 fingerprint,它生成了唯一 @metadata_id,这样当同一的文档被导入到 Elasticsearch 中后,它不会生成新的文档,而是更新之前的文档,所以没有新的文档生成。

 

encode_json_fields 处理器

解码包含自然键字段的 JSON 字符串时,请使用 encode_json_fields 处理器中的 document_id 设置。

针对我们的 sample.json 文件,我们假定 "id" 字段为唯一的值。 本示例从 JSON 字符串中获取 id 的值,并将其存储在 @metadata._id 字段中:

filebeat.yml


  
  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. paths:
  5. - sample.json
  6. processors:
  7. - decode_json_fields:
  8. fields: [ 'message']
  9. target: ''
  10. overwrite_keys: true
  11. - drop_fields:
  12. fields: [ "message", "ecs", "agent", "log"]
  13. - decode_json_fields:
  14. document_id: "id"
  15. fields: [ "message"]
  16. max_depth: 1
  17. target: ""
  18. setup.template.enabled: false
  19. setup.ilm.enabled: false
  20. output.elasticsearch:
  21. hosts: [ "localhost:9200"]
  22. index: "logs_json"
  23. bulk_max_size: 1000

同样,我们在实验之前,删除 registry 目录,并删除 logs_json 索引,并再次启动 filebeat 导入数据。我们通过如下的命令获得索引文档的个数:

GET logs_json/_count

  
  1. {
  2. "count" : 2,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }

我们再次删除 registry 目录,并同时导入数据。这次我们不删除 logs_json 索引,我们再次查看文档的个数是否增加:


  
  1. {
  2. "count" : 4,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }

由于一些原因这种方法可能是由于有 bug,还不能指定文档的 _id。

 

JSON 输入设定

如果你要提取 JSON 格式的数据,并且该数据具有自然键字段,请使用 json.document_id 输入设置。

本示例从 JSON 文档中获取 id 的值,并将其存储在 @metadata._id字段中:

sample.json


  
  1. { "id": "1", "user_name": "arthur", "verified": false, "evt": "logged_in"}
  2. { "id": "2", "user_name": "arthur", "verified": true, "evt": "changed_state"}

filebeat.yml


  
  1. filebeat.inputs:
  2. - type: log
  3. enabled: true
  4. tags: [ "i", "love", "json"]
  5. json.message_key: evt
  6. json.keys_under_root: true
  7. json.add_error_key: true
  8. json.document_id: "id"
  9. fields:
  10. planet: liuxg
  11. paths:
  12. - sample.json
  13. output.elasticsearch:
  14. hosts: [ "localhost:9200"]
  15. index: "json_logs1"
  16. setup.ilm.enabled: false
  17. setup.template.name: json_logs1
  18. setup.template.pattern: json_logs1

启动 filebeat 来导入数据,并查询索引 json_logs1 的文档个数:


  
  1. {
  2. "count" : 2,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }

删除 registry 目录,并重新启动 filebeat 来导入数据:


  
  1. {
  2. "count" : 2,
  3. "_shards" : {
  4. "total" : 1,
  5. "successful" : 1,
  6. "skipped" : 0,
  7. "failed" : 0
  8. }
  9. }
GET json_logs1/_search

  
  1. {
  2. "took" : 1,
  3. "timed_out" : false,
  4. "_shards" : {
  5. "total" : 1,
  6. "successful" : 1,
  7. "skipped" : 0,
  8. "failed" : 0
  9. },
  10. "hits" : {
  11. "total" : {
  12. "value" : 2,
  13. "relation" : "eq"
  14. },
  15. "max_score" : 1.0,
  16. "hits" : [
  17. {
  18. "_index" : "json_logs1",
  19. "_type" : "_doc",
  20. "_id" : "1",
  21. "_score" : 1.0,
  22. "_source" : {
  23. "@timestamp" : "2021-02-24T02:24:56.084Z",
  24. "evt" : "logged_in",
  25. "input" : {
  26. "type" : "log"
  27. },
  28. "fields" : {
  29. "planet" : "liuxg"
  30. },
  31. "agent" : {
  32. "id" : "e2b7365d-8953-453c-87b5-7e8a65a5bc07",
  33. "name" : "liuxg",
  34. "type" : "filebeat",
  35. "version" : "7.11.0",
  36. "hostname" : "liuxg",
  37. "ephemeral_id" : "7b309ac8-48d1-46d4-839f-70948fddd428"
  38. },
  39. "ecs" : {
  40. "version" : "1.6.0"
  41. },
  42. "host" : {
  43. "name" : "liuxg"
  44. },
  45. "log" : {
  46. "offset" : 0,
  47. "file" : {
  48. "path" : "/Users/liuxg/elastic1/filebeat-7.11.0-darwin-x86_64/sample.json"
  49. }
  50. },
  51. "user_name" : "arthur",
  52. "verified" : false,
  53. "tags" : [
  54. "i",
  55. "love",
  56. "json"
  57. ]
  58. }
  59. },
  60. {
  61. "_index" : "json_logs1",
  62. "_type" : "_doc",
  63. "_id" : "2",
  64. "_score" : 1.0,
  65. "_source" : {
  66. "@timestamp" : "2021-02-24T02:24:56.084Z",
  67. "fields" : {
  68. "planet" : "liuxg"
  69. },
  70. "input" : {
  71. "type" : "log"
  72. },
  73. "host" : {
  74. "name" : "liuxg"
  75. },
  76. "agent" : {
  77. "ephemeral_id" : "7b309ac8-48d1-46d4-839f-70948fddd428",
  78. "id" : "e2b7365d-8953-453c-87b5-7e8a65a5bc07",
  79. "name" : "liuxg",
  80. "type" : "filebeat",
  81. "version" : "7.11.0",
  82. "hostname" : "liuxg"
  83. },
  84. "evt" : "changed_state",
  85. "user_name" : "arthur",
  86. "verified" : true,
  87. "log" : {
  88. "file" : {
  89. "path" : "/Users/liuxg/elastic1/filebeat-7.11.0-darwin-x86_64/sample.json"
  90. },
  91. "offset" : 74
  92. },
  93. "tags" : [
  94. "i",
  95. "love",
  96. "json"
  97. ],
  98. "ecs" : {
  99. "version" : "1.6.0"
  100. }
  101. }
  102. }
  103. ]
  104. }
  105. }

上面显示这种方法是可以行的。还是两个文档。也就是说明了我们可以通过这种方法来设置 Elasticsearch 中的文档 id。


转载:https://blog.csdn.net/UbuntuTouch/article/details/114007425
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场