栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

(file-flume-kafka-flume-hdfs过程中)flume 配置文件的编写

(file-flume-kafka-flume-hdfs过程中)flume 配置文件的编写

flume 配置文件的编写 flume 配置文件 file-flume-kafka.conf 使用 TAILDIR source
  1 # Name the components on this agent
  2 a1.sources = r1
  3 a1.channels = c1
  4 
  5 # Describe/configure the source
  6 a1.sources.r1.type = TAILDIR
  # taildir source 监控几个组,先给这个组起一个名字,测试只用监控一个组就行了,先叫它f1 
  7 a1.sources.r1.filegroups = f1
  8 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
  # positionFile 和断点续传有关,记录的是断点续传的那个断点
  9 a1.sources.r1.positionFile = /opt/module/flume/pasitionFileFlume1.json
 10 
 11 # 配置拦截器
 12 a1.sources.r1.interceptors = i1
 13 a1.sources.r1.interceptors.i1.type = com.atguigu.flume.ETLInterceptor$MyBuilder
 14 
 # Kafka channel是基于磁盘的(因为Kafka的数据是会落盘的,所以他是基于磁盘的),他的速度比memory channel慢
 #比file channel快(因为他有很多的优化,比如顺序读写,页缓存什么的)。它的优势在于他节省了一个channel的时间,
 #因为Kafka channel和Kafka sink的时间是一样的,都是往hdfs上面发。
 15 # Use a channel which buffers events in memory
 # 默认有序列化,就不用配置了
 16 a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
 17 a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092
 # 发到哪个topic里面,这个topic待会需要自己创建
 18 a1.channels.c1.kafka.topic = topic_log
 # 是否上传上去event 要改成false,否则上传的是event,会乱码
 19 a1.channels.c1.parseAsFlumeEvent = false
 20 
 21 # Bind the source and sink to the channel
 22 a1.sources.r1.channels = c1

kafka-flume-hdfs.conf使用Kafka source
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 使用Kafka source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# 连接Kafka
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
# 组id改的话,变成了一个新的组,会触发offset重置,这个可以调,在官网下面有
a1.sources.r1.kafka.consumer.group.id = flume1
a1.sources.r1.kafka.topics = topic_log

# Describe the sink
a1.sinks.k1.type = hdfs
# 保存到hdfs的路劲
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
# 文件前缀
a1.sinks.k1.hdfs.filePrefix = log-
# 文件是否滚动 默认是false 没什么用 可以不管
a1.sinks.k1.hdfs.round = false

# 控制小文件问题的三个参数
# 实际开发中应该写成一小时,自己测试的时候为了方便,设置成10秒。
a1.sinks.k1.hdfs.rollInterval = 10
# 128M
a1.sinks.k1.hdfs.rollSize = 134217728
# event个数
a1.sinks.k1.hdfs.rollCount = 0

## 控制输出文件是原生文件。
# 设置能够进行压缩的格式
a1.sinks.k1.hdfs.fileType = CompressedStream
# 使用lzop压缩,前提是Hadoop要配置好lzop压缩
a1.sinks.k1.hdfs.codeC = lzop

# 使用file channel
# Use a channel which buffers events in file
a1.channels.c1.type = file
# 使用本地磁盘作为缓冲的路径
a1.channels.c1.dataDirs = /opt/module/flume/file-channel/date
# 检查点,自带检查,同时会进行断点续传。
a1.channels.c1.checkpointDir = /opt/module/flume/file-channel/checkpoint

# 配置source拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.flume.TimeStampInterceptor$MyBuilder

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336276.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号