栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

log4j+flume+kafka实时日志处理

log4j+flume+kafka实时日志处理

        将项目中的日志使用log4j打印,然后使用avro方式,收集到flume,最后输出到kafka。flume官方提供了两种方式接受log4j输入源的方式:Log4J Appender 和 Load Balancing Log4J Appender,flume详情可查看官网:Welcome to Apache Flume — Apache Flume。

一、appender

1)Log4J Appender参数解释

Property NameDefaultDescription
Hostname使用avro源的flume agent主机名(必填)
Portflume agent的avro源的监听端口(必填)
UnsafeModefalse如果为true,则添加程序不会在发送事件失败时引发异常
AvroReflectionEnabledfalse使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用)
AvroSchemaUrlavro schema的url地址

2) Load Balancing Log4J Appender参数解释

Property NameDefaultDescription
Hosts使用avro源的flume agent主机名加端口,多个用空格分隔,如:hadoop01:6666 hadoop02:6666
SelectorROUND_ROBIN选择机制。ROUND_ROBIN(轮询)、RANDOM(随机)或自定义FQDN,但必须是从LoadBalancingSelector继承的类。
MaxBackoff一个long型数值,表示负载平衡客户端将从未能使用事件的节点回退的最长时间(毫秒)。默认为无回退
UnsafeModefalse如果为true,则添加程序不会在发送事件失败时引发异常
AvroReflectionEnabledfalse使用Avro反射来序列化Log4j事件。(当用户记录字符串时不要使用)
AvroSchemaUrlavro schema的url地址

生产环境建议使用此种appender,类似这种架构:

二、日志打印类

1)引入pom

    
      org.apache.flume.flume-ng-clients
      flume-ng-log4jappender
      1.9.0
    
    
    
      org.apache.flume
      flume-ng-sdk
      1.9.0
    

    
    
      log4j
      log4j
      1.2.17
    

    
    
      org.slf4j
      slf4j-log4j12
      1.7.25
      test
    

2)工具类

package com.zstax;

import org.apache.log4j.Logger;


public class Log4jPrinter {
    private static final Logger logger= Logger.getLogger(Log4jPrinter.class);

    
    public static void printBuriedLog(String buriedLog) {
        logger.info(buriedLog);
    }

}

三、log4j.properties配置如下:

1)单个agent

# Log4j Appender
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=hadoop05
log4j.appender.flume.Port=6666
log4j.appender.flume.UnsafeMode=true
log4j.appender.flume.layout = org.apache.log4j.PatternLayout 
# 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径
log4j.logger.com.zstax.Log4jPrinter = INFO,flume

 2)多个agent轮询

log4j.appender.flume2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.flume2.Hosts = hadoop01:6666 hadoop02:6666
log4j.appender.flume2.Selector = ROUND_ROBIN
log4j.appender.flume2.MaxBackoff = 30000
log4j.appender.flume2.UnsafeMode = true
log4j.appender.flume2.layout=org.apache.log4j.PatternLayout
# 这里的com.zstax.Log4jPrinter为项目中打印日志类的路径
log4j.logger.com.zstax.Log4jPrinter = INFO,flume2

四、flume配置

logger.sources = r1
logger.sinks = k1
logger.channels = c1

# Describe/configure the source
logger.sources.r1.type = Avro
logger.sources.r1.bind = 0.0.0.0
logger.sources.r1.port = 6666

# Describe the sink
logger.sinks.k1.channel=c1
logger.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
logger.sinks.k1.brokerList=hadoop03:6667,hadoop04:6667,hadoop05:6667
logger.sinks.k1.topic=buriedLogger
logger.sinks.k1.serializer.class=kafka.serializer.StringEncoder
logger.sinks.k1.serializer.appendnewline=false

#Spillable Memory Channel
logger.channels.c1.type=SPILLABLEMEMORY
logger.channels.c1.checkpointDir = /data/flume/checkpoint
logger.channels.c1.dataDirs = /data/flume

# Bind the source and sink to the channel
logger.sources.r1.channels = c1

五、kafka配置

1)创建主题

bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 2 --partitions 3 --topic buriedLogger

 2)查看所有主题列表

bin/kafka-topics.sh --list --zookeeper hadoop01:2181

[kafka@hadoop05 kafka-broker]$ bin/kafka-topics.sh --list --zookeeper hadoop01:2181
ATLAS_ENTITIES
ATLAS_HOOK
__consumer_offsets
ambari_kafka_service_check
buriedLogger

3)模拟消费者

bin/kafka-console-consumer.sh --from-beginning --topic buriedLogger --bootstrap-server hadoop03:6667,hadoop04:6667,hadoop05:6667

4)运行打印日志

package com.zstax;


public class TestApp {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
        }
        Thread.sleep(10000);
        for (int i = 20; i < 40; i++) {
            Log4jPrinter.printBuriedLog("日志消息产生了:"+i);
        }
        Thread.sleep(10000);
    }
}

 可以看到消费者接收到了消息:

日志消息产生了:0
日志消息产生了:3
日志消息产生了:6
日志消息产生了:9
日志消息产生了:12
日志消息产生了:15
日志消息产生了:18
日志消息产生了:2
日志消息产生了:5
日志消息产生了:8
日志消息产生了:11
日志消息产生了:14
日志消息产生了:17
日志消息产生了:1
日志消息产生了:4
日志消息产生了:7
日志消息产生了:10
日志消息产生了:13
日志消息产生了:16
日志消息产生了:19
日志消息产生了:20
日志消息产生了:23
日志消息产生了:26
日志消息产生了:29
日志消息产生了:32
日志消息产生了:35
日志消息产生了:38
日志消息产生了:22
日志消息产生了:25
日志消息产生了:28
日志消息产生了:31
日志消息产生了:34
日志消息产生了:37
日志消息产生了:21
日志消息产生了:24
日志消息产生了:27
日志消息产生了:30
日志消息产生了:33
日志消息产生了:36
日志消息产生了:39

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774602.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号