栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

python的多路复用实现聊天群

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

python的多路复用实现聊天群

在我的《python高级编程和异步io编程》中我讲解了socket编程,这里贴一段用socket实现聊天室的功能的源码,因为最近工作比较忙,后期我会将这里的代码细节分析出来,目前先把代码贴上。大家自行研究运行一下。
server端:

"""
server select
"""

import sys
import time
import socket
import select
import logging
from queue import Queue
import queue

g_select_timeout = 10

class Server(object):
    def __init__(self, host='0.0.0.0', port=3333, timeout=2, client_nums=10):
 self.__host = host
 self.__port = port
 self.__timeout = timeout
 self.__client_nums = client_nums
 self.__buffer_size = 1024

 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 self.server.setblocking(False)
 self.server.settimeout(self.__timeout)
 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)  # keepalive
 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 端口复用
 server_host = (self.__host, self.__port)
 try:
     self.server.bind(server_host)
     self.server.listen(self.__client_nums)
 except:
     raise

 self.inputs = [self.server]  # select 接收文件描述符列表
 self.outputs = []  # 输出文件描述符列表
 self.message_queues = {}  # 消息队列
 self.client_info = {}

    def run(self):
 while True:
     readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, g_select_timeout)
     if not (readable or writable or exceptional):
  continue

     for s in readable:
  if s is self.server:  # 是客户端连接
      connection, client_address = s.accept()
      # print "connection", connection
      print("%s connect." % str(client_address))
      connection.setblocking(0)  # 非阻塞
      self.inputs.append(connection)  # 客户端添加到inputs
      self.client_info[connection] = str(client_address)
      self.message_queues[connection] = Queue()  # 每个客户端一个消息队列

  else:  # 是client, 数据发送过来
      try:
   data = s.recv(self.__buffer_size)
      except:
   err_msg = "Client Error!"
   logging.error(err_msg)
      if data:
   # print data
   data = "%s %s say: %s" % (time.strftime("%Y-%m-%d %H:%M:%S"), self.client_info[s], data)
   self.message_queues[s].put(data)  # 队列添加消息

   if s not in self.outputs:  # 要回复消息
self.outputs.append(s)
      else:  # 客户端断开
   # Interpret empty result as closed connection
   print
   "Client:%s Close." % str(self.client_info[s])
   if s in self.outputs:
self.outputs.remove(s)
   self.inputs.remove(s)
   s.close()
   del self.message_queues[s]
   del self.client_info[s]

     for s in writable:  # outputs 有消息就要发出去了
  try:
      next_msg = self.message_queues[s].get_nowait()  # 非阻塞获取
  except queue.Empty:
      err_msg = "Output Queue is Empty!"
      # g_logFd.writeFormatMsg(g_logFd.LEVEL_INFO, err_msg)
      self.outputs.remove(s)
  except Exception as e:  # 发送的时候客户端关闭了则会出现writable和readable同时有数据,会出现message_queues的keyerror
      err_msg = "Send Data Error! ErrMsg:%s" % str(e)
      logging.error(err_msg)
      if s in self.outputs:
   self.outputs.remove(s)
  else:
      for cli in self.client_info:  # 发送给其他客户端
   if cli is not s:
try:
    cli.sendall(next_msg.encode("utf8"))
except Exception as e:  # 发送失败就关掉
    err_msg = "Send Data to %s  Error! ErrMsg:%s" % (str(self.client_info[cli]), str(e))
    logging.error(err_msg)
    print
    "Client: %s Close Error." % str(self.client_info[cli])
    if cli in self.inputs:
 self.inputs.remove(cli)
 cli.close()
    if cli in self.outputs:
 self.outputs.remove(s)
    if cli in self.message_queues:
 del self.message_queues[s]
    del self.client_info[cli]

     for s in exceptional:
  logging.error("Client:%s Close Error." % str(self.client_info[cli]))
  if s in self.inputs:
      self.inputs.remove(s)
      s.close()
  if s in self.outputs:
      self.outputs.remove(s)
  if s in self.message_queues:
      del self.message_queues[s]
  del self.client_info[s]

if "__main__" == __name__:
    Server().run()

client端

#!/usr/local/bin/python
# *-* coding:utf-8 -*-

"""
client.py
"""

import sys
import time
import socket
import threading

class Client(object):
    def __init__(self, host, port=3333, timeout=1, reconnect=2):
 self.__host = host
 self.__port = port
 self.__timeout = timeout
 self.__buffer_size = 1024
 self.__flag = 1
 self.client = None
 self.__lock = threading.Lock()

    @property
    def flag(self):
 return self.__flag

    @flag.setter
    def flag(self, new_num):
 self.__flag = new_num

    def __connect(self):
 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 # client.bind(('0.0.0.0', 12345,))
 client.setblocking(True)
 client.settimeout(self.__timeout)
 client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 端口复用
 server_host = (self.__host, self.__port)
 try:
     client.connect(server_host)
 except:
     raise
 return client

    def send_msg(self):
 if not self.client:
     return
 while True:
     time.sleep(0.1)
     # data = raw_input()
     data = sys.stdin.readline().strip()
     if "exit" == data.lower():
  with self.__lock:
      self.flag = 0
  break
     self.client.sendall(data.encode("utf8"))
 return

    def recv_msg(self):
 if not self.client:
     return
 while True:
     data = None
     with self.__lock:
  if not self.flag:
      print('ByeBye~~')
      break
     try:
  data = self.client.recv(self.__buffer_size)
     except socket.timeout:
  continue
     except:
  raise
     if data:
  print("%sn" % data)
     time.sleep(0.1)
 return

    def run(self):
 self.client = self.__connect()
 send_proc = threading.Thread(target=self.send_msg)
 recv_proc = threading.Thread(target=self.recv_msg)
 recv_proc.start()
 send_proc.start()
 recv_proc.join()
 send_proc.join()
 self.client.close()

if "__main__" == __name__:
    Client('localhost').run()

运行方式:

  1. 启动server
    python server.py

  2. 启动client1
    python client.py

  3. 启动client2
    python client.py

在client1的console中输入任何字符串,client2中立马就可以收到

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/220768.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号