栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

附加源flume日志数据采集

附加源flume日志数据采集

Flume是非常流行的日志采集系统,可以作为Spark Streaming的附加数据源。具体使用步骤如下所示:

(1)登录Linux系统;

(2)创建/home/hadoop/spark/streaming/flume目录,使用以下命令:

mkdir -p /home/hadoop/spark/streaming/flume

(3)进入/home/hadoop/spark/streaming/flume目录,使用以下命令:

cd /home/hadoop/spark/streaming/flume

(4)创建word.txt文件,使用以下命令:

touch word.txt

(5)创建exec-memory-avro.properties文件,使用以下命令:

touch exec-memory-avro.properties

(6)编辑exec-memory-avro.properties文件,填入以下内容:

# 给 agent 起个名字叫做 a1

# 给 a1 添加一个 source 叫做 r1
# 给 a1 添加一个 channel 叫做 c1
# 给 a1 添加一个 sink 叫做 k1
a1.sources = r1
a1.channels = c1
a1.sinks = k1

# 配置 r1 类型为内置的 exec 类型,采集 linux command 产生的数据
a1.sources.r1.type = exec

# 配置 r1 采集这个命令产生的数据
a1.sources.r1.command = tail -F /home/hadoop/spark/streaming/flume/word.txt

# 配置 c1 类型为内置的 memory 类型
a1.channels.c1.type = memory

# 配置 k1 类型为内置的 avro 类型
a1.sinks.k1.type = avro
# 配置 k1 发送数据到 localhost 服务器
a1.sinks.k1.hostname = localhost
# 配置 k1 发送数据到 44444 端口
a1.sinks.k1.port =44444

# 把 r1、c1 和 k1 组装到一起
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(7)下载整合flume和spark streaming的jar文件,地址如下:

https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11/2.4.6/spark-streaming-flume_2.11-2.4.6.jar 

(8)复制spark-streaming-flume_2.11-2.4.6.jar和spark-streaming-flume-assembly_2.11-2.4.6.jar文件到D:spark-2.4.6-bin-hadoop2.7jars目录;

(9)创建SparkStreamingFlume.py文件,并填写以下代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

sc = SparkContext("local[2]", "Spark Streaming Flume Word Count")
ssc = StreamingContext(sc, 2)

hostname= '192.168.1.11'(虚拟机ip)
port = int(44444)

lines = FlumeUtils.createStream(ssc, hostname, port)

words = lines.flatMap(lambda x: x.split(' '))
wordones = words.map(lambda x: (x, 1))
wordCounts = wordOnes.reduceByKey(lambda x, y: x + y)
wordCounts.pprint()

ssc.start()
ssc.awaitTermination()

(10)启动SparkStreamingFlume.py程序:

spark-submit SparkStreamingFlume.py

(11)启动Flume,使用以下命令:

flume-ng agent -n a1 -c conf -f /home/hadoop/spark/streaming/flume/exec-memory-avro.properties

(12)向/home/hadoop/spark/streaming/flume/word.txt文件追加内容,使用以下命令:

echo 'hello spark' >> /home/hadoop/spark/streaming/flume/word.txt

(13)查看SparkStreamingFlume.py程序的输出,如下所示:

-------------------------------------------
Time: 1488029432000 ms
-------------------------------------------
(hello, 1)
(word, 1)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774874.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号