import sys from contextlib import asynccontextmanager from typing import Dict, Any import yaml from v2.nacos import ClientConfigBuilder, GRPCConfig, NacosNamingService, RegisterInstanceParam, DeregisterInstanceParam, NacosConfigService, ConfigParam try: import asyncore except ImportError: import pyasyncore sys.modules['asyncore'] = pyasyncore import logging.config import uvicorn from fastapi import FastAPI, Depends from fastapi import HTTPException from fastapi.responses import JSONResponse from app.api.api_route import router from app.core.config import settings, get_settings, Settings from app.schemas.response_template import ResponseModel from logging_env import LOGGER_CONFIG_DICT from starlette.middleware.cors import CORSMiddleware logging.config.dictConfig(LOGGER_CONFIG_DICT) logging.getLogger("pika").setLevel(logging.WARNING) logger = logging.getLogger(__name__) naming_client: NacosNamingService | None = None config_client: NacosConfigService | None = None current_config: dict = {} # ==================== 配置区 ==================== NACOS_SERVER = "18.167.251.121:28848" # ← 改成你 Nacos 的公网IP:8848(推荐标准端口) NACOS_USERNAME = "nacos" NACOS_PASSWORD = "Aidlab123123!" NAMESPACE_ID = "zcr" # 你的 namespace_id SERVICE_NAME = "fastapi-demo-service" SERVICE_GROUP = "DEFAULT_GROUP" SERVICE_PORT = 8040 # FastAPI 监听端口 REGISTER_IP = "10.1.1.43" # 配置相关 CONFIG_DATA_ID = "aida.python" CONFIG_GROUP = "LOCAL" nacos_config_data: Dict[str, Any] = {} @asynccontextmanager async def lifespan(app: FastAPI): """FastAPI 生命周期管理:启动时注册,关闭时注销""" global naming_client, config_client global nacos_config_data, settings client_config = ( ClientConfigBuilder() .username(NACOS_USERNAME) .password(NACOS_PASSWORD) .namespace_id(NAMESPACE_ID) .server_address(NACOS_SERVER) .log_level("DEBUG") # 先开 DEBUG 方便排查 .grpc_config(GRPCConfig(grpc_timeout=5000)) .build() ) # Naming Service(服务注册发现) naming_client = await NacosNamingService.create_naming_service(client_config) logger.info("Nacos NamingService 初始化成功") config_client = await NacosConfigService.create_config_service(client_config) logger.info("Nacos ConfigService 初始化成功") # ==================== 1. 获取配置 ==================== try: config_param = ConfigParam(data_id=CONFIG_DATA_ID, group=CONFIG_GROUP, ) config_content = await config_client.get_config(config_param) logger.info(f"获取配置: {config_content}") if config_content: loaded = yaml.safe_load(config_content) or {} nacos_config_data = loaded # 用 Nacos 配置覆盖 settings for key, value in loaded.items(): if hasattr(settings, key): setattr(settings, key, value) logger.info(f"✅ Nacos 配置加载成功: {CONFIG_DATA_ID} | 覆盖字段数量: {len(loaded)}") else: logger.warning("Nacos 返回配置为空,使用 .env + 默认值") # 2. 注册动态监听器(配置变更自动刷新) async def listener(content: str): try: new_config = yaml.safe_load(content) if content else {} nacos_config_data = new_config # 实时覆盖 settings for key, value in new_config.items(): if hasattr(settings, key): old_val = getattr(settings, key) setattr(settings, key, value) if old_val != value: logger.info(f"🔄 配置更新 → {key}: {old_val} → {value}") logger.info(f"【Nacos 动态更新】{CONFIG_DATA_ID}") except Exception as e: logger.error(f"Nacos 配置解析失败: {e}") await config_client.add_listener(CONFIG_DATA_ID, CONFIG_GROUP, listener) logger.info("✅ Nacos 配置监听器已注册(支持热更新)") except Exception as e: logger.error(f"获取配置失败: {e}") current_config["content"] = "获取配置失败" # ====================2. 注册到 Nacos ==================== try: register_param = RegisterInstanceParam( service_name=SERVICE_NAME, group_name=SERVICE_GROUP, ip=REGISTER_IP, port=SERVICE_PORT, weight=1.0, cluster_name="DEFAULT", metadata={"version": "1.0", "framework": "fastapi"}, enabled=True, healthy=True, ephemeral=False, # ← 先用 False 测试,成功后再改成 True ) response = await naming_client.register_instance(request=register_param) logger.info(f"Nacos 注册成功!Response: {response}") print(f"✅ 服务已注册到 Nacos: {SERVICE_NAME} @ {REGISTER_IP}:{SERVICE_PORT}") except Exception as e: logger.error(f"Nacos 注册失败: {e}") yield # FastAPI 正常运行 # ====================3. 应用关闭时注销 ==================== if naming_client: try: deregister_param = DeregisterInstanceParam( service_name=SERVICE_NAME, group_name=SERVICE_GROUP, ip=REGISTER_IP, port=SERVICE_PORT, cluster_name="DEFAULT", ) await naming_client.deregister_instance(request=deregister_param) logger.info("Nacos 注销成功") except Exception as e: logger.warning(f"Nacos 注销失败: {e}") app = FastAPI(lifespan=lifespan, docs_url="/docs", redoc_url='/re-docs', openapi_url=f"/openapi.json", description=''' Base frame with FastAPI - Super Resolution API ''') app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.include_router(router=router) @app.exception_handler(HTTPException) async def http_exception_handler(exc: HTTPException): return JSONResponse( status_code=exc.status_code, content=ResponseModel(code=exc.status_code, msg=exc.detail, data=exc.detail).dict() ) @app.get("/health") async def health(): return {"status": "ok", "env": settings.SERVE_ENV} @app.get("/config") async def show_config(s: Settings = Depends(get_settings)): """查看当前完整配置(生产环境建议加权限)""" return s.model_dump() if __name__ == '__main__': uvicorn.run(app, host="0.0.0.0", port=settings.PORT)