[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sRICBtiN-1638758418070)(F:大数据图片7.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RiqicdfM-1638758418071)(F:大数据图片8.png)]
# taildir读取同目录中的多个日志文件 将数据发送到hdfs # 声明组件名称 taildir2hdfs.sources = r1 taildir2hdfs.sinks = k1 taildir2hdfs.channels = c1 # Describe/configure the source taildir2hdfs.sources.r1.type = TAILDIR taildir2hdfs.sources.r1.filegroups = g1 g2 g3 taildir2hdfs.sources.r1.filegroups.g1 = /root/log.txt taildir2hdfs.sources.r1.filegroups.g2 = /root/log/.*.txt taildir2hdfs.sources.r1.filegroups.g3 = /opt/2.txt # Describe the sink taildir2hdfs.sinks.k1.type = hdfs taildir2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M # 将hdfs输出文件类型改为数据流 taildir2hdfs.sinks.k1.hdfs.fileType = DataStream # 输出格式化改为文本格式 taildir2hdfs.sinks.k1.hdfs.writeFormat = Text #设置使用本地时间戳 taildir2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true # 设置文件的滚动条件 # 设置滚动周期 单位s taildir2hdfs.sinks.k1.hdfs.rollInterval = 300 # 设置文件达到指定大小 taildir2hdfs.sinks.k1.hdfs.rollSize = 130000000 # 设置文件中写入多少条记录 taildir2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory taildir2hdfs.channels.c1.type = memory # 设置 memory channel可以保存的最大event数 # 设置依据, 需要估算channel可能缓存的数据量 # 与agent可用Jvm内存 # 可以在flume-ng脚本中配置agent的JVM参数增大内存 # 20000~200000 taildir2hdfs.channels.c1.capacity = 30000 taildir2hdfs.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel taildir2hdfs.sources.r1.channels = c1 taildir2hdfs.sinks.k1.channel = c12、多数据源汇总
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XLZc4FGk-1638758418072)(F:大数据图片9.png)]
zhaohui01最为服务端,向hdfs集群,配置文件
#avro读取同目录中的多个日志文件 将数据发送到hdfs # 声明组件名称 avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = zhaohui01 avro2hdfs.sources.r1.port = 12345 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M # 将hdfs输出文件类型改为数据流 avro2hdfs.sinks.k1.hdfs.fileType = DataStream # 输出格式化改为文本格式 avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true # 设置文件的滚动条件 # 设置滚动周期 单位s avro2hdfs.sinks.k1.hdfs.rollInterval = 300 # 设置文件达到指定大小 avro2hdfs.sinks.k1.hdfs.rollSize = 130000000 # 设置文件中写入多少条记录 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory # 设置 memory channel可以保存的最大event数 # 设置依据, 需要估算channel可能缓存的数据量 # 与agent可用Jvm内存 # 可以在flume-ng脚本中配置agent的JVM参数增大内存 # 20000~200000 avro2hdfs.channels.c1.capacity = 30000 avro2hdfs.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
zhaohui02 zhaohui03作为客户端连接到zhaohui01
# taildir读取同目录中的多个日志文件 将数据发送到avro # 声明组件名称 taildir2avro.sources = r1 taildir2avro.sinks = k1 taildir2avro.channels = c1 # Describe/configure the source taildir2avro.sources.r1.type = TAILDIR taildir2avro.sources.r1.filegroups = g1 g2 g3 taildir2avro.sources.r1.filegroups.g1 = /root/log.txt taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt taildir2avro.sources.r1.filegroups.g3 = /opt/2.txt # Describe the sink taildir2avro.sinks.k1.type = avro taildir2avro.sinks.k1.hostname = zhaohui01 taildir2avro.sinks.k1.port = 12345 # Use a channel which buffers events in memory taildir2avro.channels.c1.type = memory # 设置 memory channel可以保存的最大event数 # 设置依据, 需要估算channel可能缓存的数据量 # 与agent可用Jvm内存 # 可以在flume-ng脚本中配置agent的JVM参数增大内存 # 20000~200000 taildir2avro.channels.c1.capacity = 30000 taildir2avro.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel taildir2avro.sources.r1.channels = c1 taildir2avro.sinks.k1.channel = c1
启动命令
服务端首先启动
flume-ng agent -n avro2hdfs -c /opt/flume-1.9/conf/ -f /opt/flume-1.9/agents/avro2hdfs.conf -Dflume.root.logger=INFO,console
客户端启动命令
flume-ng agent -n taildir2avro -c /opt/flume-1.9/conf/ -f /opt/flume-1.9/agents/taildir2avro.conf -Dflume.root.logger=INFO,console3、单数据源多出口案例(复制/选择) 复制
需求分析: zhaohui01 监控文件信息,将文件变化一份发送到 zhaohui02 上传到HDFS,将一份文件发送到 zhaohui03 保存到本地
1、在zhaohui02中配置 /opt/flume-1.9/agents/avro2hdfs.conf 文件
#avro读取同目录中的多个日志文件 将数据发送到hdfs # 声明组件名称 avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = zhaohui01 avro2hdfs.sources.r1.port = 121212 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M # 将hdfs输出文件类型改为数据流 avro2hdfs.sinks.k1.hdfs.fileType = DataStream # 输出格式化改为文本格式 avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true # 设置文件的滚动条件 # 设置滚动周期 单位s avro2hdfs.sinks.k1.hdfs.rollInterval = 300 # 设置文件达到指定大小 avro2hdfs.sinks.k1.hdfs.rollSize = 130000000 # 设置文件中写入多少条记录 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory # 设置 memory channel可以保存的最大event数 # 设置依据, 需要估算channel可能缓存的数据量 # 与agent可用Jvm内存 # 可以在flume-ng脚本中配置agent的JVM参数增大内存 # 20000~200000 avro2hdfs.channels.c1.capacity = 30000 avro2hdfs.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
2、在zhaohui03中配置 /opt/flume-1.9/agents/avro2hdfs.conf 文件,保存到本地
#avro读取同目录中的多个日志文件 将数据发送到localhost # 声明组件名称 avro2localhost.sources = r1 avro2localhost.sinks = k1 avro2localhost.channels = c2 # Describe/configure the source avro2localhost.sources.r1.type = avro avro2localhost.sources.r1.bind = zhaohui01 avro2localhost.sources.r1.port = 121213 # Describe the sink avro2localhost.sinks.k1.type = file_roll avro2localhost.sinks.k1.sink.directory = /opt/flume-1.9/agents/log # Use a channel which buffers events in memory avro2localhost.channels.c2.type = memory # 设置 memory channel可以保存的最大event数 # 设置依据, 需要估算channel可能缓存的数据量 # 与agent可用Jvm内存 # 可以在flume-ng脚本中配置agent的JVM参数增大内存 # 20000~200000 avro2localhost.channels.c2.capacity = 30000 avro2localhost.channels.c2.transactionCapacity = 3000 # Bind the source and sink to the channel avro2localhost.sources.r1.channels = c2 avro2localhost.sinks.k1.channel = c2
3、zhaohui01作为客户端向服务端发送
在zhaohui01中配置
# taildir读取同目录中的多个日志文件 将数据发送到avro
# 声明组件名称
taildir2avro.sources = r1
taildir2avro.channels = c1 c2
taildir2avro.sinks = k1 k2
# Describe/configure the source
taildir2avro.sources.r1.type = TAILDIR
taildir2avro.sources.r1.filegroups = f1
taildir2avro.sources.r1.filegroups.f1 = /root
public class TypeInterceptor implements Interceptor {
//声明一个存放事件的集合
private List addHeaderEvents;
public void initialize() {
//初始化
addHeaderEvents = new ArrayList();
}
//单个事件拦截
public Event intercept(Event event) {
//1、获取事件中的头信息
Map headers = event.getHeaders();
//2、获取事件中的body信息
String body = new String(event.getBody());
//3、根据body中是否有hello来决定添加怎样的头信息
if (body.contains("hello")){
//4、添加头信息
headers.put("type","zch");
}else {
//5、添加头信息
headers.put("type","others");
}
return event;
}
//批量事件拦截
public List intercept(List events) {
//1、清空集合
addHeaderEvents.clear();
//2、遍历events
for (Event event : events) {
//3、给每一个事件添加头信息
addHeaderEvents.add(intercept(event));
}
//返回结果
return addHeaderEvents;
}
public void close() {
}
public static class Builder implements Interceptor.Builder{
public Interceptor build() {
return new TypeInterceptor();
}
public void configure(Context context) {
}
}
}
2、编写 zhaohui01 的 /opt/flume-1.9/agents/taildir2dvro_interceptor.conf
# taildir读取同目录中的多个日志文件 将数据发送到avro # 声明组件名称 taildir2avro.sources = r1 taildir2avro.channels = c1 c2 taildir2avro.sinks = k1 k2 # Describe/configure the source taildir2avro.sources.r1.type = TAILDIR taildir2avro.sources.r1.filegroups = f1 taildir2avro.sources.r1.filegroups.f1 = /root/*.txt # Interceptor # com.zch.interceptor.TypeInterceptor 为java类名的全路径 # $Builder 为拦截器内部类的名字 taildir2avro.sources.r1.interceptors = i1 taildir2avro.sources.r1.interceptors.i1.type = com.zch.interceptor.TypeInterceptor$Builder # Channel Selector # taildir2avro.sources.r1.selector.header 为java拦截器的header的名字 taildir2avro.sources.r1.selector.type = multiplexing taildir2avro.sources.r1.selector.header = type taildir2avro.sources.r1.selector.mapping.zch = c1 taildir2avro.sources.r1.selector.mapping.others = c2 # Describe the sink taildir2avro.sinks.k1.type = avro taildir2avro.sinks.k1.hostname = zhaohui01 taildir2avro.sinks.k1.port = 121212 taildir2avro.sinks.k2.type = avro taildir2avro.sinks.k2.hostname = zhaohui01 taildir2avro.sinks.k2.port = 121213 # Use a channel which buffers events in memory taildir2avro.channels.c1.type = memory taildir2avro.channels.c1.capacity = 30000 taildir2avro.channels.c1.transactionCapacity = 3000 taildir2avro.channels.c2.type = memory taildir2avro.channels.c2.capacity = 30000 taildir2avro.channels.c2.transactionCapacity = 3000 # Bind the source and sink to the channel taildir2avro.sources.r1.channels = c1 c2 taildir2avro.sinks.k1.channel = c1 taildir2avro.sinks.k2.channel = c23、编写zhaohui02 zhaohui03 的配置文件
/opt/flume-1.9/agents/avro2hdfs_interceptor.conf
zhaohui02
#avro读取同目录中的多个日志文件 将数据发送到hdfs # 声明组件名称 avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = zhaohui01 avro2hdfs.sources.r1.port = 121212 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M # 将hdfs输出文件类型改为数据流 avro2hdfs.sinks.k1.hdfs.fileType = DataStream # 输出格式化改为文本格式 avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true # 设置文件的滚动条件 # 设置滚动周期 单位s avro2hdfs.sinks.k1.hdfs.rollInterval = 300 # 设置文件达到指定大小 avro2hdfs.sinks.k1.hdfs.rollSize = 130000000 # 设置文件中写入多少条记录 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory # 设置 memory channel可以保存的最大event数 # 设置依据, 需要估算channel可能缓存的数据量 # 与agent可用Jvm内存 # 可以在flume-ng脚本中配置agent的JVM参数增大内存 # 20000~200000 avro2hdfs.channels.c1.capacity = 30000 avro2hdfs.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
/opt/flume-1.9/agents/avro2localhost_interceptor.conf
zhaohui03
#avro读取同目录中的多个日志文件 将数据发送到localhost # 声明组件名称 avro2localhost.sources = r1 avro2localhost.sinks = k1 avro2localhost.channels = c2 # Describe/configure the source avro2localhost.sources.r1.type = avro avro2localhost.sources.r1.bind = zhaohui01 avro2localhost.sources.r1.port = 121213 # Describe the sink avro2localhost.sinks.k1.type = file_roll avro2localhost.sinks.k1.sink.directory = /opt/flume-1.9/agents/log # Use a channel which buffers events in memory avro2localhost.channels.c2.type = memory # 设置 memory channel可以保存的最大event数 # 设置依据, 需要估算channel可能缓存的数据量 # 与agent可用Jvm内存 # 可以在flume-ng脚本中配置agent的JVM参数增大内存 # 20000~200000 avro2localhost.channels.c2.capacity = 30000 avro2localhost.channels.c2.transactionCapacity = 3000 # Bind the source and sink to the channel avro2localhost.sources.r1.channels = c2 avro2localhost.sinks.k1.channel = c23、启动服务
先启动服务端 zhaohui02 zhaohui03
在启动客户端 zhaohui01
4、后台运行程序命令后台启动命令,静默不输出内容
nohup flume-ng agent --name a1 --conf /opt/flume-1.9/conf/ --conf-file /opt/flume-1.9/agents/agent_test_1.conf -Dflume.root.logger=INFO,console &



