栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Flask

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flask

我解决了将应用程序作为参数传递给类并按照错误描述的建议使用它的上下文,但是名称空间也是必需的:

class Listener(threading.Thread):    def __init__(self, r, channels, app):    threading.Thread.__init__(self)    self.daemon = True    self.redis = r    self.pubsub = self.redis.pubsub()    self.pubsub.psubscribe(channels)    self.app = app    def work(self, item):        with app.app_context(): if isinstance(item['data'], bytes):     try:         msg = item['data'].depre('utf-8')         depre_msg = json.loads(msg)   if depre_msg['type'] == 'UPDATE_TASK':  send(json.dumps({"type":"UPDATE_TASK"}), room='home', namespace='/')         #_send_task_message()     except ValueError as e:         log.error("Error decoding msg to microservice: %s", str(e))    def run(self):        for item in self.pubsub.listen(): self.work(item)if __name__ == '__main__':    r = redis.Redis()    client = Listener(r, ['/bobguarana/socketio'], app)    client.start()    socketio.run(debug=True, app=app, port=8080)


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/373056.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号