小言_互联网的博客

Logstash:解析 JSON 文件并导入到 Elasticsearch 中

563人阅读  评论(0)

在今天的文章中,我们将详述如何使用 Logstash 来解析 JSON 文件的日志,并把它导入到 Elasticsearch 中。在之前的文章 “Logstash:Data转换,分析,提取,丰富及核心操作” 也有提到过,但是没有具体的例子。总体说来解析 JSON 文件的日志有两种方法:

  • 在 file input 里使用 JSON codec
  • 在 file input 里不使用 JSON codec,但是在 filter 的部分使用 JSON filter

我们把 JSON 格式的数据解析并导入到 Elasticsearch 的流程如下:

 

准备数据

我们准备了如下的数据:

sample.json

{"id": 4,"timestamp":"2019-06-10T18:01:32Z","paymentType":"Visa","name":"Cary Boyes","gender":"Male","ip_address":"223.113.73.232","purpose":"Grocery","country":"Pakistan","pastEvents":[{"eventId":7,"transactionId":"63941-950"},{"eventId":8,"transactionId":"55926-0011"}],"age":46}
{"id": 5,"timestamp":"2020-02-18T12:27:35Z","paymentType":"Visa","name":"Betteanne Diament","gender":"Female","ip_address":"159.148.102.98","purpose":"Computers","country":"Brazil","pastEvents":[{"eventId":9,"transactionId":"76436-101"},{"eventId":10,"transactionId":"55154-3330"}],"age":41}

 

构建 Logstash 配置文件

使用 json codec


  
  1. input {
  2. file {
  3. path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
  4. start_position => "beginning"
  5. sincedb_path => "/dev/null"
  6. codec => "json"
  7. }
  8. }
  9. output {
  10. stdout {
  11. codec => rubydebug
  12. }
  13. }

我们运行 Logstash:

sudo ./bin/logstash -f logstash_json.conf 

上面的命令输出的结果为:

从上面的结果中,我们可以看出来文档被正确地解析。

 

使用 JSON filter

我们可以在 file input 中不使用任何的 code,但是我们可以可以使用 JSON filter 来完成解析的工作:

logstash_json_fileter.conf


  
  1. input {
  2. file {
  3. path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
  4. start_position => "beginning"
  5. sincedb_path => "/dev/null"
  6. }
  7. }
  8. filter {
  9. json {
  10. source => "message"
  11. }
  12. }
  13. output {
  14. stdout {
  15. codec => rubydebug
  16. }
  17. }

在上面,我们添加了 filter 这个部分。我们使用了 json 这个过滤器来完成对 JSON 格式的解析。重新运行我们的 Logstash。我们可以看到如下的输出:

在上面,我们可以看到一个叫做 message 的字段。这个字段显然它会占存储空间。我们可以把它删除掉。同时,我们也可以去掉那些不需要的元字段以节省空间。

logstash_json_fileter.conf


  
  1. input {
  2. file {
  3. path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
  4. start_position => "beginning"
  5. sincedb_path => "/dev/null"
  6. }
  7. }
  8. filter {
  9. json {
  10. source => "message"
  11. }
  12. if [paymentType] == "Mastercard" {
  13. drop{}
  14. }
  15. mutate {
  16. remove_field => [ "message", "path", "host", "@version"]
  17. }
  18. }
  19. output {
  20. stdout {
  21. codec => rubydebug
  22. }
  23. }

在上面,我们检查 paymentType 是否为 Mastercard,如果是的话,我们把整个事件丢弃。同时我们删除不需要的字段,比如 message, path 等。重新运行 Logstash。我们可以看到如下的输出:

显然这次的输出比刚才的要干净很多。你可能已经注意到 @timestamp 的值和 timestamp 的值不太一样。在 Kibana 中,我们经常会使用 @timestamp 作为事件的时间标签。我们可以做如下的处理:

logstash_json_fileter.conf


  
  1. input {
  2. file {
  3. path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
  4. start_position => "beginning"
  5. sincedb_path => "/dev/null"
  6. }
  7. }
  8. filter {
  9. json {
  10. source => "message"
  11. }
  12. if [paymentType] == "Mastercard" {
  13. drop{}
  14. }
  15. date {
  16. match => [ "timestamp", "ISO8601" ]
  17. locale => en
  18. }
  19. mutate {
  20. remove_field => [ "message", "path", "host", "@version", "timestamp"]
  21. }
  22. }
  23. output {
  24. stdout {
  25. codec => rubydebug
  26. }
  27. }

在上面,我们添加了 date 过滤器来解析时间。同时我们也删除 timestamp 这个字段。我们得到的结果是:

从上面我们可以看出来 @timestamp 的时间现在是时间的 timestamp 字段的时间。

在上面,我们看到 postEvent 是一个数组。如果我们想把这个数组拆分,并把其中的每一个事件作为一个分别的事件。我们可以使用 split 过滤器来完成。

logstash_json_fileter.conf


  
  1. input {
  2. file {
  3. path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
  4. start_position => "beginning"
  5. sincedb_path => "/dev/null"
  6. }
  7. }
  8. filter {
  9. json {
  10. source => "message"
  11. }
  12. if [paymentType] == "Mastercard" {
  13. drop{}
  14. }
  15. date {
  16. match => [ "timestamp", "ISO8601" ]
  17. locale => en
  18. }
  19. mutate {
  20. remove_field => [ "message", "path", "host", "@version", "timestamp"]
  21. }
  22. split {
  23. field => "[pastEvents]"
  24. }
  25. }
  26. output {
  27. stdout {
  28. codec => rubydebug
  29. }
  30. }

从上面我们可以看出来 postEvents 数组被拆分,并形成多个文档。上面的最终文档还是有些美中不足:eventId 及 transactionId 还是处于 pastEvents 对象之下。我们想把它移到和 id 同一级的位置。为此,我们做如下的修改:

logstash_json_fileter.conf


  
  1. input {
  2. file {
  3. path => [ "/Users/liuxg/data/logstash_json/sample.json" ]
  4. start_position => "beginning"
  5. sincedb_path => "/dev/null"
  6. }
  7. }
  8. filter {
  9. json {
  10. source => "message"
  11. }
  12. if [paymentType] == "Mastercard" {
  13. drop{}
  14. }
  15. date {
  16. match => [ "timestamp", "ISO8601" ]
  17. locale => en
  18. }
  19. split {
  20. field => "[pastEvents]"
  21. }
  22. mutate {
  23. add_field => {
  24. "eventId" => "%{[pastEvents][eventId]}"
  25. "transactionId" => "%{[pastEvents][transactionId]}"
  26. }
  27. remove_field => [ "message", "path", "host", "@version", "timestamp", "pastEvents"]
  28. }
  29. }
  30. output {
  31. stdout {
  32. codec => rubydebug
  33. }
  34. elasticsearch {
  35. index => "logstash_json"
  36. }
  37. }

重新运行 Logstash。我们可以看到如下的输出:

在上面,我们把 eventId 及 transactionId 移到文档的根下面,并删除 pastEvents 这个字段。我们同时也把文档导入到 Elasticsearch 中。

我们可以在 Elasticsearch 中对文档进行搜索:

GET logstash_json/_search

  
  1. {
  2. "took" : 1,
  3. "timed_out" : false,
  4. "_shards" : {
  5. "total" : 1,
  6. "successful" : 1,
  7. "skipped" : 0,
  8. "failed" : 0
  9. },
  10. "hits" : {
  11. "total" : {
  12. "value" : 4,
  13. "relation" : "eq"
  14. },
  15. "max_score" : 1.0,
  16. "hits" : [
  17. {
  18. "_index" : "logstash_json",
  19. "_type" : "_doc",
  20. "_id" : "JXZRAHgBoLC90rTy6jNl",
  21. "_score" : 1.0,
  22. "_source" : {
  23. "gender" : "Female",
  24. "@timestamp" : "2020-02-18T12:27:35.000Z",
  25. "id" : 5,
  26. "country" : "Brazil",
  27. "name" : "Betteanne Diament",
  28. "paymentType" : "Visa",
  29. "transactionId" : "76436-101",
  30. "eventId" : "9",
  31. "ip_address" : "159.148.102.98",
  32. "age" : 41,
  33. "purpose" : "Computers"
  34. }
  35. },
  36. {
  37. "_index" : "logstash_json",
  38. "_type" : "_doc",
  39. "_id" : "KHZRAHgBoLC90rTy6jNl",
  40. "_score" : 1.0,
  41. "_source" : {
  42. "gender" : "Male",
  43. "@timestamp" : "2019-06-10T18:01:32.000Z",
  44. "id" : 4,
  45. "country" : "Pakistan",
  46. "name" : "Cary Boyes",
  47. "paymentType" : "Visa",
  48. "transactionId" : "55926-0011",
  49. "eventId" : "8",
  50. "ip_address" : "223.113.73.232",
  51. "age" : 46,
  52. "purpose" : "Grocery"
  53. }
  54. },
  55. ...

 


转载:https://blog.csdn.net/UbuntuTouch/article/details/114383426
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场