栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

Logstash

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Logstash

文章目录
  • 一、Logstash架构介绍
    • 1.1 为什么需要Logstash
    • 1.2 什么是Logstash
    • 1.3 Logstash架构
    • 1.4 安装Logstash
  • 二、Logstash input插件
    • 2.1 stdin插件
    • 2.2 file插件
    • 2.3 beats 插件
    • 2.4 kafka插件
  • 三、 Logstash Filter插件
    • 3.1 Grok插件
      • 3.1.1 grok如何出现的?
      • 3.1.2 grok解决什么问题
      • 3.1.3 grok语法示意图
      • 3.1.4 grok语法示例
    • 3.2 geoip插件
    • 3.3 fields字段
    • 3.4 Date插件
    • 3.5 useragent插件
    • 3.6 mutate 插件
      • 3.6.1 remove_field
      • 3.6.2 split
      • 3.6.3 add_field
      • 3.6.4 convert
  • 四、Logstash Output插件
    • 4.1 stdout插件
    • 4.2 file插件
    • 4.3 elastic插件
  • 五、Logstash分析App日志
    • 5.1
    • 5.2 APP日志收集架构
    • 5.3 步骤
    • 5.4 APP日志收集实践
  • 六、Logstash分析Nginx日志
    • 6.1 架构
    • 6.2 实现
  • 七、Logstash分析MySQL日志
    • 7.1 什么是慢日志
    • 7.2 为什么要收集慢日志
    • 7.2 架构
    • 7.3 思路
    • 7.4 配置mysql
    • 7.5 配置filebeat
    • 7.6 配置 logstash
    • 7.7 配置kibana

一、Logstash架构介绍 1.1 为什么需要Logstash

对于部分生产上的日志无法像 Nginx 那样,可以直接将输出的日志转为 Json 格式,但是可以借助 Logstash来将我们的 ”非结构化数据“,转为 “结构化数据”;

filbeat --> logstash(input fileter output) --es

1.2 什么是Logstash

Logstash 是免费且开放的服务器端数据处理管道,能够从多个来源采集数据,转换数据,然后将数据发送到您最喜欢的“存储库”中。。官网

1.3 Logstash架构

Logstash 的基础架构类似于 pipeline 流水线,主要分为如下步骤:

  1. Input:数据采集(常用插件:stdin、file、kafka、beat、http)
  2. Filter:数据解析/转换(常用插件:grok、date、geoip、mutate、useragent)
  3. Output:数据输出 (常用插件:Elasticsearch)
1.4 安装Logstash
[root@localhost ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.8.1.rpm
[root@localhost ~]# yum install java -y
[root@localhost ~]# rpm -ivh logstash-7.8.1.rpm 

# 修改配置文件
[root@logstash-node1 ~]# vim/etc/logstash/logstash.yml
node.name: logstash-node1
path.data: /var/lib/logstash # 数据存放路径
pipeline.workers: 2 
pipeline.batch.size: 1000 # 同时处理条目
path.logs: /var/log/logstash # 日志目录

# 堆内存大小调整
[root@logstash-node1 ~]# vim/etc/logstash/jvm.options
-Xms1g
-Xmx1g
二、Logstash input插件

input 插件用于指定输入源,一个 pipeline 可以有多个 input 插件,我们主要围绕下面几个 input插件进行介绍

stdin
file
beat
kafka
http
2.1 stdin插件

从标准输入读取数据,从标准输出中输出内容;

[root@logstash-node1 ~]# cat /etc/logstash/conf.d/stdin_logstash.conf 
# 从终端中输入,输出到中端
input {
	stdin {
	    type => "stdin" # 自定义事件类型
	    tags => "stdin_type" # 自定义tag,用于后续事件判断
	}

}


output {
    stdout {
	codec => "rubydebug"
	}

}


# 执行 -r 表示不停止logstash下修改配置文件
[root@logstash-node1 ~]# /usr/share/logstash/bin/logstash  -f /etc/logstash/conf.d/stdin_logstash.conf -r

中端中输入 test logstash,返回结果如下

          "type" => "stdin",
       "message" => "test logstash",
          "host" => "logstash-node1",
          "tags" => [
        [0] "stdin_type"
    ],
      "@version" => "1",
    "@timestamp" => 2021-10-30T07:34:57.440Z
}

2.2 file插件

从 file 文件中读取数据,然后输入至标准输入;

[root@logstash-node1 conf.d]# cat file_logstash.conf 
input {
	file {
		path => "/var/log/test.log"
		type => "syslog"
		exclude => "*.gz" # 不监听的文件 
		start_position => "beginning" # 第一次从头开始读取文件 beginning or end
		stat_interval => "3" #定时检查文件是否更新,默认1s
   }
}

output {
	stdout {
		codec => rubydebug
	}
}

往日志中追加,查看反馈

[root@logstash-node1 ~]# echo "file logstash" > /var/log/test.log
{
      "@version" => "1",
          "path" => "/var/log/test.log",
          "type" => "syslog",
    "@timestamp" => 2021-10-30T07:58:42.699Z,
          "host" => "logstash-node1",
       "message" => "file logstash"
}
2.3 beats 插件

从filebeat文件中读取数据,然后输入至标准输入;

input {
	beats {
		port => 5044 # filebeat 发送数据到logstash主机的5044端口
	}
}
output {
	stdout {
		codec => rubydebug
	}
}

2.4 kafka插件

从kafka文件中读取数据,然后输出至标准输出;

input {
	kafka {
		zk_connect =>
			"kafka1:2181,kafka2:2181,kafka3:2181"
			 group_id => "logstash"
			 topic_id => "apache_logs"
			 consumer_threads => 16
	}
}
三、 Logstash Filter插件

数据从源传输到存储的过程中,Logstash 的 filter过滤器能够解析各个事件,识别已命名的字段结构,并将它们转换成通用格式,以便更轻松、更快速地分析和实现商业价值;
利用 Grok 从非结构化数据中派生出结构
利用 geoip 从 IP 地址分析出地理坐标
利用 useragent 从 请求中分析操作系统、设备类型等

3.1 Grok插件 3.1.1 grok如何出现的?

我们希望将如下非结构化的数据解析成 json 结构化数据格式

120.27.74.166 - - [30/Dec/2019:11:59:18+0800] "GET / HTTP/1.1" 302 154 "-""Mozilla/5.0 (Macintosh; Intel Mac OS X10_14_1) Chrome/79.0.3945.88Safari/537.36"

需要使用非常复杂的正则表达式;

[([^]]+)]s[(w+)]s([^:]+:sw+sw+s[^:]+:S+s[^:]+:S+sS+).*[([^]]+)]s
[(w+)]s([^:]+:sw+sw+s[^:]+:
S+s[^:]+:S+sS+).*[([^]]+)]s
[(w+)]s([^:]+:sw+
sw+s[^:]+:S+s[^:]+:S+sS+).*
3.1.2 grok解决什么问题

grok其实是带有名字的正则表达式集合。grok 内置了很多 pattern 可以直接使用;
需要翻 qiang
grok介绍
grok语法生成器

3.1.3 grok语法示意图

3.1.4 grok语法示例

grok示例:使用 grok pattern 将 Nginx 日志格式化为 json 格式;

input {
        http {
                port => 5656
        }

}

filter {
        grok {
                match => {
                        "message" => "%{COMBINEDAPACHELOG}"
                }
        }
}

output {

        stdout {
                codec => rubydebug
        }
}
~    

结果示例

{
           "auth" => "-",
        "request" => "/fonts/icons/icon.woff",
      "timestamp" => "30/Oct/2021:10:53:18 +0800",
          "bytes" => "43852",
       "referrer" => ""http://elk.bertwu.net/css/style.css"",
           "host" => "10.0.0.1",
       "@version" => "1",
        "headers" => {
         "request_method" => "POST",
              "http_host" => "10.0.0.151:5656",
            "http_accept" => "**",
        "http_user_agent" => "insomnia/2021.6.0",
           "request_path" => "/",
         "content_length" => "52",
           "http_version" => "HTTP/1.1"
    }
}

3.6.3 add_field

mutate 中 add_field,可以将分割后的数据创建出新的字段名称。便于以后的统计和分析

...
filter {
        mutate {
                split => { "message" => "|" }
                #将分割后的字段添加到指定的字段名称
                add_field => {
                        "UserID" => "%{[message][0]}"
                        "Action" => "%{[message][1]}"
                        "Date" => "%{[message][2]}"
                }
        }
}
...

结果展示:

{
          "Date" => "2019-12-28 03:18:31",
        "Action" => "提交订单",
    "@timestamp" => 2021-10-30T12:46:37.558Z,
          "tags" => [
        [0] "_grokparsefailure",
        [1] "_geoip_lookup_failure"
    ],
       "message" => [
        [0] "5607",
        [1] "提交订单",
        [2] "2019-12-28 03:18:31"
    ],
        "UserID" => "5607",
          "host" => "10.0.0.1",
      "@version" => "1",
       "headers" => {
         "request_method" => "POST",
              "http_host" => "10.0.0.151:5656",
            "http_accept" => "*/*",
        "http_user_agent" => "insomnia/2021.6.0",
           "request_path" => "/",
         "content_length" => "37",
           "http_version" => "HTTP/1.1"
    }
}
3.6.4 convert

mutate 中的 convert类型转换。 支持转换integer、float、string等类型;

...
filter {
        mutate {
                split => { "message" => "|" }
                #将分割后的字段添加到指定的字段名称
                add_field => {
                        "UserID" => "%{[message][0]}"
                        "Action" => "%{[message][1]}"
                        "Date" => "%{[message][2]}"
                }
                #对新添加字段进行格式转换
                convert => {
                        "UserID" => "integer"
                        "Action" => "string"
                        "Date" => "string"
                }
                #移除无用的字段
                remove_field => ["headers","message"]
        }
}
...

结果展示:

{
          "Date" => "2019-12-28 03:18:31",
        "Action" => "提交订单",
    "@timestamp" => 2021-10-30T12:52:38.695Z,
          "tags" => [
        [0] "_grokparsefailure",
        [1] "_geoip_lookup_failure"
    ],
        "UserID" => "5607",
          "host" => "10.0.0.1",
      "@version" => "1"
}

四、Logstash Output插件
  • stdout
  • file
  • elasticsearch
4.1 stdout插件

stdout 插件将数据输出到屏幕终端,便于调试;

output {
	stdout {
		codec => rubydebug
	}
}
4.2 file插件

将结果输出到文件,实现将分散在多地的文件统一到一处:比如将所有 web 机器的 web 日志收集到一个文件中,从而方便查阅信息;

output {
	file {
		path => "/var/log/web.log"
	}
}
4.3 elastic插件

一般情况下,我们都会输出到 elasticsearch

output {
	elasticsearch {
		#一般写data集群的主机地址
		hosts =>	["172.16.1.162:9200","172.16.1.163:9200"]
		#索引名称
		index => "nginx-%{+YYYY.MM.dd}"
		#覆盖索引模板
		template_overwrite => true

	}
}
五、Logstash分析App日志 5.1

APP日志,主要是用来记录用户的操作,大体内容如下:

[INFO] 2019-12-28 04:53:36 [cn.oldxu.dashboard.Main] - DAU|8329|领取优惠券|2019-12-28 03:18:31
[INFO] 2019-12-28 04:53:40 [cn.oldxu.dashboard.Main] - DAU|131|评论商品|2019-12-28 03:06:27
5.2 APP日志收集架构

5.3 步骤

1.首先通过 Filebeat 读取日志文件中的内容,并且将内容发送给 Logstash;
2.Logstash接收到内容后,将数据转换为结构化数据。然后输出给Elasticsearch;
3.Kibana添加Elasticsearch索引,读取数据,然后在Kibana中进行分析,最后进行展示;

5.4 APP日志收集实践

1.启动app产生日志
下载日志文件

[root@web01 ~]# wget http://cdn.xuliangwei.com/app-2020-08.log 

2.配置filebeat

[root@web01 ~]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths: /var/log/app.log


output.logstash:
  hosts: ["172.16.1.151:5044"]

3.配置logstash

[root@logstash-node1 conf.d]# cat app_logstash.conf 
input {
	beats {
	 port => 5044
	}
}


filter {

	mutate {
		split => { "message" => "|" }
		add_field => {

			"UserID" => "%{[message][1]}"
			"Action" => "%{[message][2]}"
			"Date" => "%{[message][3]}"
              

	}
	convert => {
			"UserID" => "integer"
			"Action" => "string"
			"Date" => "string"
			
		}
	remove_field => ["message"]
	
	}
	date {
		#2020-08-28 01:05:02
		match => ["Date", "yyyy-MM-dd HH:mm:ss"]
		target => "@timestamp"
		timezone => "Asia/Shanghai"
	}


}

output {
	stdout {
		codec => rubydebug
	}

	elasticsearch {
		hosts => ["172.16.1.162:9200","172.16.1.163:9200","172.16.1.161:9200"]
		index => "app-%{+YYYY.MM.dd}"
		template_overwrite => true
	}

}

4.kibana上创建app索引展示效果

六、Logstash分析Nginx日志

实现思路:
1.将 Nginx 普通日志转换为 json
2.将 Nginx 日志的时间格式进行格式化输出
3.将 Nginx 日志的来源IP进行地域分析
4.将 Nginx 日志的 user-agent 字段进行分析
5.将 Nginx 日志的 bytes 修改为整数
6.移除没有用的字段,message、headers

6.1 架构

6.2 实现

日志格式1:

14.145.74.175 - - [10/Nov/2020:00:01:53 +0800] "POST /course/ajaxmediauser/ HTTP/1.1" 200 54 "www.oldxu.com" "http://www.oldxu.com/video/678" mid=678&time=60&learn_time=551.5 "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.101 Safari/537.36" "-" 10.100.136.64:80 200 0.014 0.014

所用的grok语法:

 grok {
            match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:hostname} (?:%{QS:referrer}|-) (?:%{NOTSPACE:post_args}|-) %{QS:useragent} (?:%{QS:x_forward_for}|-) (?:%{URIHOST:upstream_host}|-) (?:%{NUMBER:upstream_response_code}|-) (?:%{NUMBER:upstream_response_time}|-) (?:%{NUMBER:response_time}|-)" }
        }

日志格式2:

123.150.183.45 - - [22/Nov/2015:12:01:01 +0800] "GET /online/ppjonline/images/forms/validatePass.png HTTP/1.1" 200 370 "http://www.papaonline.com.cn/online/ppjonline/order/orderNow.jsp" "Mozilla/5.0 (Linux; U; Android 4.3; zh-CN; SCH-N719 Build/JSS15J) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 UCBrowser/9.9.5.489 U3/0.8.0 Mobile Safari/533.1"

所用的grok语法:

	grok {
		match => {
			"message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{data:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:referrer} %{QS:useragent}"
		}
	}

1.配置filebeat

[root@web01 filebeat]# cat filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths: /var/log/nginx/access.log
  tags: ["inginx-access"]
- type: log
  enabled: true
  paths: /var/log/nginx/error.log
  tags: ["nginx-error"]


output.logstash:
  hosts: ["172.16.1.151:5044"]

2.配置logstash

[root@logstash-node1 conf.d]# cat nginx_beat_logstash_es.conf 
input {

	beats {
		port => 5044
	}
}

filter {
  if "nginx-access" in [tags][0]{	
	grok {

		match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} [%{HTTPDATE:timestamp}] "%{WORD:method} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response} (?:%{NUMBER:bytes}|-) %{QS:hostname} (?:%{QS:referrer}|-) (?:%{NOTSPACE:post_args}|-) %{QS:useragent} (?:%{QS:x_forward_for}|-) (?:%{URIHOST:upstream_host}|-) (?:%{NUMBER:upstream_response_code}|-) (?:%{NUMBER:upstream_response_time}|-) (?:%{NUMBER:response_time}|-)"}
	}
	useragent {
		source => "useragent"
		target => "useragent"
	}
	geoip {
		source => "clientip"
	}
        date {
            match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
            target => "@timestamp"
            timezone => "Asia/Shanghai"
        }
        
        mutate {
         convert => ["bytes","integer"]
         convert => ["response_time", "float"]
         convert => ["upstream_response_time", "float"]
         remove_field => ["message","agent"]
         add_field => { "target_index" => "logstash-nginx-access-%{+YYYY.MM.dd}"}
	}
   }else if "nginx-error" in [tags][0]{
	mutate {
		add_field => { "target_index" => "logstash-nginx-error-%{+YYYY.MM.dd}" }
		remove_field => ["agent"]

	}     
  }
}



output {
	stdout {
		codec => rubydebug
	}
	elasticsearch {
		hosts => ["172.16.1.161:9200","172.16.1.162:9200","172.16.1.163:9200"]
		index => "%{[target_index]}"
	}
}

3.kibana生成图
1.总访问次数
2.独立ip总量
3.总产生流量
4.柱状图,时间段访问次数
5.柱状图,时间段流量趋势
6.饼图,浏览器设备、状态码、手机设备

七、Logstash分析MySQL日志 7.1 什么是慢日志

当 SQL 语句执行时间超过所设定的阈值时,便会记录到指定的日志文件中,所记录内容称之为慢查询日志

7.2 为什么要收集慢日志

数据库在运行期间,可能会存在 SQL 语句查询过慢,那我们如何快速定位、分析哪些 SQL语句需要优化处理,又是哪些 SQL 语句给业务系统造成影响呢?当我们进行统一的收集分析,SQL语句执行的时间,以及执行的SQL语句,一目了然。

7.2 架构

7.3 思路
  • 1.安装 MySQL;开启 MySQL 慢查询日志;
  • 2 配置 filebeat 收集本地慢查询日志路径;
    2.1 使用 exclude_lines 排除无用行;
    2.2 使用 multiline 对内容进行合并;
  • 3 配置 logstash
    3.1.使用 grok 插件将 mysql 慢日志格式化为 json格式;
    3.2.使用 date 插件将 timestamp 时间转换为本地时间,然后覆盖 @timestamp;
    3.3.检查 json 格式是否成功,成功后可以将没用的字段删除;
    3.4.最后将输出到屏幕的内容,输出至Elasticsearch集群;
7.4 配置mysql
[root@db01 ~]# vim /etc/my.cnf
[mysqld]
...
slow_query_log=ON
slow_query_log_file=/var/log/mariadb/slow.log # 需要mysql用户有权限写
long_query_time=3
...

重启mysql ,并模拟产生慢日志

MariaDB [(none)]> select sleep(1) user,host from mysql.user;
MariaDB [(none)]> select sleep(0.5) user,host from mysql.user;
MariaDB [(none)]> select sleep(0.6) user,host from mysql.user;

慢日志格式

[root@sonarqube mysqldb]# vim slow.log

# Time: 211031 17:22:29
# User@Host: root[root] @ localhost []  Id:     5
# Query_time: 7.013641  Lock_time: 0.000081 Rows_sent: 7  Rows_examined: 7
SET timestamp=1635672149;
select sleep (1) user,host from mysql.user;
# Time: 211031 17:22:51
# User@Host: root[root] @ localhost []  Id:     5
# Query_time: 3.518726  Lock_time: 0.000080 Rows_sent: 7  Rows_examined: 7
SET timestamp=1635672171;
select sleep (0.5) user,host from mysql.user;
# Time: 211031 17:23:00
# User@Host: root[root] @ localhost []  Id:     5
# Query_time: 4.913531  Lock_time: 0.000074 Rows_sent: 7  Rows_examined: 7
SET timestamp=1635672180;
select sleep (0.7) user,host from mysql.user;
~                                                  
7.5 配置filebeat
[root@web01 ~]# cat /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths: /var/log/mariadb/slow.log
  exclude_lines: ['^# Time'] # 排除匹配的行
  multiline.pattern: '^# User'
  multiline.negate: true
  multiline.match: after
  multiline.max_lines: 1000

output.logstash:
  hosts: ["172.16.1.151:5044"]

7.6 配置 logstash
[root@logstash-node1 conf.d]# cat mysql_beat_logstash_es.conf 
input {

	beats {
		port => 5044
	}
}

filter {
	mutate {
		gsub => [ "message", "n", " "] # 去除n

	}


   	grok {
        	match => { "message" => "(?m)^# User@Host: %{USER:User}[%{USER-2:User}] @ (?:(?S*) )?[(?:%{IP:Client_IP})?] # Thread_id: %{NUMBER:Thread_id:integer}s+ Schema: (?:(?S*) )s+QC_hit: (?:(?S*) )# Query_time: %{NUMBER:Query_Time}s+ Lock_time: %{NUMBER:Lock_Time}s+ Rows_sent: %{NUMBER:Rows_Sent:integer}s+Rows_examined: %{NUMBER:Rows_Examined:integer} SET timestamp=%{NUMBER:timestamp}; s*(?(?w+)s+.*)" }
    }
	date {

		match => ["timestamp","UNIX","YYYY-MM-dd HH:mm:ss"] # unix时间转换
		target => "@timestamp"
		timezone => "Asia/Shanghai"
	}
	mutate {
        #移除message等字段
        remove_field => ["message","input","timestamp"]

        #对Query_time Lock_time 格式转换为浮点数
        convert => ["Lock_Time","float"]
        convert => ["Query_Time","float"]
        
        #添加索引名称,官方建议这种写法,也可以直接写死
      add_field => { "[@metadata][target_index]" => "logstash-mysql-%{+YYYY.MM.dd}" } 
    }

}


output {

	stdout {
		codec => rubydebug
	}
	elasticsearch {
        hosts => ["172.16.1.162:9200","172.16.1.163:9200"]
        index => "%{[@metadata][target_index]}" # 直接调用索引
        template_overwrite => true
    }
}


7.7 配置kibana

图形展示:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/360909.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号