Introduction

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash将来自不同数据源的数据统一搜集起来,并根据需求将数据标准化输出到你所选择的目的地。如下图所示。

img

Input/Filter/Output

Logstash可以从多个数据源获取数据,并对其进行处理、转换,最后将其发送到不同类型的“存储”

输入

采集各种样式、大小和来源的数据

分布式系统中,数据往往是以各种各样的形式(结构化、非结构话)存在于不同的节点中。Logstash 支持不同数据源的选择 ,日志、报表、数据库的内容等等。可以在同一时间从众多常用来源捕捉事件。

  • 文件类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
input{
file{
# path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
path => [‘pathA’,‘pathB’]

# 表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
discover_interval => 15

# 排除那些文件,也就是不去读取那些文件
exclude => [‘fileName1’,‘fileNmae2’]

# 被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
close_older => 3600

# 在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
ignore_older => 86400

# logstash 每隔多久检查一次被监听文件状态(是否有更新), 默认是 1 秒。
stat_interval => 1

#sincedb记录数据上一次的读取位置的一个index
sincedb_path => ’$HOME/. sincedb‘

#logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
start_position => ‘beginning’

# 注意:这里需要提醒大家的是,如果你需要每次都从开始读取文件的话,只设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null
}

}
  • 数据库类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
input{
jdbc{
# jdbc sql server 驱动,各个数据库都有对应的驱动,需自己下载
jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar"
#jdbc class 不同数据库有不同的 class 配置
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
#配置数据库连接 ip 和端口,以及数据库
jdbc_connection_string => "jdbc:sqlserver://xxxxxx:1433;databaseName=test_db"
#配置数据库用户名
jdbc_user =>
#配置数据库密码
jdbc_password =>

# 上面这些主要配置数据库java驱动,账号配置
# 定时器 多久执行一次SQL,默认是一分钟
# schedule => 分 时 天 月 年
# schedule => * 22 * * * 表示每天22点执行一次
schedule => "* * * * *"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
# 是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
#此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
#如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
tracking_column => create_time
#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 我们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值
last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
# 是否将字段名称转小写。
# 这里有个小的提示,如果你之前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
# 因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
lowercase_column_names => false
# 你的SQL的位置,当然,你的SQL也可以直接写在这里。
# statement => SELECT * FROM tabeName t WHERE t.creat_time > :last_sql_value
statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"
# 数据类型,标明数据来源,es索引的时候可以建立不同的额索引
type => "my_info"
}
# 注意:外在的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
# 如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。
}
  • beats

主要是接受filebeats的数据导入

1
2
3
4
5
6
7
8
input {
beats {
# 接受数据端口
port => 5044
# 数据类型
type => "logs"
}
}

过滤器

实时解析和转换数据

数据从源传输到存储库的过程中,需要对不同的数据进行不同的存储,Logstash 过滤器能够解析每条记录,识别每条数据的字段内容,并将它们转换成自定义数据,以便进行处理分析计算。

Logstash 动态地转换和解析数据,支持各种格式或复杂度数据的解析:

  • 利用 Grok 从非结构化数据中派生出结构
  • 从 IP 地址破译出地理坐标
  • 将 PII 数据匿名化,完全排除敏感字段
  • 整体处理不受数据源、格式或架构的影响

输出

尽管 ES是logstash的常用输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。

Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

img

Install && config

  • 安装

安装比较简单,官网直接有现成的二进制包,下载地址: https://artifacts.elastic.co/downloads/logstash/logstash-7.10.1-linux-x86_64.tar.gz

安装也比较简单,解压设置path即可使用。

本人经常使用,就写了个安装elk的脚本,需要的可以拿去使用:https://github.com/shiguofu2012/scripts/blob/master/install_elk.sh。

  • 配置intput/output

Logstash配置有两个必需的元素,输入和输出,以及一个可选过滤器。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。

img

  1. 接下来,允许Logstash最基本的管道,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[root@VM-145-82-centos ~]# logstash -e 'input { stdin {} } output { stdout {} }'
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2021-01-07T22:15:40,409][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.9.3", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc Java HotSpot(TM) 64-Bit Server VM 25.261-b12 on 1.8.0_261-b12 +indy +jit [linux-x86_64]"}
[2021-01-07T22:15:40,803][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-01-07T22:15:42,097][INFO ][org.reflections.Reflections] Reflections took 39 ms to scan 1 urls, producing 22 keys and 45 values
[2021-01-07T22:15:43,158][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x41654bea run>"}
[2021-01-07T22:15:43,809][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.64}
[2021-01-07T22:15:43,866][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2021-01-07T22:15:43,914][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-01-07T22:15:44,235][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
{
"host" => "VM-145-82-centos",
"message" => "",
"@timestamp" => 2021-01-07T14:15:43.902Z,
"@version" => "1"
}
hello
{
"host" => "VM-145-82-centos",
"message" => "hello",
"@timestamp" => 2021-01-07T14:15:47.996Z,
"@version" => "1"
}

{
"host" => "VM-145-82-centos",
"message" => "",
"@timestamp" => 2021-01-07T14:15:50.766Z,
"@version" => "1"
}

从标准输入获取数据,输出到标准输出。

  1. input 从filebeat获取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
input {
beats {
host => "0.0.0.0" # 默认是127.0.0.1 只能本级访问
port => 5044
}
}
# output 索引至es
output {
elasticsearch {
hosts => ["localhost:9200"] # es地址
user => "xxxx" # 用户名
password => "xxxx" # 密码
index => "test-ap-%{+YYYY.MM.dd}" # 建立的索引,这里默认每天建一个索引
}
}

总体来讲,input/output是比较容易配置的,关键是对数据进行格式化。

  • filter
正则匹配

grok 匹配非格式化字段,提取字段格式化数据,强大的文本解析工具,支持正则表达式

1
2
3
4
5
6
7
8
9
10
11
grok {
match => { "[message]" => "%{TIMESTAMP_ISO8601:_timestamp} %{LOGLEVEL:level} %{DATA:stack}" }
}
# 解析失败的处理
if "_grokparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}
}
ip解析
1
2
3
4
5
6
filter {
geoip {
source => "ip"
fields => ["city_name", "timezone"] # 选择解析的字段
}
}

解析出来的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"ip" => "183.60.92.253",
"@version" => "1",
"@timestamp" => "2014-08-07T10:32:55.610Z",
"host" => "raochenlindeMacBook-Air.local",
"geoip" => {
"ip" => "183.60.92.253",
"country_code2" => "CN",
"country_code3" => "CHN",
"country_name" => "China",
"continent_code" => "AS",
"region_name" => "30",
"city_name" => "Guangzhou",
"latitude" => 23.11670000000001,
"longitude" => 113.25,
"timezone" => "Asia/Chongqing",
"real_region_name" => "Guangdong",
"location" => [
[0] 113.25,
[1] 23.11670000000001
]
}
}
字段增删改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
filter {
mutate {
remove_field => ["ecs", "input", "agent", "tags", "@version", "@metadata"] # 移除字段
rename => ["host", "my_host"] # 重命名
rename => ["kubernetes", "my_k8s"] # 重命名
remove_field => ["[host][mac]", "[my_host][containerized]", "[my_host][os]", "[my_host][id]", "[my_host][name]", "[my_host][architecture]"] # 移除字段,使用已经重命名的字段
add_field => { "mytype" => "type" } # 增加字段

update => { "sample" => "My new message" } # 更新字段内容,如果字段不存在,不会新建
replace => { "message" => "%{source_host}: My new message" } # 与 update 功能相同,区别在于如果字段不存在则会新建字段
convert => ["request_time", "float"] # 数据类型转换
uppercase => [ "fieldname" ] # 大写转换
lowercase => [ "fieldname" ]

# 提供正则表达式替换
gsub => [
# replace all forward slashes with underscore
"fieldname", "/", "_",
# replace backslashes, question marks, hashes, and minuses
# with a dot "."
"fieldname2", "[\\?#-]", "."
]
}
}
条件判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
filter {
# 条件判断,字段my_k8s是否存在; 并且日志路径匹配
if ![my_k8s] and [log][file][path] =~ "/data/project_log/[\w-]+/[\w-\\.]+.log" {
mutate {
split => ["[log][file][path]", "/"]
# split操作 /data/project_log/app1/app.log => ["", data, project_log, app1, app.log]
add_field => { "[kubernets][labels][app]" => "%{[log][file][path][3]}" }
}
}
# 字段操作放在mutate中
mutate {
remove_field => [ "log" ]
}
}
json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
filter {
# 检查是否是json格式
if [message] =~ "^\{.*\}[\s\S]*$" {
json {
source => "[message]"
target => "jsoncontent"
}
# json 数据失败
if "_jsonparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}
}
}

Example

这里介绍一个曾经搭建的ELK日志系统。

结构比较简单,kubetnets中filebeat damonSet方式运行,搜集所有container 标准输出的日志,并传入logstash中,logstash将数据导入elasticsearch。结构图如下所示:

image-20210111133718737

下面开始logstash的配置:

input比较简单,使用filebeat搜集日志,传入logstash

1
2
3
4
5
6
input {
beats {
host => "0.0.0.0"
port => 5044
}
}

output增加了几个条件判断,根据不同的字段日志类型,索引到不同的es索引中;如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
output {
# k8s app 索引到对应的app中
if ([kubernetes][labels][app]) {
if ([type] == "app") {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[kubernetes][labels][app]}-%{+YYYY.MM.dd}"
}
# 根据type区分索引
} else if ([type] == "user") {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[kubernetes][labels][app]}-[type]%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[kubernetes][labels][app]}-%{+YYYY.MM.dd}"
}
}

# 不存在k8s app字段
} else if ([type] == "user") {
elasticsearch {
hosts => ["localhost:9200"]
index => "default-%{+YYYY.MM.dd}"
}
} else if ([type] == "app") {
elasticsearch {
hosts => ["localhost:9200"]
index => "default-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "default-v1.0.0"
}
}
}

filter 配置,不同的日志格式,输出格式化的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
filter {
mutate {
# 移除filebeat发送的多余字段
remove_field => ["ecs", "input", "agent", "tags", "@version", "@metadata"]
remove_field => ["[host][mac]", "[host][containerized]", "[host][os]", "[host][id]", "[host][name]", "[host][architecture]"]
}
if ![kubernetes] and [log][file][path] =~ "/data/app_log/[\w-]+/[\w\.-]+.log" {
mutate {
split => ["[log][file][path]", "/"]
add_field => { "[kubernetes][labels][app]" => "%{[log][file][path][3]}-host" }
}
}
mutate {
remove_field => [ "log" ]
}

if [message] =~ "^\{.*\}[\s\S]*$" {
# json格式处理
json {
source => "[message]"
}
if "_jsonparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}

if ([time]) {
date {
match => ["time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}

mutate {
remove_field => ["time"]
}
}

# docker 日志格式
if [log] =~ "^\{.*\}[\s\S]*$" {
json {
source => "[log]"
}

mutate {
remove_field => ["log"]
}

if ([start_time]) {
date {
match => ["start_time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "start_time"
}
}

if ([app_start_time]) {
date {
match => ["app_start_time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "app_start_time"
}
}

if ([end_time]) {
date {
match => ["end_time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "end_time"
}
}

if ([timestamp]) {
date {
match => ["timestamp", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}
}
} else {
if ([timestamp]) {
date {
match => ["timestamp", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}
}
}
} else {
# 匹配日志格式
grok {
match => { "[message]" => "%{TIMESTAMP_ISO8601:_timestamp} %{LOGLEVEL:level} %{DATA:stack} - (?<message>(.|\r|\n|\t)*)" }
}
# 匹配格式失败处理
if "_grokparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}
# 解析时间格式
date {
match => ["_timestamp", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}

mutate {
remove_field => ["_timestamp"]
}
}

mutate {
remove_field => ["message"]
}
}

总结

总之 ,logstash具备强大的功能,将不同数据源的数据经过清洗格式化,转化为结构化的数据,存储到不同的存储单元。