上一章获取到了排行的视频信息,并且每小时更新一次,那么这里使用flume监控文件并配置到kafka中。
自用启动代码:: cd soft/kafka_2.12-2.7.0/bin/ 启动zookeeper: ./zookeeper-server-start.sh ../config/zookeeper.properties 启动kafka: ./kafka-server-start.sh ../config/server.properties flume链接kafka: /root/soft/flume-1.8.0/bin/flume-ng agent -c conf/ -n a1 -f /root/soft/flume-1.8.0/conf/flume-2-kafka.conf 测试数据: ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic data --from-beginning kafkaf服务关闭: /root/soft/kafka_2.12-2.7.0/bin/kafka-server-stop.sh zookeeper服务关闭: /root/soft/kafka_2.12-2.7.0/bin/zookeeper-server-stop.shflume安装
将下载的flume放到指定位置并解压
进入安装目录:输入命令查看安装版本信息:
[root@VM-16-6-centos soft]# ls apache-flume-1.8.0-bin.tar.gz flume-1.8.0 jdk-8u144-linux-x64.tar.gz kafka_2.12-2.7.0.tgz data jdk1.8.0_144 kafka_2.12-2.7.0 tmp [root@VM-16-6-centos soft]# cd flume-1.8.0/ [root@VM-16-6-centos flume-1.8.0]# ls bin CHANGELOG conf DEVNOTES doap_Flume.rdf docs lib LICENSE NOTICE README.md RELEASE-NOTES tools [root@VM-16-6-centos flume-1.8.0]# ./bin/flume-ng version Flume 1.8.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 99f591994468633fc6f8701c5fc53e0214b6da4f Compiled by denes on Fri Sep 15 14:58:00 CEST 2017 From source with checksum fbb44c8c8fb63a49be0a59e27316833dkafka单机版安装
将下载的Kafka放到指定位置并解压
Kafka自带了zookeeper集群,我这里只有一台机器,直接使用单机版的
cd /usr/local/kafka/config #进入配置目录 vi zookeeper.properties #编辑修改相应的参数 dataDir=/usr/local/kafka/zookeeper #zookeeper #修改数据目录,其他默认 vi server.properties #编辑修改相应的参数 log.dirs=/usr/local/kafka/log/kafka #修改日志存放路径 其他默认
测试一下:
再开一个窗口,进入bin目录启动zookeeper 我这里没有后台启动,因为这样如果出现问题可以看见报错 [root@VM-16-6-centos ~]# cd soft/kafka_2.12-2.7.0/ [root@VM-16-6-centos kafka_2.12-2.7.0]# cd bin/ [root@VM-16-6-centos bin]# ./zookeeper-server-start.sh ../config/zookeeper.properties
再开一个窗口,进入bin目录启动kafka ./kafka-server-start.sh ../config/server.properties
再开一个窗口,进入bin目录查看topic [root@VM-16-6-centos ~]# cd soft/kafka_2.12-2.7.0/bin/ [root@VM-16-6-centos bin]# ./kafka-topics.sh --list --zookeeper localhost:2181 创建topic ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
再开一个窗口,进入bin目录控制台测试 控制台生产者 ./kafka-console-producer.sh --broker-list localhost:9092 --topic test
再开一个窗口,进入bin目录,打开消费者 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning配置Flume和kafka
首先再flume的conf目录下新建一个conf文件,里面定义如下东西: [root@VM-16-6-centos ~]# cd soft/flume-1.8.0/conf/ [root@VM-16-6-centos conf]# vi flume-2-kafka.conf a1.sources = r1 a1.channels = c1 a1.sinks = k1 a1.sources.r1.type = exec # 使用exec模式 a1.sources.r1.command = tail -F -c +0 /root/soft/data/data # 要监控的文件路径 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.channels.c1.byteCapacityBufferPercentage = 20 a1.channels.c1.byteCapacity = 800000 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = data a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 # 这里是zookeeper的服务器地址 a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
启动链接kafka
/root/soft/flume-1.8.0/bin/flume-ng agent -c conf/ -n a1 -f /root/soft/flume-1.8.0/conf/flume-2-kafka.conf 这里出现了一个Info,排查一下发现这个不影响功能 Info: Including Hive libraries found via () for Hive access + exec /root/soft/jdk1.8.0_144/bin/java -Xmx20m -cp '/root/soft/flume-1.8.0/conf:/root/soft/flume-1.8.0/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -n a1 -f /root/soft/flume-1.8.0/conf/flume-2-kafka.conf
打印控制台测试一下数据
进入Kafka的bin目录 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic data --from-beginning
消费到了最新数据



