我解决了将应用程序作为参数传递给类并按照错误描述的建议使用它的上下文,但是名称空间也是必需的:
class Listener(threading.Thread): def __init__(self, r, channels, app): threading.Thread.__init__(self) self.daemon = True self.redis = r self.pubsub = self.redis.pubsub() self.pubsub.psubscribe(channels) self.app = app def work(self, item): with app.app_context(): if isinstance(item['data'], bytes): try: msg = item['data'].depre('utf-8') depre_msg = json.loads(msg) if depre_msg['type'] == 'UPDATE_TASK': send(json.dumps({"type":"UPDATE_TASK"}), room='home', namespace='/') #_send_task_message() except ValueError as e: log.error("Error decoding msg to microservice: %s", str(e)) def run(self): for item in self.pubsub.listen(): self.work(item)if __name__ == '__main__': r = redis.Redis() client = Listener(r, ['/bobguarana/socketio'], app) client.start() socketio.run(debug=True, app=app, port=8080)


