推送事件(Put)
duPut:将此数据写入临时缓冲区putList
doCommit:检查channel 内存队列是否足够合并
doRollBack:当 channel 内存队列空间不足,回滚数据
拉取事件(Take)
doTake:将数据取到临时缓冲区 takeList
doCommit:如果数据全部发送成功,则清除临时缓冲区 takeList
doRollback:数据发送过程中如果出现异常,rollback 将临时缓冲区 takeList 中的数据归还给 channel 内存队列
1.2 Agent 重要组件Agent 除了 Source、Channel、Sink 外,还有 Channel Selectors、Sink Processors、Interceptors。
1.2.1 Channel SelectorsChannel Selector :选出 Event 将要被发往哪个 Channel。
其共有三种类型, 分别是 Replicating(复制)、Multiplexing(多路复用)、Custom Channel(自定义)。
- Replicating:会将同一个 Event 发往所有的 Channe,默认项
- Multiplexing:会根据相应的原则,将不同的 Event 发往不同的 Channel
- Custom Channel:需要自定义实现 ChannelSelector 接口
例子:
# Replicating Channel a1.sources = r1 a1.channels = c1 c2 c3 a1.sources.r1.selector.type = replicating a1.sources.r1.channels = c1 c2 c3 a1.sources.r1.selector.optional = c3
# Multiplexing Channel a1.sources = r1 a1.channels = c1 c2 c3 c4 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 c3 a1.sources.r1.selector.default = c4
# 自定义 Channel a1.sources = r1 a1.channels = c1 a1.sources.r1.selector.type = org.example.MyChannelSelector1.2.2 Sink Processors
SinkProcessor 共 有 三四种 类 型 , 分 别 是 Default Sink Processor、FailoverSinkProcessor(故障转移)、 LoadBalancingSinkProcessor(负载均衡接)、Custom Sink Processor(自定义)。
- Default Sink Processor:只接受一个 Sink
- Failover Sink Processor:维护一个按优先级排序的接收器列表,确保只要有一个可用的事件就会被处理(交付)。
- LoadBalancing Sink Processor:负载均衡接收器处理器提供了在多个接收器上负载平衡流的能力。
例子:
# Failover Sink Processor a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000
# Load balancing Sink Processor a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = random
1.2.3 InterceptorsLoadBalancing Sink Processor 和 FailoverSinkProcessor 对应的是 Sink Group
Interceptors :Flume具有在运行中修改/删除事件的能力,这就需要在拦截器的帮助下完成。通常一个拦截器返回的事件列表被传递给下一个拦截器。
常用的拦截器:
- Timestamp Interceptor:这个拦截器在事件头中插入它处理事件的时间(以毫秒为单位)。
- Host Interceptor:这个拦截器插入运行此代理的主机的主机名或IP地址。
例子:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = host
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname
a1.sources.r1.interceptors.i2.type = timestamp
a1.sinks.k1.filePrefix = FlumeData.%{CollectorHost}.%Y-%m-%d
a1.sinks.k1.channel = c1
2. 拓扑结构
2.1 串行模式
2.1.1 模型结构
该模式是将多个 Flumev串联起来,为了跨越多个代理或跃点传输数据,前一个代理的 Sink 和当前跃点的 Source 需要为avro类型,且 Sink 指向 Source 的主机名(或IP地址)和端口。
2.1.2 串行实现Flume 的数量会影响传输速率,如果其中某个节点宕机,会影响整个传输系统。
1️⃣ Step1:node1主机,添加配置文件 tailDir2avro.conf
# Name the components on this agent taildir2avro.sources = r1 taildir2avro.sinks = k1 taildir2avro.channels = c1 # Describe/configure the source taildir2avro.sources.r1.type = TAILDIR taildir2avro.sources.r1.filegroups = g1 g2 taildir2avro.sources.r1.filegroups.g1 = /root/log.txt taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2avro.sinks.k1.type = avro taildir2avro.sinks.k1.hostname = node2 taildir2avro.sinks.k1.port = 12345 # Use a channel which buffers events in memory taildir2avro.channels.c1.type = memory taildir2avro.channels.c1.capacity = 30000 taildir2avro.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel taildir2avro.sources.r1.channels = c1 taildir2avro.sinks.k1.channel = c1
2️⃣ Step2:node2 主机,添加配置文件 avro2hdfs.conf
# Name the components on this agent avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = node2 avro2hdfs.sources.r1.port = 12345 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M avro2hdfs.sinks.k1.hdfs.fileType = DataStream avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true avro2hdfs.sinks.k1.hdfs.rollInterval = 300 avro2hdfs.sinks.k1.hdfs.rollSize = 130000000 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory avro2hdfs.channels.c1.capacity = 30000 avro2hdfs.channels.c1.transactionCapacity = 3000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
3️⃣ Step3:启动 node2 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/avro2hdfs.conf -n avro2hdfs -Dflume.root.logger=INFO,console
4️⃣ Step4:查看node2 的 12345 端口是否被监听,确定已在监听状态
5️⃣ Step5:运行一会 node1 的 printer.sh 脚本,启动 node1 的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2avro.conf -n taildir2avro -Dflume.root.logger=INFO,console
6️⃣ Step6:查看 node2 控制台输出,node1 以成功向 node2 的12345端口发送数据了
7️⃣ Step7:查看web端是否有数据提交
2.2 复制和多路复用 2.2.1 模型结构Flume支持将事件流向一个或多个目的地,这种模式可以将相同数据源复制到多个channel中,或将不同数据分发到不同的channel中,sink可以选择传送到不同的目的地。
2.2.2 模型实现实现: node1 上实现两种 channel、sink,一种将数据上传至 hdfs,另一种发送给 node2,node2再上传一份。
1️⃣ Step1:在 node1 上添加配置文件 taildir2hdfs_avro.conf
# Name the components on this agent taildir2hdfs_avro.sources = r1 taildir2hdfs_avro.sinks = k1 k2 taildir2hdfs_avro.channels = c1 c2 # Describe/configure the source taildir2hdfs_avro.sources.r1.type = TAILDIR taildir2hdfs_avro.sources.r1.filegroups = g1 g2 taildir2hdfs_avro.sources.r1.filegroups.g1 = /root/log.txt taildir2hdfs_avro.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2hdfs_avro.sinks.k1.type = hdfs taildir2hdfs_avro.sinks.k1.hdfs.path = hdfs://node1:8020/flume/node1/%y-%m-%d-%H-%M taildir2hdfs_avro.sinks.k1.hdfs.fileType = DataStream taildir2hdfs_avro.sinks.k1.hdfs.writeFormat = Text taildir2hdfs_avro.sinks.k1.hdfs.useLocalTimeStamp = true taildir2hdfs_avro.sinks.k1.hdfs.rollInterval = 300 taildir2hdfs_avro.sinks.k1.hdfs.rollSize = 30000000 taildir2hdfs_avro.sinks.k1.hdfs.rollCount = 0 taildir2hdfs_avro.sinks.k2.type = avro taildir2hdfs_avro.sinks.k2.hostname = node2 taildir2hdfs_avro.sinks.k2.port = 12345 # Use a channel which buffers events in memory taildir2hdfs_avro.channels.c1.type = memory taildir2hdfs_avro.channels.c1.capacity = 10000 taildir2hdfs_avro.channels.c1.transactionCapacity = 1000 taildir2hdfs_avro.channels.c2.type = memory taildir2hdfs_avro.channels.c2.capacity = 10000 taildir2hdfs_avro.channels.c2.transactionCapacity = 1000 # Bind the source and sink to the channel taildir2hdfs_avro.sources.r1.channels = c1 c2 taildir2hdfs_avro.sinks.k1.channel = c1 taildir2hdfs_avro.sinks.k2.channel = c2
2️⃣ Step2:在 node2 上添加配置文件 avro2hdfs.conf
# Name the components on this agent avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = node2 avro2hdfs.sources.r1.port = 12345 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/node2/%y-%m-%d-%H-%M avro2hdfs.sinks.k1.hdfs.fileType = DataStream avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true avro2hdfs.sinks.k1.hdfs.rollInterval = 300 avro2hdfs.sinks.k1.hdfs.rollSize = 30000000 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory avro2hdfs.channels.c1.capacity = 10000 avro2hdfs.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
3️⃣ Step3:启动 node2 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/avro2hdfs.conf -n avro2hdfs -Dflume.root.logger=INFO,console
4️⃣ Step4:启动 node1 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2hdfs_avro.conf -n taildir2hdfs_avro -Dflume.root.logger=INFO,console
5️⃣ Step5:在web端查看是否有文件上传
2.3 负载均衡和故障转移 2.3.1 模型结构 2.3.2 模型实现实现: node1 向 node2、node3 发送数据,node2、node3 负责上传数据
1️⃣ Step1:在 node1 上添加配置文件 taildir2avro.conf
# Name the components on this agent taildir2avro.sources = r1 taildir2avro.sinks = k1 k2 taildir2avro.channels = c1 c2 # Describe/configure the source taildir2avro.sources.r1.type = TAILDIR taildir2avro.sources.r1.filegroups = g1 g2 taildir2avro.sources.r1.filegroups.g1 = /root/log.txt taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2avro.sinks.k1.type = avro taildir2avro.sinks.k1.hostname = node2 taildir2avro.sinks.k1.port = 12345 taildir2avro.sinks.k2.type = avro taildir2avro.sinks.k2.hostname = node3 taildir2avro.sinks.k2.port = 12345 # Use a channel which buffers events in memory taildir2avro.channels.c1.type = memory taildir2avro.channels.c1.capacity = 10000 taildir2avro.channels.c1.transactionCapacity = 1000 taildir2avro.channels.c2.type = memory taildir2avro.channels.c2.capacity = 10000 taildir2avro.channels.c2.transactionCapacity = 1000 # Bind the source and sink to the channel taildir2avro.sources.r1.channels = c1 c2 taildir2avro.sinks.k1.channel = c1 taildir2avro.sinks.k2.channel = c2
2️⃣ Step2:在 node2 上添加配置文件 avro2hdfs.conf
# Name the components on this agent avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = node2 avro2hdfs.sources.r1.port = 12345 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/node2/%y-%m-%d-%H-%M avro2hdfs.sinks.k1.hdfs.fileType = DataStream avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true avro2hdfs.sinks.k1.hdfs.rollInterval = 300 avro2hdfs.sinks.k1.hdfs.rollSize = 30000000 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory avro2hdfs.channels.c1.capacity = 10000 avro2hdfs.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
3️⃣ Step3:在 node3 上添加配置文件 avro2hdfs.conf
# Name the components on this agent avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = node3 avro2hdfs.sources.r1.port = 12345 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/node3/%y-%m-%d-%H-%M avro2hdfs.sinks.k1.hdfs.fileType = DataStream avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true avro2hdfs.sinks.k1.hdfs.rollInterval = 300 avro2hdfs.sinks.k1.hdfs.rollSize = 30000000 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory avro2hdfs.channels.c1.capacity = 10000 avro2hdfs.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
4️⃣ Step4:启动 node2、node3 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/avro2hdfs.conf -n avro2hdfs -Dflume.root.logger=INFO,console
5️⃣ Step5:启动 node1 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2avro.conf -n taildir2avro -Dflume.root.logger=INFO,console
6️⃣ Step6:控制台查看日志输出
7️⃣ Step7:在web端查看是否有文件上传
2.4 聚合模式 2.4.1 模型结构这是比较实用的结构,在Flume中通过配置许多带有avro接收器的第一层代理来实现,这些代理都指向单个代理的avro源(同样,您可以在这种场景中使用节约源/接收器/客户端)。第二层代理上的这个源将接收到的事件合并为单个通道,该通道由接收器消费到其最终目的地。
web应用通常分布在上百个服务器。产生的日志,处理起来也非常麻烦。用flume的这种组合方式能很好的解决这一问题,每台服务器部署一个flume采集日志,传送到一个集中收集日志的flume,再由此flume上传到hdfs、hive、hbase等,进行日志分析。
2.4.2 模型实现实现: node1、node2 都向 node3 发送数据,由 node3 负责向 hdfs 上传数据
1️⃣ Step1:在 node1 上添加配置文件 taildir2avro.conf
# Name the components on this agent taildir2avro.sources = r1 taildir2avro.sinks = k1 taildir2avro.channels = c1 # Describe/configure the source taildir2avro.sources.r1.type = TAILDIR taildir2avro.sources.r1.filegroups = g1 g2 taildir2avro.sources.r1.filegroups.g1 = /root/log.txt taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2avro.sinks.k1.type = avro taildir2avro.sinks.k1.hostname = node3 taildir2avro.sinks.k1.port = 12345 # Use a channel which buffers events in memory taildir2avro.channels.c1.type = memory taildir2avro.channels.c1.capacity = 10000 taildir2avro.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel taildir2avro.sources.r1.channels = c1 taildir2avro.sinks.k1.channel = c1
2️⃣ Step2:在 node2 上添加配置文件 taildir2avro.conf
# Name the components on this agent taildir2avro.sources = r1 taildir2avro.sinks = k1 taildir2avro.channels = c1 # Describe/configure the source taildir2avro.sources.r1.type = TAILDIR taildir2avro.sources.r1.filegroups = g1 g2 taildir2avro.sources.r1.filegroups.g1 = /root/log.txt taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt # Describe the sink taildir2avro.sinks.k1.type = avro taildir2avro.sinks.k1.hostname = node3 taildir2avro.sinks.k1.port = 12345 # Use a channel which buffers events in memory taildir2avro.channels.c1.type = memory taildir2avro.channels.c1.capacity = 10000 taildir2avro.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel taildir2avro.sources.r1.channels = c1 taildir2avro.sinks.k1.channel = c1
3️⃣ Step3:在 node3 上添加配置文件 avro2hdfs.conf
# Name the components on this agent avro2hdfs.sources = r1 avro2hdfs.sinks = k1 avro2hdfs.channels = c1 # Describe/configure the source avro2hdfs.sources.r1.type = avro avro2hdfs.sources.r1.bind = node3 avro2hdfs.sources.r1.port = 12345 # Describe the sink avro2hdfs.sinks.k1.type = hdfs avro2hdfs.sinks.k1.hdfs.path = hdfs://node1:8020/flume/%y-%m-%d-%H-%M avro2hdfs.sinks.k1.hdfs.fileType = DataStream avro2hdfs.sinks.k1.hdfs.writeFormat = Text avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true avro2hdfs.sinks.k1.hdfs.rollInterval = 300 avro2hdfs.sinks.k1.hdfs.rollSize = 30000000 avro2hdfs.sinks.k1.hdfs.rollCount = 0 # Use a channel which buffers events in memory avro2hdfs.channels.c1.type = memory avro2hdfs.channels.c1.capacity = 10000 avro2hdfs.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel avro2hdfs.sources.r1.channels = c1 avro2hdfs.sinks.k1.channel = c1
4️⃣ Step4:启动 node3 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/avro2hdfs.conf -n avro2hdfs -Dflume.root.logger=INFO,console
5️⃣ Step5:启动 node1、node2 上的 Flume
flume-ng agent -c /opt/flume-1.9.0/conf/ -f /opt/flume-1.9.0/agents/taildir2avro.conf -n taildir2avro -Dflume.root.logger=INFO,console
6️⃣ Step6:控制台查看日志输出
7️⃣ Step7:在web端查看是否有文件上传
3. 写在最后配置文件大致一样,每次测试时都先向监控的目录或文件下写点东西,每测试完清空hdfs上的文件。
❤️ END ❤️



