搭建集群:
docker load --input spark.tar docker-compose up -d docker ps docker exec -it spark-master bash #在主机端口 jps #查看
在主机窗口:
重新开一个窗口worker1:
结果:
socker-server.py代码
# from concurrent.futures import thread
# from distutils.log import info
# from http import client
# from re import T
import socket,os,time
from threading import Thread
ADDRESS=('spark-master',9003)
g_socket_server=None
g_conn_pool={}
def init():
global g_socket_server
g_socket_server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
g_socket_server.bind(ADDRESS)
g_socket_server.listen(5)
print("server start,wait for client connecting...")
def accept_client():
while True:
client,info=g_socket_server.accept()
thread=Thread(target=message_handle,args=(client,info)
)
thread.setDaemon(True)
thread.start()
def message_handle(client,info):
client.sendall("connect server successfully!".encode(encoding='utf8'))
while True:
try:
msg="this is a text n"
client.sendall(msg.encode(encoding='utf8'))
except Exception as e:
print(e)
break
def remove_client(client_type):
client=g_conn_pool(client_type)
if None!=client:
client.close()
g_conn_pool.pop(client_type)
print("client offline:+client_type")
if __name__=='__main__':
init()
thread=Thread(target=accept_client)
thread.setDaemon(True)
thread.start()
while True:
time.sleep(0.1)
socket_wordcount.py代码:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__=="__main__":
sc=SparkContext(appName="PythonStreamingNetworkWordCount")
sc.setLogLevel("ERROR")
ssc=StreamingContext(sc,1)
lines=ssc.socketTextStream("spark-master",9003)
# counts=lines.map(lambda line:line.split("^"))
# .map(lambda x:(x[1],x[2],x[4].split('|')))
counts=lines.flatMap(lambda line:line.split(" "))
.map(lambda word:(word,1))
.reduceByKey(lambda a,b:a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
进阶统计:
(运行步骤同上!!!)
socker-server2.py代码
from concurrent.futures import thread
from errno import EHOSTDOWN
from pydoc import cli
import socket,os,time
from threading import Thread
ADDRESS=('spark-master',9005)
g_socket_server=None
g_conn_pool={}
def init():
global g_socket_server
g_socket_server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
g_socket_server.bind(ADDRESS)
g_socket_server.listen(5)
print("server start, wait for client connecting...")
def accept_client():
while True:
client,info = g_socket_server.accept()
thread=Thread(target=message_handle,args=(client,info))
thread.setDaemon(True)
thread.start()
def message_handle(client,info):
client.sendall("connect server successfully!".encode(encoding='utf8'))
f=open("/root/spark/result0412.csv",'r')
for line in f:
try:
if len(line.strip())>0:
client.sendall(line.encode(encoding='utf8'))
time.sleep(0.2)
# while True:
# try:
# msg = "this is a text n"
# client.sendall(msg.encode(encoding='utf8'))
# time.sleep(0.5)
except Exception as e:
print(e)
break
def remove_client(client_type):
client = g_conn_pool[client_type]
if None !=client:
client.close()
g_conn_pool.pop(client_type)
print("client offine:"+client_type)
if __name__=='__main__':
init()
thread=Thread(target=accept_client)
thread.setDaemon(True)
thread.start()
while True:
time.sleep(0.2)
socket_wordcount2.py代码:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__=="__main__":
sc=SparkContext(appName="PythonStreamingNetworkWordCount")
sc.setLogLevel("ERROR")
ssc=StreamingContext(sc,3)
lines=ssc.socketTextStream("spark-master",9005)
counts=lines.map(lambda line:line.split("^"))
.map(lambda x:(x[1],x[2],x[4].split('|')))
counts.pprint()
ssc.start()
ssc.awaitTermination()
运行结果:



