栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021.12.27Flume-Kafka导数据

2021.12.27Flume-Kafka导数据

event_attendess.conf  
[root@linux01 kb15conf]# vi ./event_attendess.conf 

eventattendess.sources=eventAttendessSource
eventattendess.channels=eventAttendessChannel
eventattendess.sinks=eventAttendessSink

eventattendess.sources.eventAttendessSource.type=spooldir
eventattendess.sources.eventAttendessSource.spoolDir=/opt/kb15tmp/flumelogfile/eventattend
eventattendess.sources.eventAttendessSource.deserializer=LINE
eventattendess.sources.eventAttendessSource.deserializer.maxLineLength=320000
eventattendess.sources.eventAttendessSource.includePattern=eventsattend_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
eventattendess.sources.eventAttendessSource.interceptors=head_filter
eventattendess.sources.eventAttendessSource.interceptors.head_filter.type=regex_filter
eventattendess.sources.eventAttendessSource.interceptors.head_filter.regex=^event*
eventattendess.sources.eventAttendessSource.interceptors.head_filter.excludeEvents=true

eventattendess.channels.eventAttendessChannel.type=file
eventattendess.channels.eventAttendessChannel.checkpointDir=/opt/kb15tmp/checkpoint/eventattend
eventattendess.channels.eventAttendessChannel.dataDirs=/opt/kb15tmp/checkpoint/data/eventattend

eventattendess.sinks.eventAttendessSink.type=org.apache.flume.sink.kafka.KafkaSink
eventattendess.sinks.eventAttendessSink.batchSize=640
eventattendess.sinks.eventAttendessSink.brokerList=192.168.111.131:9092
eventattendess.sinks.eventAttendessSink.topic=event_attendess_row

eventattendess.sources.eventAttendessSource.channels=eventAttendessChannel
eventattendess.sinks.eventAttendessSink.channel=eventAttendessChannel

kafka-topics.sh --create --zookeeper 192.168.111.131:2181 --topic event_attendess_row  --partitions 1 --replication-factor 1

[root@linux01 flume160]#./bin/flume-ng agent --name eventattendess --conf ./conf/ --conf-file ./conf/kb15conf/event_attendess.conf -Dflume.root.logger=INFO,console

kafka-console-consumer.sh --bootstrap-server 192.168.111.131:9092 --topic event_attendess_row --from-beginning

[root@linux01 kb15tmp]# cp ./event_attendees.csv /opt/kb15tmp/flumelogfile/eventattend/eventsattend_2021-12-27.csv
 user_friends.conf
[root@linux01 kb15conf]# vi ./user_friends.conf 

userfriends.sources=userfriendsSource
userfriends.channels=userfriendsChannel
userfriends.sinks=userfriendsSink

userfriends.sources.userfriendsSource.type=spooldir
userfriends.sources.userfriendsSource.spoolDir=/opt/kb15tmp/flumelogfile/userfriend
userfriends.sources.userfriendsSource.deserializer=LINE
userfriends.sources.userfriendsSource.deserializer.maxLineLength=320000
userfriends.sources.userfriendsSource.includePattern=userfriends_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
userfriends.sources.userfriendsSource.interceptors=head_filter
userfriends.sources.userfriendsSource.interceptors.head_filter.type=regex_filter
userfriends.sources.userfriendsSource.interceptors.head_filter.regex=^user*
userfriends.sources.userfriendsSource.interceptors.head_filter.excludeEvents=true

userfriends.channels.userfriendsChannel.type=file
userfriends.channels.userfriendsChannel.checkpointDir=/opt/kb15tmp/checkpoint/userfriend
userfriends.channels.userfriendsChannel.dataDirs=/opt/kb15tmp/checkpoint/data/userfriend

userfriends.sinks.userfriendsSink.type=org.apache.flume.sink.kafka.KafkaSink
userfriends.sinks.userfriendsSink.batchSize=640
userfriends.sinks.userfriendsSink.brokerList=192.168.111.131:9092
userfriends.sinks.userfriendsSink.topic=user_friends_rows

userfriends.sources.userfriendsSource.channels=userfriendsChannel
userfriends.sinks.userfriendsSink.channel=userfriendsChannel


kafka-topics.sh --create --zookeeper 192.168.111.131:2181 --topic user_friends_rows  --partitions 1 --replication-factor 1

[root@linux01 flume160]#./bin/flume-ng agent --name userfriends --conf ./conf/ --conf-file ./conf/kb15conf/user_friends.conf -Dflume.root.logger=INFO,console

kafka-console-consumer.sh --bootstrap-server 192.168.111.131:9092 --topic user_friends_rows --from-beginning

cp ./user_friends.csv /opt/kb15tmp/flumelogfile/userfriend/userfriends_2021-12-27.csv
events.conf  
events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSink

events.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/kb15tmp/flumelogfile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=320000
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.sources.eventsSource.interceptors=head_filter
events.sources.eventsSource.interceptors.head_filter.type=regex_filter
events.sources.eventsSource.interceptors.head_filter.regex=^event_id*
events.sources.eventsSource.interceptors.head_filter.excludeEvents=true

events.channels.eventsChannel.type=file
events.channels.eventsChannel.checkpointDir=/opt/kb15tmp/checkpoint/events
events.channels.eventsChannel.dataDirs=/opt/kb15tmp/checkpoint/data/events

events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink.batchSize=640
events.sinks.eventsSink.brokerList=192.168.111.131:9092
events.sinks.eventsSink.topic=events

events.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel

kafka-topics.sh --create --zookeeper 192.168.111.131:2181 --topic events  --partitions 1 --replication-factor 1

[root@linux01 flume160]#./bin/flume-ng agent --name events  --conf ./conf/ --conf-file ./conf/kb15conf/events.conf-Dflume.root.logger=INFO,console

kafka-console-consumer.sh --bootstrap-server 192.168.111.131:9092 --topic events --from-beginning

[root@linux01 kb15tmp]#cp ./events.csv /opt/kb15tmp/flumelogfile/events/events_2021-12-27.csv
 train.conf 
[root@linux01 kb15conf]# vi ./train.conf 

train.sources=trainSource
train.channels=trainChannel
train.sinks=trainSink

train.sources.trainSource.type=spooldir
train.sources.trainSource.spoolDir=/opt/kb15tmp/flumelogfile/train
train.sources.trainSource.deserializer=LINE
train.sources.trainSource.deserializer.maxLineLength=320000
train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
train.sources.trainSource.interceptors=head_filter
train.sources.trainSource.interceptors.head_filter.type=regex_filter
train.sources.trainSource.interceptors.head_filter.regex=^user*
train.sources.trainSource.interceptors.head_filter.excludeEvents=true

train.channels.trainChannel.type=file
train.channels.trainChannel.checkpointDir=/opt/kb15tmp/checkpoint/train
train.channels.trainChannel.dataDirs=/opt/kb15tmp/checkpoint/data/train

train.sinks.trainSink.type=org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainSink.batchSize=640
train.sinks.trainSink.brokerList=192.168.111.131:9092
train.sinks.trainSink.topic=train

train.sources.trainSource.channels=trainChannel
train.sinks.trainSink.channel=trainChannel

kafka-topics.sh --create --zookeeper 192.168.111.131:2181 --topic train  --partitions 1 --replication-factor 1

[root@linux01 flume160]#./bin/flume-ng agent --name train --conf ./conf/ --conf-file ./conf/kb15conf/train.conf -Dflume.root.logger=INFO,console

kafka-console-consumer.sh --bootstrap-server 192.168.111.131:9092 --topic train --from-beginning

[root@linux01 kb15tmp]#cp ./train.csv /opt/kb15tmp/flumelogfile/train/train_2021-12-27.csv
 users.conf
[root@linux01 kb15conf]# vi ./users.conf 

users.sources=usersSource
users.channels=usersChannel
users.sinks=usersSink

users.sources.usersSource.type=spooldir
users.sources.usersSource.spoolDir=/opt/kb15tmp/flumelogfile/users
users.sources.usersSource.deserializer=LINE
users.sources.usersSource.deserializer.maxLineLength=320000
users.sources.usersSource.includePattern=users_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.usersSource.interceptors=head_filter
users.sources.usersSource.interceptors.head_filter.type=regex_filter
users.sources.usersSource.interceptors.head_filter.regex=^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents=true

users.channels.usersChannel.type=file
users.channels.usersChannel.checkpointDir=/opt/kb15tmp/checkpoint/users
users.channels.usersChannel.dataDirs=/opt/kb15tmp/checkpoint/data/users

users.sinks.usersSink.type=org.apache.flume.sink.kafka.KafkaSink
users.sinks.usersSink.batchSize=640
users.sinks.usersSink.brokerList=192.168.111.131:9092
users.sinks.usersSink.topic=users

users.sources.usersSource.channels=usersChannel
users.sinks.usersSink.channel=usersChannel


kafka-topics.sh --create --zookeeper 192.168.111.131:2181 --topic users  --partitions 1 --replication-factor 1

[root@linux01 flume160]#./bin/flume-ng agent --name users --conf ./conf/ --conf-file ./conf/kb15conf/users.conf -Dflume.root.logger=INFO,console

kafka-console-consumer.sh --bootstrap-server 192.168.111.131:9092 --topic users --from-beginning

[root@linux01 kb15tmp]#cp ./users.csv /opt/kb15tmp/flumelogfile/users/users_2021-12-27.csv

 

 

 

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/696389.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号