将项目中的日志使用log4j打印,然后使用avro方式,收集到flume,最后输出到kafka。flume官方提供了两种方式接受log4j输入源的方式:Log4J Appender 和 Load Balancing Log4J Appender,flume详情可查看官网:Welcome to Apache Flume — Apache Flume。
一、appender
1)Log4J Appender参数解释
| Property Name | Default | Description |
|---|---|---|
| Hostname | – | 使用avro源的flume agent主机名(必填) |
| Port | – | flume agent的avro源的监听端口(必填) |
| UnsafeMode | false | 如果为true,则添加程序不会在发送事件失败时引发异常 |
| AvroReflectionEnabled | false | 使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用) |
| AvroSchemaUrl | – | avro schema的url地址 |
2) Load Balancing Log4J Appender参数解释
| Property Name | Default | Description |
|---|---|---|
| Hosts | – | 使用avro源的flume agent主机名加端口,多个用空格分隔,如:hadoop01:6666 hadoop02:6666 |
| Selector | ROUND_ROBIN | 选择机制。ROUND_ROBIN(轮询)、RANDOM(随机)或自定义FQDN,但必须是从LoadBalancingSelector继承的类。 |
| MaxBackoff | – | 一个long型数值,表示负载平衡客户端将从未能使用事件的节点回退的最长时间(毫秒)。默认为无回退 |
| UnsafeMode | false | 如果为true,则添加程序不会在发送事件失败时引发异常 |
| AvroReflectionEnabled | false | 使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用) |
| AvroSchemaUrl | – | avro schema的url地址 |
生产环境建议使用此种appender,类似这种架构:
二、日志打印类
1)引入pom
org.apache.flume.flume-ng-clients
flume-ng-log4jappender
1.9.0
org.apache.flume
flume-ng-sdk
1.9.0
log4j
log4j
1.2.17
org.slf4j
slf4j-log4j12
1.7.25
test
2)工具类
package com.zstax;
import org.apache.log4j.Logger;
public class Log4jPrinter {
private static final Logger logger= Logger.getLogger(Log4jPrinter.class);
public static void printBuriedLog(String buriedLog) {
logger.info(buriedLog);
}
}
三、log4j.properties配置如下:
1)单个agent
# Log4j Appender log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname=hadoop05 log4j.appender.flume.Port=6666 log4j.appender.flume.UnsafeMode=true log4j.appender.flume.layout = org.apache.log4j.PatternLayout # 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径 log4j.logger.com.zstax.Log4jPrinter = INFO,flume
2)多个agent轮询
log4j.appender.flume2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender log4j.appender.flume2.Hosts = hadoop01:6666 hadoop02:6666 log4j.appender.flume2.Selector = ROUND_ROBIN log4j.appender.flume2.MaxBackoff = 30000 log4j.appender.flume2.UnsafeMode = true log4j.appender.flume2.layout=org.apache.log4j.PatternLayout # 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径 log4j.logger.com.zstax.Log4jPrinter = INFO,flume2
四、flume配置
logger.sources = r1 logger.sinks = k1 logger.channels = c1 # Describe/configure the source logger.sources.r1.type = Avro logger.sources.r1.bind = 0.0.0.0 logger.sources.r1.port = 6666 # Describe the sink logger.sinks.k1.channel=c1 logger.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink logger.sinks.k1.brokerList=hadoop03:6667,hadoop04:6667,hadoop05:6667 logger.sinks.k1.topic=buriedLogger logger.sinks.k1.serializer.class=kafka.serializer.StringEncoder logger.sinks.k1.serializer.appendnewline=false #Spillable Memory Channel logger.channels.c1.type=SPILLABLEMEMORY logger.channels.c1.checkpointDir = /data/flume/checkpoint logger.channels.c1.dataDirs = /data/flume # Bind the source and sink to the channel logger.sources.r1.channels = c1
五、kafka配置
1)创建主题
bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 2 --partitions 3 --topic buriedLogger
2)查看所有主题列表
bin/kafka-topics.sh --list --zookeeper hadoop01:2181
[kafka@hadoop05 kafka-broker]$ bin/kafka-topics.sh --list --zookeeper hadoop01:2181
ATLAS_ENTITIES
ATLAS_HOOK
__consumer_offsets
ambari_kafka_service_check
buriedLogger
3)模拟消费者
bin/kafka-console-consumer.sh --from-beginning --topic buriedLogger --bootstrap-server hadoop03:6667,hadoop04:6667,hadoop05:6667
4)运行打印日志
package com.zstax;
public class TestApp {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20; i++) {
Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
}
Thread.sleep(10000);
for (int i = 20; i < 40; i++) {
Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
}
Thread.sleep(10000);
}
}
可以看到消费者接收到了消息:
日志消息产生了:0 日志消息产生了:3 日志消息产生了:6 日志消息产生了:9 日志消息产生了:12 日志消息产生了:15 日志消息产生了:18 日志消息产生了:2 日志消息产生了:5 日志消息产生了:8 日志消息产生了:11 日志消息产生了:14 日志消息产生了:17 日志消息产生了:1 日志消息产生了:4 日志消息产生了:7 日志消息产生了:10 日志消息产生了:13 日志消息产生了:16 日志消息产生了:19 日志消息产生了:20 日志消息产生了:23 日志消息产生了:26 日志消息产生了:29 日志消息产生了:32 日志消息产生了:35 日志消息产生了:38 日志消息产生了:22 日志消息产生了:25 日志消息产生了:28 日志消息产生了:31 日志消息产生了:34 日志消息产生了:37 日志消息产生了:21 日志消息产生了:24 日志消息产生了:27 日志消息产生了:30 日志消息产生了:33 日志消息产生了:36 日志消息产生了:39



