07 ELK Logstash 配置语法详解


Logstash基本语法组成

  1. logstash之所以功能强大和流行,还与其丰富的过滤器插件是分不开的,过滤器提供的并不单单是过滤的功能,
  2. 还可以对进入过滤器的原始数据进行复杂的逻辑处理,甚至添加独特的事件到后续流程中。
  3. Logstash配置文件有如下三部分组成,其中input、output部分是必须配置,
  4. filter部分是可选配置,而filter就是过滤器插件,可以在这部分实现各种日志过滤功能。
1
2
3
4
5
6
7
8
9
input {
#输入插件
}
filter {
#过滤匹配插件
}
output {
#输出插件
}

Logstash 输入插件(Input)

读取文件(File)

  1. logstash使用一个名为filewatch的ruby gem库来监听文件变化,并通过一个叫.sincedb的数据库文件来记录被监听的日志文件的读取进度(时间戳),
  2. 这个sincedb数据文件的默认路径在 <path.data>/plugins/inputs/file下面,
  3. 文件名类似于.sincedb_452905a167cf4509fd08acb964fdb20c,
  4. 而<path.data>表示logstash插件存储目录,默认是LOGSTASH_HOME/data。
1
2
3
4
5
6
7
8
9
10
11
12
input {
file {
path => ["/var/log/messages"]
type => "system"
start_position => "beginning"
}
}
output {
stdout{
codec=>rubydebug
}
}
  1. 这个配置是监听并接收本机的/var/log/messages文件内容,
  2. start_position表示按时间戳记录的地方开始读取,如果没有时间戳则从头开始读取,有点类似cat命令,
  3. 默认情况下,logstash会从文件的结束位置开始读取数据,也就是说logstash进程会以类似tail -f命令的形式逐行获取数据。
  4. type用来标记事件类型,通常会在输入区域通过type标记事件类型。
1
看业务需求,是否需要重新读取日志中所有内容,需要就加上 start_position => "beginning"

标准输入(Stdin)

  1. stdin是从标准输入获取信息
1
2
3
4
5
6
7
8
9
10
11
12
input{
stdin{
add_field=>{"key"=>"iivey"}
tags=>["add1"]
type=>"test1"
}
}
output {
stdout{
codec=>rubydebug
}
}

读取 Syslog日志

  1. 6.0版本叫做rsyslog

  2. 如何将rsyslog收集到的日志信息发送到logstash中,这里以centos7.5为例,需要做如下两个步骤的操作:

  3. 首先,在 需要收集日志 的 服务器 上找到rsyslog的配置文件/etc/rsyslog.conf,添加如下内容:

1
*.*     @@172.16.213.120:5514
  1. 其中,172.16.213.120是logstash服务器的地址。5514是logstash启动的监听端口。

  2. 接着,重启rsyslog服务:

  3. 然后,在logstash服务器上创建一个事件配置文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
input {
syslog {
port => "5514"
}
}

output {
stdout{
codec=>rubydebug
}
}

[root@logstash test]# ../bin/logstash -f logstash2.conf

1
2
3
4
5
6
# 去要收集的服务器上配置

[root@filebeat1 ~]# vim /etc/rsyslog.conf
*.* @@172.17.70.236:5514

[root@filebeat1 ~]# systemctl restart rsyslog

读取TCP网络数据

  1. 下面的事件配置文件就是通过”LogStash::Inputs::TCP”和”LogStash::Filters::Grok”配合实现syslog功能的例子,
  2. 这里使用了logstash的TCP/UDP插件读取网络数据:
  3. grok 正则抓取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
input {
tcp {
port => "5514"
}
}

filter {
grok {
match => { "message" => "%{SYSLOGLINE}" }
}
}

output {
stdout{
codec=>rubydebug
}
}

[root@logstash test]# ../bin/logstash -f logstash3.conf

# 造个登录失败日志

# 还可以导入日志
[root@filebeat1 ~]# nc 172.17.70.236 5514 < /var/log/secure

Logstash 编码插件(Codec)

  1. 其实我们就已经用过编码插件codec了,也就是这个rubydebug,它就是一种codec,虽然它一般只会用在stdout插件中,作为配置测试或者调试的工具。
  2. 编码插件(Codec)可以在logstash输入或输出时处理不同类型的数据,
  3. 因此,Logstash不只是一个input–>filter–>output的数据流,而是一个input–>decode–>filter–>encode–>output的数据流。
  4. Codec支持的编码格式常见的有plain、json、json_lines等。下面依次介绍。

codec插件之 plain

  1. plain是一个空的解析器,它可以让用户自己指定格式,也就是说输入是什么格式,输出就是什么格式。
  2. 下面是一个包含plain编码的事件配置文件:
1
2
3
4
5
6
7
8
9
input{
stdin{
}
}
output{
stdout{
codec => "plain"
}
}

codec插件之 json、json_lines

  1. 如果发送给logstash的数据内容为json格式,可以在input字段加入codec=>json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。
  2. 如果想让logstash输出为json格式,可以在output字段加入codec=>json,下面是一个包含json编码的事件配置文件:
  3. json每个字段是key:values格式,多个字段之间通过逗号分隔。有时候,如果json文件比较长,需要换行的话,那么就要用json_lines编码格式了。
  4. 数据分析 大多数使用json格式
  5. 数据非常大需要换行,就改成json_lines
1
2
3
4
5
6
7
8
9
input {
stdin {
}
}
output {
stdout {
codec => json
}
}

Logstash 过滤器插件(Filter)

  1. filter插件对字段的分割、转换和赋值

Grok 正则捕获

  1. grok是一个十分强大的logstash filter插件,他可以通过正则解析任意文本,将非结构化日志数据弄成结构化和方便查询的结构。
  2. 他是目前logstash 中解析非结构化日志数据最好的方式。
  3. Grok 的语法规则是:

    • %{语法: 语义}
  4. “语法”指的就是匹配的模式,例如使用NUMBER模式可以匹配出数字,IP模式则会匹配出127.0.0.1这样的IP地址:

  5. 例如输入的内容为:
    • 172.16.213.132 [07/Feb/2018:16:24:19 +0800] “GET / HTTP/1.1” 403 5039
1
2
3
%{IP:clientip}匹配模式将获得的结果为:              clientip: 172.16.213.132
%{HTTPDATE:timestamp}匹配模式将获得的结果为: timestamp: 07/Feb/2018:16:24:19 +0800
%{QS:referrer}匹配模式将获得的结果为: referrer: "GET / HTTP/1.1"
  1. 匹配规则方法路径查看

1
2
3
4
5
# 匹配模式
[root@logstash test]# cd /usr/local/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-patterns-core-4.1.2/patterns/

# 基础匹配模式
[root@logstash patterns]# more grok-patterns

调试grok正则表达式工具:

  1. 实际使用过滤先通过调试工具获取方法
1
2
3
4
5
6
7
8
http://grokdebug.herokuapp.com
http://120.203.18.89:6969/130/grok-debug-elk-%E5%9C%A8%E7%BA%BF%E8%B0%83%E8%AF%95grok%E5%B7%A5%E5%85%B7/

# 转义
\空格\[ ... \]

# 语法:语义
语义用来标志截取出来的数据是做什么用的
1
2
3
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}
  1. 通过上面这个组合匹配模式,我们将输入的内容分成了五个部分,即五个字段,
  2. 将输入内容分割为不同的数据字段,这对于日后解析和查询日志数据非常有用,这正是使用grok的目的。
  3. Logstash默认提供了近200个匹配模式(其实就是定义好的正则表达式)让我们来使用,可以在logstash安装目录下,
  4. 例如这里是/usr/local/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.2/patterns目录里面查看,
  5. 基本定义在grok-patterns文件中。
1
2
3
4
5
6
7
# 从这些定义好的匹配模式中,可以查到上面使用的四个匹配模式对应的定义规则

匹配模式 正则定义规则
NUMBER (?:%{BASE10NUM})
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
IP (?:%{IPV6}|%{IPV4})
QS %{QUOTEDSTRING}

使用 Grok 正则捕获

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@logstash test]# vim logstash6.conf

input{
stdin{}
}
filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
}
}
output{
stdout{
codec => "rubydebug"
}
}
1
2
3
[root@logstash test]# ../bin/logstash -f logstash6.conf 
# 输入内容:
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

进一步过滤 messages

  1. 既然已经把messages字段过滤分割5段,那么已经不需要messages整体这个字段了,需要把它删除
  2. 两个时间字段 日志收集时间和日志产生时间 不一致,主要是我们在后面展示的时候,只想展示 日志的产生时间,需要替换赋值
  3. 我们自己定义的时间字段也可删除,需要放在赋值之后,要不然赋值没了
1
remove_field => ["message"] 删除字段中的某一列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@logstash test]# vim logstash6.conf 

input{
stdin{}
}
filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
remove_field => [ "message" ]
}
date{
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}
output{
stdout{
codec => "rubydebug"
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
[root@logstash test]# vim logstash6.conf 

input{
stdin{}
}

filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
remove_field => ["message"]
}
date{
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
mutate{
remove_field => ["timestamp"]
}
}

output{
stdout{
codec => "rubydebug"
}
}

[root@logstash test]# ../bin/logstash -f logstash6.conf
# 输入内容:
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

时间处理 (Date)

  1. date插件是对于排序事件和回填旧数据尤其重要,它可以用来转换日志记录中的时间字段,变成LogStash::Timestamp对象,
  2. 然后转存到@timestamp字段里,这在之前已经做过简单的介绍。
1
2
3
4
5
6
7
8
filter {
grok {
match => ["message", "%{HTTPDATE:timestamp}"]
}
date {
match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
}
}

数据修改 (Mutate)

gsup 正则表达式替换匹配字段

  1. gsub可以通过正则表达式替换字段中匹配到的值,只对字符串字段有效
1
2
3
4
5
6
# filed_name_1 是要替换的字段名
filter {
mutate {
gsub => ["filed_name_1", "/" , "_"]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[root@logstash test]# vim logstash6.conf 

input{
stdin{}
}

filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
# remove_field => ["message"]
}
date{
match => ["timestamp", "dd/MM/yyyy:HH:mm:ss Z"]
}
mutate{
# remove_field => ["timestamp"]
gsub => ["message","/","_"]
}
}

output{
stdout{
codec => "rubydebug"
}
}

split 分隔符分割字符串为数组

  1. split可以通过指定的分隔符分割字段中的字符串为数组
1
2
3
4
5
6
7
# 将filed_name_2字段以"|"为区间分隔为数组。

filter {
mutate {
split => ["filed_name_2", "|"]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[root@logstash test]# vim logstash6.conf 

input{
stdin{}
}

filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
# remove_field => ["message"]
}
date{
match => ["timestamp", "dd/MM/yyyy:HH:mm:ss Z"]
}
mutate{
# remove_field => ["timestamp"]
# gsub => ["message","/","_"]
split => ["message","|"]
}
}

output{
stdout{
codec => "rubydebug"
}
}

[root@logstash test]# ../bin/logstash -f logstash6.conf
# 输入内容
172.16.213.132|[07/Feb/2018:16:24:19 +0800]|"GET / HTTP/1.1"|403|5039

rename 重命名字段

  1. rename可以实现重命名某个字段的功能
1
2
3
4
5
6
7
# 这个示例表示将字段old_field重命名为new_field。

filter {
mutate {
rename => { "old_field" => "new_field" }
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[root@logstash test]# vim logstash6.conf 

input{
stdin{}
}

filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
# remove_field => ["message"]
}
date{
match => ["timestamp", "dd/MM/yyyy:HH:mm:ss Z"]
}
mutate{
# remove_field => ["timestamp"]
# gsub => ["message","/","_"]
# split => ["message","|"]
rename => { "message" => "new_message" }
}
}

output{
stdout{
codec => "rubydebug"
}
}

[root@logstash test]# ../bin/logstash -f logstash6.conf
# 输入内容
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

remove_field 删除字段

  1. remove_field可以实现删除某个字段的功能
1
2
3
4
5
6
7
# 将字段timestamp删除

filter {
mutate {
remove_field => ["timestamp"]
}
}

数据修改(Mutate) 完整示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# gsub => ["referrer","\"",""]
# ["referrer","\"",""] -> \" 转换成空

[root@logstash test]# vim logstash6.conf

input{
stdin{}
}

filter{
grok{
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
# remove_field => ["message"]
}
date{
match => ["timestamp", "dd/MM/yyyy:HH:mm:ss Z"]
}
mutate{
rename => { "response" => "new_response" }
gsub => ["referrer","\"",""]
split => ["clientip","."]
remove_field => ["timestamp"]
}
}

output{
stdout{
codec => "rubydebug"
}
}

[root@logstash test]# ../bin/logstash -f logstash6.conf
# 输入内容
172.16.213.132 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

GeoIP 地址查询归类

  1. GeoIP是最常见的免费IP地址归类查询库,当然也有收费版可以使用。
  2. GeoIP库可以根据IP 地址提供对应的地域信息,包括国别,省市,经纬度等,此插件对于可视化地图和区域统计非常有用。
  3. 下面是一个关于GeoIP插件的简单示例(仅列出filter部分)::
1
2
3
4
5
6
7
# 其中,ip_field字段是输出IP地址的一个字段。

filter {
geoip {
source => "ip_field"
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
[root@logstash test]# vim logstash6.conf 

input{
stdin{}
}

filter{
grok {
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
# remove_field => ["message"]
}

date {
match => ["timestamp", "dd/MM/yyyy:HH:mm:ss Z"]
}

mutate {
#rename => { "response" => "new_response" }
#gsub => ["referrer","\"",""]
#split => ["clientip","."]
remove_field => ["timestamp"]
}

geoip {
source => "clientip"
}
}

output{
stdout{
codec => "rubydebug"
}
}

[root@logstash test]# ../bin/logstash -f logstash6.conf
114.114.114.114 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 精简 经度 纬度 时区
[root@logstash test]# vim logstash6.conf

input{
stdin{}
}

filter{
grok {
match => ["message","%{IP:clientip}\ \[%{HTTPDATE:timestamp}\]\ %{QS:referrer}\ %{NUMBER:response}\ %{NUMBER:bytes}"]
# remove_field => ["message"]
}

date {
match => ["timestamp", "dd/MM/yyyy:HH:mm:ss Z"]
}

mutate {
#rename => { "response" => "new_response" }
#gsub => ["referrer","\"",""]
#split => ["clientip","."]
remove_field => ["timestamp"]
}

geoip {
source => "clientip"
fields => [ "city_name", "region_name", "country_name", "ip" , "latitude", "longitude" , "timezone" ]
}
}

output{
stdout{
codec => "rubydebug"
}
}

filter插件综合应用实例

  1. 下面给出一个业务系统输出的日志格式,由于业务系统输出的日志格式无法更改,
  2. 因此就需要我们通过logstash的filter过滤功能以及grok插件来获取需要的数据格式,
  3. 此业务系统输出的日志内容以及原始格式如下:
1
2018-02-09T10:57:42+08:00|~|123.87.240.97|~|Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_2 like Mac OS X) AppleWebKit/604.4.7 Version/11.0 Mobile/15C202 Safari/604.1|~|http://m.sina.cn/cm/ads_ck_wap.html|~|1460709836200|~|DF0184266887D0E
1
2
3
# 分析 先看什么分割的
这段日志都是以“|~|”为区间进行分隔的,那么刚好我们就以“|~|”为区间分隔符
将这段日志内容分割为6个字段。这里通过grok插件进行正则匹配组合就能完成这个功能。

内容分割

1
2
3
4
5
6
7
# 把内容分成了6个字段
# GREEDYDATA 用来匹配一段所有内容,\|\~\|用来分割一直匹配得到最后,所有每个分割都要加好
# (%{GREEDYDATA:http_user_agent})\|\~\|

# 调试grok正则表达式工具:
http://grokdebug.herokuapp.com
http://120.203.18.89:6969/130/grok-debug-elk-%E5%9C%A8%E7%BA%BF%E8%B0%83%E8%AF%95grok%E5%B7%A5%E5%85%B7/

配置过滤检测

1
2
3
4
5
6
7
# 流程
# 1. 日志分割 按照特有的间隔:\|\~\|
filter - grok - match => {"message" => "..."}

# 2. message 分割好后就没用了 可以remove_field
# 3. data 日志产生时间格式化,再赋值给@timestamp,原本的locatime也没用了 删除
# 4. 2018-02-09T10:57:42+08:00 时间格式要正确
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[root@logstash test]# vim logstash7.conf 

input{
stdin{}
}

filter{
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:localtime}\|\~\|%{IPORHOST:clientip}\|\~\|(%{GREEDYDATA:http_user_agent})\|\~\|(%{DATA:http_referer})\|\~\|%{GREEDYDATA:mediaid}\|\~\|%{GREEDYDATA:osid}" }
remove_field => ["message"]
}

date {
match => ["localtime", "yyyy-MM-dd'T'HH:mm:ssZZ"]
target => "@timestamp"
}

mutate {
remove_field => ["localtime"]
}

}

output{
stdout{
codec => "rubydebug"
}
}

[root@logstash test]# ../bin/logstash -f logstash7.conf
# 输入内容:
2018-02-09T10:57:42+08:00|~|123.87.240.97|~|Mozilla/5.0 (iPhone; CPU iPhone OS 11_2_2 like Mac OS X) AppleWebKit/604.4.7 Version/11.0 Mobile/15C202 Safari/604.1|~|http://m.sina.cn/cm/ads_ck_wap.html|~|1460709836200|~|DF0184266887D0E

1
ELK的难点就在于 日志的过滤,清晰

Logstash 输出插件(output)

  1. output是Logstash的最后阶段,一个事件可以经过多个输出,而一旦所有输出处理完成,整个事件就执行完成。
  2. 一些常用的输出包括:
1
2
3
1. file:            表示将日志数据写入磁盘上的文件。
2. elasticsearch: 表示将日志数据发送给Elasticsearch。Elasticsearch可以高效方便和易于查询的保存数据。
3. graphite: 表示将日志数据发送给graphite,graphite是一种流行的开源工具,用于存储和绘制数据指标。
  1. Logstash还支持输出到nagios、hdfs、email(发送邮件)和Exec(调用命令执行)
1
2
# 官方文档
https://www.elastic.co/guide/en/logstash/6.3/output-plugins.html

输出到标准输出(stdout)

  1. stdout与之前介绍过的stdin插件一样,它是最基础和简单的输出插件
1
2
3
4
5
output {
stdout {
codec => rubydebug
}
}

保存为文件(file)

  1. file插件可以将输出保存到一个文件中
  2. 很棒,相当于文件的实时备份
1
2
3
4
5
6
7
# 使用了变量匹配,用于自动匹配时间和主机名,这在实际使用中很有帮助。
# /data/log3 这段目录必须存在,我测试用了tmp

output {
file {
path => "/data/log3/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@logstash test]# vim logstash8.conf 

input {
stdin {}
}

output {
file {
path => "/tmp/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
}
}


[root@logstash test]# ../bin/logstash -f logstash8.conf
hello

[root@logstash test]# cat /tmp/2019-10-27/logstash_14.log
{"@timestamp":"2019-10-27T14:48:25.284Z","@version":"1","host":"logstash","message":"hello"}

完整同步日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@logstash test]# vim logstash8.conf 

input {
stdin {}
}

output {
file {
path => "/tmp/%{+yyyy-MM-dd}/%{host}_%{+HH}.log"
codec => line { format => "%{message}"}
}
}

114.114.114.114 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039
[root@logstash test]# cat /tmp/2019-10-27/logstash_15.log
114.114.114.114 [07/Feb/2018:16:24:19 +0800] "GET / HTTP/1.1" 403 5039

输出到 elasticsearch

  1. Logstash将过滤、分析好的数据输出到elasticsearch中进行存储和查询,是最经常使用的方法。
1
2
3
4
5
6
7
8
    output {
elasticsearch {
host => ["172.16.213.37:9200","172.16.213.77:9200","172.16.213.78:9200"]
index => "logstash-%{+YYYY.MM.dd}"
manage_template => false
template_name => "template-web_access_log"
}
}
  1. 上面配置中每个配置项含义如下:
1
2
3
4
5
6
7
8
9
10
11
12
host:
- 是一个数组类型的值,后面跟的值是elasticsearch节点的地址与端口,默认端口是9200。可添加多个地址。
index:
- 写入elasticsearch的索引的名称,这里可以使用变量。
- Logstash提供了%{+YYYY.MM.dd}这种写法。在语法解析的时候,看到以+ 号开头的,就会自动认为后面是时间格式,
- 尝试用时间格式来解析后续字符串。这种以天为单位分割的写法,可以很容易的删除老的数据或者搜索指定时间范围内的数据。
- 此外,注意索引名中不能有大写字母。
manage_template:
- 用来设置是否开启logstash自动管理模板功能,如果设置为false将关闭自动管理模板功能。
- 如果我们自定义了模板,那么应该设置为false。
template_name:
- 这个配置项用来设置在Elasticsearch中模板的名称。