first commit

This commit is contained in:
zcr
2026-02-04 17:57:49 +08:00
parent b7a8569097
commit d034937ddd
18 changed files with 1986 additions and 0 deletions

View File

View File

@@ -0,0 +1,86 @@
import os
from google.oauth2 import service_account
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from src.server.agent.state import AgentState
from src.server.agent.tools import generate_2025_report_tool, generate_furniture_sketch
from src.server.agent.config_loader import get_agent_prompt
from src.core.config import settings
creds = service_account.Credentials.from_service_account_file(
settings.GOOGLE_GENAI_USE_VERTEXAI,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
# 初始化 Gemini 模型 (使用 Flash 以保证速度)
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash", temperature=0.5, credentials=creds,
project="aida-461108", location='us-central1', vertexai=True, api_key=settings.GOOGLE_API_KEY
)
# --- 1. Designer Agent (设计顾问) ---
def designer_node(state: AgentState):
"""负责细化设计需求,提供专业参数"""
messages = state["messages"]
system_text = get_agent_prompt("designer") or """
你是一位资深的家具设计师。你的职责是:
1. 从用户的模糊描述中提取或补充具体的设计参数(尺寸、材质、人体工学数据)。
2. 如果用户想画图,不要直接画,而是先描述清楚细节,然后让 Visualizer 去画。
请以专业的口吻回复。
"""
system_prompt = SystemMessage(content=system_text)
response = llm.invoke([system_prompt] + messages)
return {"messages": [response]}
# --- 2. Researcher Agent (情报专家) ---
def researcher_node(state: AgentState):
"""负责调用报告生成工具"""
# 绑定工具给 LLM
tools = [generate_2025_report_tool]
llm_with_tools = llm.bind_tools(tools)
messages = state["messages"]
system_text = get_agent_prompt("researcher") or "你是情报专家,负责检索与整理参考资料并生成报告。"
system_prompt = SystemMessage(content=system_text)
response = llm_with_tools.invoke([system_prompt] + messages)
# 如果模型决定调用工具
if response.tool_calls:
# 这里为了简化直接在节点内执行工具LangGraph也可以用 ToolNode
tool_call = response.tool_calls[0]
if tool_call["name"] == "generate_2025_report_tool":
result = generate_2025_report_tool.invoke(tool_call["args"])
return {"messages": [response, HumanMessage(content=str(result))]}
return {"messages": [response]}
# --- 3. Visualizer Agent (视觉专家) ---
def visualizer_node(state: AgentState):
"""负责将自然语言转化为绘图 Prompt 并调用绘图工具"""
tools = [generate_furniture_sketch]
llm_with_tools = llm.bind_tools(tools)
messages = state["messages"]
system_text = get_agent_prompt("visualizer") or """
你是视觉专家。你的目标是生成高质量的家具草图。
步骤:
1. 根据上下文,编写一个详细的 Stable Diffusion 风格的英文 Prompt。
2. 必须调用 generate_furniture_sketch 工具来生成图片。
"""
# 强制它尝试调用工具
system_prompt = SystemMessage(content=system_text)
response = llm_with_tools.invoke([system_prompt] + messages)
if response.tool_calls:
tool_call = response.tool_calls[0]
if tool_call["name"] == "generate_furniture_sketch":
result = generate_furniture_sketch.invoke(tool_call["args"])
# 返回工具结果给 LLM让它生成最终回复
final_msg = f"已为您生成草图,链接如下:{result}"
return {"messages": [response, HumanMessage(content=final_msg)]}
return {"messages": [response]}

View File

@@ -0,0 +1,32 @@
"""加载项目根目录下的 config.yaml 并提供 agent prompt 访问接口。"""
import os
from functools import lru_cache
from typing import Any, Dict, Optional
import yaml
def _project_root() -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), "../..", ".."))
@lru_cache(maxsize=1)
def load_config() -> Dict[str, Any]:
path = os.path.join(_project_root(), "config.yaml")
if not os.path.exists(path):
return {}
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def get_agent_prompt(agent_name: str) -> Optional[str]:
cfg = load_config()
agents = cfg.get("agents", {})
entry = agents.get(agent_name, {})
prompt = entry.get("prompt_template") or entry.get("prompt")
return prompt
def get_model_config() -> Dict[str, Any]:
cfg = load_config()
return cfg.get("model", {})

98
src/server/agent/graph.py Normal file
View File

@@ -0,0 +1,98 @@
import os
from typing import Literal
from google.oauth2 import service_account
from langchain_core.messages import AIMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.graph import StateGraph, END, START
from pydantic import BaseModel
from pymongo import MongoClient
from src.core.config import settings, MONGO_URI
from src.server.agent.state import AgentState
from src.server.agent.agents import designer_node, researcher_node, visualizer_node
from langgraph.checkpoint.mongodb import MongoDBSaver
# --- Supervisor (路由逻辑) ---
# 定义路由的输出结构,强制 LLM 选择一个
class RouteResponse(BaseModel):
next: Literal["Designer", "Researcher", "Visualizer", "FINISH"]
creds = service_account.Credentials.from_service_account_file(
settings.GOOGLE_GENAI_USE_VERTEXAI,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
llm_supervisor = ChatGoogleGenerativeAI(
model="gemini-2.0-flash", temperature=0, credentials=creds,
project="aida-461108", location='us-central1', vertexai=True, api_key=settings.GOOGLE_API_KEY
)
def supervisor_node(state: AgentState):
messages = state["messages"]
if not messages:
return {"next": "FINISH"}
last_message = messages[-1]
# --- 改进的拦截逻辑 ---
# 如果最后一条消息是 AI 产生的(且没有调用工具),说明专家已经回复完了用户
# 此时我们才拦截并结束,否则会导致专家没机会说话
if isinstance(last_message, AIMessage) and not last_message.tool_calls:
return {"next": "FINISH"}
# 如果最后一条是 HumanMessage说明用户刚说完Supervisor 必须派发任务
system_prompt = """
你是家具设计团队的主管(Supervisor)。
请根据用户的意图,选择最合适的专家:
- Designer: 设计建议、参数细化、闲聊、问候。
- Visualizer: 绘图、看草图。
- Researcher: 市场报告、趋势。
只需输出专家名称。
"""
chain = llm_supervisor.with_structured_output(RouteResponse)
decision = chain.invoke([{"role": "system", "content": system_prompt}] + messages)
return {"next": decision.next}
# --- 构建 Graph ---
workflow = StateGraph(AgentState)
workflow.add_node("Supervisor", supervisor_node)
workflow.add_node("Designer", designer_node)
workflow.add_node("Researcher", researcher_node)
workflow.add_node("Visualizer", visualizer_node)
workflow.add_edge(START, "Supervisor")
# 这里的逻辑是关键Supervisor 决定去向
workflow.add_conditional_edges(
"Supervisor",
lambda state: state["next"],
{
"Designer": "Designer",
"Researcher": "Researcher",
"Visualizer": "Visualizer",
"FINISH": END
}
)
# 重点修改:专家执行完后,必须回到 Supervisor 进行状态检查
# 如果 Supervisor 发现专家刚说完话,它会触发上面的逻辑返回 FINISH
workflow.add_edge("Designer", "Supervisor")
workflow.add_edge("Researcher", "Supervisor")
workflow.add_edge("Visualizer", "Supervisor")
client = MongoClient(MONGO_URI)
checkpointer = MongoDBSaver(
client=client["furniture_agent_db"],
db_name="langgraph",
collection_name="checkpoints"
)
app = workflow.compile(checkpointer=checkpointer)

View File

@@ -0,0 +1,49 @@
from langchain_core.messages import HumanMessage, AIMessage
from src.server.agent.graph import app
def main():
# 模拟 thread_id 区分不同用户或项目
config = {"configurable": {"thread_id": "project_alpha"}}
while True:
user_input = input("\n👤 设计师 (输入 'history' 定位轮次): ")
# --- 官方推荐的异步回溯逻辑 ---
if user_input.lower() == "history":
print("\n--- 历史记录 ---")
for state in app.get_state_history(config):
# 每一个 state 都是一个 CheckpointTuple
cp_id = state.config["configurable"]["checkpoint_id"]
msg = state.values["messages"][-1].content[:30] if state.values.get("messages") else "Initial"
print(f"ID: {cp_id} | 内容: {msg}...")
target_id = input("\n请输入想要回溯的 Checkpoint ID (直接回车取消): ")
if target_id:
# 重新配置 config指向特定的 checkpoint_id 实现分支
config = {"configurable": {"thread_id": "project_alpha", "checkpoint_id": target_id}}
print(f"✅ 已定位到节点 {target_id},后续对话将从此分叉。")
continue
# --- 官方推荐的 astream 异步流式调用 ---
print("🤖 Agent 思考中...")
for event in app.stream(
{"messages": [HumanMessage(content=user_input)]},
config,
stream_mode="values" # 这里设为 values 可以直接获取当前状态的消息列表
):
# 获取当前节点处理后的最新消息
if "messages" in event:
last_msg = event["messages"][-1]
if isinstance(last_msg, AIMessage):
# 为了极致流式体验,可以在此处对 content 进行打印
pass
# 运行结束后,最新的状态已经自动持久化到 MongoDB
# 我们可以通过 app.get_state(config) 验证
final_state = app.get_state(config)
print(f"\n✅ 最终回复: {final_state.values['messages'][-1].content}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,9 @@
import operator
from typing import Annotated, Sequence, TypedDict, Union
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
# messages 存储完整的对话历史operator.add 表示新消息是追加而不是覆盖
messages: Annotated[Sequence[BaseMessage], operator.add]
# next 存储 Supervisor 决定的下一步是谁
next: str

25
src/server/agent/tools.py Normal file
View File

@@ -0,0 +1,25 @@
from langchain_core.tools import tool
# --- 模拟你已经开发好的报告生成功能 ---
@tool
def generate_2025_report_tool(topic: str) -> str:
"""
专门用于收集信息并生成报告。
当用户询问关于趋势、市场分析、年度报告如2025家具报告时调用此工具。
"""
print(f"\n[系统日志] 正在调用外部模块生成关于 '{topic}' 的报告...")
# 这里对接你实际的代码比如return my_existing_module.run(topic)
return f"【报告生成成功】已生成关于 {topic} 的 PDF 报告。核心洞察2025年趋势倾向于生物嗜好设计(Biophilic Design)和可持续软木材质。"
# --- 绘图工具 ---
@tool
def generate_furniture_sketch(prompt: str) -> str:
"""
用于生成家具草图。输入必须是详细的英文绘画提示词(Prompt)。
"""
print(f"\n[系统日志] 正在调用 Gemini/Imagen 绘图 APIPrompt: {prompt}...")
# 在真实场景中,这里调用 Google Imagen API 或 Midjourney API
# 示例返回一个模拟的图片链接
return "https://furniture-design-db.com/generated_sketch_v1.jpg"