- ELK介绍
- 搭建步骤
- 1、日志消息发送至kafka
- 1)引入依赖
- 2)Nacos配置中心
- 3)自定义日志处理发送至kafka
- 4)测试用例
- 5)查看该主题最新的消息
- 2、logstash订阅kafka消息并转发至elaticsearch
- 1)资料下载
- 2)安装logstash
- 3)配置logstash输入输出规则
- 3、验证es-head是否有创建索引
- 4、在Kibana上查看效果
ELK是三个开源软件的缩写,分别表示:Elasticsearch , Logstash, Kibana , 它们都是开源软件。
Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能。它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。
Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。
Kibana 也是一个开源和免费的工具,Kibana可以为 Logstash 和 ElasticSearch 提供的日志分析友好的 Web 界面,可以帮助汇总、分析和搜索重要数据日志。
以上介绍内容来自:https://www.cnblogs.com/aresxin/p/8035137.html
本文示例说明
本文使用logback将日志内容发送到kafka,由logstash消费kafka的日志消息后发送至elasticsearch中,最后通过使用kibana查看/统计/分析elasticsearch中的日志数据
Kafka
- Windows / Linux搭建Zookeeper单机/集群
- Windows / Linux搭建Kafka单机/集群
- Windows / Linux安装kafka eagle(非必须)
Elasticsearch
- Windows / Linux 安装ElasticSearch7.x单机/集群 + IK分词器
Kibana
- Windows / Linux安装es Head / Kibana
1、日志消息发送至kafka 1)引入依赖除Logstash外,开始下文内容前请确保上述中间件均安装且启动成功。
本文代码基于SpringCloudAlibaba微服务架构
pom.xml
2)Nacos配置中心Hoxton.SR9 2.3.2.RELEASE 2.2.6.RELEASE org.projectlombok lombok true org.springframework.boot spring-boot-starter-web com.alibaba.cloud spring-cloud-starter-alibaba-nacos-config com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery org.springframework.kafka spring-kafka
application.yml
spring:
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
config:
server-addr: 127.0.0.1:8848
namespace: public
group: manage
kafka:
client-id: ${spring.application.name}
# kafka服务器地址
bootstrap-servers: 127.0.0.1:9092
# kafka消费者
producer:
# 0:发送则立即返回;1:master收到后返回;all:所有节点同步完成后返回
acks: 1
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 单个分区批次发送多少数据;16KB
batch-size: 16384
# 所有分区可用缓冲区大小,超出则立即发送;32MB
buffer-memory: 33554432
logging:
config: http://${spring.cloud.nacos.config.server-addr}/nacos/v1/cs/configs?group=${spring.cloud.nacos.config.group}&tenant=${spring.cloud.nacos.config.namespace}&dataId=logback-spring.xml
logback-spring.xml
3)自定义日志处理发送至kafka%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n debug ${APPLICATION_NAME} ${KAFKA_SERVERS} ${KAFKA_ACKS} ${KAFKA_KEYSERIALIZER} ${KAFKA_VALUESERIALIZER} ${KAFKA_BATCHSIZE} ${KAFKA_BUFFERMEMORY} DEBUG ACCEPT DENY
LogbackKafkaAdapter.java
package com.esp.common.adapter; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.Appenderbase; import lombok.Data; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; @Data public class LogbackKafkaAdapter extends Appenderbase4)测试用例{ private String logLevel; private String applicationName; private String bootstrapServers; private String producerAcks; private String producerKeySerializer; private String producerValueSerializer; private Integer producerBatchSize; private Long producerBufferMemory; private Producer producer; private String topic; private static final String ESP_PROJECT_PACKAGE_PREFIX = "com.esp"; @Override public void start() { super.start(); // 构建kafka生产者对象 Properties properties = new Properties(); properties.put("client.id", this.applicationName); properties.put("bootstrap.servers", this.bootstrapServers); properties.put("acks", this.producerAcks); properties.put("batch.size", this.producerBatchSize); properties.put("buffer.memory", this.producerBufferMemory); properties.put("key.serializer", this.producerKeySerializer); properties.put("value.serializer", this.producerValueSerializer); this.producer = new KafkaProducer<>(properties); this.topic = String.format("%s_%slog", this.applicationName, this.logLevel); } @Override protected void append(ILoggingEvent iLoggingEvent) { if (!iLoggingEvent.getLoggerName().startsWith(ESP_PROJECT_PACKAGE_PREFIX)) return; String msg = iLoggingEvent.getFormattedMessage(); producer.send(new ProducerRecord(this.topic, msg)); } }
package com.esp.manage.api;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
@SpringBootTest
public class Logback2KafkaTest {
@Test
public void test() {
log.debug("发送debug日志到kafka");
}
}
5)查看该主题最新的消息
[root@jszwjs56ji kafka_2.13-2.8.1]# ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic manage_debuglog --from-beginning 发送debug日志到kafka2、logstash订阅kafka消息并转发至elaticsearch 1)资料下载
官网版本选择地址:https://www.elastic.co/cn/downloads/past-releases#logstash
CSDN下载
- logstash-7.14.2-windows-x86_64.zip(Windows)
- logstash-7.14.2-linux-x86_64.tar.gz(Linux)
- logstash-7.14.2-darwin-x86_64.tar.gz(Macos)
官网下载
- logstash-7.14.2-windows-x86_64.zip(Windows)
- logstash-7.14.2-linux-x86_64.tar.gz(Linux)
- logstash-7.14.2-darwin-x86_64.tar.gz(Macos)
[root@jszwjs56ji ~]# mkdir /data/logstash && cd /data/logstash
# 下载
[root@jszwjs56ji logstash]# wget https://artifacts.elastic.co/downloads/logstash/logstash-7.14.2-linux-x86_64.tar.gz
# 解压
[root@jszwjs56ji logstash]# tar -zxvf logstash-7.14.2-linux-x86_64.tar.gz
[root@jszwjs56ji logstash]# cd logstash-7.14.2/
# 验证插件,需要含有:logstash-input-kafka。默认自带就有
[root@jszwjs56ji logstash-7.14.2]# bin/logstash-plugin list | grep kafka
logstash-integration-kafka
├── logstash-input-kafka
└── logstash-output-kafka
# 测试输入输出;启动完毕后输入任意字符+回车;
[root@jszwjs56ji logstash-7.14.2]# bin/logstash -e 'input { stdin {} } output { stdout {} }'
hello 954l
{
"@version" => "1",
"host" => "jszwjs56ji",
"@timestamp" => 2021-11-24T06:17:04.024Z,
"message" => "hello 954l"
}
3)配置logstash输入输出规则
参考blog:https://www.freesion.com/article/3448629599/
官网规则说明:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
[root@jszwjs56ji logstash-7.14.2]# cd config/ # 创建自定义conf文件 [root@jszwjs56ji config]# touch kafka-logstash-elaticsearch.conf # conf文件内容如下:↓
kafka-logstash-elaticsearch.conf
input {
kafka{
bootstrap_servers => ["127.0.0.1:9092"]
client_id => "logstash"
group_id => "logstash-es"
auto_offset_reset => "latest"
consumer_threads => 3 #消费线程数,小于等于分区数
decorate_events => "true" #携带额外信息至message,区分主题
topics => ["manage_debuglog"] #消费主题
}
}
filter{
# 为每个主题构建对应的[@metadata][index]
mutate {
add_field => {"[@metadata][index]" => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "%{[@metadata][index]}"
timeout => 300
}
}
启动logstash
[root@jszwjs56ji logstash-7.14.2]# nohup ./bin/logstash -f config/kafka-logstash-elaticsearch.conf & # 查看启动日志 [root@jszwjs56ji logstash-7.14.2]# tail -f nohup.out3、验证es-head是否有创建索引
4、在Kibana上查看效果需在都启动完成后再触发一条日志后进行验证,因为上文logstash配置的偏移量是latest



