(1)登录Linux系统;
(2)创建/home/hadoop/spark/streaming/flume目录,使用以下命令:
mkdir -p /home/hadoop/spark/streaming/flume
(3)进入/home/hadoop/spark/streaming/flume目录,使用以下命令:
cd /home/hadoop/spark/streaming/flume
(4)创建word.txt文件,使用以下命令:
touch word.txt
(5)创建exec-memory-avro.properties文件,使用以下命令:
touch exec-memory-avro.properties
(6)编辑exec-memory-avro.properties文件,填入以下内容:
# 给 agent 起个名字叫做 a1 # 给 a1 添加一个 source 叫做 r1 # 给 a1 添加一个 channel 叫做 c1 # 给 a1 添加一个 sink 叫做 k1 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # 配置 r1 类型为内置的 exec 类型,采集 linux command 产生的数据 a1.sources.r1.type = exec # 配置 r1 采集这个命令产生的数据 a1.sources.r1.command = tail -F /home/hadoop/spark/streaming/flume/word.txt # 配置 c1 类型为内置的 memory 类型 a1.channels.c1.type = memory # 配置 k1 类型为内置的 avro 类型 a1.sinks.k1.type = avro # 配置 k1 发送数据到 localhost 服务器 a1.sinks.k1.hostname = localhost # 配置 k1 发送数据到 44444 端口 a1.sinks.k1.port =44444 # 把 r1、c1 和 k1 组装到一起 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
(7)下载整合flume和spark streaming的jar文件,地址如下:
https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11/2.4.6/spark-streaming-flume_2.11-2.4.6.jar
(8)复制spark-streaming-flume_2.11-2.4.6.jar和spark-streaming-flume-assembly_2.11-2.4.6.jar文件到D:spark-2.4.6-bin-hadoop2.7jars目录;
(9)创建SparkStreamingFlume.py文件,并填写以下代码:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
sc = SparkContext("local[2]", "Spark Streaming Flume Word Count")
ssc = StreamingContext(sc, 2)
hostname= '192.168.1.11'(虚拟机ip)
port = int(44444)
lines = FlumeUtils.createStream(ssc, hostname, port)
words = lines.flatMap(lambda x: x.split(' '))
wordones = words.map(lambda x: (x, 1))
wordCounts = wordOnes.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
(10)启动SparkStreamingFlume.py程序:
spark-submit SparkStreamingFlume.py
(11)启动Flume,使用以下命令:
flume-ng agent -n a1 -c conf -f /home/hadoop/spark/streaming/flume/exec-memory-avro.properties
(12)向/home/hadoop/spark/streaming/flume/word.txt文件追加内容,使用以下命令:
echo 'hello spark' >> /home/hadoop/spark/streaming/flume/word.txt
(13)查看SparkStreamingFlume.py程序的输出,如下所示:
------------------------------------------- Time: 1488029432000 ms ------------------------------------------- (hello, 1) (word, 1)



