栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flume 实时读取数据输案例

flume 实时读取数据输案例

案例一:监控端口将数据发送到屏幕显示
1、使用的组件类型

①netcat source:  作用就是监听某个tcp端口手动的数据,将每行数据封装为一个event。
        工作原理类似于nc -l 端口
        
配置:
    必须属性:
    type    –    The component type name, needs to be netcat
    bind    –    Host name or IP address to bind to
    port    –    Port # to bind to
    
②logger sink: 作用使用logger(日志输出器)将event输出到文件或控制台,使用info级别记录event!
    必须属性:
    type    –    The component type name, needs to be logger
    可选属性:
maxBytesToLog    16    Maximum number of bytes of the Event body to log③memery channel
    必须属性:
    type    –    The component type name, needs to be memory
    可选属性:
    capacity    100    The maximum number of events stored in the channel
    transactionCapacity    100    The maximum number of events the channel will take from a source or give to a sink per transaction
    

2、编写配置文件

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#组名名.属性名=属性值
#定义source
a1.sources.r1.type=netcat
a1.sources.r1.bind=hadoop102
a1.sources.r1.port=44444

#定义sink
a1.sinks.k1.type=logger
a1.sinks.k1.maxBytesToLog=100

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

案例二:监控文件将数据发送到HDFS上
1. 使用的组件类型

①EXECSource
        介绍: execsource会在agent启动时,运行一个linux命令,运行linux命令的进程要求是一个可以持续产生数据的进程!
                    将标准输出的数据封装为event!
                通常情况下,如果指定的命令退出了,那么source也会退出并且不会再封装任何的数据!
                所以使用这个source一般推荐类似cat ,tail -f 这种命令,而不是date这种只会返回一个数据,并且执行完就退出的命令!
        配置:
            必须配置:
            type    –    The component type name, needs to be exec
            command    –    The command to execute
            
②HDFSSink
        介绍:  hdfssink将event写入到HDFS!目前只支持生成两种类型的文件: text | sequenceFile,这两种文件都可以使用压缩!
                写入到HDFS的文件可以自动滚动(关闭当前正在写的文件,创建一个新文件)。基于时间、events的数量、数据大小进行周期性的滚动!
                支持基于时间和采集数据的机器进行分桶和分区操作!
                HDFS数据所上传的目录或文件名可以包含一个格式化的转义序列,这个路径或文件名会在上传event时,被自动替换,替换为完整的路径名!
                使用此Sink要求本机已经安装了hadoop,或持有hadoop的jar包!
        配置:    
            必须配置:
            type    –    The component type name, needs to be hdfs
            hdfs.path    –    HDFS directory path (eg hdfs://namenode/flume/webdata/)
            
            可选参考word
            

2、配置:

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#组名名.属性名=属性值
#定义source
a1.sources.r1.type=exec
a1.sources.r1.command=tail -f /opt/module/hive/logs/hive.log

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#定义sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop101:9000/flume/%Y%m%d/%H/%M
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-

#以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动
#是否将时间戳向下舍
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = minute

#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100

#以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用!
#30秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#每写多少个event滚动一次
a1.sinks.k1.hdfs.rollCount = 0
#以不压缩的文本形式进行存储
a1.sinks.k1.hdfs.fileType=DataStream

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

案例三:监控目录将数据发送到HDFS上
1、使用的组件类型

1.SpoolingDirSource
    简介:
        SpoolingDirSource指定本地磁盘的一个目录为"Spooling(自动收集)"的目录!这个source可以读取目录中
        新增的文件,将文件的内容封装为event!
        
        SpoolingDirSource在读取一整个文件到channel之后,它会采取策略,要么删除文件(是否可以删除取决于配置),要么对文件
        进程一个完成状态的重命名,这样可以保证source持续监控新的文件!
        
        SpoolingDirSource和execsource不同,SpoolingDirSource是可靠的!即使flume被杀死或重启,依然不丢数据!但是为了保证
        这个特性,付出的代价是,一旦flume发现以下情况,flume就会报错,停止!
                ①一个文件已经被放入目录,在采集文件时,不能被修改
                ②文件的名在放入目录后又被重新使用(出现了重名的文件)
                
        要求: 必须已经封闭的文件才能放入到SpoolingDirSource,在同一个SpoolingDirSource中都不能出现重名的文件!
    使用:
        必需配置:
        type    –    The component type name, needs to be spooldir.
        spoolDir    –    The directory from which to read files from.

2、配置:

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#组名名.属性名=属性值
#定义source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/root/flume

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#定义sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop101:9000/flume/%Y%m%d/%H/%M
#上传文件的前缀
a1.sinks.k1.hdfs.filePrefix = logs-

#以下三个和目录的滚动相关,目录一旦设置了时间转义序列,基于时间戳滚动
#是否将时间戳向下舍
a1.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
a1.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
a1.sinks.k1.hdfs.roundUnit = minute

#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个Event才flush到HDFS一次
a1.sinks.k1.hdfs.batchSize = 100

#以下三个和文件的滚动相关,以下三个参数是或的关系!以下三个参数如果值为0都代表禁用!
#30秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#每写多少个event滚动一次
a1.sinks.k1.hdfs.rollCount = 0
#以不压缩的文本形式保存数据
a1.sinks.k1.hdfs.fileType=DataStream 


#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336141.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号