栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

fastapi nacos

fastapi nacos

import nacos_py
from fastapi import FastAPI
import yaml
from munch import munchify
from starlette.requests import Request
from starlette.responses import Response
import traceback
import sys
import warnings

sys.tracebacklimit = 3  # 限制打印错误行数
# 首先配置application.properties中nacos.core.auth.enabled=true
# 配置nacos 用户-角色-权限

app = FastAPI()


# async def catch_exceptions_middleware(request: Request, call_next):
#     try:
#         return await call_next(request)
#     except Exception as e:
#         # print(e)
#         # print(traceback.format_list())
#         return Response("Internal server error", status_code=500)
#
#
# app.middleware('http')(catch_exceptions_middleware)

@app.exception_handler(Exception)
async def custom_http_exception_handler(request, exc):
    return Response("Internal server exception", status_code=500)


@app.exception_handler(ZeroDivisionError)  # 统一处理某些错误
async def custom_http_exception_handler(request, exc):
    return Response("Internal server error", status_code=500)



class nacos:
    SERVER_ADDRESSES = "127.0.0.1:8845"
    NAMESPACE = "public"
    client = nacos_py.NacosClient(SERVER_ADDRESSES, namespace=NAMESPACE, username="pang", password="pang")
    data_id = "testDataId"
    group = "testGroup"

    def add_watcher(self, methods):
        self.client.add_config_watchers(self.data_id, self.group, methods)

    def get_config(self, data_id=None, group=None):
        if data_id is None:
            data_id = self.data_id
        if group is None:
            group = self.group
        confi = self.client.get_config(data_id, group)
        confi = yaml.safe_load(confi)
        core_config = munchify(confi)
        print(core_config)
        return confi


def call(args):
    global config
    config = a.get_config()


a = nacos()
config = a.get_config()
a.add_watcher({call})


@app.get("/print")
async def root():
    var = 1 / 0
    return {"message": config}


@app.get("/prin")
async def root():
    return {"message": config}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699497.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号