06 ELK+Filebeat+Kafka+ZooKeeper 构建大数据日志分析平台


ELK 应用案例

典型 ELK 应用架构

此架构稍微有些复杂,因此,这里做一下架构解读。 这个架构图从左到右,总共分为5层,每层实现的功能和含义分别介绍如下:

  1. 第一层. 数据采集层

    • 数据采集层位于最左边的业务服务器集群上,在每个业务服务器上面安装了filebeat做日志收集,然后把采集到的原始日志发送到Kafka+zookeeper集群上。

    • filebeat 只能做简单的数据过滤,数据此时还是原始数据。

  2. 第二层. 消息队列层

    • 原始日志发送到Kafka+zookeeper集群上后,会进行集中存储,此时,filbeat是消息的生产者,存储的消息可以随时被消费。

    • 通过zookeeper调度和协调kafka工作,比如主节点挂掉了,选取主节点等

  3. 第三层. 数据分析层

    • Logstash作为消费者,会去Kafka+zookeeper集群节点实时拉取原始日志,然后将获取到的原始日志根据规则进行分析、清洗、过滤,最后将清洗好的日志转发至Elasticsearch集群。

    • 如数据量过大,或者考虑性能,Logstash可以为多台。

  4. 第四层. 数据持久化存储

    • Elasticsearch集群在接收到logstash发送过来的数据后,执行写磁盘,建索引库等操作,最后将结构化的数据存储到Elasticsearch集群上。
  1. 第五层. 数据查询、展示层

    • Kibana是一个可视化的数据展示平台,当有数据检索请求时,它从Elasticsearch集群上读取数据,然后进行可视化出图和多维度分析。\

环境与角色说明

服务器环境与角色

  1. 操作系统统一采用Centos7.5版本,各个服务器角色如下表所示:(我使用阿里云服务器,操作系统可能会是7.6)

软件环境与版本

  1. 下表详细说明了本节安装软件对应的名称和版本号,其中,ELK三款软件推荐选择一样的版本,这里选择的是6.3.2版本。

安装JDK以及设置环境变量

  1. 选择合适版本并下载JDK

    • Zookeeper 、elasticsearch和Logstash都依赖于Java环境,并且elasticsearch和Logstash要求JDK版本至少在JDK1.7或者以上。
  1. 安装过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[root@server2 ~]# ls
jdk-8u231-linux-x64.tar.gz

# 解压
[root@server2 ~]# tar -zxvf jdk-8u231-linux-x64.tar.gz -C /usr/local/
[root@server2 ~]# ls -l /usr/local/jdk1.8.0_231/

# 配置环境变量
[root@server2 local]# vim /etc/profile

export JAVA_HOME=/usr/local/jdk1.8.0_231
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$CLASSPATH

[root@server2 local]# source /etc/profile

# 让设置生效
[root@server2 local]# java -version
java version "1.8.0_231"
Java(TM) SE Runtime Environment (build 1.8.0_231-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.231-b11, mixed mode)

# 所有的ES环境都需要安装

安装并配置 elasticsearch 集群

elasticsearch 集群的架构与角色

  1. 在ElasticSearch的架构中,有三类角色,分别是Client Node、Data Node和Master Node,

  2. 搜索查询的请求一般是经过Client Node来向 Data Node 获取数据,

  3. 而索引查询首先请求 Master Node 节点,然后 Master Node 将请求分配到多个Data Node节点完成一次索引查询。

集群中每个角色的含义介绍如下:

  1. master node:
    • 可以理解为主节点,主要用于元数据(metadata)的处理,比如索引的新增、删除、分片分配等,以及管理集群各个节点的状态。
    • elasticsearch集群中可以定义多个主节点,但是,在同一时刻,只有一个主节点起作用,其它定义的主节点,是作为主节点的候选节点存在。
    • 当一个主节点故障后,集群会从候选主节点中选举出新的主节点。
  1. data node:
    • 数据节点,这些节点上保存了数据分片。它负责数据相关操作,比如分片的CRUD、搜索和整合等操作。
    • 数据节点上面执行的操作都比较消耗 CPU、内存和I/O资源,因此数据节点服务器要选择较好的硬件配置,才能获取高效的存储和分析性能。
  1. client node:
    • 客户端节点,属于可选节点,是作为任务分发用的,它里面也会存元数据,但是它不会对元数据做任何修改。
    • client node存在的好处是可以分担data node的一部分压力,因为elasticsearch的查询是两层汇聚的结果,
    • 第一层是在data node上做查询结果汇聚,然后把结果发给client node,client node接收到data node发来的结果后再做第二次的汇聚,
    • 然后把最终的查询结果返回给用户。这样,client node就替data node分担了部分压力。

安装 elasticsearch集群

1
2
3
4
# 环境介绍
server1 172.17.70.229 ES Master、ES NataNode
server2 172.17.70.230 ES Master、Kibana
server3 172.17.70.231 ES Master、ES NataNode
  1. 下载ES
1
2
3
4
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.2.tar.gz
https://artifacts.elastic.co/downloads/kibana/kibana-6.3.2-linux-x86_64.tar.gz
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.3.2-linux-x86_64.tar.gz
https://artifacts.elastic.co/downloads/logstash/logstash-6.3.2.tar.gz
1
2
3
4
5
6
7
8
9
[root@server2 ~]# mkdir -p /app/elk

[root@server2 elk]# ls -l
total 632464
-rw-r--r-- 1 root root 91452574 Jul 24 2018 elasticsearch-6.3.2.tar.gz
-rw-r--r-- 1 root root 12483335 Oct 25 11:02 filebeat-6.3.2-linux-x86_64.tar.gz
-rw-r--r-- 1 root root 194151339 Oct 25 10:24 jdk-8u231-linux-x64.tar.gz
-rw-r--r-- 1 root root 205331616 Oct 25 10:51 kibana-6.3.2-linux-x86_64.tar.gz
-rw-r--r-- 1 root root 144211416 Oct 25 11:02 logstash-6.3.2.tar.gz
1
2
3
4
# 解压安装es
[root@server2 elk]# tar -zxvf elasticsearch-6.3.2.tar.gz -C /usr/local
# 修改目录名
[root@server2 local]# mv /usr/local/elasticsearch-6.3.2 /usr/local/elasticsearch
1
2
3
4
5
6
7
8
9
10
11
12
13
# 目录说明
[root@server2 local]# cd elasticsearch/
[root@server2 elasticsearch]# ls -l
total 460
drwxr-xr-x 3 root root 4096 Oct 25 11:03 bin # 启动服务脚本 和 服务
drwxr-xr-x 2 root root 4096 Jul 20 2018 config # 配置文件
drwxr-xr-x 2 root root 4096 Jul 20 2018 lib
-rw-r--r-- 1 root root 13675 Jul 20 2018 LICENSE.txt
drwxr-xr-x 2 root root 4096 Jul 20 2018 logs
drwxr-xr-x 17 root root 4096 Jul 20 2018 modules
-rw-r--r-- 1 root root 416018 Jul 20 2018 NOTICE.txt
drwxr-xr-x 2 root root 4096 Jul 20 2018 plugins # 插件
-rw-r--r-- 1 root root 8511 Jul 20 2018 README.textile
1
2
3
4
5
6
7
8
[root@server2 elasticsearch]# cd config/ 
-rw-rw---- 1 root root 2853 Jul 20 2018 elasticsearch.yml # 主配置文件
-rw-rw---- 1 root root 2937 Jul 20 2018 jvm.options # JVM内存配置
-rw-rw---- 1 root root 6380 Jul 20 2018 log4j2.properties
-rw-rw---- 1 root root 473 Jul 20 2018 role_mapping.yml
-rw-rw---- 1 root root 197 Jul 20 2018 roles.yml
-rw-rw---- 1 root root 0 Jul 20 2018 users
-rw-rw---- 1 root root 0 Jul 20 2018 users_roles

增加es用户授权

1
2
3
4
5
6
7
8
#增加用户组
groupadd elasticsearch

#增加用户,并规定所属用户组和密码
useradd elasticsearch -g elasticsearch

# 递归更改文件的拥有者
chown -R elasticsearch:elasticsearch /usr/local/elasticsearch

操作系统调优

  1. 操作系统以及JVM调优主要是针对安装elasticsearch的机器。对于操作系统,需要调整几个内核参数,将下面内容添加到/etc/sysctl.conf文件中:

  2. fs.file-max主要是配置系统最大打开文件描述符数,建议修改为655360或者更高,

  3. vm.max_map_count影响Java线程数量,用于限制一个进程可以拥有的VMA(虚拟内存区域)的大小,系统默认是65530,建议修改成262144或者更高。

1
2
3
4
5
6
[root@server2 config]# vim /etc/sysctl.conf 
fs.file-max=655360
vm.max_map_count = 262144

# 启用生效
[root@server2 config]# sysctl -p
  1. 调整进程最大打开文件描述符(nofile)、最大用户进程数(nproc)和最大锁定内存地址空间(memlock),添加如下内容到/etc/security/limits.conf文件中:
1
2
3
4
5
6
7
8
[root@server2 config]# vim /etc/security/limits.conf 

* soft nofile 204800
* hard nofile 204800
* soft nproc 655350
* hard nproc 655350
* soft memlock unlimited
* hard memlock unlimited
  1. 最后,还需要修改/etc/security/limits.d/20-nproc.conf文件(centos7.x系统)
1
2
3
4
5
6
7
[root@server2 config]# vim /etc/security/limits.d/20-nproc.conf
* soft nproc 655350

# 启用生效
重新打开终端即可

[root@server2 ~]# ulimit -a

JVM调优

  1. JVM调优主要是针对elasticsearch的JVM内存资源进行优化,elasticsearch的内存资源配置文件为jvm.options,
  2. 此文件位于/usr/local/elasticsearch/config目录下,打开此文件,修改如下内容:
1
2
3
4
5
6
7
[root@server2 ~]# cd /usr/local/elasticsearch/config/
[root@server2 config]# vim jvm.options
-Xms1g
-Xmx1g

# 我的阿里云主机是2G内存
# 可根据服务器内存大小,修改为合适的值。一般设置为服务器物理内存的一半最佳。

配置 elasticsearch

  1. elasticsearch的配置文件均在elasticsearch根目录下的config文件夹,这里是/usr/local/elasticsearch/config目录,
  2. 主要有jvm.options、elasticsearch.yml和log4j2.properties三个主要配置文件。这里重点介绍elasticsearch.yml一些重要的配置项及其含义。
  3. 这里配置的elasticsearch.yml文件内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@server2 config]# cd /usr/local/elasticsearch/config
[root@server2 config]# vim elasticsearch.yml


cluster.name: elkbigdata # 集群名称
node.name: server1 # 节点名称 配置文件这里都不一致
node.master: true # 是否有权利成为master 默认是true 默认第一台启动的es就是master
node.data: true # 是否是一个数据节点
path.data: /data1/elasticsearch,/data2/elasticsearch # 索引数据的存储路径
path.logs: /usr/local/elasticsearch/logs # 日志
bootstrap.memory_lock: true # 锁住物理内存 不使用swap
network.host: 0.0.0.0
http.port: 9200 # 对外提供http服务端口 通过该端口查看返回的数据信息
discovery.zen.minimum_master_nodes: 1 # 最少master节点数 集群中不能低于这个数
discovery.zen.ping.timeout:3s # 默认 新增节点集群会去ping 3秒ping不到就认为故障 网络丢包就加大
discovery.zen.ping.unicast.hosts: ["172.17.70.229:9300", "172.17.70.230:9300", "172.17.70.231:9300"]
# master初始化列表

# 9300 是es集群互相通信端口
  1. 注意node.name肯定不能相同,其他可以相同

  2. cluster.name: elkbigdata

    • 配置elasticsearch集群名称,默认是elasticsearch。这里修改为elkbigdata,elasticsearch会自动发现在同一网段下的集群名为elkbigdata的主机。
  3. node.name: server1
    • 节点名,任意指定一个即可,这里是server1,我们这个集群环境中有三个节点,分别是server1、server2和server3,记得根据主机的不同,要修改相应的节点名称。
  4. node.master: true
    • 指定该节点是否有资格被选举成为master,默认是true,elasticsearch集群中默认第一台启动的机器为master角色,如果这台服务器宕机就会重新选举新的master。
  5. node.data: true
    • 指据,默认为true,表示数据存储节点,如果节点配置node.master:false并且node.data: false,则该节点就是client node。
    • 这个client node类似于一个“路由器”,负责将集群层面的请求转发到主节点,将数据相关的请求转发到数据节点。
  6. path.data:/data1/elasticsearch,/data2/elasticsearch
    • 设置索引数据的存储路径,默认是elasticsearch根目录下的data文件夹,这里自定义了两个路径,可以设置多个存储路径,用逗号隔开。
  7. path.logs: /usr/local/elasticsearch/logs
    • 设置日志文件的存储路径,默认是elasticsearch根目录下的logs文件夹
  8. bootstrap.memory_lock: true
    • 此配置项一般设置为true用来锁住物理内存。linux下可以通过“ulimit -l” 命令查看最大锁定内存地址空间(memlock)是不是unlimited
  9. network.host: 0.0.0.0
    • 此配置项用来设置elasticsearch提供服务的IP地址,默认值为0.0.0.0,此参数是在elasticsearch新版本中增加的,此值设置为服务器的内网IP地址即可。
  10. http.port: 9200
    • 设置elasticsearch对外提供服务的http端口,默认为9200。其实,还有一个端口配置选项transport.tcp.port,此配置项用来设置节点间交互通信的TCP端口,默认是9300。
  11. discovery.zen.minimum_master_nodes: 1
    • 配置当前集群中最少的master节点数,默认为1,也就是说,elasticsearch集群中master节点数不能低于此值,如果低于此值,elasticsearch集群将停止运行。在三个以上节点的集群环境中,建议配置大一点的值,推荐2至4个为好。
  12. discovery.zen.ping.unicast.hosts: [“172.17.70.229:9300”, “172.17.70.230:9300”,”172.17.70.231:9300”]
    • 设置集群中master节点的初始列表,可以通过这些节点来自动发现新加入集群的节点。这里需要注意,master节点初始列表中对应的端口是9300。即为集群交互通信端口。

创建data目录

1
2
3
4
5
6
# /data1/elasticsearch,/data2/elasticsearch

[root@server2 config]# mkdir -p /data1/elasticsearch
[root@server2 config]# mkdir -p /data2/elasticsearch
[root@server2 config]# chown -R elasticsearch:elasticsearch /data1/elasticsearch/
[root@server2 config]# chown -R elasticsearch:elasticsearch /data2/elasticsearch/

使用普通用户启动ES服务

1
2
3
4
5
6
7
8
9
10
11
# es5版本后 禁止root用户启动es
# 启动elasticsearch服务需要在一个普通用户下完成,如果通过root用户启动elasticsearch的话,可能会收到如下错误:
# java.lang.RuntimeException: can not run elasticsearch as root
# “-d”参数的意思是将elasticsearch放到后台运行。

[root@server2 config]# su - elasticsearch
[elasticsearch@server2 elasticsearch]$ cd /usr/local/elasticsearch/
[elasticsearch@server2 elasticsearch]$ bin/elasticsearch -d

[elasticsearch@server2 logs]$ netstat -ntlp
[elasticsearch@server2 logs]$ ps -aux |grep java

验证elasticsearch集群的正确性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 将所有elasticsearch节点的服务启动后,在任意一个节点执行如下命令:
[elasticsearch@server2 logs]$ curl http://172.17.70.230:9200
{
"name" : "server2",
"cluster_name" : "elkbigdata",
"cluster_uuid" : "Qxj3RgRARVCylLtalVOZFA",
"version" : {
"number" : "6.3.2",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "053779d",
"build_date" : "2018-07-20T05:20:23.451332Z",
"build_snapshot" : false,
"lucene_version" : "7.3.1",
"minimum_wire_compatibility_version" : "5.6.0",
"minimum_index_compatibility_version" : "5.0.0"
},
"tagline" : "You Know, for Search"
}

# 同时查看另外两台节点
curl http://172.17.70.229:9200
curl http://172.17.70.231:9200

安装与配置 zookeeper 集群

  1. 对于集群模式下的ZooKeeper部署,官方建议至少要三台服务器,关于服务器的数量,推荐是奇数个(3、5、7、9等等),以实现ZooKeeper集群的高可用,这里使用三台服务器进行部署

下载与安装zookeeper

  1. ZooKeeper是用Java编写的,需要安装Java运行环境,可以从zookeeper官网https://zookeeper.apache.org/获取zookeeper安装包,这里安装的版本是zookeeper-3.4.13.tar.gz。
  2. 将下载下来的安装包直接解压到一个路径下即可完成zookeeper的安装,
1
2
3
[root@server2 elk]# tar -zxvf zookeeper-3.4.13.tar.gz -C /usr/local
[root@server2 elk]# mv /usr/local/zookeeper-3.4.13 /usr/local/zookeeper
[root@server2 elk]# cd /usr/local/zookeeper/

配置zookeeper

  1. zookeeper 安装到了/usr/local目录下,因此,zookeeper的配置模板文件/usr/local/zookeeper/conf/zoo_sample.cfg,
  2. 拷贝zoo_sample.cfg并重命名为zoo.cfg,重点配置如下内容:
1
2
[root@server2 conf]# cd /usr/local/zookeeper/conf/
[root@server2 conf]# cp zoo_sample.cfg zoo.cfg
1
2
3
4
5
6
7
8
9
# 配置属性参数
tickTime=2000 # 控制心跳超时 2000毫秒 度量单位
initLimit=10 # Follower服务器初始化连接到Leader时 最长能忍受多少个心跳 10*2000=20秒
syncLimit=5 # Leader与Follower之间发送消息,请求和应答时间长度 5*2000
dataDir=/data/zookeeper # 必须配 存储快照 不配置log 也会放在这里
clientPort=2181 # 监听端口
server.1=172.16.213.51:2888:3888 # 集群服务器信息 .1代表第几台服务器
server.2=172.16.213.109:2888:3888 # 2888 是与 Leader 通信的端口
server.3=172.16.213.75:2888:3888 # 3888 选举时服务器之间通信端口
1
2
3
4
5
6
7
8
9
[root@kafkazk1 conf]# grep ^'[a-Z]' zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181
server.1=172.17.70.232:2888:3888
server.2=172.17.70.233:2888:3888
server.3=172.17.70.234:2888:3888

每个配置项含义如下

  1. tickTime:zookeeper使用的基本时间度量单位,以毫秒为单位,它用来控制心跳和超时。2000表示2 tickTime。更低的tickTime值可以更快地发现超时问题。
  2. initLimit:这个配置项是用来配置Zookeeper集群中Follower服务器初始化连接到Leader时,最长能忍受多少个心跳时间间隔数(也就是tickTime)
  3. syncLimit:这个配置项标识Leader与Follower之间发送消息,请求和应答时间长度最长不能超过多少个tickTime的时间长度
  4. dataDir:必须配置项,用于配置存储快照文件的目录。需要事先创建好这个目录,如果没有配置dataLogDir,那么事务日志也会存储在此目录。
  5. clientPort:zookeeper服务进程监听的TCP端口,默认情况下,服务端会监听2181端口。
  6. server.A=B:C:D:其中A是一个数字,表示这是第几个服务器;B是这个服务器的IP地址;
    C表示的是这个服务器与集群中的Leader服务器通信的端口;D 表示如果集群中的Leader服务器宕机了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

myid 文件

  1. 除了修改zoo.cfg配置文件外,集群模式下还要配置一个文件myid,
  2. 这个文件需要放在dataDir配置项指定的目录下,这个文件里面只有一个数字,如果要写入1,表示第一个服务器,与zoo.cfg文本中的server.1中的1对应,以此类推,
  3. 在集群的第二个服务器zoo.cfg配置文件中dataDir配置项指定的目录下创建myid文件,写入2,这个2与zoo.cfg文本中的server.2中的2对应。
  4. Zookeeper在启动时会读取这个文件,得到里面的数据与zoo.cfg里面的配置信息比较,从而判断每个zookeeper server的对应关系。
  5. 为了保证zookeeper集群配置的规范性,建议将zookeeper集群中每台服务器的安装和配置文件路径都保存一致。
1
2
3
server1 myid 1
server2 myid 2
server3 myid 3
1
2
3
4
5
6
7
[root@kafkazk1 conf]# mkdir -p /data/zookeeper
[root@kafkazk1 conf]# cd /data/zookeeper
[root@kafkazk1 zookeeper]# vim myid
[root@kafkazk1 zookeeper]# cat /data/zookeeper/myid
1

# 3台集群同样操作

启动 zookeeper

1
2
3
4
5
6
7
# 三台机器一起启动

[root@kafkazk1 conf]# cd /usr/local/zookeeper/bin
[root@kafkazk1 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 查看启动是否成功
1. Zookeeper启动后,通过jps命令(jdk内置命令)可以看到有一个QuorumPeerMain标识,
2. 这个就是Zookeeper启动的进程,前面的数字是Zookeeper进程的PID。

[root@kafkazk1 bin]# jps
1334 QuorumPeerMain
1359 Jps

# 日志文件 会在启动的路径下
[root@kafkazk3 bin]# tail -200 zookeeper.out

# 端口
[root@kafkazk3 bin]# netstat -tnlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:2181 0.0.0.0:* LISTEN 1334/java
tcp 0 0 172.17.70.234:3888 0.0.0.0:* LISTEN 1334/java
1
2
3
4
5
6
7
# 有时候为了启动Zookeeper方面,也可以添加zookeeper环境变量到系统的/etc/profile中,
# 这样,在任意路径都可以执行“zkServer.sh start”命令了,添加环境变量的内容为:
vim /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

source /etc/profile

安装并配置 Kafka Broker 集群

  1. 这里将kafka和zookeeper部署在一起了。另外,由于是部署集群模式的kafka,因此下面的操作需要在每个集群节点都执行一遍。

下载与安装Kafka

1
2
# 选择kafka版本 需要与 filebeat 需求的对应
https://www.elastic.co/guide/en/beats/filebeat/6.3/kafka-output.html

  1. 可以从kafka官网 https://kafka.apache.org/downloads 获取kafka安装包,这里推荐的版本是kafka_2.10-0.10.0.1.tgz,

  2. 将下载下来的安装包直接解压到一个路径下即可完成kafka的安装,这里统一将kafka安装到/usr/local目录下,基本操作过程如下:

1
2
[root@kafkazk1 elk]# tar -zxvf kafka_2.10-0.10.0.1.tgz -C /usr/local
[root@kafkazk1 elk]# mv /usr/local/kafka_2.10-0.10.0.1 /usr/local/kafka
1
2
3
4
5
6
7
8
[root@kafkazk1 kafka]# ls -l

drwxr-xr-x 3 root root 4096 Aug 4 2016 bin
drwxr-xr-x 2 root root 4096 Aug 4 2016 config
drwxr-xr-x 2 root root 4096 Oct 26 11:50 libs
-rw-r--r-- 1 root root 28824 Aug 4 2016 LICENSE
-rw-r--r-- 1 root root 336 Aug 4 2016 NOTICE
drwxr-xr-x 2 root root 4096 Aug 4 2016 site-docs
1
2
3
4
# 主配置文件
[root@kafkazk1 kafka]# ls -l config/server.properties

# kafka他也自带了zookeeper,但是我们用的是自己的所有不用关心他的配置文件

配置 kafka集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 常用配置

broker.id=1 # 节点在集群中的唯一标识 每个节点需要修改
listeners=PLAINTEXT://172.16.213.51:9092 # kafka监听地址和端口 9092默认 每个节点需要修改
log.dirs=/usr/local/kafka/logs # 日志文件 关键是数据
num.partitions=6 # topic有多少个分区 >= 消费者数 保证每个消费者都能得到数据
num.recovery.threads.per.data.dir=1 # 可减少启动时 日志的加载时间
log.retention.hours=60 # 日志保持时间 60小时
log.segment.bytes=1073741824 # partition中每个segment数据文件的大小,默认是1G
zookeeper.connect=172.16.213.51:2181,172.16.213.75:2181,172.16.213.109:2181
# zookeeper 所在的地址 三个zookeeper节点
log.retention.check.interval.ms=300000 # 日志检查时间
zookeeper.connection.timeout.ms=6000 # zookeeper 连接超时时间

auto.create.topics.enable=true # 是否自动创建topic
delete.topic.enable=true # 设置可以物理删除topic

每个配置项含义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
1. broker.id:
- 每一个broker在集群中的唯一表示,要求是正数。
- 当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况。


2. listeners:
- 设置kafka的监听地址与端口,可以将监听地址设置为主机名或IP地址,这里将监听地址设置为IP地址。


3. log.dirs:
- 这个参数用于配置kafka保存数据的位置,kafka中所有的消息都会存在这个目录下。
- 可以通过逗号来指定多个路径, kafka会根据最少被使用的原则选择目录分配新的parition。
- 需要注意的是,kafka在分配parition的时候选择的规则不是按照磁盘的空间大小来定的,而是根据分配的 parition的个数多小而定。


4. num.partitions:
- 这个参数用于设置新创建的topic有多少个分区,可以根据消费者实际情况配置,配置过小会影响消费性能。
- 这里配置6个。


5. log.retention.hours:
- 这个参数用于配置kafka中消息保存的时间,还支持log.retention.minutes和 log.retention.ms配置项。
- 这三个参数都会控制删除过期数据的时间,推荐使用log.retention.ms。如果多个同时设置,那么会选择最小的那个。


6. log.segment.bytes:
- 配置partition中每个segment数据文件的大小,默认是1GB,超过这个大小会自动创建一个新的segment file。


7. zookeeper.connect:
- 这个参数用于指定zookeeper所在的地址,它存储了broker的元信息。
- 这个值可以通过逗号设置多个值,每个值的格式均为:hostname:port/path,
- 每个部分的含义如下:
- hostname:表示zookeeper服务器的主机名或者IP地址,这里设置为IP地址。
- port: 表示是zookeeper服务器监听连接的端口号。
- /path:表示kafka在zookeeper上的根目录。如果不设置,会使用根目录。


8. auto.create.topics.enable:
- 这个参数用于设置是否自动创建topic,如果请求一个topic时发现还没有创建, kafka会在broker上自动创建一个topic,
- 如果需要严格的控制topic的创建,那么可以设置auto.create.topics.enable为false,禁止自动创建topic。


9. delete.topic.enable:
- 在0.8.2版本之后,Kafka提供了删除topic的功能,但是默认并不会直接将topic数据物理删除。
- 如果要从物理上删除(即删除topic后,数据文件也会一同删除),就需要设置此配置项为true。

本次配置

  1. kafka集群都要修改配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@kafkazk1 config]# grep ^'[a-Z]' server.properties 

broker.id=1
listeners=PLAINTEXT://172.17.70.232:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
log.retention.hours=60
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181
zookeeper.connection.timeout.ms=6000
auto.create.topics.enable=true
delete.topic.enable=true

启动kafka集群

  1. 在启动kafka集群前,需要确保ZooKeeper集群已经正常启动。
  2. 依次在kafka各个节点上执行如下命令即可
1
2
3
4
5
6
7
8
9
10
# nohup 后台启动
# & 输出nohup.out 日志在本地

[root@kafkazk1 kafka]# cd /usr/local/kafka
[root@kafkazk1 kafka]# nohup bin/kafka-server-start.sh config/server.properties &
[1] 1766

[root@kafkazk1 kafka]# nohup: ignoring input and appending output to ‘nohup.out’
[root@kafkazk1 kafka]# ls
bin config libs LICENSE logs nohup.out NOTICE site-docs
1
2
3
4
5
6
7
8
这里将kafka放到后台运行,启动后,会在启动kafka的当前目录下生成一个nohup.out文件,
1. 可通过此文件查看kafka的启动和运行状态。
2. 通过jps指令,可以看到有个Kafka标识,这是kafka进程成功启动的标志。

[root@kafkazk1 kafka]# jps
2032 Jps
1334 QuorumPeerMain
1766 Kafka
1
2
3
[root@kafkazk2 kafka]# tail -200 nohup.out 
[root@kafkazk1 kafka]# ps -ef|grep java
[root@kafkazk1 kafka]# netstat -tnlp | grep 9092

kafka 集群基本命令操作

  1. kefka提供了多个命令用于查看、创建、修改、删除topic信息,

  2. 也可以通过命令测试如何生产消息,消费消息等,这些命令位于kafka安装目录的bin目录下,这里是/usr/local/kafka/bin。

  3. 登录任意一台kafka集群节点,切换到此目录下,即可进行命令操作。

  4. 下面列举kafka的一些常用命令的使用方法:

1
2
3
4
5
6
7
# 务必掌握
1. 显示topic列表
2. 创建一个topic,并指定topic属性(副本数、分区数等)
3. 查看某个topic的状态
4. 生产消息
5. 消费消息
6. 删除topic
1
2
3
# 显示当前kafka集群中的topic列表
[root@kafkazk1 kafka]# cd /usr/local/kafka
[root@kafkazk1 kafka]# bin/kafka-topics.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --list
1
2
3
4
5
6
7
8
9
10
# 创建一个topic
[root@kafkazk1 kafka]# bin/kafka-topics.sh --create --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --replication-factor 1 --partitions 3 --topic mytopic
Created topic "mytopic".

--replication-factor 1 # 副本信息 保存1份
--partitiions3 # partitiions数量 和消费者数量有关
--topic mytopic # topic 名称

[root@kafkazk1 kafka]# bin/kafka-topics.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --list
mytopic
1
2
3
4
5
6
7
8
9
# 查看某个topic的属性信息
[root@kafkazk1 kafka]# bin/kafka-topics.sh --describe --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --topic mytopic
Topic:mytopic PartitionCount:3 ReplicationFactor:1 Configs:
Topic: mytopic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: mytopic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: mytopic Partition: 2 Leader: 3 Replicas: 3 Isr: 3

# Replicas 对应副本
# Isr: 1 活动状态
1
2
3
4
5
6
7
8
# 生产消息
# --broker-list 指定broker信息
# 生产消息指定broker地址+端口 kafka集群的节点信息
# --topic 指定消息生产在哪个topic
# 交互命令 敲完命令 回车 输入消息
[root@kafkazk1 kafka]# bin/kafka-console-producer.sh --broker-list 172.17.70.232:9092,172.17.70.233:9092,172.17.70.234:9092 --topic mytopic
test data kafka
123456
1
2
3
4
5
6
7
8
9
10
# 消费消息 
# 查看消息 就是消费消息
# 在开个终端查看消息
# 消息是实时消费
[root@kafkazk2 kafka]# bin/kafka-console-consumer.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --topic mytopic
heelo
abcdefg

# 通过键盘输入消息内容,消费者马上可以看到
# Crtl + c 退出

1
2
3
4
5
6
7
8
# 从头开始接收 查看所有消息
# 从头开始接收消息并没有顺序 ,只有实时消费查看 是按照生产的顺序
[root@kafkazk2 kafka]# bin/kafka-console-consumer.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --topic mytopic --from beginning
heelo
test data kafka
abcdefg
123456
leo
  1. 实际生产机制,并不是键盘生产消息,而是通过第三方软件,比如logstash或者filebeat,向kafka生产数据

  2. 消费者 可以使ES 也可以使logstash,最终实现数据传递

  3. kafka就是生产和消费数据的中介,他实现数据的传递,消息队列,持久化缓存数据,作用于消息传输和保存数据

1
2
3
4
5
6
7
8
9
10
11
# 删除一个topic
[root@kafkazk1 kafka]# bin/kafka-topics.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --delete --topic mytopic
Topic mytopic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 如果没有设置 delete.topic.enable=true 那么就是标记删除(逻辑删除),设置了就是物理删除

# 再次查看就没有了
[root@kafkazk1 kafka]# bin/kafka-topics.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --list

# 调试消费数据会经常用到,确认数据是否到kafa

安装并配置 Filebeat

为什么要使用 filebeat

  1. Logstash功能虽然强大,但是它依赖java、在数据量大的时候,Logstash进程会消耗过多的系统资源,这将严重影响业务系统的性能,
  2. 而filebeat就是一个完美的替代者,filebeat是Beat成员之一,基于Go语言,没有任何依赖,配置文件简单,格式明了,
  3. 同时,filebeat比logstash更加轻量级,所以占用系统资源极少,非常适合安装在生产机器上。

下载与安装 filebeat

  1. 由于filebeat基于go语言开发,无其他任何依赖,因而安装非常简单,
  2. 可以从elastic官网https://www.elastic.co/downloads/beats/filebeat 获取filebeat安装包,
  3. 这里下载的版本是filebeat-6.3.2-linux-x86_64.tar.gz。
  4. 将下载下来的安装包直接解压到一个路径下即可完成filebeat的安装。
  5. 根据前面的规划,将filebeat安装到filebeat server主机上,这里设定将filebeat安装到/usr/local目录下,
  6. 基本操作过程如下:
1
2
3
4
5
6
7
[root@filebeat1 ~]# mkdir -p /app/elk
[root@filebeat1 ~]# cd /app/elk/
[root@filebeat1 elk]# ls
filebeat-6.3.2-linux-x86_64.tar.gz

[root@filebeat1 elk]# tar -zxvf filebeat-6.3.2-linux-x86_64.tar.gz -C /usr/local
[root@filebeat1 elk]# mv /usr/local/filebeat-6.3.2-linux-x86_64 /usr/local/filebeat
1
2
3
4
5
6
7
8
9
10
11
12
13
[root@filebeat1 filebeat]# cd /usr/local/filebeat/
[root@filebeat1 filebeat]# ls -l

-rw-r--r-- 1 root root 55717 Jul 20 2018 fields.yml
-rwxr-xr-x 1 root root 47593843 Jul 20 2018 filebeat # 启动文件
-rw-r----- 1 root root 58886 Jul 20 2018 filebeat.reference.yml
-rw------- 1 root root 7230 Jul 20 2018 filebeat.yml # 配置文件
drwxrwxr-x 4 1000 1000 4096 Jul 20 2018 kibana
-rw-r--r-- 1 root root 13675 Jul 20 2018 LICENSE.txt
drwxr-xr-x 16 1000 1000 4096 Jul 20 2018 module
drwxr-xr-x 2 root root 4096 Jul 20 2018 modules.d # 模块文件 快速配置filebeat
-rw-r--r-- 1 root root 143351 Jul 20 2018 NOTICE.txt
-rw-r--r-- 1 root root 802 Jul 20 2018 README.md
1
2
3
# 配置方法
1. 手动配置 filebeat.yml 能更清楚运行机制
2. 模块化配置 modules.d 下的已经写好的配置 , 重命名就可以使用

配置 filebeat

  1. filebeat的配置文件目录为/usr/local/filebeat/filebeat.yml,这里仅列出常用的配置项,内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
filebeat.inputs:
- type: log # 输入log日志类型
enabled: true # 默认是走模块方式,手工需要配置为true 模块就不启作用了
paths: # 指定要收集的日志 可以是完整路径,也可以是模糊匹配
- /var/log/messages
- /var/log/secure
fields: # log_topic: osmessages 就是我们定义的topic 的名称 所以搜集的日志会放入
log_topic: osmessages

#exclude_files: ['.gz$'] # 过滤 排除.gz结尾的文件

name: "172.16.213.157" # 服务器的标识 为空就是主机名
output.kafka: # kafka 配置
enabled: true
hosts: ["172.16.213.51:9092", "172.16.213.75:9092", "172.16.213.109:9092"] # kafka集群
version: "0.10" # 版本
topic: '%{[fields][log_topic]}' # 指定输出到哪个topic
partition.round_robin: # rr轮询
reachable_only: true
worker: 2
required_acks: 1 # 最大限度保证数据写入 leader确定后再进行发送下一条
compression: gzip # 数据压缩
max_message_bytes: 10000000 # 单条消息的最大长度 10M
logging.level: debug # 消息级别 生产的时候可以改成警告或者info
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 实验版本
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
- /var/log/secure

output.kafka:
enabled: true
hosts: ["172.17.70.232:9092", "172.17.70.232:9092", "172.17.70.232:9092"]
version: "0.10"
topic: '%{[fields][log_topic]}'
partition.round_robin:
reachable_only: true
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 10000000
processors:
- drop_fields:
fields: ["beat", "input", "source", "offset","prospector","host"]
logging.level: debug
name: "172.17.70.235"
1
2
3
# 官方文档支持
https://www.elastic.co/guide/en/beats/filebeat/6.3/configuration-filebeat-options.html
https://www.elastic.co/guide/en/beats/filebeat/6.3/configuring-output.html
1
# 所有输出的选项 都需要注释 默认的是es

1
收费插件 Xpack

配置项的含义介绍如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
1. filebeat.inputs:
- 用于定义数据原型。

2. type:
- 指定数据的输入类型,这里是log,即日志,是默认值,还可以指定为stdin,即标准输入。参考官方文档

3. enabled: true:
- 启用手工配置filebeat,而不是采用模块方式配置filebeat。

4. paths:
- 用于指定要监控的日志文件,可以指定一个完整路径的文件,也可以是一个模糊匹配格式,例如:
- - /data/nginx/logs/nginx_*.log,该配置表示将获取/data/nginx/logs目录下的所有以.log结尾的文件,注意这里有个破折号“-”,
- 要在paths配置项基础上进行缩进,不然启动filebeat会报错,另外破折号前面不能有tab缩进,建议通过空格方式缩进。
- - /var/log/*.log,该配置表示将获取/var/log目录的所有子目录中以”.log”结尾的文件,而不会去查找/var/log目录下以”.log”结尾的文件。

5. name:
- 设置filebeat收集的日志中对应主机的名字,如果配置为空,则使用该服务器的主机名。这里设置为IP,便于区分多台主机的日志信息。\

6. output.kafka:
- filebeat支持多种输出,支持向kafka,logstash,elasticsearch输出数据,这里的设置是将数据输出到kafka。

7. enabled:
- 表明这个模块是启动的。

8. host:
- 指定输出数据到kafka集群上,地址为kafka集群IP加端口号。

9. topic:
- 指定要发送数据给kafka集群的哪个topic,若指定的topic不存在,则会自动创建此topic。
- 注意topic的写法,在filebeat6.x之前版本是通过“%{[type]}”来自动获取document_type配置项的值。
- 而在filebeat6.x之后版本是通过'%{[fields][log_topic]}'来获取日志分类的。

10. logging.level:
- 定义filebeat的日志输出级别,有critical、error、warning、info、debug五种级别可选,在调试的时候可选择debug模式。
1
# 过滤 清除字段

启动 filebeat 收集日志

  1. 所有配置完成之后,就可以启动filebeat,开启收集日志进程了,启动方式如下:
1
2
3
4
5
6
[root@filebeat1 filebeat]# cd /usr/local/filebeat/

[root@filebeat1 filebeat]# nohup ./filebeat -e -c filebeat.yml &
[root@filebeat1 filebeat]# tail -200 nohup.out

# 开始监控文件

1
2
3
4
5
6
7
8
9
10
# 看看要收集的日志 格式

[root@filebeat1 filebeat]# tail /var/log/messages
Oct 26 17:12:01 filebeat1 rsyslogd: [origin software="rsyslogd" swVersion="8.24.0" x-pid="813" x-info="http://www.rsyslog.com"] rsyslogd was HUPed
Oct 26 17:20:01 filebeat1 systemd: Started Session 11 of user root.
Oct 26 17:20:01 filebeat1 systemd: Starting Session 11 of user root.
Oct 26 17:30:01 filebeat1 systemd: Started Session 12 of user root.
Oct 26 17:30:01 filebeat1 systemd: Starting Session 12 of user root.

[root@filebeat1 filebeat]# tail /var/log/secure

模拟测试 filebeat 输出信息格式解读

  1. 开启filebeat的日志查看记录,有变化就会更新到日志中
1
[root@filebeat1 filebeat]# tailf nohup.out
  1. 模拟一个失败的登录,日志产生到 /var/log/secure 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 找一台服务器访问filebeat主机,故意失败登录,故意输错密码
[root@server2 elk]# ssh 172.17.70.235
root@172.17.70.235's password:
Permission denied, please try again.


[root@filebeat1 filebeat]# tailf nohup.out
# json格式的输出
...
"@timestamp": "2019-10-26T09:39:23.831Z", # 收集到日志的具体时间
"source": "/var/log/secure", # 收集的哪个日志
"offset": 151, # 偏移量,消费信息的时候可以使用

"prospector": { # 收集类型 和 输入日志类型
"type": "log"
},
"input": {
"type": "log"
},


"fields": { # 自定义的列
"log_topic": "osmessages"
},


"beat": { # 收集信息
"name": "172.17.70.235",
"hostname": "filebeat1",
"version": "6.3.2"
},

"host": { # 主机信息
"name": "172.17.70.235"
},

# 收集的数据信息
"message": "Oct 26 17:39:14 filebeat1 sshd[1527]: pam_unix(sshd:auth): authentication failure; logname= uid=0 euid=0 tty=ssh ruser= rhost=172.17.70.230 user=root",
...

filebeat输出信息格式解读

  1. 从这个输出可以看到,输出日志被修改成了JSON格式,日志总共分为10个字段,
  2. 分别是 “@timestamp”、”@metadata”、”beat”、”host”、”source”、
    “offset”、”message”、”prospector”、”input”和”fields”字段,
  3. 每个字段含义如下:
1
2
3
4
5
6
7
8
9
10
1. @timestamp:时间字段,表示读取到该行内容的时间。
2. @metadata:元数据字段,此字段只有是跟Logstash进行交互使用。
3. beat:beat属性信息,包含beat所在的主机名、beat版本等信息。
4. host: 主机名字段,输出主机名,如果没主机名,输出主机对应的IP。
5. source: 表示监控的日志文件的全路径。
6. offset: 表示该行日志的偏移量。
7. message: 表示真正的日志内容。
8. prospector:filebeat对应的消息类型。
9. input:日志输入的类型,可以有多种输入类型,例如Log、Stdin、redis、Docker、TCP/UDP等
10.fields:topic对应的消息字段或自定义增加的字段。

过滤字段

  1. 日志输出格式介绍和字段删减方法
  2. filebeat收集了这么多字段的数据,所有我们要做一个简单的过滤 再交给后面的程序
  3. 通过filebeat接收到的内容,默认增加了不少字段,但是有些字段对数据分析来说没有太大用处,
  4. 所以有时候需要删除这些没用的字段,在filebeat配置文件中添加如下配置,即可删除不需要的字段:
  5. 这个设置表示删除”beat”、”input”、”source”、”offset” 四个字段,其中,
  6. @timestamp 和@metadata字段是不能删除的,就算加上也过滤不了。
  7. 做完这个设置后,再次查看kafka中的输出日志,已经不再输出这四个字段信息了。
  8. 后面可以通过logstash进行更好的过滤,不用担心
1
2
3
processors:
- drop_fields:
fields: ["beat", "input", "source", "offset"]

1
2
3
4
# 重启filebeat 
[root@filebeat1 filebeat]# pgrep -f filebeat
1514
[root@filebeat1 filebeat]# kill -9 `pgrep -f filebeat`
1
# 减少输出后

查看kafka集群有没有收到日志

  1. 登录到任意的kafka集群节点

  2. 消费topic osmessages就行

1
2
3
4
5
6
7
8
9
10
# 先看看有没有创建 我们定义的topic
[root@kafkazk2 kafka]# bin/kafka-topics.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --list
osmessages

# 开启一个消费
[root@kafkazk2 kafka]# bin/kafka-console-consumer.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --topic osmessages

# 再模拟一次数据查看 数据有消费
# filebeat 实时查询并生产数据 可以正常发送给 kafka
# logstash 可以进行更好的过滤将想要的数据 匹配给不同的字段

1
2
3
4
# 时间概念:

"@timestamp":"2019-10-26T10:32:52.317Z" # 收集日志时间
"message":"Oct 26 18:32:51 # 输出信息:系统日志打印的真实时间

安装并配置 Logstash 服务

下载与安装 Logstash

  1. 可以从elastic官网 https://www.elastic.co/downloads/logstash 获取logstash安装包,这里下载的版本是logstash-6.3.2.tar.gz。
  2. 将下载下来的安装包直接解压到一个路径下即可完成logstash的安装。根据前面的规划,
  3. 将logstash安装到logstash server主机上,这里统一将logstash安装到/usr/local目录下,基本操作过程如下:
1
2
3
4
5
6
7
[root@filebeat1 ~]# mkdir -p /app/elk
[root@filebeat1 ~]# cd /app/elk

[root@logstash elk]# tar -zxvf logstash-6.3.2.tar.gz -C /usr/local
[root@logstash elk]# mv /usr/local/logstash-6.3.2 /usr/local/logstash

[root@logstash elk]# cd /usr/local/logstash/

Logstash 是怎么工作的

  1. Logstash是一个开源的、服务端的数据处理pipeline(管道),它可以接收多个源的数据、然后对它们进行转换、最终将它们发送到指定类型的目的地。

  2. Logstash是通过 插件机制 实现各种功能的,可以在https://github.com/logstash-plugins 下载各种功能的插件,也可以自行编写插件。

  3. Logstash实现的功能主要分为接收数据、解析过滤并转换数据、输出数据三个部分,对应的插件依次是input插件、filter插件、output插件,

  4. 其中,filter插件是可选的,其它两个是必须插件。也就是说在一个完整的Logstash配置文件中,必须有input插件和output插件。

1
2
# 官方文档学习
https://www.elastic.co/guide/en/logstash/6.3/input-plugins.html

常用的 input 插件

  1. input插件主要用于接收数据,Logstash支持接收多种数据源,常用的有如下几种:
  2. file:
    • 读取一个文件,这个读取功能有点类似于linux下面的tail命令,一行一行的实时读取。
  3. syslog:
    • 监听系统514端口的syslog messages,并使用RFC3164格式进行解析。
  4. redis:
    • Logstash 可以从redis服务器读取数据,此时redis类似于一个消息缓存组件。
  5. kafka:
    • Logstash 也可以从kafka集群中读取数据,kafka加Logstash的架构一般用在数据量较大的业务场景,kafka可用作数据的缓冲和存储。
  6. filebeat:
    • filebeat是一个文本日志收集器,性能稳定,并且占用系统资源很少,Logstash可以接收filebeat发送过来的数据。

常用的 filter

  1. filter插件主要用于数据的过滤、解析和格式化,也就是将非结构化的数据解析成结构化的、可查询的标准化数据。常见的filter插件有如下几个:
  2. grok: 正则捕获
    • grok是Logstash最重要的插件,可解析并结构化任意数据,支持正则表达式,并提供了很多内置的规则和模板可供使用。
    • 使用最多,但也最复杂。
  3. mutate: 数据修改
    • 提供了丰富的基础类型数据处理能力。包括类型转换,字符串处理和字段处理等。
  4. date:时间处理
    • 可以用来转换你的日志记录中的时间字符串。
  5. GeoIP:地址查询
    • 可以根据IP地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。

常用的 output

  1. output插件用于数据的输出,一个Logstash事件可以穿过多个output,直到所有的output处理完毕,这个事件才算结束。输出插件常见的有如下几种:
  2. elasticsearch:
    • 发送数据到elasticsearch。
  3. file:
    • 发送数据到文件中。
  4. redis:
    • 发送数据到redis中,从这里可以看出,redis插件既可以用在input插件中,也可以用在output插件中。
  5. kafka:
    • 发送数据到kafka中,与redis插件类似,此插件也可以用在Logstash的输入和输出插件中。

Logstash 配置文件入门

  1. /usr/local/logstash/config/, jvm.options是设置JVM内存资源的配置文件,logstash.yml是logstash全局属性配置文件,
  2. 另外还需要自己创建一个logstash事件配置文件,这里介绍下logstash事件配置文件的编写方法和使用方式。
  3. 在介绍Logstash配置之前,先来认识一下logstash是如何实现输入和输出的。
  4. Logstash提供了一个shell脚本/usr/local/logstash/bin/logstash,
  5. 可以方便快速的启动一个logstash进程,在Linux命令行下,运行如下命令启动Logstash进程:
1
2
[root@logstash elk]# cd /usr/local/logstash/
[root@logstash logstash]# bin/logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

1
2
3
4
5
1. -e代表执行的意思。
2. input即输入的意思,input里面即是输入的方式,这里选择了stdin,就是标准输入(从终端输入)。
3. output即输出的意思,output里面是输出的方式,这里选择了stdout,就是标准输出(输出到终端)。
4. 这里的codec是个插件,表明格式。这里放在stdout中,表示输出的格式,
5. rubydebug是专门用来做测试的格式,一般用来在终端输出JSON格式。
1
2
3
4
5
1. 这就是logstash的输出格式。Logstash在输出内容中会给事件添加一些额外信息。
2. 比如"@version"、"host"、"@timestamp" 都是新增的字段,
3. 而最重要的是@timestamp ,用来标记事件的发生时间。
4. 由于这个字段涉及到Logstash内部流转,如果给一个字符串字段重命名为@timestamp的话,Logstash就会直接报错。
5. 另外,也不能删除这个字段。

编写事件文件

  1. 在logstash的输出中,常见的字段还有type,表示事件的唯一类型,
  2. tags,表示事件的某方面属性,我们可以随意给事件添加字段或者从事件里删除字段。
  3. 使用-e参数在命令行中指定配置是很常用的方式,但是如果logstash需要配置更多规则的话,就必须把配置固化到文件里,这就是logstash事件配置文件
  4. 如果把上面在命令行执行的logstash命令,写到一个配置文件logstash-1.conf中,就变成如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@logstash logstash]# vim logstash-1.conf 

input {
stdin{}
}
output {
stdout{codec => rubydebug}
}

[root@logstash logstash]# bin/logstash -f logstash-1.conf

# 通过这种方式也可以启动logstash进程,不过这种方式启动的进程是在前台运行的,要放到后台运行,
# 可通过nohup命令实现,操作如下:
[root@logstash logstash]# nohup bin/logstash -f logstash-simple.conf &
# 这样,logstash进程就放到了后台运行了,在当前目录会生成一个nohup.out文件,
# 可通过此文件查看 logstash 进程的启动状态。

input 输入插件 file

  1. logstash启动后会去监控messages文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@logstash logstash]# vim logstash-1.conf 

input {
file{
path => "/var/log/messages"
}
}
output {
stdout{codec => rubydebug}
}

[root@logstash logstash]# bin/logstash -f logstash-1.conf

# 我们发现日志产生的时间是世界时间,不是东八区,后续我们会用方法修改
# file用来获取文件懂得输入,多了path文件路径字段,实时监控信息
# 对于output插件,这里仍然采用rubydebug的JSON输出格式,这对于调试logstash输出信息是否正常非常有用。
# 如果需要监控多个文件,可以通过逗号分隔即可,例如:
# path => ["/var/log/*.log","/var/log/message","/var/log/secure"]

output 输出插件 输出到kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@logstash logstash]# vim logstash_in_kafka.conf

input{
file {
path => ["/var/log/messages","/var/log/secure"]
}
}
output {
kafka {
bootstrap_servers => "172.17.70.232:9092,172.17.70.233:9092,172.17.70.234:9092"
topic_id => "osmessages"
}
}
1
2
3
4
5
1. 这个配置文件中,输入input仍然是file,重点看输出插件,这里定义了output的输出源为kafka,
2. 通过bootstrap_servers选项指定了kafka集群的IP地址和端口。
3. 特别注意这里IP地址的写法,每个IP地址之间通过逗号分隔。
4. output输出中的topic_id选项,是指定输出到kafka中的哪个topic下,
5. 这里是osmessages,如果无此topic,会自动重建topic。
1
2
3
4
5
6
# kafka 消费数据
[root@kafkazk1 kafka]# bin/kafka-console-consumer.sh --zookeeper 172.17.70.232:2181,172.17.70.233:2181,172.17.70.234:2181 --topic osmessages

# 放到后台 持续收集
[root@logstash logstash]# nohup bin/logstash -f logstash_in_kafka.conf &
[root@logstash logstash]# ps -ef|grep java

收集 filebeat端口发来的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 5044是接收端口,filebeat把数据发送到他的5044端口上,logstash收集

[root@logstash logstash]# vim logstash_in_filebeat.conf

input {
beats {
port => 5044
}
}

output {
stdout{
codec => rubydebug
}
}

# Starting server on port: 5044 # 本地会被启动5044端口用来接收数据
# 本次不用后台运行,在输出到终端
[root@logstash logstash]# bin/logstash -f logstash_in_filebeat.conf

[root@logstash logstash]# netstat -tnlp | grep 5044
tcp 0 0 0.0.0.0:5044 0.0.0.0:* LISTEN 1267/java

1
2
3
4
5
6
7
8
9
# 配置filebeat
# 配置kafka先注释掉,输出改用 logstash

# 重启filebeat
[root@filebeat1 filebeat]# kill -9 `pgrep -f filebeat`
[root@filebeat1 filebeat]# nohup ./filebeat -e -c filebeat.yml &
[root@filebeat1 filebeat]# tail -200 nohup.out

# ssh连接故意输错密码,在查看logstash终端收集的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 把输出改成kafka
# 必须指定json格式

[root@logstash logstash]# vim logstash_in_filebeat.conf

input {
beats {
port => 5044
}
}

output {
kafka {
codec => json
bootstrap_servers => "172.17.70.232:9092,172.17.70.233:9092,172.17.70.234:9092"
topic_id => "osmessages"
}

}

[root@logstash logstash]# bin/logstash -f logstash_in_filebeat.conf

配置logstash作为转发节点

  1. 上面对logstash的使用做了一个基础的介绍,现在回到本节介绍的这个案例中,在这个部署架构中,
  2. logstash是作为一个二级转发节点使用的,也就是它将kafka作为数据接收源,然后将数据发送到elasticsearch集群中,
  3. 按照这个需求,新建logstash事件配置文件 kafka_os_into_es.conf,内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@logstash logstash]# vim kafka_os_into_es.conf

input {
kafka {
bootstrap_servers => "172.17.70.232:9092,172.17.70.233:9092,172.17.70.234:9092"
topics => ["osmessages"]
codec => json
}
}
output {
elasticsearch {
hosts => ["172.17.70.229:9200","172.17.70.230:9200","172.17.70.231:9200"]
index => "osmessageslog-%{+YYYY-MM-dd}"
}
}

[root@logstash logstash]# nohup bin/logstash -f kafka_os_into_es.conf &
1
2
3
# 注意
topics
index 指定存到ES里数据 索引 的名称 必须指定 页面可以通过索引查询

安装并配置Kibana展示日志数据

下载与安装Kibana

  1. kibana使用Node.js(JavaScript)语言编写,安装部署十分简单,即下即用,可以从elastic官网https://www.elastic.co/cn/downloads/kibana 下载所需的版本,

  2. 这里需要注意的是Kibana与Elasticsearch的版本必须一致,另外,在安装Kibana时,要确保Elasticsearch、Logstash和kafka已经安装完毕。

  3. 这里安装的版本是kibana-6.3.2-linux-x86_64.tar.gz。将下载下来的安装包直接解压到一个路径下即可完成kibana的安装,根据前面的规划,

  4. 将kibana安装到server2主机上,然后统一将kibana安装到/usr/local目录下,基本操作过程如下:

1
2
3
[root@server2 elk]# tar -zxvf kibana-6.3.2-linux-x86_64.tar.gz -C /usr/local

[root@server2 elk]# mv /usr/local/kibana-6.3.2-linux-x86_64 /usr/local/kibana

配置 Kibana

  1. 由于将Kibana安装到了/usr/local目录下,因此,Kibana的配置文件为/usr/local/kibana/config/kibana.yml,

  2. Kibana 配置非常简单,这里仅列出常用的配置项,内容如下:

1
2
3
4
5
6
[root@server2 kibana]# vim /usr/local/kibana/config/kibana.yml
[root@server2 kibana]# grep ^'[a-Z]' /usr/local/kibana/config/kibana.yml
server.port: 5601
server.host: "172.17.70.230"
elasticsearch.url: "http://172.17.70.231:9200"
kibana.index: ".kibana6"
  1. 每个配置项的含义介绍如下
    • 阿里云服务器 用公网地址 开访问端口 5601
  2. server.port:
    • kibana绑定的监听端口,默认是5601。
  3. server.host:
    • kibana绑定的IP地址,如果内网访问,设置为内网地址即可。
  4. elasticsearch.url:
    • kibana访问ElasticSearch的地址,如果是ElasticSearch集群,添加任一集群节点IP即可,
    • 官方推荐是设置为ElasticSearch集群中client node角色的节点IP。
  5. kibana.index:
    • 用于存储kibana数据信息的索引,这个可以在kibanaweb界面中看到。

启动Kibana服务与web配置

  1. 启动kibana服务的命令在/usr/local/kibana/bin目录下,执行如下命令启动kibana服务:
1
2
3
4
5
6
7
8
9
[root@server2 kibana]# cd /usr/local/kibana/

[root@server2 kibana]# nohup bin/kibana &

[root@server2 kibana]# ps -ef|grep node
root 1625 1237 6 10:44 pts/0 00:00:01 bin/../node/bin/node --no-warnings bin/../src/cli
root 1637 1237 0 10:45 pts/0 00:00:00 grep --color=auto node

http://60.205.217.112:5601/app/kibana

创建索引

1
2
3
4
之前我们在 logstash定义了 这个索引 
osmessageslog-*

# 根据时间排序

查看数据

1
以后不用再去每台服务器查看日志

1
2
自定义 按照字段搜索
message:Failed

1
2
3
4
5
# 收集的字段与我们filebeat 过滤的字段 息息相关 如果加上host 就可以知道来自哪台服务器,添加上就有了

processors:
- drop_fields:
fields: ["beat", "input", "source", "offset","prospector","host"]

修改filebeat的过滤

1
2
3
4
[root@filebeat1 filebeat]# kill -9 `pgrep -f filebeat`
[root@filebeat1 filebeat]# nohup ./filebeat -e -c filebeat.yml &
# 再去访问 添加上host字段
# 在filebeat配置文件里 配置name = 服务器IP 就可现实IP地址,否则是默认主机名

调试并验证日志数据流向

  1. 经过上面的配置过程,大数据日志分析平台已经基本构建完成,由于整个配置架构比较复杂,这里来梳理下各个功能模块的数据和业务流向。