栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flume之项目实战

Flume之项目实战

目录

一、多路复用及拦截器的使用

二、复制

三、聚合


一、多路复用及拦截器的使用

        需求:使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。(区分数字和字母,将其发往不同的系统)

       一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。

1.自定义拦截器


	
		org.apache.flume
		flume-ng-core
		1.9.0
	


package com.hpu.flume;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.List;


public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        byte b = event.getBody()[0];
        if (b >= '0' && b <= '9'){
            event.getHeaders().put("type","number");
        }
        if ((b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z')){
            event.getHeaders().put("type","letter");
        }
        return event;
    }

    @Override
    public List intercept(List events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {

    }
    public static class MyBuilder implements Builder{
        //创建拦截器
        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

 2.根据图配置文件

        创建新文件conf/group2/flume1,创建新文件conf/group2/flume2,创建新文件conf/group2/flume3。

        为Flume1配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。

    为Flume1配置1个netcat source,1个sink group(2个avro sink),并配置相应的ChannelSelector和interceptor。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444


a1.sources.r1.selector.type = multiplexing
# 使用headers中的哪些参数
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.number = c1
a1.sources.r1.selector.mapping.letter = c2

# a1.sources.r1.selector.default = c4

# 拦截器配置
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.hpu.flume.MyInterceptor$MyBuilder

# Describe the sink

a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
    为Flume2配置一个avro source和一个logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4141

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1
    为Flume3配置一个avro source和一个logger sink。
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142

a1.sinks.k1.type = logger


a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sinks.k1.channel = c1
a1.sources.r1.channels = c1

3.启动加运行

bin/flume-ng agent --conf conf/ --name al --conf-file conf/group2/flume3.conf
bin/flume-ng agent --conf conf/ --name al --conf-file conf/group2/flume2.conf
bin/flume-ng agent --conf conf/ --name al --conf-file conf/group2/flume1.conf

4.在hadoop102使用netcat向localhost:44444发送字母和数字,观察flume2和flume3打印的日志。

二、复制

        需求:使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local FileSystem。

实现图:

1.创建文件夹

在/opt/module/flume/conf目录下创建group1文件夹
[example@hadoop102 conf]$ mkdir group1/
在/opt/module/flume/目录下创建flume3datas文件夹
[example@hadoop102 flume]$ mkdir flume3datas

 2.编辑配置文件

创建flume1.conf
配置1个接收日志文件的source和两个channel、两个sink,分别输送给flume2和flume3。
vim flume1.conf
添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files1/.*file.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*log.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json


# 将数据流复制给所有channel 默认参数可以不写
a1.sources.r1.selector.type = replicating


# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102 
a1.sinks.k1.port = 4141

a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142

# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
创建flume2.conf
配置上级Flume输出的Source,输出是到HDFS的Sink。
vim flume2.conf
添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4141


# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume1/%Y%m%d/%H
# 文件的前缀
a1.sinks.k1.hdfs.filePrefix = log-

#多久生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件的滚动大小大概是128M
a1.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a1.sinks.k1.hdfs.rollCount = 0
# 使用本地的时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true

#设置文件类型,可支持压缩
a1.sinks.k1.hdfs.fileType = DataStream



# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
创建flume3.conf
配置上级Flume输出的Source,输出是到本地目录的Sink。
vim flume3.conf
添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop102
a1.sources.r1.port = 4142


# Describe the sink
a1.sinks.k1.type = file_roll
a1.sinks.k1.sink.directory = /opt/module/flume/flume3datas



# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动与运行

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group1/flume3.conf

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group1/flume2.conf

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group1/flume1.conf
在检查文件输入
[example@hadoop102 files1]$ echo hello >> file1.txt
分别去hdfs和本地查看结果

三、聚合

需求:

        hadoop102上的Flume-1监控文件/opt/module/flume/files1/.*file.*,

        hadoop103上的Flume-2监控某一个端口的数据流,

        Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。

1.准备工作:

分发Flume
xsync flume
在hadoop102、hadoop103以及hadoop104的/opt/module/flume/conf目录下创建一个group3文件夹。
mkdir group3
mkdir group3
mkdir group3

2.编辑配置文件

创建flume1.conf
配置Source用于监控本地文件,配置Sink输出数据到下一级Flume。
在hadoop102上编辑配置文件
vim flume1.conf 
添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /opt/module/flume/files1/.*file.*
a1.sources.r1.filegroups.f2 = /opt/module/flume/files2/.*log.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json


# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
创建flume2.conf
配置Source监控端口44444数据流,配置Sink数据到下一级Flume:
在hadoop103上编辑配置文件
vim flume2.conf
添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop104
a1.sinks.k1.port = 4141

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
创建flume3.conf
配置source用于接收flume1与flume2发送过来的数据流,最终合并后sink到控制台。
在hadoop104上编辑配置文件
vim flume3.conf
添加如下内容
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source

a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop104
a1.sources.r1.port = 4141


# Describe the sink
a1.sinks.k1.type = logger


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.启动与运行

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume1.conf

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume2.conf

bin/flume-ng agent --conf conf/ --name a1 --conf-file conf/group3/flume3.conf

在hadoop102上向/opt/module/flume目录下的group.log追加内容
[example@hadoop102 files1]$ echo 'hello' > file1
在hadoop103上向44444端口发送数据
[example@hadoop103 flume]$ nc hadoop103 44444
检查hadoop104上数据

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/680757.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号