import _thread
import json
import ssl
import time
import websocket
def on_message(ws, message):
print(ws)
print(message)
def on_error(ws, error):
print(ws)
print(error)
def on_close(ws):
print(ws)
print("closed")
def on_open(ws):
def run(*args):
# 建立连接后自动订阅
ws.send(json.dumps({}))
while True:
time.sleep(10)
ws.send(json.dumps({"ping": int(time.time() * 1000)}))
_thread.start_new_thread(run, ())
if __name__ == "__main__":
url = ""
websocket.enableTrace(True)
ws = websocket.WebSocketApp(url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
# wss跳过SSL验证
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})