1.1.2 分析检测一个端口的数据,将数据包含codecat发送到一个channel,不包含的发送到另一个channel
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要
发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构,Multiplexing的原理是,根据 event 中 Header 的某个 key 的值,将不同的 event 发送到不同的 Channel中,所以我们需要自定义一个 Interceptor,为不同类型的 event 的 Header 中的 key 赋予不同的值。
- 创建一个 maven 项目,并引入以下依赖
org.apache.flume flume-ng-core 1.9.0 - 定义 TypeInterceptor 类并实现 Interceptor 接口
package com.codecat.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map; public class TypeInterceptor implements Interceptor { // 声明一个存放事件的集合 private ListaddHeaderEvents; @Override public void initialize() { // 初始化存放事件的集合 addHeaderEvents = new ArrayList<>(); } // 单个事件拦截 @Override public Event intercept(Event event) { // 1. 获取事件中的头信息 Map headers = event.getHeaders(); // 2. 获取事件中的body信息 String body = new String(event.getBody()); // 3. 根据body中是否有"codecat"来决定添加怎样的头信息 if (body.contains("codecat")) { // 4. 添加头信息 headers.put("type", "first"); } else { // 4. 添加头信息 headers.put("type", "second"); } return event; } // 批量事件拦截 @Override public List intercept(List list) { // 1. 清空集合 addHeaderEvents.clear(); // 2. 遍历events for (Event event : list) { // 3. 给每个事件添加头信息 addHeaderEvents.add(intercept(event)); } // 4. 返回结果 return addHeaderEvents; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new TypeInterceptor(); } @Override public void configure(Context context) { } } } - 将写好的代码打包,并放到 flume 的 lib 目录下
- 配置文件
- 在 hadoop102 上的 Flume 配置 1 个 netcat source,1 个 sink group(2 个 avro sink),并配置相应的 ChannelSelector 和 interceptor。
# Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.codecat.interceptor.TypeInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.first = c1 a1.sources.r1.selector.mapping.second = c2 # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop103 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = hadoop104 a1.sinks.k2.port = 4242 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Use a channel which buffers events in memory a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
- 在 hadoop103 上的 Flume 配置一个 avro source 和一个 logger sink
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
- 在 hadoop104 上的 Flume 配置一个 avro source 和一个 logger sink
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4242 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
-
启动 flume 进程
[codecat@hadoop103 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/interceptor/flume-flume-logger.conf -Dflume.root.logger=INFO,console [codecat@hadoop104 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/interceptor/flume-flume-logger.conf -Dflume.root.logger=INFO,console [codecat@hadoop102 flume-1.9.0]$ bin/flume-ng agent -c conf/ -n a1 -f jobs/interceptor/flume-netcat-flume.conf
-
在 hadoop102 使用 netcat 向 localhost:44444 发送数据
-
观察 hadoop103 和 hadoop104 打印的日志
1.2.2 分析 1.2.3 实现步骤使用 flume 接收数据,并给每条数据添加前缀,输出到控制台
- 导入 pom 依赖
org.apache.flume flume-ng-core 1.9.0 - 编写代码
package com.codecat.source; import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource; import java.nio.charset.StandardCharsets; import java.util.HashMap; public class MySource extends AbstractSource implements Configurable, PollableSource { // 定义配置文件将来要读取的字段 private Long delay; private String field; @Override public void configure(Context context) { delay = context.getLong("delay"); field = context.getString("filed", "Hello!"); } @Override public Status process() throws EventDeliveryException { try { // 创建事件头信息 HashMapheaderMap = new HashMap<>(); // 创建事件 SimpleEvent event = new SimpleEvent(); // 循环封装事件 for (int i = 0; i < 5; i++) { // 给事件设置头信息 event.setHeaders(headerMap); // 给事件设置内容 event.setBody((field + i).getBytes(StandardCharsets.UTF_8)); // 将事件写入channel getChannelProcessor().processEvent(event); Thread.sleep(delay); } } catch (Exception e) { e.printStackTrace(); return Status.BACKOFF; } return Status.READY; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } } - 将写好的代码打包,并放到 flume 的 lib 目录下
- 配置文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.codecat.source.MySource a1.sources.r1.delay = 1000 #a1.sources.r1.field = codecat # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
- 开启任务
[codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a1 -f jobs/mysource.conf -Dflume.root.logger=INFO,console
- 结果展示
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
1.3.1 需求1.3.2 分析 1.3.3 实现步骤使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台
-
编码
package com.codecat.sink; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { // 创建Logger对象 private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class); private String prefix; private String suffix; @Override public void configure(Context context) { // 读取配置文件内容,有默认值 prefix = context.getString("prefix", "hello"); // 读取配置文件内容,无默认值 suffix = context.getString("suffix"); } @Override public Status process() throws EventDeliveryException { // 声明返回值状态信息 Status status; // 获取当前sink绑定的channel Channel channel = getChannel(); // 获取事务 Transaction transaction = channel.getTransaction(); // 声明事件 Event event; // 开启事务 transaction.begin(); // 读取channel中的事件,直到读取到事件结束循环 while (true) { event = channel.take(); if (event != null) { break; } } try { // 处理事件(打印) LOG.info(prefix + new String(event.getBody()) + suffix); // 事务提交 transaction.commit(); status = Status.READY; } catch (Exception e) { // 遇到异常,事务回滚 transaction.rollback(); status = Status.BACKOFF; } finally { // 关闭事务 transaction.close(); } return status; } } -
将写好的代码打包,并放到flume的 lib 目录下
-
配置文件
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the sink a1.sinks.k1.type = com.codecat.sink.MySink #a1.sinks.k1.prefix = codecat: a1.sinks.k1.suffix = :codecat # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
-
开启任务
[codecat@hadoop102 flume-1.9.0]$ flume-ng agent -c conf/ -n a1 -f jobs/mysink.conf -Dflume.root.logger=INFO,console
-
结果展示
Ganglia 由 gmond、gmetad 和 gweb 三部分组成
- gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。
- gmetad(Ganglia meta Daemon)整合所有信息,并将其以 RRD格式存储至磁盘的服务
- gweb(Ganglia Web)Ganglia 可视化工具,gweb 是一种利用浏览器显示 gmetad所存储数据的 PHP 前端
- 规划
hadoop102: web gmetad gmod hadoop103: gmod hadoop104: gmod
- 在 102 103 104 分别安装 epel-release
[codecat@hadoop102 flume-1.9.0]$ sudo yum -y install epel-release [codecat@hadoop103 flume-1.9.0]$ sudo yum -y install epel-release [codecat@hadoop104 flume-1.9.0]$ sudo yum -y install epel-release
- 在 102 安装
[codecat@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmetad [codecat@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-web [codecat@hadoop102 flume-1.9.0]$ sudo yum -y install ganglia-gmond
- 在 103 和 104 安装
[codecat@hadoop103 flume-1.9.0]$ sudo yum -y install ganglia-gmond [codecat@hadoop104 flume-1.9.0]$ sudo yum -y install ganglia-gmond
提示:selinux 生效需要重启,如果此时不想重启,可以临时生效之:
sudo setenforce 02.1.6 启动 ganglia
- 在 102 103 104 启动
sudo systemctl start gmond
- 在 102 启动
sudo systemctl start httpd sudo systemctl start gmetad
flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console -Dflume.monitoring.type=ganglia -Dflume.monitoring.hosts=hadoop102:86492.2.2 发送数据观察 ganglia 监测图
图例说明
| 字段(图表名称) | 字段含义 |
|---|---|
| EventPutAttemptCount | source 尝试写入 channel 的事件总数量 |
| EventPutSuccessCount | 成功写入 channel 且提交的事件总数量 |
| EventTakeAttemptCount sink | 尝试从 channel 拉取事件的总数量。 |
| EventTakeSuccessCount sink | 成功读取的事件的总数量 |
| StartTime | channel 启动的时间(毫秒) |
| StopTime | channel 停止的时间(毫秒) |
| ChannelSize | 目前 channel 中事件的总数量 |
| ChannelFillPercentage | channel 占用百分比 |
| ChannelCapacity | channel 的容量 |



