在今天的文章中,我将展示如何使用 file input 结合 multiline 来展示如何导入一个 CSV 文件。针对 multiline,我在之前的文章 “运用 Elastic Stack 分析 Spring boot 微服务日志 (一)” 有讲到过。另外我也有两篇关于使用 Logstash 导入 CSV 的例子
针对 CSV 的导入,我们也可以使用 Filebeat 来解析 CSV 文件。如果你有兴趣的话,请参考:
准备数据
在今天的练习中,我们有如下的测试数据:
multiline.csv
-
INV-12402400071,05/31/2018,2595,Hy-Vee
Wine
and
Spirits
/
Denison,"1620
4th
Ave,
South",Denison,51442,"1620
4th
Ave,
South
Denison
51442
(42.012395,
-95.348601
)",24,CRAWFORD,1011100,Blended
Whiskies,260,DIAGEO
AMERICAS,25608,Seagrams
7
Crown
Bl
Whiskey,6,1750,11.96,17.94,1,107.64,1.75,0.46
-
S29195400002,11/21/2015,2205,Ding's
Honk
And
Holler,900
E
WASHINGTON,CLARINDA,51632,"900
E
WASHINGTON
-
CLARINDA
51632
-
(40.739238,
-95.02756
)",73,Page,,,255,Wilson
Daniels
Ltd.,297,Templeton
Rye
w/Flask,6,750,18.09,27.14,12,325.68,9.00,2.38
-
S29198800001,11/20/2015,2191,Keokuk
Spirits,1013
MAIN,KEOKUK,52632,"1013
MAIN
-
KEOKUK
52632
-
(40.39978,
-91.387531
)",56,Lee,,,255,Wilson
Daniels
Ltd.,297,Templeton
Rye
w/Flask,6,750,18.09,27.14,6,162.84,4.50,1.19
-
S29198800001,11/20/2015,2191,Keokuk
Spirits,1013
MAIN,KEOKUK,52632,"1013
MAIN
-
KEOKUK
52632
-
(40.39978,
-91.387531
)",56,Lee,,,255,Wilson
Daniels
Ltd.,297,Templeton
Rye
w/Flask,6,750,18.09,27.14,6,162.84,4.50,1.19
这个数据来源于 https://data.iowa.gov/Sales-Distribution/Iowa-Liquor-Sales/m3tr-qhgy/data。其中的有些数据具有多行输入,也就是多出了一些换行符 "\n",从而导致有些记录分布在多行,尽管这种情况比较少见。在上面,我们可以看到如下的三个文档:
- INV-12402400071
- S29195400002
- S29198800001
其中 S29195400002 及 S29198800001 连个文档的内容跨三行。和第一个文档显然是不同的。那么我们该如何处理这种情况呢?首先,我们看到文档都是以 INV- 已经 S 开头的行。一般来说 Logstash 的架构图如下:
首先它含有一个 Input, 然后经过0个或多个 filter 的处理,最终输出到 Output。
针对我们的情况,我们可以使用如下的架构来对它进行处理:
我们可以使用 file input 配合 multiline,然后把数据传入到 csv, mutate, 及 Grok 这样的过滤器来进行处理。
首先,我们创建一个叫做 logstash_csv.conf 文件
logstash_csv.conf
-
input {
-
# Read the csv file. also use the multiline codec, everything that does not start with S or INV- is part of the prior line due to addresses having line breaks
-
file {
-
start_position =>
"beginning"
-
path =>
"/Users/liuxg/data/logstash_multiline/multline.csv"
-
sincedb_path =>
"/dev/null"
-
codec => multiline {
-
pattern =>
"^(S|INV-)[0-9][0-9]"
-
negate =>
"true"
-
what =>
"previous"
-
}
-
}
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
在上面,我们使用 file 把指定位置的 multilne.csv 读入进来。我们使用了如下的 codec:
-
codec =>
multiline {
-
pattern =>
"^(S|INV-)[0-9][0-9]"
-
negate =>
"true"
-
what =>
"previous"
-
}
它首先匹配以 S 或 INV- 为开头的行,紧接着 S 或 INV- 后面接0-9之中的两个数字。negate 为 true 表示没有匹配的行需要添加到 previous (前面)已经匹配的行里从而组成一个文档。如果你对这个还不是很理解的话,请参阅之前在 “Beats:使用 Filebeat 传送多行日志” 中的描述。
我们使用 Logstash 运行上面的配置文件:
sudo ./bin/logstash -f logstash_csv.conf
那么输出的结果为:
我们看到文档虽然一个文档被分为三行,但是它们还是被正确地识别为一个文档。在文档中,我们看见有 \n 字符出现。在接下来的处理中,我们需要把这个字符去掉。
我们接下来使用 csv 过滤器来进行处理:
logstash_csv.conf
-
input {
-
# Read the csv file. also use the multiline codec, everything that does not start with S or INV- is part of the prior line due to addresses having line breaks
-
file {
-
start_position =>
"beginning"
-
path =>
"/Users/liuxg/data/logstash_multiline/multline.csv"
-
sincedb_path =>
"/dev/null"
-
codec => multiline {
-
pattern =>
"^(S|INV-)[0-9][0-9]"
-
negate =>
"true"
-
what =>
"previous"
-
}
-
}
-
}
-
-
filter {
-
# Parse the csv values define fields as integers and \floats
-
csv {
-
columns => [
"InvoiceItemNumber",
"Date",
"StoreNumber",
"StoreName",
"Address",
"City",
"ZipCode",
"StoreLocation",
"CountyNumber",
"County",
"Category",
"CategoryName",
"VendorNumber",
"VendorName",
"ItemNumber",
"ItemDescription",
"Pack",
"BottleVolumeml",
"StateBottleCost",
"StateBottleRetail",
"BottlesSold",
"SaleDollars",
"VolumeSoldLiters",
"VolumeSoldGallons"]
-
-
convert => {
"StoreNumber" =>
"integer"
"ItemNumber" =>
"integer"
"Category" =>
"integer"
"CountyNumber" =>
"integer"
"VendorNumber" =>
"integer"
"Pack" =>
"integer"
"SaleDollars" =>
"float"
"StateBottleCost" =>
"float"
"StateBottleRetail" =>
"float"
"BottleVolumeml" =>
"float"
"BottlesSold" =>
"float"
"VolumeSoldLiters" =>
"float"
"VolumeSoldGallons" =>
"float"}
-
-
remove_field => [
"message"]
-
}
-
}
-
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
在上面,我们把 CSV 文档中的项进行解析,并形成各个字段。同时我们也使用 convert 把字段里的数值字段转换为数值类型以便于分析。删除 message 字段。
重新运行 Logstash, 并查看结果:
在上面,我们看到 Country 以及 City,它们都是大写字母,我们想把它们转换为小写字母。同时在 StoreLocation 中,我们发现有 \n 字符。我们在 filter 部分添加 mutate 来对它们进行处理:
logstash_csv.conf
-
input {
-
# Read the csv file. also use the multiline codec, everything that does not start with S or INV- is part of the prior line due to addresses having line breaks
-
file {
-
start_position =>
"beginning"
-
path =>
"/Users/liuxg/data/logstash_multiline/multline.csv"
-
sincedb_path =>
"/dev/null"
-
codec => multiline {
-
pattern =>
"^(S|INV-)[0-9][0-9]"
-
negate =>
"true"
-
what =>
"previous"
-
}
-
}
-
}
-
-
filter {
-
# Parse the csv values define fields as integers and \floats
-
csv {
-
columns => [
"InvoiceItemNumber",
"Date",
"StoreNumber",
"StoreName",
"Address",
"City",
"ZipCode",
"StoreLocation",
"CountyNumber",
"County",
"Category",
"CategoryName",
"VendorNumber",
"VendorName",
"ItemNumber",
"ItemDescription",
"Pack",
"BottleVolumeml",
"StateBottleCost",
"StateBottleRetail",
"BottlesSold",
"SaleDollars",
"VolumeSoldLiters",
"VolumeSoldGallons"]
-
-
convert => {
"StoreNumber" =>
"integer"
"ItemNumber" =>
"integer"
"Category" =>
"integer"
"CountyNumber" =>
"integer"
"VendorNumber" =>
"integer"
"Pack" =>
"integer"
"SaleDollars" =>
"float"
"StateBottleCost" =>
"float"
"StateBottleRetail" =>
"float"
"BottleVolumeml" =>
"float"
"BottlesSold" =>
"float"
"VolumeSoldLiters" =>
"float"
"VolumeSoldGallons" =>
"float"}
-
-
remove_field => [
"message"]
-
}
-
-
# Take the linebreaks out of the location and convert to spaces and lowercase the city and county as they change in the source file
-
mutate {
-
gsub => [
"StoreLocation",
"\n",
" " ]
-
lowercase => [
"County",
"City" ]
-
}
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
重新运行 Logstash 并查看输出结果:
我们看到 Country 及 City 的字母都变为小写了,同时在 StoreLocation 中再也没有 \n 字符了。
接下来,我们想提取 StoreLocation 里面的位置信息。我们可以看到里面含有一个坐标(经纬度)。我们可以使用 grok 过滤器来进行匹配:
logstash_csv.conf
-
input {
-
# Read the csv file. also use the multiline codec, everything that does not start with S or INV- is part of the prior line due to addresses having line breaks
-
file {
-
start_position =>
"beginning"
-
path =>
"/Users/liuxg/data/logstash_multiline/multline.csv"
-
sincedb_path =>
"/dev/null"
-
codec => multiline {
-
pattern =>
"^(S|INV-)[0-9][0-9]"
-
negate =>
"true"
-
what =>
"previous"
-
}
-
}
-
}
-
-
filter {
-
# Parse the csv values define fields as integers and \floats
-
csv {
-
columns => [
"InvoiceItemNumber",
"Date",
"StoreNumber",
"StoreName",
"Address",
"City",
"ZipCode",
"StoreLocation",
"CountyNumber",
"County",
"Category",
"CategoryName",
"VendorNumber",
"VendorName",
"ItemNumber",
"ItemDescription",
"Pack",
"BottleVolumeml",
"StateBottleCost",
"StateBottleRetail",
"BottlesSold",
"SaleDollars",
"VolumeSoldLiters",
"VolumeSoldGallons"]
-
-
convert => {
"StoreNumber" =>
"integer"
"ItemNumber" =>
"integer"
"Category" =>
"integer"
"CountyNumber" =>
"integer"
"VendorNumber" =>
"integer"
"Pack" =>
"integer"
"SaleDollars" =>
"float"
"StateBottleCost" =>
"float"
"StateBottleRetail" =>
"float"
"BottleVolumeml" =>
"float"
"BottlesSold" =>
"float"
"VolumeSoldLiters" =>
"float"
"VolumeSoldGallons" =>
"float"}
-
-
remove_field => [
"message"]
-
}
-
-
# Take the linebreaks out of the location and convert to spaces and lowercase the city and county as they change in the source file
-
mutate {
-
gsub => [
"StoreLocation",
"\n",
" " ]
-
lowercase => [
"County",
"City" ]
-
}
-
-
# Get the lat/lon if there is a (numbers,numbers) data in the location
-
grok {
-
match => {
"StoreLocation" =>
"\((?<location>[-,.0-9 ]*)\)" }
-
}
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
我们匹配 StoreLocation 里的含有括号 ()里的内容并赋予给 location。字符含 -,.0-9。重新运行 Logstash:
从上面我们可以看出来 location 从 StoreLocation 中被提取出来了。
接下来,我们来把文档的时间修改为来自文档中的时间。我们可以看到目前的 @timestamp 不是我们文档的 Date 字段的时间。
logstash_csv.conf
-
input {
-
# Read the csv file. also use the multiline codec, everything that does not start with S or INV- is part of the prior line due to addresses having line breaks
-
file {
-
start_position =>
"beginning"
-
path =>
"/Users/liuxg/data/logstash_multiline/multline.csv"
-
sincedb_path =>
"/dev/null"
-
codec => multiline {
-
pattern =>
"^(S|INV-)[0-9][0-9]"
-
negate =>
"true"
-
what =>
"previous"
-
}
-
}
-
}
-
-
filter {
-
# Parse the csv values define fields as integers and \floats
-
csv {
-
columns => [
"InvoiceItemNumber",
"Date",
"StoreNumber",
"StoreName",
"Address",
"City",
"ZipCode",
"StoreLocation",
"CountyNumber",
"County",
"Category",
"CategoryName",
"VendorNumber",
"VendorName",
"ItemNumber",
"ItemDescription",
"Pack",
"BottleVolumeml",
"StateBottleCost",
"StateBottleRetail",
"BottlesSold",
"SaleDollars",
"VolumeSoldLiters",
"VolumeSoldGallons"]
-
-
convert => {
"StoreNumber" =>
"integer"
"ItemNumber" =>
"integer"
"Category" =>
"integer"
"CountyNumber" =>
"integer"
"VendorNumber" =>
"integer"
"Pack" =>
"integer"
"SaleDollars" =>
"float"
"StateBottleCost" =>
"float"
"StateBottleRetail" =>
"float"
"BottleVolumeml" =>
"float"
"BottlesSold" =>
"float"
"VolumeSoldLiters" =>
"float"
"VolumeSoldGallons" =>
"float"}
-
-
remove_field => [
"message"]
-
}
-
-
# Take the linebreaks out of the location and convert to spaces and lowercase the city and county as they change in the source file
-
mutate {
-
gsub => [
"StoreLocation",
"\n",
" " ]
-
lowercase => [
"County",
"City" ]
-
}
-
-
# Get the lat/lon if there is a (numbers,numbers) data in the location
-
grok {
-
match => {
"StoreLocation" =>
"\((?<location>[-,.0-9 ]*)\)" }
-
}
-
-
# Match the date to just daily and the correct timezone
-
date {
-
"match" => [
"Date",
"MM/dd/YYYY" ]
-
"timezone" =>
"America/Chicago"
-
}
-
}
-
-
output {
-
stdout {
-
codec => rubydebug
-
}
-
}
再次运行 Logstash:
显然现在的 @timestamp 变为来自文档中的时间了。
我们接下来可以添加输出到 Elasticsearch:
logstash_csv.conf
-
input {
-
# Read the csv file. also use the multiline codec, everything that does not start with S or INV- is part of the prior line due to addresses having line breaks
-
file {
-
start_position =>
"beginning"
-
path =>
"/Users/liuxg/data/logstash_multiline/multline.csv"
-
sincedb_path =>
"/dev/null"
-
codec => multiline {
-
pattern =>
"^(S|INV-)[0-9][0-9]"
-
negate =>
"true"
-
what =>
"previous"
-
}
-
}
-
}
-
-
filter {
-
# Parse the csv values define fields as integers and \floats
-
csv {
-
columns => [
"InvoiceItemNumber",
"Date",
"StoreNumber",
"StoreName",
"Address",
"City",
"ZipCode",
"StoreLocation",
"CountyNumber",
"County",
"Category",
"CategoryName",
"VendorNumber",
"VendorName",
"ItemNumber",
"ItemDescription",
"Pack",
"BottleVolumeml",
"StateBottleCost",
"StateBottleRetail",
"BottlesSold",
"SaleDollars",
"VolumeSoldLiters",
"VolumeSoldGallons"]
-
-
convert => {
"StoreNumber" =>
"integer"
"ItemNumber" =>
"integer"
"Category" =>
"integer"
"CountyNumber" =>
"integer"
"VendorNumber" =>
"integer"
"Pack" =>
"integer"
"SaleDollars" =>
"float"
"StateBottleCost" =>
"float"
"StateBottleRetail" =>
"float"
"BottleVolumeml" =>
"float"
"BottlesSold" =>
"float"
"VolumeSoldLiters" =>
"float"
"VolumeSoldGallons" =>
"float"}
-
-
remove_field => [
"message"]
-
}
-
-
# Take the linebreaks out of the location and convert to spaces and lowercase the city and county as they change in the source file
-
mutate {
-
gsub => [
"StoreLocation",
"\n",
" " ]
-
lowercase => [
"County",
"City" ]
-
}
-
-
# Get the lat/lon if there is a (numbers,numbers) data in the location
-
grok {
-
match => {
"StoreLocation" =>
"\((?<location>[-,.0-9 ]*)\)" }
-
}
-
-
# Match the date to just daily and the correct timezone
-
date {
-
"match" => [
"Date",
"MM/dd/YYYY" ]
-
"timezone" =>
"America/Chicago"
-
}
-
}
-
-
output {
-
elasticsearch {
-
hosts => [
"https://your.cluster.here:9243"]
-
index => [
"iowa-liquor"]
-
user =>
"elastic"
-
password =>
"redacted"
-
manage_template =>
false
-
}
-
-
#output dots while we process
-
stdout {
codec =>
"dots" }
-
#if we saw a date parse failure, dump it to screen to review
-
if
"_dateparsefailure" in [tags] {
-
stdout {
codec =>
"rubydebug" }
-
}
-
}
转载:https://blog.csdn.net/UbuntuTouch/article/details/114374804