flume安装部署
官网地址安装部署 flume监控端数据
安装netcat工具判断44444端口是否被占用创建Flume Agent配置文件flume-netcat-logger.conf开启flume监听端口使用netcat工具向本机的44444端口发送内容观察flume监听页面接受数据的情况 实时监控单个追加文件
引入相关jar包创建Flume Agent配置文件flume-file-hdfs.conf运行flume 实时监控目录下多个新文件
创建Flume Agent配置文件flume-dir-hdfs.conf启动监控文件夹命令向 upload 文件夹中添加文件测试spooldir说明 实时监控目录下的多个追加文件
创建Flume Agent配置文件flume-taildir-hdfs.conf启动监控文件夹命令向files文件夹中追加内容测试taildir说明 Flume自定义Source
自定义Source概述引入maven依赖自定义Source,继承AbstractSource类并实现Configurable和PollableSource接口使用maven打包,上传jar包到flume的lib目录下创建flume配置文件启动flume进程 Flume自定义Sink
自定义Sink概述引入maven依赖自定义Sink,继承AbstractSink类并实现Configurable接口使用maven打包,上传jar包到flume的lib目录下创建flume配置文件启动flume进程 Flume自定义拦截器Interceptor
自定义Interceptor概述引入maven依赖自定义CustomInterceptor,实现Interceptor接口使用maven打包,上传代码到flume的lib目录下创建 flume 配置文件启动Flume进程 Flume聚合
实现成果创建flume1-logger-flume.conf创建flume2-netcat-flume.conf创建flume3-logger-flume.conf执行配置文件 Flume负载均衡和故障转移
实现成果创建flume-necat-flume.conf创建flume-flume-console1.conf创建 flume-flume-console2.conf执行配置文件 Flume-实现复制和多路复用
实现成果创建flume-file-flume.conf创建flume-flume-hdfs.conf创建flume-flume-dir.conf执行配置文件 Flume调优
内存优化
flume安装部署 官网地址Flume官网地址:http://flume.apache.org/
文档查看地址:http://flume.apache.org/FlumeUserGuide.html
下载地址:http://archive.apache.org/dist/flume/
安装部署第一步:解压flume到安装目录下
tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
第二步:修改flume启动模板,配置JAVA环境
mv flume-env.sh.template flume-env.sh vi flume-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_144
报错解决
将flume的lib文件夹下的guava-11.0.2.jar删除已解决兼容HADOOP3.1.3
rm /opt/module/flume/lib/guava-11.0.2.jar
删除guava的服务器节点,一定要配置hadoop环境变量,否则会报错ClassNotFoundException
将flume的conf文件夹下的flume-env.sh.template文件修改为flume-env.sh,并添加如下数据
export JAVA_HOME=/opt/module/jdk1.8.0_212flume监控端数据 安装netcat工具
yum install -y nc判断44444端口是否被占用
netstat -tunlp | grep 44444创建Flume Agent配置文件flume-netcat-logger.conf
# a1表示agent的名称 a1.sources = r1 # 表示a1的source的名称 a1.sinks = k1 # 表示a1的sink名称 a1.channels = c1 # 表示a1的channel的名称 # 配置source a1.sources.r1.type = netcat # 表示a1的输入源类型为netcat端口类型 a1.sources.r1.bind = localhost # 表示a1的监听的主机 a1.sources.r1.port = 44444 # 表示a1监听的端口号 # 配置sink a1.sinks.k1.type = logger # 表示a1的输出目的地是控制台logger类型 # 配置channel a1.channels.c1.type = memory # 表示a1的channel类型是memory内存型 a1.channels.c1.capacity = 1000 # 表示a1的channel总容量是1000个event a1.channels.c1.transactionCapacity = 100 # 表示a1的channel传输时收集到了100条event以后再去提交事务 # 绑定 source sink channel a1.sources.r1.channels = c1 # 表示将r1和c1连接 source可以连接多个channel a1.sinks.k1.channel = c1 # 表示将k1和c1连接 sink只能连接一个channel开启flume监听端口
bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.logger=INFO,console
–conf/-c:表示配置文件存储在 conf/目录–name/-n:表示给 agent 起名为 a1–conf-file/-f:flume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。-Dflume.root.logger=INFO,console :-D 表示 flume 运行时动态修改 flume.root.logger参数属性值,并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、error。 使用netcat工具向本机的44444端口发送内容
nc localhost 44444观察flume监听页面接受数据的情况 实时监控单个追加文件 引入相关jar包
flume想要将数据输出到HDFS,需要持有Hadoop相关jar包,拷贝到flume的lib目录下
commons-configuration-1.6.jar、 hadoop-auth-2.7.2.jar、 hadoop-common-2.7.2.jar、 hadoop-hdfs-2.7.2.jar、 commons-io-2.4.jar、 htrace-core-3.1.0-incubating.jar创建Flume Agent配置文件flume-file-hdfs.conf
要想读取Linux系统中的文件,就得按照Linux命令的规则执行命令。由于Hive日志在Linux系统中所读取文件的类型选择:exec即execute 执行的意思。表示执行Linux命令来读取文件
# 定义agent a2.sources = r2 a2.sinks = k2 a2.channels = c2 # 配置source a2.sources.r2.type = exec # 定义source类型为exec可执行命令 a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log a2.sources.r2.shell = /bin/bash -c # 执行shell脚本的绝对路径 # 配置sink a2.sinks.k2.type = hdfs # sink类型为hdfs a2.sinks.k2.hdfs.path = hdfs://hadoop102:9000/flume/%Y%m%d/%H # 文件上传到hdfs路径 # 上传文件的前缀 a2.sinks.k2.hdfs.filePrefix = logs- # 是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round = true # 多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue = 1 # 重新定义时间单位 a2.sinks.k2.hdfs.roundUnit = hour # 是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp = true # 积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize = 1000 # 设置文件类型,可支持压缩 a2.sinks.k2.hdfs.fileType = DataStream # 多久生成一个新的文件 a2.sinks.k2.hdfs.rollInterval = 30 # 设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize = 134217700 # 文件的滚动与 Event 数量无关 a2.sinks.k2.hdfs.rollCount = 0 # 配置channel a2.channels.c2.type = memory a2.channels.c2.capacity = 1000 a2.channels.c2.transactionCapacity = 100 # 绑定 source sink channel a2.sources.r2.channels = c2 a2.sinks.k2.channel = c2运行flume
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf实时监控目录下多个新文件 创建Flume Agent配置文件flume-dir-hdfs.conf
# 定义agent a3.sources = r3 a3.sinks = k3 a3.channels = c3 # 配置source a3.sources.r3.type = spooldir # 定义source类型为目录 a3.sources.r3.spoolDir = /opt/module/flume/upload # 定义监控目录 a3.sources.r3.fileSuffix = .COMPLETED # 定义文件上传完,后缀 a3.sources.r3.fileHeader = true # 是否有文件头 # 忽略所有以.tmp 结尾的文件,不上传 a3.sources.r3.ignorePattern = ([^ ]*.tmp) # 配置sink a3.sinks.k3.type = hdfs # sink类型为hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H # 文件上传到hdfs路径 # 上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- # 是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true # 多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 # 重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour # 是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true # 积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize = 100 # 设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream # 多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 # 设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 # 文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount = 0 # 配置channel a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # 绑定 source sink channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf向 upload 文件夹中添加文件测试
mkdir upload touch spooling.txt touch spooling.tmp touch spooling.logspooldir说明
在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件,上传完成的文件会以.COMPLETED 结尾被监控文件夹每500毫秒扫描一次文件变动
实时监控目录下的多个追加文件Exec source适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooldir Source能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而 Taildir Source既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控
创建Flume Agent配置文件flume-taildir-hdfs.conf# 定义agent a3.sources = r3 a3.sinks = k3 a3.channels = c3 # 配置source a3.sources.r3.type = TAILDIR # 定义source类型 a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json # 指定position_file位置 a3.sources.r3.filegroups = f1 a3.sources.r3.filegroups.f1 = /opt/module/flume/files/file.* # 定义监控目录文件 # 配置sink a3.sinks.k3.type = hdfs a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix = upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k3.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount = 0 # 配置channel a3.channels.c3.type = memory a3.channels.c3.capacity = 1000 a3.channels.c3.transactionCapacity = 100 # 绑定 source sink channel a3.sources.r3.channels = c3 a3.sinks.k3.channel = c3启动监控文件夹命令
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf向files文件夹中追加内容测试
mkdir files echo hello >> file1.txttaildir说明
Taildir Source维护了一个 json格式的 positionFile,其会定期的往positioFile中更新每个文件读取到的最新的位置,因此能够实现断点续传。PositionFile的格式如下
{"inode":2496272,"pos":12,"file":"/opt/module/flume/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/files/file2.txt"}
Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件
Flume自定义Source 自定义Source概述Source是负责接收数据到Flume Agent的组件。Source组件可以处理各种类型、各种格式的日志数据官方提供的source类型已经很多,但是有时候并不能满足实际开发当中的需求,此时需要根据实际需求自定义某些 source
https://flumeapache.org/FlumeDeveloperGuide.html#source根据官方说明自定义MySource需要继承AbstractSource类并实现 Configurable和PollableSource接口。
configure(Context context):初始化 context,读取配置文件内容process():接受数据,将获取数据封装成event并写入 channel,这个方法将被循环调用。getBackOffSleepIncrement()getMaxBackOffSleepInterval()
引入maven依赖自定义Source,继承AbstractSource类并实现Configurable和PollableSource接口org.apache.flume flume-ng-core 1.7.0
package czs.study.flume.source;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import java.util.HashMap;
public class MySource extends AbstractSource implements Configurable, PollableSource {
//定义配置文件将来要读取的字段
private Long delay;
private String field;
// 初始化配置信息
@Override
public void configure(Context context) {
delay = context.getLong("delay");
field = context.getString("fleld", "hello");
}
@Override
public Status process() throws EventDeliveryException {
try {
//创建事件头信息
HashMap hearderMap = new HashMap<>();
//创建事件
SimpleEvent event = new SimpleEvent();
//循环封装事件
for (int i = 0; i < 5; i++) {
//给事件设置头信息
event.setHeaders(hearderMap);
//给事件设置内容
event.setBody((field + i).getBytes());
//将事件写入 channel
getChannelProcessor().processEvent(event);
Thread.sleep(delay);
}
} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}
return Status.READY;
}
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
}
使用maven打包,上传jar包到flume的lib目录下
创建flume配置文件
# 配置agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = czs.study.flume.source.MySource a1.sources.r1.delay = 1000 a1.sources.r1.field = czs # 配置sink a1.sinks.k1.type = logger # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定配置 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1启动flume进程
bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,consoleFlume自定义Sink 自定义Sink概述
Sink不断地轮询Channel中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统或者被发送到另一个FlumeAgent,Sink是完全事务性的。在从Channel批量删除数据之前,每个Sink用Channel启动一个事务。批量事件一旦成功写出到存储系统或下一个Flume Agent,Sink就利用Channel提交事务。事务一旦被提交,该Channel从自己的内部缓冲区删除事件,官方提供的Sink类型已经很多,但是有时候并不能满足实际开发当中的需求,因此需要根据实际需求自定义某些Sink
官网地址:https://flume.apache.org/FlumeDeveloperGuide.html#sink自定义MySink需要继承AbstractSink类并实现 Configurable接口
configure(Context context)//初始化 context,读取配置文件内容process()//从 Channel 读取获取数据event,这个方法将被循环调用 引入maven依赖
自定义Sink,继承AbstractSink类并实现Configurable接口org.apache.flume flume-ng-core 1.7.0
package czs.study.flume.sink;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySink extends AbstractSink implements Configurable {
//创建 Logger 对象
private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
private String prefix;
private String suffix;
@Override
public Status process() throws EventDeliveryException {
// 声明返回值状态信息
Status status;
// 获取当前 Sink 绑定的 Channel
Channel ch = getChannel();
// 获取事务
Transaction txn = ch.getTransaction();
// 声明事件
Event event;
// 开启事务
txn.begin();
// 读取 Channel 中的事件,直到读取到事件结束循环
while (true) {
event = ch.take();
if (event != null) {
break;
}
}
try {
// 处理事件(打印)
LOG.info(prefix + new String(event.getBody()) + suffix);
// 事务提交
txn.commit();
status = Status.READY;
} catch (Exception e) {
// 遇到异常,事务回滚
txn.rollback();
status = Status.BACKOFF;
} finally {
// 关闭事务
txn.close();
}
return status;
}
@Override
public void configure(Context context) {
// 读取配置文件内容,有默认值
prefix = context.getString("prefix", "hello:");
// 读取配置文件内容,无默认值
suffix = context.getString("suffix");
}
}
使用maven打包,上传jar包到flume的lib目录下
创建flume配置文件
# 配置agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # 配置sink a1.sinks.k1.type = czs.study.flume.sink.MySink #a1.sinks.k1.prefix = hello: a1.sinks.k1.suffix = :world # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置绑定 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1启动flume进程
bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,consoleFlume自定义拦截器Interceptor 自定义Interceptor概述
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume拓扑结构中的Multiplexing结构,Multiplexing的原理是根据event中Header的某个key的值将不同的event发送到不同的Channel中,所以需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值
引入maven依赖自定义CustomInterceptor,实现Interceptor接口org.apache.flume flume-ng-core 1.7.0
package czs.study.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
public class CustomInterceptor implements Interceptor {
@Override
public void initialize() {
}
@Override
public Event intercept(Event event) {
byte[] body = event.getBody();
if (body[0] < 'z' && body[0] > 'a') {
event.getHeaders().put("type", "letter");
} else if (body[0] > '0' && body[0] < '9') {
event.getHeaders().put("type", "number");
}
return event;
}
@Override
public List intercept(List events) {
for (Event event : events) {
intercept(event);
}
return events;
}
@Override
public void close() {
}
public static class Builder implements Interceptor.Builder {
@Override
public Interceptor build() {
return new CustomInterceptor();
}
@Override
public void configure(Context context) {
}
}
}
使用maven打包,上传代码到flume的lib目录下
创建 flume 配置文件
为hadoop102上的Flume1配置1个netcat source,1个sinkgroup(2 个 avro sink),并配置相应的ChannelSelector和interceptor
# 配置agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = czs.study.flume.interceptor.CustomInterceptor$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = type a1.sources.r1.selector.mapping.letter = c1 a1.sources.r1.selector.mapping.number = c2 # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop103 a1.sinks.k1.port = 4141 a1.sinks.k2.type=avro a1.sinks.k2.hostname = hadoop104 a1.sinks.k2.port = 4242 # 配置channel1 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置channel2 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 绑定配置 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2
为hadoop103上的Flume2配置一个avro source和一个logger sink
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop103 a1.sources.r1.port = 4141 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1
为hadoop104上的Flume3配置一个avro source和一个logger sink
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = avro a1.sources.r1.bind = hadoop104 a1.sources.r1.port = 4242 a1.sinks.k1.type = logger a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sinks.k1.channel = c1 a1.sources.r1.channels = c1启动Flume进程
先启动hadoop103和104,再启动hadoop102,注意先后顺序
使用nc连接上hadoop102,测试发送字母和数字
Flume聚合 实现成果hadoop102上的Flume-1监控文件/opt/module/group.log,hadoop103上的Flume-2监控某一个端口的数据流,Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
创建flume1-logger-flume.conf配置Source用于监控hive.log文件,配置Sink输出数据到下一级Flume
# 配置agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/group.log a1.sources.r1.shell = /bin/bash -c # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定配置 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1创建flume2-netcat-flume.conf
配置Source监控端口44444数据流,配置Sink输出数据到下一级Flume
# 配置agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 配置source a2.sources.r1.type = netcat a2.sources.r1.bind = hadoop103 a2.sources.r1.port = 44444 # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop104 a1.sinks.k1.port = 4141 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定配置 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1创建flume3-logger-flume.conf
配置source用于接收flume1与flume2发送过来的数据流,最终合并后sink到控制台
# 配置agent a3.sources = r1 a3.sinks = k1 a3.channels = c1 # 配置source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop104 a3.sources.r1.port = 4141 # 配置sink a3.sinks.k1.type = logger # 配置channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 # 绑定配置 a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1执行配置文件
分别开启对应配置文件:flume3-flume-logger.conf,flume2-netcat-flume.conf,flume1-logger-flume.conf
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.confFlume负载均衡和故障转移 实现成果
使用Flume1监控一个端口,其sink组中的sink分别对接Flume2和Flume3,采用FailoverSinkProcessor,实现故障转移的功能
创建flume-necat-flume.conf配置1个netcat source 和1个channel、1个sinkgroup(2 个 sink)
分别输送给flume-flume-console1和flume-flume-console2
# 配置agent a1.sources = r1 a1.channels = c1 a1.sinkgroups = g1 a1.sinks = k1 k2 # 配置source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5 a1.sinkgroups.g1.processor.priority.k2 = 10 a1.sinkgroups.g1.processor.maxpenalty = 10000 # 配置sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定配置 a1.sources.r1.channels = c1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1创建flume-flume-console1.conf
配置上级Flume输出的Source,输出是到本地控制台
# 配置agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # 配置source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # 配置sink a2.sinks.k1.type = logger # 配置channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 绑定配置 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1创建 flume-flume-console2.conf
# 配置agent a2.sources = r1 a2.sinks = k1 a2.channels = c2 # 配置source a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4142 # 配置sink a2.sinks.k1.type = logger # 配置channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 绑定配置 a2.sources.r1.channels = c2 a2.sinks.k1.channel = c2执行配置文件
分别开启对应配置文件:flume-flume-console2,flume-flume-console1,flume-netcat-flume
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.logger=INFO,console bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.confFlume-实现复制和多路复用 实现成果
使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储 到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem
创建flume-file-flume.conf配置1个接收日志文件的source和两个channel、两个sink
分别发送到flume-flume-hdfs 和flume-flume-dir
# 配置agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 c2 # 将数据流复制给所有 channel a1.sources.r1.selector.type = replicating # 配置source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell = /bin/bash -c # 配置sink # sink 端的 avro 是一个数据发送者 a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 a1.sinks.k2.type = avro a1.sinks.k2.hostname = hadoop102 a1.sinks.k2.port = 4142 # 配置channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100 # 绑定配置 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c2创建flume-flume-hdfs.conf
# 配置agent a2.sources = r1 a2.sinks = k1 a2.channels = c1 # 配置source # source 端的 avro 是一个数据接收服务 a2.sources.r1.type = avro a2.sources.r1.bind = hadoop102 a2.sources.r1.port = 4141 # 配置sink a2.sinks.k1.type = hdfs a2.sinks.k1.hdfs.path = hdfs://hadoop102:9000/flume2/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix = flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a2.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval = 600 #设置每个文件的滚动大小大概是 128M a2.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k1.hdfs.rollCount = 0 # 配置channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100 # 绑定配置 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c创建flume-flume-dir.conf
配置上级Flume输出的Source,输出是到本地目录的Sink,输出的本地目录必须是已经存在的目录,如果该目录不存在,并不会创建新的目录
# 配置agent a3.sources = r1 a3.sinks = k1 a3.channels = c2 # 配置source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4142 # 配置sink a3.sinks.k1.type = file_roll a3.sinks.k1.sink.directory = /opt/module/data/flume3 # 配置channel a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # 绑定配置 a3.sources.r1.channels = c2 a3.sinks.k1.channel = c2执行配置文件
分别启动对应的 flume 进程:flume-flume-dir,flume-flume-hdfs,flume-file-flume
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.confFlume调优 内存优化
问题报错
ERROR hdfs.HDFSEventSink: process failed java.lang.OutOfMemoryError: GC overhead limit exceeded
配置文件flume-env.sh增加如下配置
分配到集群中
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
优化原理
JVM heap一般设置为4G或更高
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc
-Xms表示JVM Heap(堆内存)最小尺寸,初始分配;-Xmx 表示JVM Heap(堆内存)最大允许的尺寸,按需分配。如果不设置一致,容易在初始化时,由于内存不够,频繁触发fullgc



