我们知道 Elasticsearch output plugin 为我们在 Logstash 的 pipeline 中向 Elasticsearch 的写入提供了可能。我们可以使用如下的格式向 Elasticsearch 写入数据:
-
elasticsearch {
-
hosts
=
> [
"https://localhost:9200"]
-
index
=
>
"data-%{+YYYY.MM.dd}"
-
user
=
>
"elastic"
-
password
=
>
"NtC7cM-GKQWOxqamHd1R"
-
ssl
=
>
true
-
ca_trusted_fingerprint
=
>
"d464eed5d00a20908318b6a1de38f88daf3a867177123def4c34aa2272571aaf"
-
}
在向 Elasticsearch 写入数据的时候,目前它有四种操作:
- index:索引文档(来自 Logstash 的事件)。
- delete:通过 id 删除文档(此操作需要一个id)
- create:索引文档,如果索引中已存在该 id 的文档,则失败。
- update:通过 id 更新文档。 Update 有一个特殊情况,你可以 upsert — 更新文档(如果不存在)。 请参阅 doc_as_upsert 选项。 注意:这在 Elasticsearch 1.x 中不起作用且不受支持。 请升级到 ES 2.x 或更高版本以将此功能与 Logstash 一起使用!
一个 sprintf 样式的字符串,用于根据事件的内容更改操作。 值 %{[foo]} 将使用 foo 字段进行操作。 如果 resolved action 不在 [index, delete, create, update] 中,事件将不会发送到 Elasticsearch。 相反,事件将被发送到管道的死信队列 (DLQ)(如果启用),或者将被记录并删除。
在实际的使用中,假如我们的操作不是 index,delete create 或 update 其中的一种,那么我们该怎么办呢?比如我们想根据一定的条件来更新文档,就像 update by query 那样?我们该怎么办呢?
幸运的是,Logstash 提供了一个叫做 HTTP output plugin。它可以帮我解决这个问题。
准备数据
首先,我们来创建如下的一个索引:
-
PUT customer
/_doc
/
2
-
{
-
"id":
2,
-
"timestamp":
"2019-08-11T17:55:56Z",
-
"paymentType":
"Visa",
-
"name":
"Darby Dacks",
-
"gender":
"Female",
-
"ip_address":
"77.72.239.47",
-
"purpose":
"Shoes",
-
"country":
"Poland",
-
"age":
55,
-
"offer":
false
-
}
我们在 Kibana 中输入上面的命令来创建一个叫做 customer 的索引。它的 id 为 2。
更新数据
接下来,我们需要按照一定的条件来更新我们的数据。比如,我们想把 paymentType 为 Visa,并且年龄大于或等于 55 岁的人的 offer 设置为 true。在 Kibana 中正常的命令是这样的:
-
POST customer
/_update_
by_query
-
{
-
"query": {
-
"bool": {
-
"must": [
-
{
-
"match": {
-
"paymentType.keyword":
"Visa"
-
}
-
},
-
{
-
"range": {
-
"age": {
-
"gte":
50
-
}
-
}
-
}
-
]
-
}
-
},
-
"script": {
-
"source":
"ctx._source.offer = params.offer",
-
"lang":
"painless",
-
"params": {
-
"offer":
true
-
}
-
}
-
}
我们可以对 Logsthash 做如下的 pipeline 设计:
logstash.conf
-
input {
-
generator {
-
message
=
>
'{"id":2,"timestamp":"2019-08-11T17:55:56Z","paymentType":"Visa","name":"Darby Dacks","gender":"Female","ip_address":"77.72.239.47","purpose":"Shoes","country":"Poland","age":55}'
-
count
=
>
1
-
}
-
}
-
-
filter {
-
js
on {
-
source
=
>
"message"
-
}
-
-
if [paymentType]
=
=
"Mastercard" {
-
drop {}
-
}
-
-
mutate {
-
remove_field
=
> [
"message",
"@timestamp",
"path",
"host",
"@version",
"log",
"event"]
-
}
-
-
}
-
-
output {
-
stdout {
-
codec
=
> rubydebug
-
}
-
http {
-
url
=
>
"https://localhost:9200/customer/_update_by_query"
-
user
=
>
"elastic"
-
password
=
>
"Y+6tv9jejPl=W4IGrTD="
-
http_
method
=
>
"post"
-
format
=
>
"message"
-
content_
type
=
>
"application/json"
-
message
=
>
'{"query":{"bool":{"must":[{"match":{"paymentType.keyword":"%{paymentType}"}},{"range":{"age":{"gte":"%{age}"}}}]}},"script":{"source":"ctx._source.offer = params.offer","lang":"painless","params":{"offer":true}}}'
-
cacert
=
>
"/Users/liuxg/elastic/elasticsearch-8.6.1/config/certs/http_ca.crt"
-
}
-
}
在上面,我们在 message 中通过一个查询,匹配到 paymentType.keyword 为 Visa,并且 age 为大于等于 55 的文档,我们设置该用户为促销对象。把他的 offer 值设置为 true。这个在实际的使用中,依据自己的条件来进行配置。在上面,cacert 为我们的 Elasticsearch 的证书文件位置。具体使用,请参考文档。
我们接下来运行 Logstash 的 pipeline:
./bin/logstash -f logstash.conf
在上面我们可以看出来信息的输出。我们在 Kibana 中使用如下的命令来检查更新后的文档:
GET customer/_search
-
{
-
"took":
0,
-
"timed_out":
false,
-
"_shards": {
-
"total":
1,
-
"successful":
1,
-
"skipped":
0,
-
"failed":
0
-
},
-
"hits": {
-
"total": {
-
"value":
1,
-
"relation":
"eq"
-
},
-
"max_score":
1,
-
"hits": [
-
{
-
"_index":
"customer",
-
"_id":
"2",
-
"_score":
1,
-
"_source": {
-
"offer":
true,
-
"country":
"Poland",
-
"gender":
"Female",
-
"purpose":
"Shoes",
-
"name":
"Darby Dacks",
-
"id":
2,
-
"ip_address":
"77.72.239.47",
-
"age":
55,
-
"timestamp":
"2019-08-11T17:55:56Z",
-
"paymentType":
"Visa"
-
}
-
}
-
]
-
}
-
}
很显然,我们上面的 offer 值现在变为 true,而不是之前的 false。
转载:https://blog.csdn.net/UbuntuTouch/article/details/129155064