栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

spark部分03-socket流~自定义socket服务器

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spark部分03-socket流~自定义socket服务器

搭建集群:

docker load  --input spark.tar

docker-compose up -d

docker ps

docker exec -it spark-master bash #在主机端口

jps        #查看

 在主机窗口:

 重新开一个窗口worker1:

 结果:

socker-server.py代码

# from concurrent.futures import thread
# from distutils.log import info
# from http import client
# from re import T
import socket,os,time
from threading import Thread

ADDRESS=('spark-master',9003)
g_socket_server=None
g_conn_pool={}

def init():
    global g_socket_server
    g_socket_server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    g_socket_server.bind(ADDRESS)
    g_socket_server.listen(5)
    print("server start,wait for client connecting...")

def accept_client():
    while True:
        client,info=g_socket_server.accept()
        thread=Thread(target=message_handle,args=(client,info)
        )
        thread.setDaemon(True)
        thread.start()

def message_handle(client,info):
    client.sendall("connect server successfully!".encode(encoding='utf8'))
    while True:
        try:
            msg="this is a text n"
            client.sendall(msg.encode(encoding='utf8'))
        except Exception as e:
            print(e)
            break

def remove_client(client_type):
    client=g_conn_pool(client_type)
    if  None!=client:
        client.close()
        g_conn_pool.pop(client_type)
        print("client offline:+client_type")

if __name__=='__main__':
    init()
    thread=Thread(target=accept_client)
    thread.setDaemon(True)
    thread.start()
    while True:
        time.sleep(0.1)

socket_wordcount.py代码:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__=="__main__":
    sc=SparkContext(appName="PythonStreamingNetworkWordCount")
    sc.setLogLevel("ERROR")
    ssc=StreamingContext(sc,1)
    lines=ssc.socketTextStream("spark-master",9003)
    # counts=lines.map(lambda line:line.split("^"))
    #             .map(lambda x:(x[1],x[2],x[4].split('|')))
    counts=lines.flatMap(lambda line:line.split(" ")) 
                .map(lambda word:(word,1)) 
                .reduceByKey(lambda a,b:a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()
进阶统计:

(运行步骤同上!!!)

socker-server2.py代码

from concurrent.futures import thread
from errno import EHOSTDOWN
from pydoc import cli
import socket,os,time
from threading import Thread
ADDRESS=('spark-master',9005)
g_socket_server=None
g_conn_pool={}
def init():
    global g_socket_server
    g_socket_server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    g_socket_server.bind(ADDRESS)
    g_socket_server.listen(5)
    print("server start, wait for client connecting...")

def accept_client():
    while True:
        client,info = g_socket_server.accept()
        thread=Thread(target=message_handle,args=(client,info))
        thread.setDaemon(True)
        thread.start()

def message_handle(client,info):
    client.sendall("connect server successfully!".encode(encoding='utf8'))
    f=open("/root/spark/result0412.csv",'r')
    for line in f:
        try:
           if len(line.strip())>0:
               client.sendall(line.encode(encoding='utf8'))
               time.sleep(0.2) 

    # while True:
    #     try:
    #         msg = "this is a text n"
    #         client.sendall(msg.encode(encoding='utf8'))
    #         time.sleep(0.5)

        except Exception as e:
            print(e)
            break

def remove_client(client_type):
    client = g_conn_pool[client_type]
    if None !=client:
        client.close()
        g_conn_pool.pop(client_type)
        print("client offine:"+client_type)

if __name__=='__main__':
    init()
    thread=Thread(target=accept_client)
    thread.setDaemon(True)
    thread.start()
    while True:
        time.sleep(0.2)

 socket_wordcount2.py代码:

import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
 
if __name__=="__main__":
    sc=SparkContext(appName="PythonStreamingNetworkWordCount")
    sc.setLogLevel("ERROR")
    ssc=StreamingContext(sc,3)
    lines=ssc.socketTextStream("spark-master",9005)
    counts=lines.map(lambda line:line.split("^"))
                .map(lambda x:(x[1],x[2],x[4].split('|')))
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

运行结果: 

 

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/854969.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号