3D 打板部署
All checks were successful
git commit AiDA python develop 分支构建部署 / scheduled_deploy (push) Has been skipped

This commit is contained in:
zcr
2026-04-28 17:17:29 +08:00
parent c73bfa7e2a
commit 893f5e87b4

View File

@@ -1,9 +1,8 @@
# 1. 这里的顺序至关重要!必须在最顶端
import sys import sys
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Dict, Any
import yaml # from app.core.nacos_config import load_nacos_config, register_server, deregister_server
from v2.nacos import ClientConfigBuilder, GRPCConfig, NacosNamingService, RegisterInstanceParam, DeregisterInstanceParam, NacosConfigService, ConfigParam
try: try:
import asyncore import asyncore
@@ -14,14 +13,16 @@ except ImportError:
import logging.config import logging.config
import uvicorn import uvicorn
from fastapi import FastAPI, Depends from fastapi import FastAPI
from fastapi import HTTPException from fastapi import HTTPException
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from app.api.api_route import router from app.api.api_route import router
from app.core.config import settings, get_settings, Settings from app.core.config import settings
# from app.core.record_api_count import count_api_calls
from app.schemas.response_template import ResponseModel from app.schemas.response_template import ResponseModel
from logging_env import LOGGER_CONFIG_DICT from logging_env import LOGGER_CONFIG_DICT
from dotenv import load_dotenv
from starlette.middleware.cors import CORSMiddleware from starlette.middleware.cors import CORSMiddleware
logging.config.dictConfig(LOGGER_CONFIG_DICT) logging.config.dictConfig(LOGGER_CONFIG_DICT)
@@ -29,130 +30,46 @@ logging.getLogger("pika").setLevel(logging.WARNING)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
naming_client: NacosNamingService | None = None load_dotenv()
config_client: NacosConfigService | None = None
current_config: dict = {}
# ==================== 配置区 ====================
NACOS_SERVER = "18.167.251.121:28848" # ← 改成你 Nacos 的公网IP:8848推荐标准端口
NACOS_USERNAME = "nacos"
NACOS_PASSWORD = "Aidlab123123!"
NAMESPACE_ID = "zcr" # 你的 namespace_id
SERVICE_NAME = "fastapi-demo-service"
SERVICE_GROUP = "DEFAULT_GROUP"
SERVICE_PORT = 8040 # FastAPI 监听端口
REGISTER_IP = "10.1.1.43"
# 配置相关
CONFIG_DATA_ID = "aida.python"
CONFIG_GROUP = "LOCAL"
nacos_config_data: Dict[str, Any] = {}
@asynccontextmanager # @asynccontextmanager
async def lifespan(app: FastAPI): # async def lifespan(app: FastAPI):
"""FastAPI 生命周期管理:启动时注册,关闭时注销""" # try:
global naming_client, config_client # load_nacos_config()
global nacos_config_data, settings # register_server()
#
# yield
# finally:
# deregister_server()
# logger.info("lifespan down")
client_config = (
ClientConfigBuilder() def get_application() -> FastAPI:
.username(NACOS_USERNAME) application = FastAPI(
.password(NACOS_PASSWORD) # lifespan=lifespan,
.namespace_id(NAMESPACE_ID) docs_url="/docs",
.server_address(NACOS_SERVER) redoc_url='/re-docs',
.log_level("DEBUG") # 先开 DEBUG 方便排查 openapi_url=f"/openapi.json",
.grpc_config(GRPCConfig(grpc_timeout=5000)) description='''
.build() Base frame with FastAPI
- Super Resolution API
'''
) )
# Naming Service服务注册发现 application.add_middleware(
naming_client = await NacosNamingService.create_naming_service(client_config) CORSMiddleware,
logger.info("Nacos NamingService 初始化成功") allow_origins=["*"],
config_client = await NacosConfigService.create_config_service(client_config) allow_credentials=True,
logger.info("Nacos ConfigService 初始化成功") allow_methods=["*"],
allow_headers=["*"],
# ==================== 1. 获取配置 ==================== )
try: # application.middleware("http")(count_api_calls)
config_param = ConfigParam(data_id=CONFIG_DATA_ID, group=CONFIG_GROUP, ) application.include_router(router=router)
config_content = await config_client.get_config(config_param) return application
logger.info(f"获取配置: {config_content}")
if config_content:
loaded = yaml.safe_load(config_content) or {}
nacos_config_data = loaded
# 用 Nacos 配置覆盖 settings
for key, value in loaded.items():
if hasattr(settings, key):
setattr(settings, key, value)
logger.info(f"✅ Nacos 配置加载成功: {CONFIG_DATA_ID} | 覆盖字段数量: {len(loaded)}")
else:
logger.warning("Nacos 返回配置为空,使用 .env + 默认值")
# 2. 注册动态监听器(配置变更自动刷新)
async def listener(content: str):
try:
new_config = yaml.safe_load(content) if content else {}
nacos_config_data = new_config
# 实时覆盖 settings
for key, value in new_config.items():
if hasattr(settings, key):
old_val = getattr(settings, key)
setattr(settings, key, value)
if old_val != value:
logger.info(f"🔄 配置更新 → {key}: {old_val}{value}")
logger.info(f"【Nacos 动态更新】{CONFIG_DATA_ID}")
except Exception as e:
logger.error(f"Nacos 配置解析失败: {e}")
await config_client.add_listener(CONFIG_DATA_ID, CONFIG_GROUP, listener)
logger.info("✅ Nacos 配置监听器已注册(支持热更新)")
except Exception as e:
logger.error(f"获取配置失败: {e}")
current_config["content"] = "获取配置失败"
# ====================2. 注册到 Nacos ====================
try:
register_param = RegisterInstanceParam(
service_name=SERVICE_NAME,
group_name=SERVICE_GROUP,
ip=REGISTER_IP,
port=SERVICE_PORT,
weight=1.0,
cluster_name="DEFAULT",
metadata={"version": "1.0", "framework": "fastapi"},
enabled=True,
healthy=True,
ephemeral=False, # ← 先用 False 测试,成功后再改成 True
)
response = await naming_client.register_instance(request=register_param)
logger.info(f"Nacos 注册成功Response: {response}")
print(f"✅ 服务已注册到 Nacos: {SERVICE_NAME} @ {REGISTER_IP}:{SERVICE_PORT}")
except Exception as e:
logger.error(f"Nacos 注册失败: {e}")
yield # FastAPI 正常运行
# ====================3. 应用关闭时注销 ====================
if naming_client:
try:
deregister_param = DeregisterInstanceParam(
service_name=SERVICE_NAME,
group_name=SERVICE_GROUP,
ip=REGISTER_IP,
port=SERVICE_PORT,
cluster_name="DEFAULT",
)
await naming_client.deregister_instance(request=deregister_param)
logger.info("Nacos 注销成功")
except Exception as e:
logger.warning(f"Nacos 注销失败: {e}")
app = FastAPI(lifespan=lifespan, docs_url="/docs", redoc_url='/re-docs', openapi_url=f"/openapi.json", description=''' Base frame with FastAPI - Super Resolution API ''') app = get_application()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], )
app.include_router(router=router)
@app.exception_handler(HTTPException) @app.exception_handler(HTTPException)
@@ -163,15 +80,10 @@ async def http_exception_handler(exc: HTTPException):
) )
@app.get("/health") @app.get("/health", operation_id="health")
async def health(): async def health():
return {"status": "ok", "env": settings.SERVE_ENV} logger.info("health check")
return {"ok": True, "env": settings.APP_ENV}
@app.get("/config")
async def show_config(s: Settings = Depends(get_settings)):
"""查看当前完整配置(生产环境建议加权限)"""
return s.model_dump()
if __name__ == '__main__': if __name__ == '__main__':