栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

JavaEE:使用Kafka收集日志

JavaEE:使用Kafka收集日志

一、log4j2日志输出:

1.添加依赖+打包信息:

(1)添加log4j2依赖+打包信息:


    
        org.springframework.boot
        spring-boot-starter-web
        
        
            
                org.springframework.boot
                spring-boot-starter-logging
            
        
    
    
    
        org.springframework.boot
        spring-boot-starter-log4j2
    
    
        com.alibaba
        fastjson
        1.2.79
    
    
        com.lmax
        disruptor
        3.4.4
    



    logserver
    
        
            src/main/java
            
                ***.properties
            
            true  
        
        
            src/main/resources
            
                **/*.*
            
        
    
    
        
            org.springframework.boot
            spring-boot-maven-plugin
            2.2.8.RELEASE
            
                com.yyh.log.LogApplication
            
        
    

(2)application.yml:

server:
  port: 10003
spring:
  application:
    name: logserver
  http:
    encoding:
      charset: UTF-8
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: NON_NULL

2.在application.yml中创建log4j2.xml(基于网上源代码修改):



    
        /usr/local/filebeat/logs 
        mylog 
        [%level{length=5}] [%d{yyyy-MM-dd'T'HH:mm:ss.SSS}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
    
    
        
        
            
        
       
        
              
            
                 
                 
            
            
        
        
        
              
            
                  
            
            
                
                 
            
            
        
    
    
          
            
        
        
            
        
         
            
            
            
        
    

3.使用MDC替换log中的字段值:

@Component
public class MDCUtil implements EnvironmentAware {
    private static Environment environment;
    @Override
    public void setEnvironment(Environment environment) {
        MDCUtil.environment = environment;
    }
    public static void setMDC() {//替换log中的提定key的值
        MDC.put("hostName", NetUtils.getLocalHostname());
        MDC.put("ip", NetUtils.getMacAddressString());
        MDC.put("applicationName", environment.getProperty("spring.application.name"));  //从application.yml中获取服务名称
    }
}

4.打印log测试效果:

@RestController
@Slf4j
public class LogController {
    @RequestMapping("/testLog")
    public String testLog() {
        MDCUtil.setMDC();
        log.info("这是info日志");
        log.warn("这是warn日志");
        log.error("这是error日志");
        return "测试log";
    }
}

5.将工程打包为xxx.jar,用XFtp上传到centos上,启动:

[root@localhost ~]# java -jar logserver.jar &

二、Kafka上创建两个用于接收日志的topic:

安装/配置Kafka(主机IP:192.168.233.147):

https://blog.csdn.net/a526001650a/article/details/123062877?spm=1001.2014.3001.5502

1.创建all-log名称的topic(cd /usr/local/kafka/bin目录,--partitions指定分区数量,--replication-factor指定副本集数量):

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --create --topic all-log --partitions 1 --replication-factor 1

2.创建warn-log名称的topic(cd /usr/local/kafka/bin目录,--partitions指定分区数量,--replication-factor指定副本集数量):

[root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.233.147:9092 --create --topic warn-log --partitions 1 --replication-factor 1

三、使用FileBeat收集日志,并输出到Kafka:

1.FileBeat安装/配置:

(1)下载filebeat-8.0.0-linux-x86_64.tar.gz,使用Xftp上传到centos:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-installation-configuration.html

(2)解压到/usr/local目录:

[root@localhost ~]# tar -zxvf filebeat-8.0.0-linux-x86_64.tar.gz -C /usr/local/

(3)重命名(cd /usr/local):

[root@localhost local]# mv filebeat-8.0.0-linux-x86_64 filebeat

(4)修改filebeat.yml配置(cd /usr/local/filebeat):

[root@localhost filebeat]# vi filebeat.yml

内容如下:

...  #省略其他原有配置
filebeat.inputs:   #log输入配置
- type: log   #全量log配置
  enabled: true #启动此输入配置
  paths:
    - /usr/local/filebeat/logs/all-mylog.log    #log文件路径
  document_type: "all-log"  #document_type为自定义类型,写入ES时的_type值
  multiline:  #多行log抓取规则
    pattern: '^['                               #匹配[开头的log
    negate: true                                #true为必须匹配
    match: after                                #以[开头的多行log,从第2行-max_lines行合并到第1行的末尾
    max_lines: 2000                             #参与合并的最大行数
    timeout: 2s                                 #多行log合并等待时间,超过2秒后不再合并到当前第1行
  fields:
    web_name: logserver           #应用名称(自定义key:value)
    log_topic: all-log            #kafka topic名称(自定义key:value)
    evn: test                     #测试环境(自定义key:value)
- type: log                                    #warn级别的log配置
  enabled: true #启动此输入配置
  paths:
    - /usr/local/filebeat/logs/warn-mylog.log  #log文件路径
  document_type: "warn-log"
  multiline:  #多行log抓取规则
    pattern: '^['                               #匹配[开头的log
    negate: true                                #true为必须匹配
    match: after                                #以[开头的多行log,从第2行-max_lines行合并到第1行的末尾
    max_lines: 2000                             #参与合并的最大行数
    timeout: 2s                                 #多行log合并等待时间,超过2秒后不再合并到当前第1行
  fields:
    web_name: logserver           #应用名称(自定义key:value)
    log_topic: warn-log           #kafka topic名称(自定义key:value)
    evn: test                     #测试环境(自定义key:value)
    
output.kafka:  #配置输出log到kafka,注释掉默认的Elasticsearch Output配置
  enabled: true  #启用输出
  hosts: ["192.168.233.147:9092"]  #kafka主机地址,多个时:["ip1:port1", "ip2:port2"]
  topic: '%{[fields.log_topic]}'   #输出到kafka指定topic(获取上面自定义的log_topic属性值)
  partition.hash:                  #分区规则,值:hash(默认)、random、 round_robin
    reachable_only: true
  compression: gzip                #使用gzip压缩(默认), 其他值:none、snappy、lz4
  compression_level: 4             #压缩级别(默认为4),0为不压缩,1(最佳速度)-9(最佳压缩)
  max_message_bytes: 1000000       #消息最大值,单位为字节
  required_acks: 1                 #ACK级别,0无响应,1等待本地提交(默认),-1等待所有副本提交。
logging.to_files: true   #log输出到文件

2.FileBeat启动:

(1)检查配置(cd /usr/local/filebeat):

[root@localhost filebeat]# ./filebeat test config -e

(2)启动:

[root@localhost filebeat]# ./filebeat &

(3)查看是否启动成功:

[root@localhost filebeat]# ps -ef | grep filebeat

3.查看已抓取的log文件内容(cd /usr/local/filebeat/logs/,全量log为all-mylog.log,warn级别log为warn-mylog.log):

[root@localhost logs]# cat all-mylog.log

4.查看已传输到kafka的log文件目录(本例中如:all-log-0目录、warn-log-0目录):

[root@localhost ~]# cd /usr/local/kafka/kafka-logs/

四、使用Logstash过滤/消费日志:

1.Logstasht安装:

(1)下载logstash-8.0.0-linux-x86_64.tar.gz,使用Xftp上传到centos:

https://elasticsearch.cn/download/

(2)解压到/usr/local目录:

[root@localhost ~]# tar -zxvf logstash-8.0.0-linux-x86_64.tar.gz -C /usr/local/

(3)重命名(cd /usr/local):

[root@localhost local]# mv logstash-8.0.0 logstash

(4)创建scrpit目录(cd /usr/local/logstash):

[root@localhost logstash]# mkdir scrpit

2.Logstasht配置:

(1)配置文件说明(cd /usr/local/logstash/config):

jvm.options        #JVM参数配置文件
log4j2.properties  #log格式配置文件
startup.options    #制作centos服务参数
logstash.yml       #logstash配置

(2)修改logstash.yml配置(cd /usr/local/logstash/config):

[root@localhost config]# vi logstash.yml

内容如下:

pipeline.workers: 8   #修改工作线程数

(3)创建logstash-start,用XFtp将上传到/usr/local/logstash/scrpit目录,内容如下(基于网上源代码修改):

input {  #接收log配置
  kafka {  #配置接收kafka的all-log topic的日志
    topics_pattern => "all-log-.*"  #配置topics
    bootstrap_servers => "192.168.233.147:9092"  #配置kafka主机地址
    codec => json
    consumer_threads => 4       #消费线程为4个
    decorate_events => true
    #auto_offset_rest => "latest"   #默认已配置
    group_id => "all-log-group"   #配置分组名
  }
  kafka {  #配置接收kafka的warn-log topic的日志
    topics_pattern => "warn-log-.*"  #配置topics
    bootstrap_servers => "192.168.233.147:9092"  #配置kafka主机地址
    codec => json
    consumer_threads => 4       #消费线程为4个
    decorate_events => true
    #auto_offset_rest => "latest"   #默认已配置
    group_id => "warn-log-group"   #配置分组名
  }
}

filter { #过滤log配置
  ruby {  #修改时区
    code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
  }
  if "all-log" in [fields][log_topic] {  #fields.log_topic.all-log
    grok { #与log4j2.xml中LOG_FORMAT保持一致
      match => ["message", "[%{NOTSPACE:level}] [%{NOTSPACE:currentDateTime}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]
    }
  }
  if "warn-log" in [fields][log_topic] {  #fields.log_topic.warn-log
    grok { #与log4j2.xml中LOG_FORMAT保持一致
      match => ["message", "[%{NOTSPACE:level}] [%{NOTSPACE:currentDateTime}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]
    }
  }
}

output { #1.log输出到elasticsearch
  if "all-log" in [fields][log_topic] { #全量log输出到elasticsearch配置
    elasticsearch { #使用elasticsearch插件
      hosts => ["192.168.233.147:9200"]  #elasticsearch主机地址
      user => "root"                     #用户名
      password => "root123456"           #密码
      index => "all-log-%{[fields][web_name]}-%{index_time}"
      sniffing => true   #true启用嗅探机制(使用此机制进行elasticsearch集群负载均衡发送log)
      template_overwrite => true   #覆盖logstash默认模板
    }
  }
  if "warn-log" in [fields][logtopic] { #warn log输出到elasticsearch配置
    elasticsearch { #使用elasticsearch插件
      hosts => ["192.168.233.147:9200"]  #elasticsearch主机地址
      user => "root"                     #用户名
      password => "root123456"           #密码
      index => "warn-log-%{[fields][web_name]}-%{index_time}"
      sniffing => true   #true启用嗅探机制(使用此机制进行elasticsearch集群负载均衡发送log)
      template_overwrite => true   #覆盖logstash默认模板
    }
  }
}
output { #2.log输出到控制台
  stdout {
    codec => rubydebug
  }
}

3.Logstash启动(cd /usr/local/logstash/bin):

[root@localhost bin]# ./logstash -f /usr/local/logstash/scrpit/logstash-start.conf &

五、安装/配置Elasticsearch+Kibana(主机:192.168.233.147):

1.安装elasticsearch:

(1)下载elasticsearch-8.0.0-x86_64.rpm,使用Xftp上传到centos:

https://elasticsearch.cn/download/

(2)使用rpm命令安装:

[root@localhost ~]# rpm -i elasticsearch-8.0.0-x86_64.rpm

(3)修改elasticsearch.yml(cd /etc/elasticsearch/):

network.host: 192.168.233.147
http.port: 9200

(4)配置开机启动:

[root@localhost ~]# systemctl daemon-reload
[root@localhost ~]# systemctl enable elasticsearch.service

(5)启动/停止/重启/查看状态:

[root@localhost ~]# systemctl start elasticsearch.service
[root@localhost ~]# systemctl stop elasticsearch.service
[root@localhost ~]# systemctl restart elasticsearch.service
[root@localhost ~]# systemctl status elasticsearch.service

2.安装Kibana:

(1)下载kibana-8.0.0-x86_64.rpm,使用Xftp上传到centos:

https://elasticsearch.cn/download/

(2)使用rpm命令安装:

[root@localhost ~]# rpm -i kibana-8.0.0-x86_64.rpm

(3)修改kibana.yml(cd /etc/kibana/):

server.port: 5601
server.host: "192.168.233.147"
elasticsearch.hosts: ["http://192.168.233.147:9200"]    #连接elasticsearch
elasticsearch.username: "kibana_system"
elasticsearch.password: "pass"
elasticsearch.pingTimeout: 30000
elasticsearch.requestTimeout: 30000

(4)配置开机启动:

[root@localhost ~]# systemctl daemon-reload
[root@localhost ~]# systemctl enable kibana.service

(5)启动/停止/重启/查看状态:

[root@localhost ~]# systemctl start kibana.service
[root@localhost ~]# systemctl stop kibana.service
[root@localhost ~]# systemctl restart kibana.service
[root@localhost ~]# systemctl status kibana.service

(6)浏览器打开kibana首页:

http://192.168.233.147:5601/app/kibana

(7)在Kibana页面中使用Watcher监控告警日志:

Getting started with Watcher | Elasticsearch Guide [8.0] | Elastichttps://www.elastic.co/guide/en/elasticsearch/reference/current/watcher-getting-started.html

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746068.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号