filestream.py代码
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
sc=SparkContext(appName='test straming')
sc.setLogLevel("ERROR")
ssc=StreamingContext(sc,2)
line=ssc.textFileStream("file:///root/recruit/data")
rdd=line.map(lambda x:x)
rdd.pprint()
ssc.start()
ssc.awaitTermination()
socket_wordcount.py代码
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__=="__main__":
sc=SparkContext(appName="PythonStreamingNetworkWordCount")
sc.setLogLevel("ERROR")
ssc=StreamingContext(sc,1)
lines=ssc.socketTextStream("spark-worker1",9001)
counts=lines.flatMap(lambda line:line.split(" "))
.map(lambda word:(word,1))
.reduceByKey(lambda a,b:a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
docker-compose.yml 文件代码
version: "2"
services:
master:
image: zylctgu/spark2.4
command: /start-master
hostname: spark-master
container_name: spark-master
volumes:
- /d/documents/docker-files/spark/share_files:/root/spark
ports:
- "4040:4040"
- "8080:8080"
worker1:
image: zylctgu/spark2.4
command: /start-worker
hostname: worker1
container_name: spark-worker1
volumes:
- /d/documents/docker-files/spark/share_files:/root/spark
ports:
- "4041:4040"
- "8081:8081"
links:
- master
environment:
SPARK_WORKER_CORES: 1
SPARK_WORKER_MEMORY: 2g
worker2:
image: zylctgu/spark2.4
command: /start-worker
hostname: worker2
container_name: spark-worker2
volumes:
- /d/documents/docker-files/spark/share_files:/root/spark
ports:
- "4042:4040"
- "8082:8081"
links:
- master
environment:
SPARK_WORKER_CORES: 1
SPARK_WORKER_MEMORY: 2g
具体执行步骤
1、打开master 窗口
docker exec -it spark-master bash #是docke-compose对应的 cd root/spark nc -lk -p 9005 #是docker-compose里面的端口号
2、新开一个worker1窗口
docker exec -it spark-worker1 bash #新开的worker1窗口 cd root/spark spark-submit socket_wordcount.py #要运行的文件名socket_wordcount.py
3、在maser窗口输入词频,以“ ”空格分开
在新窗口就可以看见统计了
结果如图所示:



