小言_互联网的博客

Logstash:在 Logstash 管道中的定制的 Elasticsearch update by query

422人阅读  评论(0)

我们知道 Elasticsearch output plugin 为我们在 Logstash 的 pipeline 中向 Elasticsearch 的写入提供了可能。我们可以使用如下的格式向 Elasticsearch 写入数据:


  
  1. elasticsearch {
  2. hosts = > [ "https://localhost:9200"]
  3. index = > "data-%{+YYYY.MM.dd}"
  4. user = > "elastic"
  5. password = > "NtC7cM-GKQWOxqamHd1R"
  6. ssl = > true
  7. ca_trusted_fingerprint = > "d464eed5d00a20908318b6a1de38f88daf3a867177123def4c34aa2272571aaf"
  8. }

在向 Elasticsearch 写入数据的时候,目前它有四种操作:

  • index:索引文档(来自 Logstash 的事件)。
  • delete:通过 id 删除文档(此操作需要一个id)
  • create:索引文档,如果索引中已存在该 id 的文档,则失败。
  • update:通过 id 更新文档。 Update 有一个特殊情况,你可以 upsert — 更新文档(如果不存在)。 请参阅 doc_as_upsert 选项。 注意:这在 Elasticsearch 1.x 中不起作用且不受支持。 请升级到 ES 2.x 或更高版本以将此功能与 Logstash 一起使用!

一个 sprintf 样式的字符串,用于根据事件的内容更改操作。 值 %{[foo]} 将使用 foo 字段进行操作。 如果 resolved action 不在 [index, delete, create, update] 中,事件将不会发送到 Elasticsearch。 相反,事件将被发送到管道的死信队列 (DLQ)(如果启用),或者将被记录并删除。

在实际的使用中,假如我们的操作不是 index,delete create 或 update 其中的一种,那么我们该怎么办呢?比如我们想根据一定的条件来更新文档,就像 update by query 那样?我们该怎么办呢?

幸运的是,Logstash 提供了一个叫做 HTTP output plugin。它可以帮我解决这个问题。

准备数据

首先,我们来创建如下的一个索引:


  
  1. PUT customer /_doc / 2
  2. {
  3. "id": 2,
  4. "timestamp": "2019-08-11T17:55:56Z",
  5. "paymentType": "Visa",
  6. "name": "Darby Dacks",
  7. "gender": "Female",
  8. "ip_address": "77.72.239.47",
  9. "purpose": "Shoes",
  10. "country": "Poland",
  11. "age": 55,
  12. "offer": false
  13. }

我们在 Kibana 中输入上面的命令来创建一个叫做 customer 的索引。它的 id 为 2。

更新数据

接下来,我们需要按照一定的条件来更新我们的数据。比如,我们想把 paymentType 为 Visa,并且年龄大于或等于 55 岁的人的 offer 设置为 true。在 Kibana 中正常的命令是这样的:


  
  1. POST customer /_update_ by_query
  2. {
  3. "query": {
  4. "bool": {
  5. "must": [
  6. {
  7. "match": {
  8. "paymentType.keyword": "Visa"
  9. }
  10. },
  11. {
  12. "range": {
  13. "age": {
  14. "gte": 50
  15. }
  16. }
  17. }
  18. ]
  19. }
  20. },
  21. "script": {
  22. "source": "ctx._source.offer = params.offer",
  23. "lang": "painless",
  24. "params": {
  25. "offer": true
  26. }
  27. }
  28. }

我们可以对 Logsthash 做如下的 pipeline 设计:

logstash.conf


  
  1. input {
  2. generator {
  3. message = > '{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'
  4. count = > 1
  5. }
  6. }
  7. filter {
  8. js on {
  9. source = > "message"
  10. }
  11. if [paymentType] = = "Mastercard" {
  12. drop {}
  13. }
  14. mutate {
  15. remove_field = > [ "message", "@timestamp", "path", "host", "@version", "log", "event"]
  16. }
  17. }
  18. output {
  19. stdout {
  20. codec = > rubydebug
  21. }
  22. http {
  23. url = > "https://localhost:9200/customer/_update_by_query"
  24. user = > "elastic"
  25. password = > "Y+6tv9jejPl=W4IGrTD="
  26. http_ method = > "post"
  27. format = > "message"
  28. content_ type = > "application/json"
  29. message = > '{"query":{"bool":{"must":[{"match":{"paymentType.keyword":"%{paymentType}"}},{"range":{"age":{"gte":"%{age}"}}}]}},"script":{"source":"ctx._source.offer = params.offer","lang":"painless","params":{"offer":true}}}'
  30. cacert = > "/Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt"
  31. }
  32. }

在上面,我们在 message 中通过一个查询,匹配到 paymentType.keyword 为 Visa,并且 age 为大于等于 55 的文档,我们设置该用户为促销对象。把他的 offer 值设置为 true。这个在实际的使用中,依据自己的条件来进行配置。在上面,cacert 为我们的 Elasticsearch 的证书文件位置。具体使用,请参考文档

我们接下来运行 Logstash 的 pipeline:

./bin/logstash -f logstash.conf

 

在上面我们可以看出来信息的输出。我们在 Kibana 中使用如下的命令来检查更新后的文档:

GET customer/_search

  
  1. {
  2. "took": 0,
  3. "timed_out": false,
  4. "_shards": {
  5. "total": 1,
  6. "successful": 1,
  7. "skipped": 0,
  8. "failed": 0
  9. },
  10. "hits": {
  11. "total": {
  12. "value": 1,
  13. "relation": "eq"
  14. },
  15. "max_score": 1,
  16. "hits": [
  17. {
  18. "_index": "customer",
  19. "_id": "2",
  20. "_score": 1,
  21. "_source": {
  22. "offer": true,
  23. "country": "Poland",
  24. "gender": "Female",
  25. "purpose": "Shoes",
  26. "name": "Darby Dacks",
  27. "id": 2,
  28. "ip_address": "77.72.239.47",
  29. "age": 55,
  30. "timestamp": "2019-08-11T17:55:56Z",
  31. "paymentType": "Visa"
  32. }
  33. }
  34. ]
  35. }
  36. }

很显然,我们上面的 offer 值现在变为 true,而不是之前的 false。


转载:https://blog.csdn.net/UbuntuTouch/article/details/129155064
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场