- ES
- logstash
- yum安装
- 重要插件配置
- grok
- date插件
- mutate
- input => filter => output
- input常用支持插件
- file读取
- beats监听
- output输出插件
- elasticsearch
- file
- 示例
- kibana
- tar安装
- 如何管理索引
- **Advanced Settings**
- filebeat
- ELK配置示例
- 日志文件中分隔出普通日志和错误日志
- 日志源文件
- logstash
- filebeat
- kibana
- kibana搜索示例
es的安装忽略, 这一块随便搜索资料安装即可,或者参考之前写的elasticsearch安装
logstash yum安装Download and install the public signing key:
sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
进入到/etc/yum.repos.d/目录下,新建文件logstash.repo,填入以下内容,注意不需要更改
[logstash-7.x] name=Elastic repository for 7.x packages baseurl=https://artifacts.elastic.co/packages/7.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
# 查看可以安装的版本 yum list logstash --showduplicates | sort -r # 安装7.15.1 yum install logstash-7.15.1 # 启动 sudo systemctl start logstash.service
- /etc/logstash/ 配置文件目录
- jvm.options jvm配置,默认启动-Xms -Xmx都是1g,可以按需调整
- logstash.yml logstash主配置文件
- path.logs: /var/log/logstash 日志存放目录
- path.data: /var/lib/logstash 数据存放目录
- path.config 配置文件存放目录
- pipelines.yml
- path.config: “/etc/logstash/conf.d/*.conf” 可以把收集每个内容的配置问价写到这个目录下,以.conf结尾
- /var/log/logstash/ 日志目录
- /usr/share/logstash/ 脚本执行目录
标准输入输出测试
先停掉服务
systemctl stop logstash.service
cd /usr/share/logstash/bin
# 需要等待一会
./logstash -e 'input { stdin {} } output { stdout {} }'
在控制台输入的字符会被输出出来
主要是用来提炼消息内容,如果有些消息内容我们需要取出来后面要用到,可以用到表达式将内容解析出来
一个在线debug工具http://grokdebug.herokuapp.com/
日志原文
[2022-08-03 14:05:51.528] [2#2-1554710116417277952] INFO [http-nio-8086-exec-9] c.d.b.c.c.l.AccessLogAspect - [com.ddf.group.purchase.core.controller.AuthController]-[currentUser]请求参数: {}, 执行返回结果: {"userId":"2","username":"18356784598","credit":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36","lastModifyPasswordTime":null,"remarks":null,"disabled":false,"detail":null}, 共耗时: [0ms]
现在我想将这段文字结构化,内容如下,格式和正则很像,匹配到的内容后续都可以通过[]直接引用, 如后面会用到的output模块
- 转义字符
- s匹配空格
- ()用来包裹一个表达式的解析还有对应解析内容后的变量命名
?<>被包裹的内容即是变量名,后面跟解析表达式
- .匹配任意字符
[%{TIMESTAMP_ISO8601:logtime}]s[(?.*)#(?.*)]s(?[A-Z]{4,5})s[(?.*)]s(?.*)s-s(?.*)
date插件
https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match
date插件用于对日期进行匹配和替换,拿前面grok到的数据来说,日志中我们打印的日期格式是认为很容易认识的yyyy-MM-dd HH:mm:ss.SSS,这种格式丢失了时区,由于es存储时间都统一存储为UTC格式,如果我们不对时间做处理,时间存储进去之后就已经乱掉了。
接上面grok到的内容,现在我需要匹配到时间,并重新将这个时间重写到@timestamp字段中
date {
match=> [ "logtime", "yyyy-MM-dd HH:mm:ss.SSS"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
其实match的真正含义就是去匹配字符而已, 它是一个数组, 如果你有多种时间格式,这里可以写多个,每个参数之间没有前后关系,像上面这个并不是代表logtime必须是后面那个 yyyy-MM-dd HH:mm:ss.SSS格式才算match.
target就是要将匹配的这个时间重写到es字段的字段@timestamp中去,这个字段会是后面非常重要的各种时间过滤用到的
timezone,可用的时区列表可以在这个链接都看到,http://joda-time.sourceforge.net/timezones.html, 这个timezone字段在别的文章里我都没看到,不明白怎么解决时区的问题的,这个还是在官方文档中看到有这个属性,然后由于我们日志打印的字符根本没有时区概念,所以这个时间打印出来的时候就已经确定是什么时区的,这里要指定,然后es才能正确转换为UTC存储。
经过上面两个步骤后,时间已经被写入到@timestamp了,那么我们自己的字段是不是就没有意义了,可以通过这个插件删掉,当然这不是必须的
mutate {
remove_field => ["logtime"]
}
input => filter => output
https://www.cnblogs.com/JetpropelledSnake/p/9889275.html
logstash的处理分类三个阶段, 即输入(数据源) 、 过滤、 输出
通过这三个标签, 数据从哪个地方采集,经过什么样的处理,最终输出到哪里。对应的配置标签
input {
}
filter {
}
output {
}
其中每个标签又都支持不同的插件
input常用支持插件 file读取用来抓取文件变化,然后发送数据
配置示例
input
file {
path => ["/data/logs/*.log", "/var/log/message"]
type => "app"
start_position => "beginning"
}
}
- path 配置格式array, 指定从哪个目录读取文件,支持通配符
- type 配置格式string, 类型, 如果输出插件为elasticsearch的话, 默认情况作为es的type
- start_position 配置格式string, 参考值"beginning"和"end", 从文件开头还是结尾开始导入数据,默认结尾。只有第一次导入有效, 如果已经导入,则该配置无效
- tags 配置格式array, 用来增加标签,以便于在后续的处理流程中使用
- delimiter 配置格式string,默认"n", 文件内容的行分隔符
Beats插件用于建立监听服务,接收Filebeat或者其他beat发送的Events
由于做日志收集时,logstash比filebeat占用更多的资源,因此logstash一般都使用beats作为输入源,然后其它主机安装filebeat收集自己的日志然后输出到logstash所在的主机
配置示例
input {
beats {
port => 5044
}
}
- host 配置格式string, 监听的ip地址,默认"0.0.0.1"
- port 配置格式number, 监听服务监听的端口
用于将事件信息写入到Elasticsearch中
配置示例
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
user => "elastic"
password => "sssssss"
index => "filebeat-%{type}-%{+yyyy.MM.dd}"
template_overwrite => true
}
}
- hosts 配置格式array, 如果有多个配置为[“127.0.0.1:9200”,“127.0.0.1:9400”]
- user 配置格式string, 访问es集群的用户名
- password 配置格式string, 访问es集群的密码
- index 将数据解析到哪个索引,比如可以按照天创建索引, 方便删除历史数据或者查询指定范围内数据
- template_overwrite 是否使用覆盖先有模板
这里也可以写一些判断语法,引用前面或者filebeat传输过来的字段值,然后用来一些动态处理。比如可以使用grok解析出日志的级别, 如果是ERROR级别,可以将日志再单独写入到一个错误日志索引里去。
再比如一个logstash要监听很多个filebeat传输过来的文件,但是索引不同,那就需要filebeat传输的时候添加自定义字段进行标识,然后再这里进行判断写入到哪个索引里。
如下提供一个示例
- fields.idx 是filebeat添加的自定义字段,如果是层级关系,在logstash里的语法要写成[fields][idx]
- [level]这个则是通过grok插件解析出来的日志级别的变量,这里可以直接引用
output {
# stdout {
# codec => rubydebug
# }
if [fields][idx] == "boot-quick-error" {
elasticsearch {
hosts => "www.snowball.fans:9200"
user => "elastic"
password => "Aa&123456"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "boot-quick-error-%{+YYYY.MM.dd}"
}
}
if [fields][idx] == "boot-quick" {
elasticsearch {
hosts => "www.snowball.fans:9200"
user => "elastic"
password => "Aa&123456"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "boot-quick-log-%{+YYYY.MM.dd}"
}
}
if [fields][idx] == "group-purchase-manage" {
elasticsearch {
hosts => "192.168.0.7:9200"
user => "elastic"
password => "Aa&123456"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "group-purchase-manage-log-%{+YYYY.MM.dd}"
}
}
if [fields][idx] == "group-purchase-manage" and [level] == "ERROR" {
elasticsearch {
hosts => "192.168.0.7:9200"
user => "elastic"
password => "111111"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "group-purchase-manage-error-%{+YYYY.MM.dd}"
}
}
}
file
用户将事件数据输出到文件内
output {
file {
path => ...
codec => line { format => "custom format: %{message}"}
}
}
示例
在logstash/conf.d下新建一个boot-quick.conf,用来手机日志直接投递到es
input{
file {
path => "/opt/services/boot-quick/logs/boot-quick.log"
start_position => "beginning"
type => "boot-quick.log"
add_field => {
"log-type" => "log"
}
}
file {
path => "/opt/services/boot-quick/logs/boot-quick-error.log"
start_position => "beginning"
type => "boot-quick-error.log"
add_field => {
"log-type" => "error"
}
}
}
output{
elasticsearch {
hosts => "www.snowball.fans:9200"
user => "elastic"
password => "Aa&123456"
# 索引格式即为boot-quick-上面的log或者error-当天时间格式,如boot-quick-log-2022.03.30
index => "boot-quick-%{log-type}-%{+YYYY.MM.dd}"
}
}
kibana
tar安装
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.15.0-linux-x86_64.tar.gz tar -zxvf kibana-7.15.0-linux-x86_64.tar.gz # 启动,默认前台启动,默认不能用root账号启动,按照之前创建的es用户使用就行了 ./bin/kibana
Kibana 默认情况下从 $KIBANA_HOME/config/kibana.yml 加载配置文件。该配置文件的格式在 配置 Kibana 中做了说明。
.tar.gz安装目录
| 类型 | 描述 | 默认位置 |
|---|---|---|
| home | Kibana home 目录或 $KIBANA_HOME | 解压包时创建的目录 |
| bin | 二进制脚本,包括 kibana 启动 Kibana 服务和 kibana-plugin 安装插件 | $KIBANA_HOMEbin |
| config | 配置文件,包括 kibana.yml 。 | $KIBANA_HOMEconfig |
| data | Kibana 和其插件写入磁盘的数据文件位置 | |
| $KIBANA_HOMEdata | ||
| plugins | 插件文件位置。每一个插件都有一个单独的二级目录 | $KIBANA_HOMEplugins |
| optimize | 编译过的源码。某些管理操作(如,插件安装)导致运行时重新编译源码。 | $KIBANA_HOMEoptimize |
**kibana.yml**示例配置
# kibana http服务端口 server.port: 5601 # 默认localhost以及其它配置的回环地址, 都不能被远程访问。要配置成内网或外网ip server.host: xxxxx # es集群地址 elasticsearch.hosts: ["http://localhost:9200","http://localhost:9400"] # 如果es是收到基本权限保护的话,这里提供用户名和密码 elasticsearch.username: "elastic" elasticsearch.password: "xxxxxxx" # 配置国际化 # Specifies locale to be used for all localizable strings, dates and number formats. # Supported languages are the following: English - en , by default , Chinese - zh-CN . i18n.locale: "zh_CN"如何管理索引
左上角菜单展开->翻到最下面Management->Stack Management
进入Stack Management菜单后,选择Index Management, 这里即索引管理, 添加到es中的索引都会在这个列表展示。包括我们用filebeat或者logstash使用index指定的索引
进入Stack Management菜单后,选择Index Patterns,即索引匹配, 我们可以定义一个表达式来匹配已有的索引,这样可以将一个索引或多个索引的数据通过索引匹配给映射起来,这样再通过kibana就可以可视化对应索引规则里的索引里的数据了。
比如对应着上面索引管理里的索引,我们创建索引匹配规则
使用boot-quick-log* 去匹配所有boot-quick-log开头索引数据
使用boot-quick-error*去匹配所有boot-quick-error开头的索引数据
同时选择一个用来做数据过滤的字段,如下@timestamp
查看索引匹配规则到的数据
从左侧菜单展开,选择Discover即可
查看文档内容, 如下图,详细解释了每个布局的作用
如上图, 发现@timestamp的格式不太对,我们想要修改一下格式。除了logstash grok插件解析时间外, 也还是在这个地方要修改一下字段的格式
进入Stack Management菜单后,选择Index Patterns,在列表里找到我们的索引,然后点击进去,找到@timestamp字段的编辑按钮
打开Set format, 指定自己需要的格式, 如YYYY-MM-DD HH:mm:ss.SSS
如果每个索引匹配都改一下岂不是很麻烦,后面会在Advanced Setting统一修改
Advanced Settings在Kibana的Stack Management菜单下,有一个选项Advanced Setting, 这里可以修改一些全局的配置,如上面索引的时间格式
版本列表: https://www.elastic.co/cn/downloads/past-releases#filebeat
一般用来收集日志然后输出到logstash中,而不是直接输出到es,统一由logstash做格式转换和输出到es
tar安装
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.15.0-linux-x86_64.tar.gz tar -zxvf filebeat-7.15.0-linux-x86_64.tar.gz
解压后文件目录
| filebeat | 启动文件 |
|---|---|
| filebeat.yml | 配置文件 |
| modules.d | 简化一些日志配置的模块 |
| fields.yml | 好像是一些预定义字段,用于收集到es |
启动
nohup /opt/filebeat/filebeat/filebeat -e -c /opt/filebeat/filebeat/filebeat.yml -path.logs /opt/filebeat/logs &
不过这种启动方式, 如果日志变动不频繁,隔断时间之后filebeat就会自动终止, 建议使用supervistor 来管理进程
收集日志发送到logstash
filebeat.yml
filebeat.inputs:
- type: log
# false禁用采集工作
enable: true
# 指定采集目录
paths:
- /opt/services/boot-quick/logs/boot-quick.log
# 日志多行合并采集表达式, 一般用来匹配异常堆栈, at.... at... at...
multiline.pattern: '^[s]+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*'
# 是否否定多行日志参数,false代表不否定,即如果匹配到上面的表达式就要做什么
multiline.negate: false
# 用来确定匹配的多行日志是属于上一行的后面还是下一行的前面, 结合前两个参数则代表匹配的文本数据还是属于上一行的
multiline.match: after
# 添加自定义字段
fields:
env: dev
author: ddf
# 是否将上面添加的字段定义到根属性上,默认false, 即显示时字段都挂载到根属性fields下,如果为true,每个自定义字段都是在根属性下独立的属性
fields_under_root: false
output:
# 输出到logstash中, logstash更换为自己的ip
logstash:
hosts: ["logstash:5044"]
收集的日志附带了非常多的无用属性,都是一些主机信息的,比如host. agent``ecs等,这些属性太多严重影响查看关键字段,可以把这些字段去掉。如下图
进入filebeat.yml
找到如下内容,修改如下后重启filebeat即可
processors:
# 注释掉默认元数据处理器,默认启用就注释,没有就算
# - add_host_metadata:
# when.not.contains.tags: forwarded
# - add_cloud_metadata: ~
# - add_docker_metadata: ~
# - add_kubernetes_metadata: ~
# 按需删除不要字段
- drop_fields:
fields: ["ecs","host","agent"]
modules
默认在安装目录会有一个modules目录,里面有许多预制的模块日志采集模板,可以通过一下命令操作开启和关闭
# 查看模块列表 ./filebeat modules list # 关闭 ./filebeat modules disable nginx # 开启 ./filebeat modules enable nginxELK配置示例 日志文件中分隔出普通日志和错误日志 日志源文件
- /opt/services/boot-quick/logs/ 这个目录下采集两个日志,一个普通日志,一个错误日志,然后添加自定义字段,logstash输出的时候根据自定义字段判断,然后输出到不同的索引
- /data/logs/group-purchase-manage/group-purchase.log, 直接收集日志到logstash, 然后logstash根据grok解析出日志级别,根据日志级别决定将内容输出到哪个索引
启动beats监听, 配置output
进入logstash配置目录, 如/etc/logstash,在conf.d新建一个beats.conf,名字无所谓,主要是后缀和所在目录
日志原文内容
[2022-08-03 14:05:51.528] [2#2-1554710116417277952] INFO [http-nio-8086-exec-9] c.d.b.c.c.l.AccessLogAspect - [com.ddf.group.purchase.core.controller.AuthController]-[currentUser]请求参数: {}, 执行返回结果: {"userId":"2","username":"18356784598","credit":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36","lastModifyPasswordTime":null,"remarks":null,"disabled":false,"detail":null}, 共耗时: [0ms]
通过grok表达式去结构化数据,在线工具http://grokdebug.herokuapp.com/
表达式如下
[%{TIMESTAMP_ISO8601:logtime}]s[(?.*)#(?.*)]s(?[A-Z]{4,5})s[(?.*)]s(?.*)s-s(?.*)
结构化出来的数据如下
{
"logtime": [
[
"2022-08-03 14:05:51.528"
]
],
"YEAR": [
[
"2022"
]
],
"MONTHNUM": [
[
"08"
]
],
"MONTHDAY": [
[
"03"
]
],
"HOUR": [
[
"14",
null
]
],
"MINUTE": [
[
"05",
null
]
],
"SECOND": [
[
"51.528"
]
],
"ISO8601_TIMEZONE": [
[
null
]
],
"user": [
[
"2"
]
],
"traceId": [
[
"2-1554710116417277952"
]
],
"level": [
[
"INFO"
]
],
"thread": [
[
"http-nio-8086-exec-9"
]
],
"class": [
[
"c.d.b.c.c.l.AccessLogAspect"
]
],
"message": [
[
"[com.ddf.group.purchase.core.controller.AuthController]-[currentUser]请求参数: {}, 执行返回结果: {"userId":"2","username":"18356784598","credit":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36","lastModifyPasswordTime":null,"remarks":null,"disabled":false,"detail":null}, 共耗时: [0ms]"
]
]
}
beats.conf配置内容如下
input {
beats {
port => 5044
client_inactivity_timeout => 36000
}
}
filter {
grok {
match => {
# [user#traceId] [2022-03-30 23:35:58.988] INFO [http-nio-8082-exec-5] c.d.b.c.c.l.AccessLogAspect - 消息内容blabla
# "message" => "[%{TIMESTAMP_ISO8601:logtime}]s(?[A-Z]{4,5})s[(?.*)]s(?.*)s-s(?.*)"
"message" => "[%{TIMESTAMP_ISO8601:logtime}]s[(?.*)#(?.*)]s(?[A-Z]{4,5})s[(?.*)]s(?.*)s-s(?.*)"
}
add_field => ["user","traceId"]
# remove_field => ["@timestamp"]
}
date {
match=> [ "logtime", "yyyy-MM-dd HH:mm:ss.SSS"]
# match=> [ "logtime", "ISO8601"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
# 删除我们的字段。不删除的话es:如果正则匹配成功,一条记录就会有2个时间字段
# remove_field => ["logtime"]
}
}
output {
# stdout {
# codec => rubydebug
# }
if [fields][idx] == "boot-quick-error" {
elasticsearch {
hosts => "www.snowball.fans:9200"
user => "elastic"
password => "Aa&123456"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "boot-quick-error-%{+YYYY.MM.dd}"
}
}
if [fields][idx] == "boot-quick" {
elasticsearch {
hosts => "www.snowball.fans:9200"
user => "elastic"
password => "Aa&123456"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "boot-quick-log-%{+YYYY.MM.dd}"
}
}
if [fields][idx] == "group-purchase-manage" {
elasticsearch {
hosts => "www.snowball.fans:9200"
user => "elastic"
password => "Aa&123456"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "group-purchase-manage-log-%{+YYYY.MM.dd}"
}
}
if [fields][idx] == "group-purchase-manage" and [level] == "ERROR" {
elasticsearch {
hosts => "www.snowball.fans:9200"
user => "elastic"
password => "Aa&123456"
# 因为索引无法有大写字母,这里没有直接使用%{level}引用
index => "group-purchase-manage-error-%{+YYYY.MM.dd}"
}
}
}
filebeat
修改filebeat.yml内容如下
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/services/boot-quick/logs/boot-quick.log
multiline.pattern: '^[s]+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*'
multiline.max_lines: 500
# 不否定多行模式,即使用多行匹配模式
multiline.negate: false
# 正则表达式匹配的数据是上一行记录的后半部分数据,即匹配到的数据和上面的记录要合并>到同一行
multiline.match: after
# 添加自定义字段
fields:
idx: boot-quick
author: ddf
env: dev
- type: log
enabled: true
paths:
- /opt/services/boot-quick/logs/boot-quick-error.log
multiline.pattern: '^[s]+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*'
multiline.max_lines: 500
# 不否定多行模式,即使用多行匹配模式
multiline.negate: false
# 正则表达式匹配的数据是上一行记录的后半部分数据,即匹配到的数据和上面的记录要合并>到同一行
multiline.match: after
# 添加自定义字
fields:
idx: boot-quick-error
author: ddf
env: dev
- type: log
enabled: true
paths:
- /data/logs/group-purchase-manage/group-purchase.log
multiline.pattern: '^s+at|^org.*|^java.*|^c[a-zA-Z]+.[a-zA-Z]+.+Exception:.*'
multiline.max_lines: 500
# 不否定多行模式,即使用多行匹配模式
multiline.negate: false
# 正则表达式匹配的数据是上一行记录的后半部分数据,即匹配到的数据和上面的记录要合并>到同一行
multiline.match: after
# 添加自定义字
fields:
idx: group-purchase-manage
author: ddf
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
kibana
按照kibana相关的配置增加索引,参考Kibana章节
kibana搜索示例特殊符号转移用
搜索字段如果要连贯要用""包裹
一段话,中间部分无法确定,可以使用and连接一个搜索满足多个条件



