在今天的文章中,我们将详述如何使用 Logstash 来解析 JSON 文件的日志,并把它导入到 Elasticsearch 中。在之前的文章 “Logstash:Data转换,分析,提取,丰富及核心操作” 也有提到过,但是没有具体的例子。总体说来解析 JSON 文件的日志有两种方法:
- 在 file input 里使用 JSON codec
- 在 file input 里不使用 JSON codec,但是在 filter 的部分使用 JSON filter
我们把 JSON 格式的数据解析并导入到 Elasticsearch 的流程如下:
准备数据
我们准备了如下的数据:
sample.json
{"id": 4,"timestamp":"2019-06-10T18:01:32Z","paymentType":"Visa","name":"Cary Boyes","gender":"Male","ip_address":"223.113.73.232","purpose":"Grocery","country":"Pakistan","pastEvents":[{"eventId":7,"transactionId":"63941-950"},{"eventId":8,"transactionId":"55926-0011"}],"age":46}
{"id": 5,"timestamp":"2020-02-18T12:27:35Z","paymentType":"Visa","name":"Betteanne Diament","gender":"Female","ip_address":"159.148.102.98","purpose":"Computers","country":"Brazil","pastEvents":[{"eventId":9,"transactionId":"76436-101"},{"eventId":10,"transactionId":"55154-3330"}],"age":41}
构建 Logstash 配置文件
使用 json codec
-
input {
-
file {
-
path => [
"/Users/liuxg/data/logstash_json/sample.json" ]
-
start_position =>
"beginning"
-
sincedb_path =>
"/dev/null"
-
codec =>
"json"
-
}
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
我们运行 Logstash:
sudo ./bin/logstash -f logstash_json.conf
上面的命令输出的结果为:
从上面的结果中,我们可以看出来文档被正确地解析。
使用 JSON filter
我们可以在 file input 中不使用任何的 code,但是我们可以可以使用 JSON filter 来完成解析的工作:
logstash_json_fileter.conf
-
input {
-
file {
-
path => [
"/Users/liuxg/data/logstash_json/sample.json" ]
-
start_position =>
"beginning"
-
sincedb_path =>
"/dev/null"
-
}
-
}
-
-
filter {
-
json {
-
source =>
"message"
-
}
-
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
在上面,我们添加了 filter 这个部分。我们使用了 json 这个过滤器来完成对 JSON 格式的解析。重新运行我们的 Logstash。我们可以看到如下的输出:
在上面,我们可以看到一个叫做 message 的字段。这个字段显然它会占存储空间。我们可以把它删除掉。同时,我们也可以去掉那些不需要的元字段以节省空间。
logstash_json_fileter.conf
-
input {
-
file {
-
path => [
"/Users/liuxg/data/logstash_json/sample.json" ]
-
start_position =>
"beginning"
-
sincedb_path =>
"/dev/null"
-
}
-
}
-
-
filter {
-
json {
-
source =>
"message"
-
}
-
-
if [paymentType] ==
"Mastercard" {
-
drop{}
-
}
-
-
mutate {
-
remove_field => [
"message",
"path",
"host",
"@version"]
-
}
-
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
在上面,我们检查 paymentType 是否为 Mastercard,如果是的话,我们把整个事件丢弃。同时我们删除不需要的字段,比如 message, path 等。重新运行 Logstash。我们可以看到如下的输出:
显然这次的输出比刚才的要干净很多。你可能已经注意到 @timestamp 的值和 timestamp 的值不太一样。在 Kibana 中,我们经常会使用 @timestamp 作为事件的时间标签。我们可以做如下的处理:
logstash_json_fileter.conf
-
input {
-
file {
-
path => [
"/Users/liuxg/data/logstash_json/sample.json" ]
-
start_position =>
"beginning"
-
sincedb_path =>
"/dev/null"
-
}
-
}
-
-
filter {
-
json {
-
source =>
"message"
-
}
-
-
if [paymentType] ==
"Mastercard" {
-
drop{}
-
}
-
-
date {
-
match => [
"timestamp",
"ISO8601" ]
-
locale => en
-
}
-
-
mutate {
-
remove_field => [
"message",
"path",
"host",
"@version",
"timestamp"]
-
}
-
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
在上面,我们添加了 date 过滤器来解析时间。同时我们也删除 timestamp 这个字段。我们得到的结果是:
从上面我们可以看出来 @timestamp 的时间现在是时间的 timestamp 字段的时间。
在上面,我们看到 postEvent 是一个数组。如果我们想把这个数组拆分,并把其中的每一个事件作为一个分别的事件。我们可以使用 split 过滤器来完成。
logstash_json_fileter.conf
-
input {
-
file {
-
path => [
"/Users/liuxg/data/logstash_json/sample.json" ]
-
start_position =>
"beginning"
-
sincedb_path =>
"/dev/null"
-
}
-
}
-
-
filter {
-
json {
-
source =>
"message"
-
}
-
-
if [paymentType] ==
"Mastercard" {
-
drop{}
-
}
-
-
date {
-
match => [
"timestamp",
"ISO8601" ]
-
locale => en
-
}
-
-
mutate {
-
remove_field => [
"message",
"path",
"host",
"@version",
"timestamp"]
-
}
-
-
split {
-
field =>
"[pastEvents]"
-
}
-
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
从上面我们可以看出来 postEvents 数组被拆分,并形成多个文档。上面的最终文档还是有些美中不足:eventId 及 transactionId 还是处于 pastEvents 对象之下。我们想把它移到和 id 同一级的位置。为此,我们做如下的修改:
logstash_json_fileter.conf
-
input {
-
file {
-
path => [
"/Users/liuxg/data/logstash_json/sample.json" ]
-
start_position =>
"beginning"
-
sincedb_path =>
"/dev/null"
-
}
-
}
-
-
filter {
-
json {
-
source =>
"message"
-
}
-
-
if [paymentType] ==
"Mastercard" {
-
drop{}
-
}
-
-
date {
-
match => [
"timestamp",
"ISO8601" ]
-
locale => en
-
}
-
-
split {
-
field =>
"[pastEvents]"
-
}
-
-
mutate {
-
add_field => {
-
"eventId" =>
"%{[pastEvents][eventId]}"
-
"transactionId" =>
"%{[pastEvents][transactionId]}"
-
}
-
-
remove_field => [
"message",
"path",
"host",
"@version",
"timestamp",
"pastEvents"]
-
}
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
-
elasticsearch {
-
index =>
"logstash_json"
-
}
-
}
重新运行 Logstash。我们可以看到如下的输出:
在上面,我们把 eventId 及 transactionId 移到文档的根下面,并删除 pastEvents 这个字段。我们同时也把文档导入到 Elasticsearch 中。
我们可以在 Elasticsearch 中对文档进行搜索:
GET logstash_json/_search
-
{
-
"took" :
1,
-
"timed_out" :
false,
-
"_shards" : {
-
"total" :
1,
-
"successful" :
1,
-
"skipped" :
0,
-
"failed" :
0
-
},
-
"hits" : {
-
"total" : {
-
"value" :
4,
-
"relation" :
"eq"
-
},
-
"max_score" :
1.0,
-
"hits" : [
-
{
-
"_index" :
"logstash_json",
-
"_type" :
"_doc",
-
"_id" :
"JXZRAHgBoLC90rTy6jNl",
-
"_score" :
1.0,
-
"_source" : {
-
"gender" :
"Female",
-
"@timestamp" :
"2020-02-18T12:27:35.000Z",
-
"id" :
5,
-
"country" :
"Brazil",
-
"name" :
"Betteanne Diament",
-
"paymentType" :
"Visa",
-
"transactionId" :
"76436-101",
-
"eventId" :
"9",
-
"ip_address" :
"159.148.102.98",
-
"age" :
41,
-
"purpose" :
"Computers"
-
}
-
},
-
{
-
"_index" :
"logstash_json",
-
"_type" :
"_doc",
-
"_id" :
"KHZRAHgBoLC90rTy6jNl",
-
"_score" :
1.0,
-
"_source" : {
-
"gender" :
"Male",
-
"@timestamp" :
"2019-06-10T18:01:32.000Z",
-
"id" :
4,
-
"country" :
"Pakistan",
-
"name" :
"Cary Boyes",
-
"paymentType" :
"Visa",
-
"transactionId" :
"55926-0011",
-
"eventId" :
"8",
-
"ip_address" :
"223.113.73.232",
-
"age" :
46,
-
"purpose" :
"Grocery"
-
}
-
},
-
...
转载:https://blog.csdn.net/UbuntuTouch/article/details/114383426