栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume(二)

Flume(二)

Flume flume常见架构 1、单数据源单接口到HDFS集群

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sRICBtiN-1638758418070)(F:大数据图片7.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RiqicdfM-1638758418071)(F:大数据图片8.png)]

# taildir读取同目录中的多个日志文件 将数据发送到hdfs

# 声明组件名称
taildir2hdfs.sources = r1
taildir2hdfs.sinks = k1
taildir2hdfs.channels = c1

# Describe/configure the source
taildir2hdfs.sources.r1.type = TAILDIR
taildir2hdfs.sources.r1.filegroups = g1 g2 g3
taildir2hdfs.sources.r1.filegroups.g1 = /root/log.txt
taildir2hdfs.sources.r1.filegroups.g2 = /root/log/.*.txt
taildir2hdfs.sources.r1.filegroups.g3 = /opt/2.txt


# Describe the sink
taildir2hdfs.sinks.k1.type = hdfs
taildir2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M
# 将hdfs输出文件类型改为数据流
taildir2hdfs.sinks.k1.hdfs.fileType = DataStream
# 输出格式化改为文本格式
taildir2hdfs.sinks.k1.hdfs.writeFormat = Text
#设置使用本地时间戳
taildir2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
# 设置文件的滚动条件
# 设置滚动周期  单位s
taildir2hdfs.sinks.k1.hdfs.rollInterval = 300
# 设置文件达到指定大小
taildir2hdfs.sinks.k1.hdfs.rollSize = 130000000
# 设置文件中写入多少条记录
taildir2hdfs.sinks.k1.hdfs.rollCount = 0


# Use a channel which buffers events in memory
taildir2hdfs.channels.c1.type = memory
# 设置 memory channel可以保存的最大event数
# 设置依据, 需要估算channel可能缓存的数据量
#  与agent可用Jvm内存
#  可以在flume-ng脚本中配置agent的JVM参数增大内存
# 20000~200000
taildir2hdfs.channels.c1.capacity = 30000
taildir2hdfs.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
taildir2hdfs.sources.r1.channels = c1
taildir2hdfs.sinks.k1.channel = c1
2、多数据源汇总

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XLZc4FGk-1638758418072)(F:大数据图片9.png)]

zhaohui01最为服务端,向hdfs集群,配置文件

#avro读取同目录中的多个日志文件 将数据发送到hdfs

# 声明组件名称
avro2hdfs.sources = r1
avro2hdfs.sinks = k1
avro2hdfs.channels = c1

# Describe/configure the source
avro2hdfs.sources.r1.type = avro
avro2hdfs.sources.r1.bind = zhaohui01
avro2hdfs.sources.r1.port = 12345


# Describe the sink
avro2hdfs.sinks.k1.type = hdfs
avro2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M
# 将hdfs输出文件类型改为数据流
avro2hdfs.sinks.k1.hdfs.fileType = DataStream
# 输出格式化改为文本格式
avro2hdfs.sinks.k1.hdfs.writeFormat = Text
avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
# 设置文件的滚动条件
# 设置滚动周期  单位s
avro2hdfs.sinks.k1.hdfs.rollInterval = 300
# 设置文件达到指定大小
avro2hdfs.sinks.k1.hdfs.rollSize = 130000000
# 设置文件中写入多少条记录
avro2hdfs.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
avro2hdfs.channels.c1.type = memory
# 设置 memory channel可以保存的最大event数
# 设置依据, 需要估算channel可能缓存的数据量
#  与agent可用Jvm内存
#  可以在flume-ng脚本中配置agent的JVM参数增大内存
# 20000~200000
avro2hdfs.channels.c1.capacity = 30000
avro2hdfs.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
avro2hdfs.sources.r1.channels = c1
avro2hdfs.sinks.k1.channel = c1

zhaohui02 zhaohui03作为客户端连接到zhaohui01

# taildir读取同目录中的多个日志文件 将数据发送到avro

# 声明组件名称
taildir2avro.sources = r1
taildir2avro.sinks = k1
taildir2avro.channels = c1

# Describe/configure the source
taildir2avro.sources.r1.type = TAILDIR
taildir2avro.sources.r1.filegroups = g1 g2 g3
taildir2avro.sources.r1.filegroups.g1 = /root/log.txt
taildir2avro.sources.r1.filegroups.g2 = /root/log/.*.txt
taildir2avro.sources.r1.filegroups.g3 = /opt/2.txt


# Describe the sink
taildir2avro.sinks.k1.type = avro
taildir2avro.sinks.k1.hostname = zhaohui01
taildir2avro.sinks.k1.port = 12345


# Use a channel which buffers events in memory
taildir2avro.channels.c1.type = memory
# 设置 memory channel可以保存的最大event数
# 设置依据, 需要估算channel可能缓存的数据量
#  与agent可用Jvm内存
#  可以在flume-ng脚本中配置agent的JVM参数增大内存
# 20000~200000
taildir2avro.channels.c1.capacity = 30000
taildir2avro.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
taildir2avro.sources.r1.channels = c1
taildir2avro.sinks.k1.channel = c1

启动命令

服务端首先启动

 flume-ng agent -n avro2hdfs -c /opt/flume-1.9/conf/ -f /opt/flume-1.9/agents/avro2hdfs.conf -Dflume.root.logger=INFO,console

客户端启动命令

 flume-ng agent -n taildir2avro -c /opt/flume-1.9/conf/ -f /opt/flume-1.9/agents/taildir2avro.conf -Dflume.root.logger=INFO,console
3、单数据源多出口案例(复制/选择) 复制

需求分析: zhaohui01 监控文件信息,将文件变化一份发送到 zhaohui02 上传到HDFS,将一份文件发送到 zhaohui03 保存到本地

1、在zhaohui02中配置 /opt/flume-1.9/agents/avro2hdfs.conf 文件

#avro读取同目录中的多个日志文件 将数据发送到hdfs

# 声明组件名称
avro2hdfs.sources = r1
avro2hdfs.sinks = k1
avro2hdfs.channels = c1

# Describe/configure the source
avro2hdfs.sources.r1.type = avro
avro2hdfs.sources.r1.bind = zhaohui01
avro2hdfs.sources.r1.port = 121212

# Describe the sink
avro2hdfs.sinks.k1.type = hdfs
avro2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M
# 将hdfs输出文件类型改为数据流
avro2hdfs.sinks.k1.hdfs.fileType = DataStream
# 输出格式化改为文本格式
avro2hdfs.sinks.k1.hdfs.writeFormat = Text
avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
# 设置文件的滚动条件
# 设置滚动周期  单位s
avro2hdfs.sinks.k1.hdfs.rollInterval = 300
# 设置文件达到指定大小
avro2hdfs.sinks.k1.hdfs.rollSize = 130000000
# 设置文件中写入多少条记录
avro2hdfs.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
avro2hdfs.channels.c1.type = memory
# 设置 memory channel可以保存的最大event数
# 设置依据, 需要估算channel可能缓存的数据量
#  与agent可用Jvm内存
#  可以在flume-ng脚本中配置agent的JVM参数增大内存
# 20000~200000
avro2hdfs.channels.c1.capacity = 30000
avro2hdfs.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
avro2hdfs.sources.r1.channels = c1
avro2hdfs.sinks.k1.channel = c1

2、在zhaohui03中配置 /opt/flume-1.9/agents/avro2hdfs.conf 文件,保存到本地

#avro读取同目录中的多个日志文件 将数据发送到localhost

# 声明组件名称
avro2localhost.sources = r1
avro2localhost.sinks = k1
avro2localhost.channels = c2

# Describe/configure the source
avro2localhost.sources.r1.type = avro
avro2localhost.sources.r1.bind = zhaohui01
avro2localhost.sources.r1.port = 121213


# Describe the sink
avro2localhost.sinks.k1.type = file_roll
avro2localhost.sinks.k1.sink.directory = /opt/flume-1.9/agents/log


# Use a channel which buffers events in memory
avro2localhost.channels.c2.type = memory
# 设置 memory channel可以保存的最大event数
# 设置依据, 需要估算channel可能缓存的数据量
#  与agent可用Jvm内存
#  可以在flume-ng脚本中配置agent的JVM参数增大内存
# 20000~200000
avro2localhost.channels.c2.capacity = 30000
avro2localhost.channels.c2.transactionCapacity = 3000

# Bind the source and sink to the channel
avro2localhost.sources.r1.channels = c2
avro2localhost.sinks.k1.channel = c2

3、zhaohui01作为客户端向服务端发送

在zhaohui01中配置

# taildir读取同目录中的多个日志文件 将数据发送到avro

# 声明组件名称
taildir2avro.sources = r1
taildir2avro.channels = c1 c2
taildir2avro.sinks = k1 k2

# Describe/configure the source
taildir2avro.sources.r1.type = TAILDIR
taildir2avro.sources.r1.filegroups = f1
taildir2avro.sources.r1.filegroups.f1 = /root
public class TypeInterceptor implements Interceptor {

    //声明一个存放事件的集合
    private List addHeaderEvents;


    public void initialize() {

        //初始化
        addHeaderEvents = new ArrayList();
    }

    //单个事件拦截
    public Event intercept(Event event) {

        //1、获取事件中的头信息
        Map headers = event.getHeaders();

        //2、获取事件中的body信息
        String body = new String(event.getBody());

        //3、根据body中是否有hello来决定添加怎样的头信息
        if (body.contains("hello")){

            //4、添加头信息
            headers.put("type","zch");
        }else {
            //5、添加头信息
            headers.put("type","others");
        }
        return event;
    }

    //批量事件拦截
    public List intercept(List events) {

        //1、清空集合
        addHeaderEvents.clear();
        //2、遍历events
        for (Event event : events) {
            //3、给每一个事件添加头信息
            addHeaderEvents.add(intercept(event));
        }
        //返回结果
        return addHeaderEvents;
    }

    public void close() {


    }

    public static  class Builder implements Interceptor.Builder{

        public Interceptor build() {
            return new TypeInterceptor();
        }

        public void configure(Context context) {
            
        }
    }
}

2、编写 zhaohui01 的 /opt/flume-1.9/agents/taildir2dvro_interceptor.conf
# taildir读取同目录中的多个日志文件 将数据发送到avro

# 声明组件名称
taildir2avro.sources = r1
taildir2avro.channels = c1 c2
taildir2avro.sinks = k1 k2

# Describe/configure the source
taildir2avro.sources.r1.type = TAILDIR
taildir2avro.sources.r1.filegroups = f1
taildir2avro.sources.r1.filegroups.f1 = /root/*.txt

# Interceptor
# com.zch.interceptor.TypeInterceptor 为java类名的全路径 
# $Builder 为拦截器内部类的名字
taildir2avro.sources.r1.interceptors = i1
taildir2avro.sources.r1.interceptors.i1.type = com.zch.interceptor.TypeInterceptor$Builder

# Channel Selector
# taildir2avro.sources.r1.selector.header 为java拦截器的header的名字
taildir2avro.sources.r1.selector.type = multiplexing
taildir2avro.sources.r1.selector.header = type
taildir2avro.sources.r1.selector.mapping.zch = c1
taildir2avro.sources.r1.selector.mapping.others = c2

# Describe the sink
taildir2avro.sinks.k1.type = avro
taildir2avro.sinks.k1.hostname = zhaohui01
taildir2avro.sinks.k1.port = 121212

taildir2avro.sinks.k2.type = avro
taildir2avro.sinks.k2.hostname = zhaohui01
taildir2avro.sinks.k2.port = 121213

# Use a channel which buffers events in memory
taildir2avro.channels.c1.type = memory
taildir2avro.channels.c1.capacity = 30000
taildir2avro.channels.c1.transactionCapacity = 3000

taildir2avro.channels.c2.type = memory
taildir2avro.channels.c2.capacity = 30000
taildir2avro.channels.c2.transactionCapacity = 3000

# Bind the source and sink to the channel
taildir2avro.sources.r1.channels = c1 c2
taildir2avro.sinks.k1.channel = c1
taildir2avro.sinks.k2.channel = c2

3、编写zhaohui02 zhaohui03 的配置文件

/opt/flume-1.9/agents/avro2hdfs_interceptor.conf

zhaohui02

#avro读取同目录中的多个日志文件 将数据发送到hdfs

# 声明组件名称
avro2hdfs.sources = r1
avro2hdfs.sinks = k1
avro2hdfs.channels = c1

# Describe/configure the source
avro2hdfs.sources.r1.type = avro
avro2hdfs.sources.r1.bind = zhaohui01
avro2hdfs.sources.r1.port = 121212

# Describe the sink
avro2hdfs.sinks.k1.type = hdfs
avro2hdfs.sinks.k1.hdfs.path = hdfs://zhaohui01:8020/flume/%y-%m-%d/%H:%M
# 将hdfs输出文件类型改为数据流
avro2hdfs.sinks.k1.hdfs.fileType = DataStream
# 输出格式化改为文本格式
avro2hdfs.sinks.k1.hdfs.writeFormat = Text
avro2hdfs.sinks.k1.hdfs.useLocalTimeStamp = true
# 设置文件的滚动条件
# 设置滚动周期  单位s
avro2hdfs.sinks.k1.hdfs.rollInterval = 300
# 设置文件达到指定大小
avro2hdfs.sinks.k1.hdfs.rollSize = 130000000
# 设置文件中写入多少条记录
avro2hdfs.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
avro2hdfs.channels.c1.type = memory
# 设置 memory channel可以保存的最大event数
# 设置依据, 需要估算channel可能缓存的数据量
#  与agent可用Jvm内存
#  可以在flume-ng脚本中配置agent的JVM参数增大内存
# 20000~200000
avro2hdfs.channels.c1.capacity = 30000
avro2hdfs.channels.c1.transactionCapacity = 3000

# Bind the source and sink to the channel
avro2hdfs.sources.r1.channels = c1
avro2hdfs.sinks.k1.channel = c1

/opt/flume-1.9/agents/avro2localhost_interceptor.conf

zhaohui03

#avro读取同目录中的多个日志文件 将数据发送到localhost

# 声明组件名称
avro2localhost.sources = r1
avro2localhost.sinks = k1
avro2localhost.channels = c2

# Describe/configure the source
avro2localhost.sources.r1.type = avro
avro2localhost.sources.r1.bind = zhaohui01
avro2localhost.sources.r1.port = 121213

# Describe the sink
avro2localhost.sinks.k1.type = file_roll
avro2localhost.sinks.k1.sink.directory = /opt/flume-1.9/agents/log


# Use a channel which buffers events in memory
avro2localhost.channels.c2.type = memory
# 设置 memory channel可以保存的最大event数
# 设置依据, 需要估算channel可能缓存的数据量
#  与agent可用Jvm内存
#  可以在flume-ng脚本中配置agent的JVM参数增大内存
# 20000~200000
avro2localhost.channels.c2.capacity = 30000
avro2localhost.channels.c2.transactionCapacity = 3000

# Bind the source and sink to the channel
avro2localhost.sources.r1.channels = c2
avro2localhost.sinks.k1.channel = c2
3、启动服务

先启动服务端 zhaohui02 zhaohui03

在启动客户端 zhaohui01

4、后台运行程序命令

后台启动命令,静默不输出内容

nohup flume-ng agent --name a1 --conf /opt/flume-1.9/conf/ --conf-file /opt/flume-1.9/agents/agent_test_1.conf -Dflume.root.logger=INFO,console &
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652233.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号