针对python中websockets出现 sent 1000 (OK); then received 1000 (OK)错误的解决方案;
Traceback (most recent call last):
File "d:testwebsocketTclients.py", line 42, in startup
recv_text = await websocket.recv()
File "C:Program FilesPython39libsite-packageswebsocketslegacyprotocol.py", line 552, in recv
await self.ensure_open()
File "C:Program FilesPython39libsite-packageswebsocketslegacyprotocol.py", line 920, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedOK: sent 1000 (OK); then received 1000 (OK)
错误产生的原因
在python挖掘websocket数据时,比较完善的包文件是websockets文件,只需要
pip install websockets 然后在正文中引用即可完成 import websockets
完整使用代码一般如下:
import asyncio
import websockets
import logging
from datetime import datetime
import json
import traceback
import pymysql # For find ipadress for ping
#当前存在的主要问题是各个线程、协程存在冲突,导致最后死锁。
def functionToDB(mes):
dbconnect = pymysql.connect(host="localhost", user="root", password="root", database="****",charset="utf8")
updateSQL= ("INSERT INTO keyvalue (`keys`,`starts`,`types`,`lens`) "
"VALUES(%(keys)s,%(starts)s,%(types)s,%(lens)s)")
start=mes.split(':')[0]
start=start[2:].replace('"','')
dic_json = json.loads(mes)
cursor = dbconnect.cursor()
if isinstance(dic_json,dict): #判断是否是字典类型isinstance 返回True false
for key in dic_json:
add_mes = {
'keys': key,
'starts': start,
'types': str(type(dic_json[key])),
'lens': len(str(dic_json[key])),
}
try:
cursor.execute(updateSQL,add_mes)
dbconnect.commit()
except Exception as e:
print("更新失败!")
traceback.print_exc()
print(e.with_traceback)
sendstr = '{"SESSION_ID":"-1101623267","SESSION_NAME":"*****","HOST_IP":"*.*.*.*","USER_ID":"********","USER_NAME":"****","FILTER_ID":[*********],"msg_type":1}'
#存在问题在第1000个数据包时会报错:received 1000 (OK); then sent 1000 (OK)
async def startup(uri):
async with websockets.connect(uri) as websocket:
await websocket.send(sendstr)
logging.info("< Hello world!")
while True:
try:
recv_text = await websocket.recv()
print(">:",recv_text)
#ping pong 流程 根据服务器不同,流程不同
if recv_text[0:13] =='{"msg":"ping"':
print("<:",recv_text)
await websocket.send(recv_text)
functionToDB(recv_text)
except Exception as e:
traceback.print_exc()
if __name__ == '__main__':
#websocket的地址
remote = 'ws://*.*.*.*:80/webSocket'
loop=asyncio.get_event_loop()
loop.run_until_complete(startup(remote))
except KeyboardInterrupt as exc:
logging.info('Quit.')
以上程序中存在的问题就是websocket.recv()中数据流量及速度以服务器为主,如果服务器发送信息流量速度快于insertDB()处理数据的速度,就会出现以上错误。因错误内容显示过于模糊,很多人无法一次性定位问题原因,导致无法使用。
解决办法运用多线程及消息队列技术,将websocket.recv()接收的数据存入消息队列,将处理数据的比较慢的function从消息队列中获得数据,避免了快等慢的问题,可以非常完美的解决该问题。
ps: 广大程序员朋友们,得深入学习一下多线程技术了。。。
估计能问道这个问题得已经有相当深厚得基础了,多线程和消息队列问题方案就不献丑了,等问的人多了就po出解决方案。
可以参考以下文章



