Files
AiDA_Python/app/main.py
zcr ad4db736de
All checks were successful
git commit AiDA python develop 分支构建部署 / scheduled_deploy (push) Has been skipped
新增nacos 配置 测试
2026-04-24 10:17:42 +08:00

179 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
from contextlib import asynccontextmanager
from typing import Dict, Any
import yaml
from v2.nacos import ClientConfigBuilder, GRPCConfig, NacosNamingService, RegisterInstanceParam, DeregisterInstanceParam, NacosConfigService, ConfigParam
try:
import asyncore
except ImportError:
import pyasyncore
sys.modules['asyncore'] = pyasyncore
import logging.config
import uvicorn
from fastapi import FastAPI, Depends
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from app.api.api_route import router
from app.core.config import settings, get_settings, Settings
from app.schemas.response_template import ResponseModel
from logging_env import LOGGER_CONFIG_DICT
from starlette.middleware.cors import CORSMiddleware
logging.config.dictConfig(LOGGER_CONFIG_DICT)
logging.getLogger("pika").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
naming_client: NacosNamingService | None = None
config_client: NacosConfigService | None = None
current_config: dict = {}
# ==================== 配置区 ====================
NACOS_SERVER = "18.167.251.121:28848" # ← 改成你 Nacos 的公网IP:8848推荐标准端口
NACOS_USERNAME = "nacos"
NACOS_PASSWORD = "Aidlab123123!"
NAMESPACE_ID = "zcr" # 你的 namespace_id
SERVICE_NAME = "fastapi-demo-service"
SERVICE_GROUP = "DEFAULT_GROUP"
SERVICE_PORT = 8040 # FastAPI 监听端口
REGISTER_IP = "10.1.1.43"
# 配置相关
CONFIG_DATA_ID = "aida.python"
CONFIG_GROUP = "LOCAL"
nacos_config_data: Dict[str, Any] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
"""FastAPI 生命周期管理:启动时注册,关闭时注销"""
global naming_client, config_client
global nacos_config_data, settings
client_config = (
ClientConfigBuilder()
.username(NACOS_USERNAME)
.password(NACOS_PASSWORD)
.namespace_id(NAMESPACE_ID)
.server_address(NACOS_SERVER)
.log_level("DEBUG") # 先开 DEBUG 方便排查
.grpc_config(GRPCConfig(grpc_timeout=5000))
.build()
)
# Naming Service服务注册发现
naming_client = await NacosNamingService.create_naming_service(client_config)
logger.info("Nacos NamingService 初始化成功")
config_client = await NacosConfigService.create_config_service(client_config)
logger.info("Nacos ConfigService 初始化成功")
# ==================== 1. 获取配置 ====================
try:
config_param = ConfigParam(data_id=CONFIG_DATA_ID, group=CONFIG_GROUP, )
config_content = await config_client.get_config(config_param)
logger.info(f"获取配置: {config_content}")
if config_content:
loaded = yaml.safe_load(config_content) or {}
nacos_config_data = loaded
# 用 Nacos 配置覆盖 settings
for key, value in loaded.items():
if hasattr(settings, key):
setattr(settings, key, value)
logger.info(f"✅ Nacos 配置加载成功: {CONFIG_DATA_ID} | 覆盖字段数量: {len(loaded)}")
else:
logger.warning("Nacos 返回配置为空,使用 .env + 默认值")
# 2. 注册动态监听器(配置变更自动刷新)
async def listener(content: str):
try:
new_config = yaml.safe_load(content) if content else {}
nacos_config_data = new_config
# 实时覆盖 settings
for key, value in new_config.items():
if hasattr(settings, key):
old_val = getattr(settings, key)
setattr(settings, key, value)
if old_val != value:
logger.info(f"🔄 配置更新 → {key}: {old_val}{value}")
logger.info(f"【Nacos 动态更新】{CONFIG_DATA_ID}")
except Exception as e:
logger.error(f"Nacos 配置解析失败: {e}")
await config_client.add_listener(CONFIG_DATA_ID, CONFIG_GROUP, listener)
logger.info("✅ Nacos 配置监听器已注册(支持热更新)")
except Exception as e:
logger.error(f"获取配置失败: {e}")
current_config["content"] = "获取配置失败"
# ====================2. 注册到 Nacos ====================
try:
register_param = RegisterInstanceParam(
service_name=SERVICE_NAME,
group_name=SERVICE_GROUP,
ip=REGISTER_IP,
port=SERVICE_PORT,
weight=1.0,
cluster_name="DEFAULT",
metadata={"version": "1.0", "framework": "fastapi"},
enabled=True,
healthy=True,
ephemeral=False, # ← 先用 False 测试,成功后再改成 True
)
response = await naming_client.register_instance(request=register_param)
logger.info(f"Nacos 注册成功Response: {response}")
print(f"✅ 服务已注册到 Nacos: {SERVICE_NAME} @ {REGISTER_IP}:{SERVICE_PORT}")
except Exception as e:
logger.error(f"Nacos 注册失败: {e}")
yield # FastAPI 正常运行
# ====================3. 应用关闭时注销 ====================
if naming_client:
try:
deregister_param = DeregisterInstanceParam(
service_name=SERVICE_NAME,
group_name=SERVICE_GROUP,
ip=REGISTER_IP,
port=SERVICE_PORT,
cluster_name="DEFAULT",
)
await naming_client.deregister_instance(request=deregister_param)
logger.info("Nacos 注销成功")
except Exception as e:
logger.warning(f"Nacos 注销失败: {e}")
app = FastAPI(lifespan=lifespan, docs_url="/docs", redoc_url='/re-docs', openapi_url=f"/openapi.json", description=''' Base frame with FastAPI - Super Resolution API ''')
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], )
app.include_router(router=router)
@app.exception_handler(HTTPException)
async def http_exception_handler(exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content=ResponseModel(code=exc.status_code, msg=exc.detail, data=exc.detail).dict()
)
@app.get("/health")
async def health():
return {"status": "ok", "env": settings.SERVE_ENV}
@app.get("/config")
async def show_config(s: Settings = Depends(get_settings)):
"""查看当前完整配置(生产环境建议加权限)"""
return s.model_dump()
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=settings.PORT)