执行以下命令:
docker run --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx2048m" -d elasticsearch:7.14.0将容器内的配置文件拷贝到本地
拷贝的命令:
docker cp 容器名:容器中要拷贝的文件名及其路径 要拷贝到宿主机里面对应的路径
docker cp elasticsearch:/usr/share/elasticsearch/config/elasticsearch.yml /Users/runfa/documents/program/Docker/elasticsearch/config/elasticsearch.yml docker cp elasticsearch:/usr/share/elasticsearch/data /Users/runfa/documents/program/Docker/elasticsearch/data docker cp elasticsearch:/usr/share/elasticsearch/plugins /Users/runfa/documents/program/Docker/elasticsearch/plugins
执行成功后在/Users/runfa/documents/program/Docker/elasticsearch目录下能够看到以下三个文件夹
删除原来安装的容器:
docker rm -f elasticsearch
重新执行启动命令:
docker run --name elasticsearch --privileged=true -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx2048m" -v /Users/runfa/documents/program/Docker/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml -v /Users/runfa/documents/program/Docker/elasticsearch/data:/usr/share/elasticsearch/data -v /Users/runfa/documents/program/Docker/elasticsearch/plugins:/usr/share/elasticsearch/plugins -d elasticsearch:7.14.0
执行成功后,在浏览器中输入输入http://localhost:9200/,如果有返回json格式的内容(如下图)则说明启动成功
执行以下命令:
docker run --name kibana -p 5601:5601 -v /Users/runfa/documents/program/Docker/kibana/config:/usr/share/kibana/config -d kibana:7.14.0将容器内的配置文件拷贝到本地
拷贝的命令:
docker cp 容器名:容器中要拷贝的文件名及其路径 要拷贝到宿主机里面对应的路径
docker cp kibana:/usr/share/kibana/config /Users/runfa/documents/program/Docker/kibana/config
执行成功后在/Users/runfa/documents/program/Docker/kibana目录下能够看到config文件夹
进入config目录,修改kibana.yml配置文件
将elasticsearch.hosts的配置修改为elasticsearch地址
注意:如果是部署在本地,不能使用localhost,一定要配置ip,否则连接失败
# Default Kibana configuration for docker target server.host: "0" server.shutdownTimeout: "5s" # 配置elasticsearch地址,如果是部署在本地,不能使用localhost,一定要配置ip,否则连接失败 elasticsearch.hosts: [ "http://192.168.43.176:9200" ] monitoring.ui.container.elasticsearch.enabled: true增加容器目录映射
删除原来安装的容器:
docker rm -f kibana
重新执行启动命令:
docker run --name kibana -p 5601:5601 --privileged=true -v /Users/runfa/documents/program/Docker/kibana/config:/usr/share/kibana/config -d kibana:7.14.0
kibana的启动时间较长,需要耐心等待
待启动成功后,在浏览器中输入http://localhost:5601/,若能成功访问则说明启动成功
执行以下命令:
docker run --name logstash --privileged=true -p 5044:5044 -p 9400:9400 -d logstash:7.14.0将容器内的配置文件拷贝到本地
拷贝的命令:
docker cp 容器名:容器中要拷贝的文件名及其路径 要拷贝到宿主机里面对应的路径
docker cp logstash:/usr/share/logstash/config/ /Users/runfa/documents/program/Docker/logstash/config docker cp logstash:/usr/share/logstash/pipeline /Users/runfa/documents/program/Docker/logstash/pipeline docker cp logstash:/usr/share/logstash/data /Users/runfa/documents/program/Docker/logstash/data docker cp logstash:/usr/share/logstash/logs /Users/runfa/documents/program/Docker/logstash/logs
执行成功后在/Users/runfa/documents/program/Docker/logstash目录下能够看到以下几个文件夹
进入config目录,修改logstash.yml配置文件
注意:elasticsearch如果部署在本地,需要使用ip,不能使用localhost
node.name: logstash-203 http.host: "0.0.0.0" xpack.monitoring.elasticsearch.hosts: [ "http://192.168.43.176:9200" ] # 日志格式 json/plain log.format: json # 日志文件目录配置 path.logs: /usr/share/logstash/logs
修改pipelines.yml配置文件
这个配置文件主要是配置输入、过滤和输出,这些部分均会由 Logstash 管道予以执行
- pipeline.id: main # 该配置会读取pipeline目录下所有的conf文件 path.config: "/usr/share/logstash/pipeline/*.conf"增加容器目录映射
删除原来安装的容器:
docker rm -f logstash
重新执行启动命令:
docker run --name logstash --privileged=true -p 5044:5044 -p 9400:9400 -v /Users/runfa/documents/program/Docker/logstash/config/:/usr/share/logstash/config -v /Users/runfa/documents/program/Docker/logstash/pipeline:/usr/share/logstash/pipeline -v /Users/runfa/documents/program/Docker/logstash/data:/usr/share/logstash/data -v /Users/runfa/documents/program/Docker/logstash/logs:/usr/share/logstash/logs -d logstash:7.14.0
执行以下命令,如果没有出现明显错误,则说明启动成功
docker logs -f logstash使用Logstash将日志输出到Elasticsearch 创建conf文件
在前面配置映射到本地的pipeline目录(我的是/Users/runfa/documents/program/Docker/logstash/pipeline)创建以下几个文件:
in-file.conf该文件主要是配置Logstash输入来源,文件内容如下:
input {
file{
# 文件输入来源
path => "/usr/share/logstash/data/test/test.json"
# 使用json解析方式
codec => json {
charset => "UTF-8"
}
start_position => "beginning"
# 文档类型
type => "from_test"
}
}
out-file.conf
该文件主要是配置Logstash输出方式,文件内容如下:
output {
if [type] == "from_test"{
#日志输出到控制台
stdout {
codec => rubydebug
}
#输出到es
#无法解析的json不记录到elasticsearch中
if "_jsonparsefailure" not in [tags] {
elasticsearch {
#es地址ip端口
hosts => "192.168.43.176:9200"
#索引
index => "logstash-test"
}
}
}
}
创建输入来源文件
test.log
在前面配置映射到本地的data目录(我的是/Users/runfa/documents/program/Docker/logstash/data)下创建test文件夹,然后再test文件夹下创建test.log,输入以下内容:
注意:该文件的内容不能进行格式化或分行,否则Logstash解析失败
{"name": "zhangsan", "age": 16, "email": "123@qq.com"}
验证是否成功输出到Elasticsearch
重新启动Logstash
执行以下命令:
docker restart logstash监听Logstash日志输出
执行以下命令:
docker logs -f logstash
修改test.log,加多一行信息
{"name": "zhangsan", "age": 16, "email": "123@qq.com"}
{"name": "zhangsan", "age": 16, "email": "123@qq.com"}
如果看到控制台中输出以下信息,则证明test.log成功输入到Logstash
{
"@version" => "1",
"path" => "/usr/share/logstash/data/test/test.json",
"host" => "b3f51af74fbe",
"email" => "123@qq.com",
"@timestamp" => 2021-10-28T09:18:50.002Z,
"age" => 16,
"type" => "from_test",
"name" => "zhangsan111"
}
{
"@version" => "1",
"path" => "/usr/share/logstash/data/test/test.json",
"host" => "b3f51af74fbe",
"email" => "123@qq.com",
"@timestamp" => 2021-10-28T09:18:50.002Z,
"age" => 16,
"type" => "from_test",
"name" => "zhangsan111"
}
使用Kibana查看Logstash是否成功输出到Elasticsearch
输入http://localhost:5601打开Kibana管理页面,点击Management菜单下的Dev Tools
在左侧输入命令查询数据,Elastic的查询语法可以参考官方文档.
GET logstash-test/_search
如果查询到的hits的内容不为空(如下图),则证明logstash成功将数据输入到elasticsearch
在本地pipeline目录下,创建以下输出配置文件:
in-logback.conf配置来自Springboot logback的输入源
input {
# TCP输入,Json格式编码UTF-8
tcp {
port => 9400
codec => json {
charset => "UTF-8"
}
# 文档类型
type => "logback-logs"
}
}
out-logerror.conf
error日志输出配置
output {
if [level] == "ERROR"{
stdout {
codec => rubydebug
}
if [type] == "logback-logs"{
elasticsearch {
action => "index"
# 这里是es的地址,多个es要写成数组的形式
hosts => "192.168.43.176:9200"
# 用于kibana过滤
index => "springboot-logstash-logerror"
# 超时时间
timeout => 300
}
}
}
}
out-logwarn.conf
warn日志输出配置
output {
if [level] == "WARN"{
stdout {
codec => rubydebug
}
if [type] == "logback-logs"{
elasticsearch {
action => "index"
# 这里是es的地址,多个es要写成数组的形式
hosts => "192.168.43.176:9200"
# 用于kibana过滤
index => "springboot-logstash-logwarn"
# 超时时间
timeout => 300
}
}
}
}
out-loginfo.conf
info日志输出配置
output {
if [level] == "INFO"{
stdout {
codec => rubydebug
}
if [type] == "logback-logs"{
elasticsearch {
action => "index"
# 这里是es的地址,多个es要写成数组的形式
hosts => "192.168.43.176:9200"
# 用于kibana过滤
index => "springboot-logstash-loginfo"
# 超时时间
timeout => 300
}
}
}
}
Springboot logback配置
本项目中使用Logback日志框架,Logback提供了将日志直接输出到Logstash的功能,只需要在logback-spring.xml中添加与logstash相关的appender配置即可。
logback-spring.xml在resource目录下创建logback-spring.xml文件
application.ymllogback ${logstashAddress} { "app": "${springAppName:-}", "profile": "${springProfile:-}", "level": "%level", "traceId": "%X{traceId}", "class": "%c", "method": "%M", "msg": "%msg", "dateTime": "%date{yyyy-MM-dd'T'HH:mm:ss.SSSZ}" } ERROR ACCEPT DENY ${logstashAddress} { "app": "${springAppName:-}", "profile": "${springProfile:-}", "level": "%level", "traceId": "%X{traceId}", "class": "%c", "method": "%M", "msg": "%msg", "dateTime": "%date{yyyy-MM-dd'T'HH:mm:ss.SSSZ}" } WARN ACCEPT DENY ${logstashAddress} { "app": "${springAppName:-}", "profile": "${springProfile:-}", "level": "%level", "traceId": "%X{traceId}", "class": "%c", "method": "%M", "msg": "%msg", "dateTime": "%date{yyyy-MM-dd'T'HH:mm:ss.SSSZ}" } INFO ACCEPT DENY
除了配置spring-logback.xml,还需要配置application.xml
spring:
application:
# 项目名称启动时传入,写入ELK通过app名称进行隔离
name: elk-demo
profiles:
# 启动环境,ELK隔离通过profile属性隔离dev以及prod
active: dev
server:
port: 8080
logging:
# 日志文件地址
config: classpath:logback-spring.xml
# 配置Logstash地址
logstash:
address: localhost:9400
level:
root: info
启动验证
使用以下命令监听logstash输出日志
docker logs -f logstash
启动SpringBoot项目,如果logstash日志中有输出以下信息,证明SpringBoot日志输出到logstash成功
{
"class" => "com.example.zrf.ElkDemoApplication",
"dateTime" => "2021-10-29T11:43:22.474+0800",
"@version" => "1",
"msg" => "Started ElkDemoApplication in 5.208 seconds (JVM running for 6.385)",
"profile" => "dev",
"@timestamp" => 2021-10-29T03:43:22.494Z,
"traceId" => "",
"type" => "logback-logs",
"app" => "elk-demo",
"port" => 62230,
"level" => "INFO",
"method" => "?",
"host" => "gateway"
}
使用Kibana可视化日志数据
创建索引模式
作用:主要是用来匹配满足索引模式的数据,供后续过滤日志数据。
操作步骤:打开菜单->打开Management->打开Stack Management->找到Kibana->打开Index Patterns->点击Create Index Pattern
注意:如果Elasticsearch中没有数据,则该页面不会显示Create Index Pattern。此时需要检查Logstash数据是否成功输出到Elasticsearch
如下输入匹配日志索引的匹配格式,点击下一步。
选择时间字段,TCP方式Logstash推送日志后会自动生成@timestamp,选择@timestamp即可,点击创建索引模式。
在创建完索引模式后,可以在日志视图中使用创建的索引过滤数据。
操作步骤:打开菜单->点击Discover
点击下拉框选择创建的索引匹配器,时间范围选择最近15小时,即可打开日志视图。
点击Add filter->选择level字段->选择运算符is->Value值填写为ERROR
点击Save保存后即可查询出所有level为INFO的日志。
点击msg->点击添加
点击点击后查看的结果如下:
在前面日志视图中时间字段的显示格式为YYYY-MM-DD HH:mm:ss.SSS,如果不是这种格式的需要进行配置。
操作步骤:打开菜单->点击Stack Management->找到Kibana->点击Advanced Settings
找到dateFormat,输入YYYY-MM-DD HH:mm:ss.SSS
点击Save changes保存后回到日志视图,即可看到时间字段已经按照YYYY-MM-DD HH:mm:ss.SSS的格式进行格式化。
Kibana提供了Dev Tools控制台,能够让我们很方便的使用Elasticsearch命令进行查询。
操作步骤:打开菜单->找到Management->点击Dev Tools
进入到控制台后可以在左边输入命令进行查询,例如查询springboot-logstash-loginfo索引下的所有数据。Elasticseaerch其他查询命令可以参考官网教程.
执行以下命令:
docker run --name filebeat --privileged=true -d elastic/filebeat:7.14.0将容器内的配置文件拷贝到本地
拷贝的命令:
docker cp 容器名:容器中要拷贝的文件名及其路径 要拷贝到宿主机里面对应的路径
docker cp filebeat:/usr/share/filebeat/filebeat.yml /Users/runfa/documents/program/Docker/filebeat/filebeat.yml docker cp logstash:/var/logs /Users/runfa/documents/program/Docker/filebeat/logs
执行成功后在/Users/runfa/documents/program/Docker/logstash目录下能够看到以下几个文件
修改filebeat.yml配置文件
filebeat.config:
modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/logs/*.log
processors:
- drop_fields:
# 指定需要忽略输出的字段
fields: ["ecs", "agent", "input", "tags", "host", "log"]
ignore_missing: false
output.logstash:
hosts: ["192.168.43.176:5044"]
indices:
- index: "springboot-filebeat-logs"
filebeat会读取logs目录下相关的日志文件然后输出到logstash,这里以log-error.log为例进行说明。
在logs目录下创建log-error.log,里面的内容如下:
[elk-demo] [dev] 2021-10-14T11:09:27.237+0800 [0f7cf006e04642c5bd8a25a0587a2ca8] [http-nio-8080-exec-1] ERROR com.example.zrf.controller.TestController - error... [elk-demo] [dev] 2021-10-14T11:09:27.238+0800 [0f7cf006e04642c5bd8a25a0587a2ca8] [http-nio-8080-exec-1] ERROR com.example.zrf.controller.TestController - LJEN500:交易失败,失败原因:网络不稳定增加容器目录映射
删除原来安装的容器:
docker rm -f logstash
重新执行启动命令:
docker run --name filebeat --privileged=true -v /Users/runfa/documents/program/Docker/filebeat/filebeat.yml:/usr/share/filebeat/filebeat.yml -v /Users/runfa/documents/program/Docker/filebeat/logs:/var/logs -d elastic/filebeat:7.14.0
执行以下命令,如果没有出现明显错误,则说明启动成功
docker logs -f logstashfilebeat输出到logstash
在确保filebeat、logstash、elasticsearch和kibana启动成功后,在log-error.log文件中增加一行日志。
[elk-demo] [dev] 2021-10-14T11:09:27.237+0800 [0f7cf006e04642c5bd8a25a0587a2ca8] [http-nio-8080-exec-1] ERROR com.example.zrf.controller.TestController - error... [elk-demo] [dev] 2021-10-14T11:09:27.238+0800 [0f7cf006e04642c5bd8a25a0587a2ca8] [http-nio-8080-exec-1] ERROR com.example.zrf.controller.TestController - LJEN500:交易失败,失败原因:网络不稳定 [elk-demo] [dev] 2021-10-14T11:09:27.238+0800 [0f7cf006e04642c5bd8a25a0587a2ca8] [http-nio-8080-exec-1] ERROR com.example.zrf.controller.TestController - LJEN500:交易失败,失败原因:网络不稳定
此时可以看到logstash控制台的输出结果如下:
{
"@timestamp" => 2021-11-01T06:34:47.494Z,
"type" => "filebeat-logs",
"message" => "[elk-demo] [dev] 2021-10-14T11:09:27.237+0800 [0f7cf006e04642c5bd8a25a0587a2ca8] [http-nio-8080-exec-1] ERROR com.example.zrf.controller.TestController - error...",
"@version" => "1",
"tags" => [
[0] "beats_input_codec_plain_applied"
]
}
可以看到log-error.log中的信息全部包装在了message字段里,我们可以写一个filter配置对message字段进行解析,同时在解析玩后对该字段进行移除。
在logstash映射到本地的pipeline文件夹(如果不知道是哪个文件夹请参考Logstash启动章节)中创建filter.conf文件,在该文件中添加以下配置信息:
filter {
# 过滤来自filebeat的日志
if[type] == "filebeat-logs"{
grok {
# 筛选过滤从filebeat输入的message
match => {
"message" => "%{GREEDYdata:app} %{GREEDYdata:profile} (?d{4}-d{2}-d{2}wd{2}:d{2}:d{2}.d{3}+w{4}) %{GREEDYdata:traceId} %{GREEDYdata:thread} %{LOGLEVEL:level} %{GREEDYdata:class} - %{GREEDYdata:msg}"
}
# 移除输出的字段
remove_field => ["message"]
}
}
# 不匹配正则则删除,匹配正则用=~
if [level] !~ "(ERROR|WARN|INFO)" {
# 删除日志
drop {}
}
}
注:
- filter 主要用来过滤日志文件处理成我们需要的数据
- grok 解析文本并构造 把非结构化日志数据通过正则解析成结构化和可查询化,具体的配置可以参考链接.
备注:
官方提供了很多正则的grok pattern可以直接使用: :https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns
grok debug工具: http://grokdebug.herokuapp.com
正则表达式调试工具: https://www.debuggex.com/
grok 里边有定义好的现场的模板你可以用,但是更多的是自定义模板。 规则是这样的,小括号里边包含所有一个key和value,例子:(?value),比如以下的信息,第一个我定义的key是data,表示方法为:? 前边一个问号,然后用<>把key包含在里边去。value就是纯正则了,这个我就不举例子了。这个有个在线的调试库,可以供大家参考, http://grokdebug.herokuapp.com/



