栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ELK + logback + Kafka实现

ELK + logback + Kafka实现

ELK + logback + Kafka实现
  • ELK介绍
  • 搭建步骤
  • 1、日志消息发送至kafka
    • 1)引入依赖
    • 2)Nacos配置中心
    • 3)自定义日志处理发送至kafka
    • 4)测试用例
    • 5)查看该主题最新的消息
  • 2、logstash订阅kafka消息并转发至elaticsearch
    • 1)资料下载
    • 2)安装logstash
    • 3)配置logstash输入输出规则
  • 3、验证es-head是否有创建索引
  • 4、在Kibana上查看效果

ELK介绍

ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。
以上介绍内容来自:https://www.cnblogs.com/aresxin/p/8035137.html

本文示例说明
本文使用logback将日志内容发送到kafka,由logstash消费kafka的日志消息后发送至elasticsearch中,最后通过使用kibana查看/统计/分析elasticsearch中的日志数据

搭建步骤

Kafka

  • Windows / Linux搭建Zookeeper单机/集群
  • Windows / Linux搭建Kafka单机/集群
  • Windows / Linux安装kafka eagle(非必须)

Elasticsearch

  • Windows / Linux 安装ElasticSearch7.x单机/集群 + IK分词器

Kibana

  • Windows / Linux安装es Head / Kibana

除Logstash外,开始下文内容前请确保上述中间件均安装且启动成功。
本文代码基于SpringCloudAlibaba微服务架构

1、日志消息发送至kafka 1)引入依赖

pom.xml

	
		Hoxton.SR9
		2.3.2.RELEASE
		2.2.6.RELEASE
	
	
	
		org.projectlombok
		lombok
		true
	
        
	
		org.springframework.boot
		spring-boot-starter-web
	
	
	
	
		com.alibaba.cloud
		spring-cloud-starter-alibaba-nacos-config
	
	
	
	
		com.alibaba.cloud
		spring-cloud-starter-alibaba-nacos-discovery
	
	
	
	
		org.springframework.kafka
		spring-kafka
	
2)Nacos配置中心

application.yml

spring:
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
      config:
        server-addr: 127.0.0.1:8848
        namespace: public
        group: manage
  kafka:
    client-id: ${spring.application.name}
    # kafka服务器地址
    bootstrap-servers: 127.0.0.1:9092
    # kafka消费者
    producer:
      # 0:发送则立即返回;1:master收到后返回;all:所有节点同步完成后返回
      acks: 1
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 单个分区批次发送多少数据;16KB
      batch-size: 16384
      # 所有分区可用缓冲区大小,超出则立即发送;32MB
      buffer-memory: 33554432

logging:
  config: http://${spring.cloud.nacos.config.server-addr}/nacos/v1/cs/configs?group=${spring.cloud.nacos.config.group}&tenant=${spring.cloud.nacos.config.namespace}&dataId=logback-spring.xml

logback-spring.xml




    
    
    
    
    
    
    

    
    
        
            %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
        
    

    
        debug
        ${APPLICATION_NAME}
        ${KAFKA_SERVERS}
        ${KAFKA_ACKS}
        ${KAFKA_KEYSERIALIZER}
        ${KAFKA_VALUESERIALIZER}
        ${KAFKA_BATCHSIZE}
        ${KAFKA_BUFFERMEMORY}

		
        
            DEBUG
            ACCEPT
            DENY
        
    

    
        
        
    

3)自定义日志处理发送至kafka

LogbackKafkaAdapter.java

package com.esp.common.adapter;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.Appenderbase;
import lombok.Data;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


@Data
public class LogbackKafkaAdapter extends Appenderbase {

    private String logLevel;

    private String applicationName;

    private String bootstrapServers;

    private String producerAcks;

    private String producerKeySerializer;

    private String producerValueSerializer;

    private Integer producerBatchSize;

    private Long producerBufferMemory;

    
    private Producer producer;

    
    private String topic;

    private static final String ESP_PROJECT_PACKAGE_PREFIX = "com.esp";

    @Override
    public void start() {
        super.start();
        // 构建kafka生产者对象
        Properties properties = new Properties();
        properties.put("client.id", this.applicationName);
        properties.put("bootstrap.servers", this.bootstrapServers);
        properties.put("acks", this.producerAcks);
        properties.put("batch.size", this.producerBatchSize);
        properties.put("buffer.memory", this.producerBufferMemory);
        properties.put("key.serializer", this.producerKeySerializer);
        properties.put("value.serializer", this.producerValueSerializer);
        this.producer = new KafkaProducer<>(properties);
        this.topic = String.format("%s_%slog", this.applicationName, this.logLevel);
    }

    @Override
    protected void append(ILoggingEvent iLoggingEvent) {
        if (!iLoggingEvent.getLoggerName().startsWith(ESP_PROJECT_PACKAGE_PREFIX)) return;

        String msg = iLoggingEvent.getFormattedMessage();
        producer.send(new ProducerRecord(this.topic, msg));
    }

}
4)测试用例
package com.esp.manage.api;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;


@Slf4j
@SpringBootTest
public class Logback2KafkaTest {

    @Test
    public void test() {
        log.debug("发送debug日志到kafka");
    }

}
5)查看该主题最新的消息
[root@jszwjs56ji kafka_2.13-2.8.1]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic manage_debuglog --from-beginning
发送debug日志到kafka
2、logstash订阅kafka消息并转发至elaticsearch 1)资料下载

官网版本选择地址:https://www.elastic.co/cn/downloads/past-releases#logstash

CSDN下载

  • logstash-7.14.2-windows-x86_64.zip(Windows)
  • logstash-7.14.2-linux-x86_64.tar.gz(Linux)
  • logstash-7.14.2-darwin-x86_64.tar.gz(Macos)

官网下载

  • logstash-7.14.2-windows-x86_64.zip(Windows)
  • logstash-7.14.2-linux-x86_64.tar.gz(Linux)
  • logstash-7.14.2-darwin-x86_64.tar.gz(Macos)
2)安装logstash
[root@jszwjs56ji ~]# mkdir /data/logstash && cd /data/logstash
# 下载
[root@jszwjs56ji logstash]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.14.2-linux-x86_64.tar.gz
# 解压
[root@jszwjs56ji logstash]# tar -zxvf logstash-7.14.2-linux-x86_64.tar.gz
[root@jszwjs56ji logstash]# cd logstash-7.14.2/
# 验证插件,需要含有:logstash-input-kafka。默认自带就有
[root@jszwjs56ji logstash-7.14.2]# bin/logstash-plugin list | grep kafka
logstash-integration-kafka
 ├── logstash-input-kafka
 └── logstash-output-kafka


# 测试输入输出;启动完毕后输入任意字符+回车;
[root@jszwjs56ji logstash-7.14.2]# bin/logstash -e 'input { stdin {} } output { stdout {} }'
hello 954l
{
      "@version" => "1",
          "host" => "jszwjs56ji",
    "@timestamp" => 2021-11-24T06:17:04.024Z,
       "message" => "hello 954l"
}
3)配置logstash输入输出规则

参考blog:https://www.freesion.com/article/3448629599/
官网规则说明:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

[root@jszwjs56ji logstash-7.14.2]# cd config/
# 创建自定义conf文件
[root@jszwjs56ji config]# touch kafka-logstash-elaticsearch.conf
# conf文件内容如下:↓

kafka-logstash-elaticsearch.conf

input {
    kafka{
        bootstrap_servers => ["127.0.0.1:9092"]
        client_id => "logstash"
        group_id => "logstash-es"
        auto_offset_reset => "latest"
        consumer_threads => 3    #消费线程数,小于等于分区数
        decorate_events => "true"    #携带额外信息至message,区分主题
        topics => ["manage_debuglog"]    #消费主题
     }
}


filter{
    # 为每个主题构建对应的[@metadata][index]
    mutate {
        add_field => {"[@metadata][index]" => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"}
    }
}

output {
    elasticsearch {
        hosts => ["127.0.0.1:9200"]
        index => "%{[@metadata][index]}"
        timeout => 300
    }
}

启动logstash

[root@jszwjs56ji logstash-7.14.2]# nohup ./bin/logstash -f config/kafka-logstash-elaticsearch.conf &
# 查看启动日志
[root@jszwjs56ji logstash-7.14.2]# tail -f nohup.out
3、验证es-head是否有创建索引

需在都启动完成后再触发一条日志后进行验证,因为上文logstash配置的偏移量是latest

4、在Kibana上查看效果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600923.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号