- 一、Logstash架构介绍
- 1.1 为什么需要Logstash
- 1.2 什么是Logstash
- 1.3 Logstash架构
- 1.4 安装Logstash
- 二、Logstash input插件
- 2.1 stdin插件
- 2.2 file插件
- 2.3 beats 插件
- 2.4 kafka插件
- 三、 Logstash Filter插件
- 3.1 Grok插件
- 3.1.1 grok如何出现的?
- 3.1.2 grok解决什么问题
- 3.1.3 grok语法示意图
- 3.1.4 grok语法示例
- 3.2 geoip插件
- 3.3 fields字段
- 3.4 Date插件
- 3.5 useragent插件
- 3.6 mutate 插件
- 3.6.1 remove_field
- 3.6.2 split
- 3.6.3 add_field
- 3.6.4 convert
- 四、Logstash Output插件
- 4.1 stdout插件
- 4.2 file插件
- 4.3 elastic插件
- 五、Logstash分析App日志
- 5.1
- 5.2 APP日志收集架构
- 5.3 步骤
- 5.4 APP日志收集实践
- 六、Logstash分析Nginx日志
- 6.1 架构
- 6.2 实现
- 七、Logstash分析MySQL日志
- 7.1 什么是慢日志
- 7.2 为什么要收集慢日志
- 7.2 架构
- 7.3 思路
- 7.4 配置mysql
- 7.5 配置filebeat
- 7.6 配置 logstash
- 7.7 配置kibana
对于部分生产上的日志无法像 Nginx 那样,可以直接将输出的日志转为 Json 格式,但是可以借助 Logstash来将我们的 ”非结构化数据“,转为 “结构化数据”;
1.2 什么是Logstashfilbeat --> logstash(input fileter output) --es
Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。。官网
1.3 Logstash架构Logstash 的基础架构类似于 pipeline 流水线,主要分为如下步骤:
- Input:数据采集(常用插件:stdin、file、kafka、beat、http)
- Filter:数据解析/转换(常用插件:grok、date、geoip、mutate、useragent)
- Output:数据输出 (常用插件:Elasticsearch)
[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.rpm [root@localhost ~]# yum install java -y [root@localhost ~]# rpm -ivh logstash-7.8.1.rpm # 修改配置文件 [root@logstash-node1 ~]# vim/etc/logstash/logstash.yml node.name: logstash-node1 path.data: /var/lib/logstash # 数据存放路径 pipeline.workers: 2 pipeline.batch.size: 1000 # 同时处理条目 path.logs: /var/log/logstash # 日志目录 # 堆内存大小调整 [root@logstash-node1 ~]# vim/etc/logstash/jvm.options -Xms1g -Xmx1g二、Logstash input插件
input 插件用于指定输入源,一个 pipeline 可以有多个 input 插件,我们主要围绕下面几个 input插件进行介绍
stdin file beat kafka http2.1 stdin插件
从标准输入读取数据,从标准输出中输出内容;
[root@logstash-node1 ~]# cat /etc/logstash/conf.d/stdin_logstash.conf
# 从终端中输入,输出到中端
input {
stdin {
type => "stdin" # 自定义事件类型
tags => "stdin_type" # 自定义tag,用于后续事件判断
}
}
output {
stdout {
codec => "rubydebug"
}
}
# 执行 -r 表示不停止logstash下修改配置文件
[root@logstash-node1 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/stdin_logstash.conf -r
中端中输入 test logstash,返回结果如下
"type" => "stdin",
"message" => "test logstash",
"host" => "logstash-node1",
"tags" => [
[0] "stdin_type"
],
"@version" => "1",
"@timestamp" => 2021-10-30T07:34:57.440Z
}
2.2 file插件
从 file 文件中读取数据,然后输入至标准输入;
[root@logstash-node1 conf.d]# cat file_logstash.conf
input {
file {
path => "/var/log/test.log"
type => "syslog"
exclude => "*.gz" # 不监听的文件
start_position => "beginning" # 第一次从头开始读取文件 beginning or end
stat_interval => "3" #定时检查文件是否更新,默认1s
}
}
output {
stdout {
codec => rubydebug
}
}
往日志中追加,查看反馈
[root@logstash-node1 ~]# echo "file logstash" > /var/log/test.log
{
"@version" => "1",
"path" => "/var/log/test.log",
"type" => "syslog",
"@timestamp" => 2021-10-30T07:58:42.699Z,
"host" => "logstash-node1",
"message" => "file logstash"
}
2.3 beats 插件
从filebeat文件中读取数据,然后输入至标准输入;
input {
beats {
port => 5044 # filebeat 发送数据到logstash主机的5044端口
}
}
output {
stdout {
codec => rubydebug
}
}
2.4 kafka插件
从kafka文件中读取数据,然后输出至标准输出;
input {
kafka {
zk_connect =>
"kafka1:2181,kafka2:2181,kafka3:2181"
group_id => "logstash"
topic_id => "apache_logs"
consumer_threads => 16
}
}
三、 Logstash Filter插件
数据从源传输到存储的过程中,Logstash 的 filter过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值;
利用 Grok 从非结构化数据中派生出结构
利用 geoip 从 IP 地址分析出地理坐标
利用 useragent 从 请求中分析操作系统、设备类型等
我们希望将如下非结构化的数据解析成 json 结构化数据格式
120.27.74.166 - - [30/Dec/2019:11:59:18+0800] "GET / HTTP/1.1" 302 154 "-""Mozilla/5.0 (Macintosh; Intel Mac OS X10_14_1) Chrome/79.0.3945.88Safari/537.36"
需要使用非常复杂的正则表达式;
[([^]]+)]s[(w+)]s([^:]+:sw+sw+s[^:]+:S+s[^:]+:S+sS+).*[([^]]+)]s [(w+)]s([^:]+:sw+sw+s[^:]+: S+s[^:]+:S+sS+).*[([^]]+)]s [(w+)]s([^:]+:sw+ sw+s[^:]+:S+s[^:]+:S+sS+).*3.1.2 grok解决什么问题
grok其实是带有名字的正则表达式集合。grok 内置了很多 pattern 可以直接使用;
需要翻 qiang
grok介绍
grok语法生成器
grok示例:使用 grok pattern 将 Nginx 日志格式化为 json 格式;
input {
http {
port => 5656
}
}
filter {
grok {
match => {
"message" => "%{COMBINEDAPACHELOG}"
}
}
}
output {
stdout {
codec => rubydebug
}
}
~
结果示例
{
"auth" => "-",
"request" => "/fonts/icons/icon.woff",
"timestamp" => "30/Oct/2021:10:53:18 +0800",
"bytes" => "43852",
"referrer" => ""http://elk.bertwu.net/css/style.css"",
"host" => "10.0.0.1",
"@version" => "1",
"headers" => {
"request_method" => "POST",
"http_host" => "10.0.0.151:5656",
"http_accept" => "**",
"http_user_agent" => "insomnia/2021.6.0",
"request_path" => "/",
"content_length" => "52",
"http_version" => "HTTP/1.1"
}
}
3.6.3 add_field
mutate 中 add_field,可以将分割后的数据创建出新的字段名称。便于以后的统计和分析
...
filter {
mutate {
split => { "message" => "|" }
#将分割后的字段添加到指定的字段名称
add_field => {
"UserID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
}
}
...
结果展示:
{
"Date" => "2019-12-28 03:18:31",
"Action" => "提交订单",
"@timestamp" => 2021-10-30T12:46:37.558Z,
"tags" => [
[0] "_grokparsefailure",
[1] "_geoip_lookup_failure"
],
"message" => [
[0] "5607",
[1] "提交订单",
[2] "2019-12-28 03:18:31"
],
"UserID" => "5607",
"host" => "10.0.0.1",
"@version" => "1",
"headers" => {
"request_method" => "POST",
"http_host" => "10.0.0.151:5656",
"http_accept" => "*/*",
"http_user_agent" => "insomnia/2021.6.0",
"request_path" => "/",
"content_length" => "37",
"http_version" => "HTTP/1.1"
}
}
3.6.4 convert
mutate 中的 convert类型转换。 支持转换integer、float、string等类型;
...
filter {
mutate {
split => { "message" => "|" }
#将分割后的字段添加到指定的字段名称
add_field => {
"UserID" => "%{[message][0]}"
"Action" => "%{[message][1]}"
"Date" => "%{[message][2]}"
}
#对新添加字段进行格式转换
convert => {
"UserID" => "integer"
"Action" => "string"
"Date" => "string"
}
#移除无用的字段
remove_field => ["headers","message"]
}
}
...
结果展示:
{
"Date" => "2019-12-28 03:18:31",
"Action" => "提交订单",
"@timestamp" => 2021-10-30T12:52:38.695Z,
"tags" => [
[0] "_grokparsefailure",
[1] "_geoip_lookup_failure"
],
"UserID" => "5607",
"host" => "10.0.0.1",
"@version" => "1"
}
四、Logstash Output插件
- stdout
- file
- elasticsearch
stdout 插件将数据输出到屏幕终端,便于调试;
output {
stdout {
codec => rubydebug
}
}
4.2 file插件
将结果输出到文件,实现将分散在多地的文件统一到一处:比如将所有 web 机器的 web 日志收集到一个文件中,从而方便查阅信息;
output {
file {
path => "/var/log/web.log"
}
}
4.3 elastic插件
一般情况下,我们都会输出到 elasticsearch
output {
elasticsearch {
#一般写data集群的主机地址
hosts => ["172.16.1.162:9200","172.16.1.163:9200"]
#索引名称
index => "nginx-%{+YYYY.MM.dd}"
#覆盖索引模板
template_overwrite => true
}
}
五、Logstash分析App日志
5.1
APP日志,主要是用来记录用户的操作,大体内容如下:
[INFO] 2019-12-28 04:53:36 [cn.oldxu.dashboard.Main] - DAU|8329|领取优惠券|2019-12-28 03:18:31 [INFO] 2019-12-28 04:53:40 [cn.oldxu.dashboard.Main] - DAU|131|评论商品|2019-12-28 03:06:275.2 APP日志收集架构 5.3 步骤
1.首先通过 Filebeat 读取日志文件中的内容,并且将内容发送给 Logstash;
2.Logstash接收到内容后,将数据转换为结构化数据。然后输出给Elasticsearch;
3.Kibana添加Elasticsearch索引,读取数据,然后在Kibana中进行分析,最后进行展示;
1.启动app产生日志
下载日志文件
[root@web01 ~]# wget http://cdn.xuliangwei.com/app-2020-08.log
2.配置filebeat
[root@web01 ~]# cat /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: /var/log/app.log output.logstash: hosts: ["172.16.1.151:5044"]
3.配置logstash
[root@logstash-node1 conf.d]# cat app_logstash.conf
input {
beats {
port => 5044
}
}
filter {
mutate {
split => { "message" => "|" }
add_field => {
"UserID" => "%{[message][1]}"
"Action" => "%{[message][2]}"
"Date" => "%{[message][3]}"
}
convert => {
"UserID" => "integer"
"Action" => "string"
"Date" => "string"
}
remove_field => ["message"]
}
date {
#2020-08-28 01:05:02
match => ["Date", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["172.16.1.162:9200","172.16.1.163:9200","172.16.1.161:9200"]
index => "app-%{+YYYY.MM.dd}"
template_overwrite => true
}
}
4.kibana上创建app索引展示效果
实现思路:
1.将 Nginx 普通日志转换为 json
2.将 Nginx 日志的时间格式进行格式化输出
3.将 Nginx 日志的来源IP进行地域分析
4.将 Nginx 日志的 user-agent 字段进行分析
5.将 Nginx 日志的 bytes 修改为整数
6.移除没有用的字段,message、headers
日志格式1:
14.145.74.175 - - [10/Nov/2020:00:01:53 +0800] "POST /course/ajaxmediauser/ HTTP/1.1" 200 54 "www.oldxu.com" "http://www.oldxu.com/video/678" mid=678&time=60&learn_time=551.5 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" 10.100.136.64:80 200 0.014 0.014
所用的grok语法:
grok {
match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:hostname} (?:%{QS:referrer}|-) (?:%{NOTSPACE:post_args}|-) %{QS:useragent} (?:%{QS:x_forward_for}|-) (?:%{URIHOST:upstream_host}|-) (?:%{NUMBER:upstream_response_code}|-) (?:%{NUMBER:upstream_response_time}|-) (?:%{NUMBER:response_time}|-)" }
}
日志格式2:
123.150.183.45 - - [22/Nov/2015:12:01:01 +0800] "GET /online/ppjonline/images/forms/validatePass.png HTTP/1.1" 200 370 "http://www.papaonline.com.cn/online/ppjonline/order/orderNow.jsp" "Mozilla/5.0 (Linux; U; Android 4.3; zh-CN; SCH-N719 Build/JSS15J) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 UCBrowser/9.9.5.489 U3/0.8.0 Mobile Safari/533.1"
所用的grok语法:
grok {
match => {
"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{data:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:useragent}"
}
}
1.配置filebeat
[root@web01 filebeat]# cat filebeat.yml filebeat.inputs: - type: log enabled: true paths: /var/log/nginx/access.log tags: ["inginx-access"] - type: log enabled: true paths: /var/log/nginx/error.log tags: ["nginx-error"] output.logstash: hosts: ["172.16.1.151:5044"]
2.配置logstash
[root@logstash-node1 conf.d]# cat nginx_beat_logstash_es.conf
input {
beats {
port => 5044
}
}
filter {
if "nginx-access" in [tags][0]{
grok {
match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:hostname} (?:%{QS:referrer}|-) (?:%{NOTSPACE:post_args}|-) %{QS:useragent} (?:%{QS:x_forward_for}|-) (?:%{URIHOST:upstream_host}|-) (?:%{NUMBER:upstream_response_code}|-) (?:%{NUMBER:upstream_response_time}|-) (?:%{NUMBER:response_time}|-)"}
}
useragent {
source => "useragent"
target => "useragent"
}
geoip {
source => "clientip"
}
date {
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
convert => ["bytes","integer"]
convert => ["response_time", "float"]
convert => ["upstream_response_time", "float"]
remove_field => ["message","agent"]
add_field => { "target_index" => "logstash-nginx-access-%{+YYYY.MM.dd}"}
}
}else if "nginx-error" in [tags][0]{
mutate {
add_field => { "target_index" => "logstash-nginx-error-%{+YYYY.MM.dd}" }
remove_field => ["agent"]
}
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["172.16.1.161:9200","172.16.1.162:9200","172.16.1.163:9200"]
index => "%{[target_index]}"
}
}
3.kibana生成图
1.总访问次数
2.独立ip总量
3.总产生流量
4.柱状图,时间段访问次数
5.柱状图,时间段流量趋势
6.饼图,浏览器设备、状态码、手机设备
当 SQL 语句执行时间超过所设定的阈值时,便会记录到指定的日志文件中,所记录内容称之为慢查询日志
7.2 为什么要收集慢日志数据库在运行期间,可能会存在 SQL 语句查询过慢,那我们如何快速定位、分析哪些 SQL语句需要优化处理,又是哪些 SQL 语句给业务系统造成影响呢?当我们进行统一的收集分析,SQL语句执行的时间,以及执行的SQL语句,一目了然。
7.2 架构 7.3 思路- 1.安装 MySQL;开启 MySQL 慢查询日志;
- 2 配置 filebeat 收集本地慢查询日志路径;
2.1 使用 exclude_lines 排除无用行;
2.2 使用 multiline 对内容进行合并; - 3 配置 logstash
3.1.使用 grok 插件将 mysql 慢日志格式化为 json格式;
3.2.使用 date 插件将 timestamp 时间转换为本地时间,然后覆盖 @timestamp;
3.3.检查 json 格式是否成功,成功后可以将没用的字段删除;
3.4.最后将输出到屏幕的内容,输出至Elasticsearch集群;
[root@db01 ~]# vim /etc/my.cnf [mysqld] ... slow_query_log=ON slow_query_log_file=/var/log/mariadb/slow.log # 需要mysql用户有权限写 long_query_time=3 ...
重启mysql ,并模拟产生慢日志
MariaDB [(none)]> select sleep(1) user,host from mysql.user; MariaDB [(none)]> select sleep(0.5) user,host from mysql.user; MariaDB [(none)]> select sleep(0.6) user,host from mysql.user;
慢日志格式
[root@sonarqube mysqldb]# vim slow.log # Time: 211031 17:22:29 # User@Host: root[root] @ localhost [] Id: 5 # Query_time: 7.013641 Lock_time: 0.000081 Rows_sent: 7 Rows_examined: 7 SET timestamp=1635672149; select sleep (1) user,host from mysql.user; # Time: 211031 17:22:51 # User@Host: root[root] @ localhost [] Id: 5 # Query_time: 3.518726 Lock_time: 0.000080 Rows_sent: 7 Rows_examined: 7 SET timestamp=1635672171; select sleep (0.5) user,host from mysql.user; # Time: 211031 17:23:00 # User@Host: root[root] @ localhost [] Id: 5 # Query_time: 4.913531 Lock_time: 0.000074 Rows_sent: 7 Rows_examined: 7 SET timestamp=1635672180; select sleep (0.7) user,host from mysql.user; ~7.5 配置filebeat
[root@web01 ~]# cat /etc/filebeat/filebeat.yml filebeat.inputs: - type: log enabled: true paths: /var/log/mariadb/slow.log exclude_lines: ['^# Time'] # 排除匹配的行 multiline.pattern: '^# User' multiline.negate: true multiline.match: after multiline.max_lines: 1000 output.logstash: hosts: ["172.16.1.151:5044"]7.6 配置 logstash
[root@logstash-node1 conf.d]# cat mysql_beat_logstash_es.conf
input {
beats {
port => 5044
}
}
filter {
mutate {
gsub => [ "message", "n", " "] # 去除n
}
grok {
match => { "message" => "(?m)^# User@Host: %{USER:User}[%{USER-2:User}] @ (?:(?S*) )?[(?:%{IP:Client_IP})?] # Thread_id: %{NUMBER:Thread_id:integer}s+ Schema: (?:(?S*) )s+QC_hit: (?:(?S*) )# Query_time: %{NUMBER:Query_Time}s+ Lock_time: %{NUMBER:Lock_Time}s+ Rows_sent: %{NUMBER:Rows_Sent:integer}s+Rows_examined: %{NUMBER:Rows_Examined:integer} SET timestamp=%{NUMBER:timestamp}; s*(?(?w+)s+.*)" }
}
date {
match => ["timestamp","UNIX","YYYY-MM-dd HH:mm:ss"] # unix时间转换
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
#移除message等字段
remove_field => ["message","input","timestamp"]
#对Query_time Lock_time 格式转换为浮点数
convert => ["Lock_Time","float"]
convert => ["Query_Time","float"]
#添加索引名称,官方建议这种写法,也可以直接写死
add_field => { "[@metadata][target_index]" => "logstash-mysql-%{+YYYY.MM.dd}" }
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["172.16.1.162:9200","172.16.1.163:9200"]
index => "%{[@metadata][target_index]}" # 直接调用索引
template_overwrite => true
}
}
7.7 配置kibana
图形展示:



