feat 接入report

This commit is contained in:
zcr
2026-03-04 19:03:12 +08:00
parent 62b4505261
commit 972c3803a7
6 changed files with 246 additions and 261 deletions

View File

@@ -2,7 +2,7 @@ from pathlib import Path
from typing import AsyncGenerator, Dict, Any
from deepagents import create_deep_agent
from deepagents.backends import FilesystemBackend
from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage, AIMessage
from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage, AIMessage, AIMessageChunk
from langchain_core.runnables import RunnableConfig
from langchain_qwq import ChatQwen
@@ -26,6 +26,7 @@ MAIN_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = MAIN_DIR
model = ChatQwen(
enable_thinking=False,
model="qwen3.5-flash",
max_tokens=3_000,
timeout=None,
@@ -48,6 +49,7 @@ research_agent = create_deep_agent(
def get_model(config: RunnableConfig):
temp = config["configurable"].get("llm_temperature", 0.5)
return ChatQwen(
enable_thinking=False,
model="qwen3.5-flash",
max_tokens=3_000,
timeout=None,
@@ -114,110 +116,69 @@ async def researcher_node(
)]
}
async for event in research_agent.astream_events(
async for chunk in research_agent.astream(
{"messages": messages[-12:]},
version="v2",
config=config,
stream_subgraphs=True
config=config
):
event_type = event["event"]
name = event.get("name", "未知")
if event["event"] == "on_custom_event":
custom_data = event["data"]
# 你的 writer 发的是 dict所以这里 custom_data 就是你写的 {"type": "report_delta", "delta": "..."}
if isinstance(custom_data, dict) and custom_data.get("type") == "report_delta":
delta = custom_data.get("delta", "")
print(delta, end="", flush=True) # 实时打印,不换行
# ────────────── 工具结束事件:重点处理并 yield 输出 ──────────────
if event["event"] in {"on_tool_start", "on_tool_end"}:
tool_name = event.get("name", "未知")
is_start = event["event"] == "on_tool_start"
if is_start:
tool_input = event["data"].get("input", {})
current_step = f"正在執行工具:{tool_name}"
print(f"| {current_step} | {tool_input}")
yield {
"messages": [AIMessage(
content=full_content,
name="Researcher",
additional_kwargs={
"current_step": current_step,
"tool_name": tool_name,
"tool_input": tool_input,
"tool_status": "start",
"streaming": True
}
)]
}
else: # on_tool_end
tool_output = event["data"].get("output", "")
current_step = f"工具 {tool_name} 已完成"
print(f"| {current_step} | {tool_output}")
yield {
"messages": [AIMessage(
content=full_content,
name="Researcher",
additional_kwargs={
"current_step": current_step,
"tool_name": tool_name,
"tool_output": tool_output,
"tool_status": "end",
"streaming": True
}
)]
}
# ────────────── LLM 内容生成(保持原有逻辑) ──────────────
elif event_type == "on_chat_model_stream":
chunk = event["data"]["chunk"].content or ""
if chunk:
print(chunk, end="", flush=True)
full_content += chunk
if "\n" in chunk or len(full_content) % 4 == 0:
yield {
"messages": [AIMessage(
content=full_content,
name="Researcher",
additional_kwargs={
"current_step": current_step,
"streaming": True
}
)]
}
# ────────────── 其他链路事件(可选补充) ──────────────
elif event_type in ("on_chain_start", "on_chain_end"):
status = "开始" if event_type == "on_chain_start" else "完成"
current_step = f"[{status}] {name.upper()}"
if "messages" in chunk and isinstance(chunk["messages"], AIMessageChunk):
yield {
"messages": [AIMessage(
content=full_content,
name="Researcher",
additional_kwargs={
"current_step": current_step,
"streaming": True
}
)]
"messages": chunk["messages"], # 逐 token 追加
# 可以額外 yield 一些 metadata例如
# "node": "Researcher",
# "status": "thinking"
}
# 最终输出
yield {
"messages": [AIMessage(
content=full_content.strip() or "报告生成完成",
name="Researcher",
additional_kwargs={
"current_step": "报告已完成",
"streaming": False
}
)],
"next": "Suggester"
}
else:
# 其他類型的 chunk
yield chunk
#
# async def researcher_node(
# state: AgentState,
# config: RunnableConfig
# ) -> Dict[str, Any]:
# """
# 薄節點:只判斷是否要跑深度報告,並準備初始訊息
# 真正的 report 生成與 streaming 交給外層或子圖處理
# """
# use_report = config["configurable"].get("use_report", False)
#
# if not use_report:
# return {
# "messages": [AIMessage(
# content="深度報告功能未啟用,請通過前端按鈕觸發。",
# name="Researcher"
# )],
# "next": "Supervisor"
# }
#
# # 發送初始訊息,讓前端馬上看到「正在啟動」
# # initial_msg = AIMessage(
# # content="正在啟動深度報告生成...",
# # name="Researcher",
# # additional_kwargs={
# # "current_step": "正在啟動深度報告生成...",
# # "streaming": True
# # }
# # )
#
# # 方式一:最簡單,直接把 research_agent 當作下一個要執行的東西
# # (假設 research_agent 已 compile 好,且支援 astream
# # return {
# # "messages": state["messages"] + [initial_msg],
# # # 可以選擇加一個自訂 key 標記
# # "report_in_progress": True,
# # # next 留空或回 Supervisor由 conditional edges 決定
# # }
#
# # 方式二:如果你想更明確(推薦用 Send未來好擴充
# return Send(
# "research_sub_agent", # 你要在 graph.add_node("research_sub_agent", research_agent)
# {
# "messages": state["messages"][-12:],
# "configurable": config["configurable"]
# }
# )
# --- 3. Visualizer Agent (视觉专家) ---
async def visualizer_node(state: AgentState, config: RunnableConfig):
"""负责将自然语言转化为绘图 Prompt 并调用绘图工具"""

View File

@@ -2,6 +2,7 @@ from typing import Literal
from langchain_core.messages import AIMessage
from langchain_core.runnables import RunnableConfig
from langchain_qwq import ChatQwen
from langgraph.checkpoint.serde.jsonplus import JsonPlusSerializer
from langgraph.graph import StateGraph, END, START
from pydantic import BaseModel
from pymongo import MongoClient
@@ -91,6 +92,7 @@ client = MongoClient(MONGO_URI)
checkpointer = MongoDBSaver(
client=client["furniture_agent_db"],
db_name="langgraph",
collection_name="checkpoints"
collection_name="checkpoints",
serde=JsonPlusSerializer(pickle_fallback=True), # ← 關鍵這一行
)
app = workflow.compile(checkpointer=checkpointer)

View File

@@ -3,6 +3,7 @@ import json
import re
from typing import Optional, List, Dict
from langchain_qwq import ChatQwen
from langgraph.config import get_stream_writer
from pydantic import BaseModel, Field
from langchain_core.tools import tool
from langchain_core.messages import SystemMessage, HumanMessage
@@ -15,6 +16,7 @@ from src.core.config import settings
llm = ChatQwen(
enable_thinking=False,
model="qwen3.5-flash",
temperature=0.2,
max_tokens=3_000,
@@ -47,7 +49,7 @@ class ReportInput(BaseModel):
# =========================
@tool("report_generator", args_schema=ReportInput)
async def report_generator(
def report_generator(
report_topic: str,
structured_data: List[Dict],
language: str = "English"
@@ -57,11 +59,11 @@ async def report_generator(
directly from structured retrieval results.
"""
writer = get_stream_writer()
if not structured_data:
return {
"status": "error",
"message": "No structured data provided."
}
error_msg = "Error: No structured data provided."
writer({"type": "report_error", "message": error_msg})
return error_msg
collected_data_str = json.dumps(
structured_data,
@@ -103,55 +105,40 @@ Input Data:
# =========================
# 调用 LLM
# =========================
writer({"type": "report_start", "topic": report_topic, "language": language})
full_report = ""
try:
response = await llm.ainvoke([
for chunk in llm.stream([
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
])
report_content = response.content.strip()
# 清理 markdown block 包裹
report_content = (
report_content
.replace("```markdown", "")
.replace("```", "")
.strip()
)
]):
if chunk.content: # Gemini 返回的 chunk.content
delta = chunk.content
full_report += delta
writer({"type": "report_delta", "delta": delta}) # ← 实时推送给前端
except Exception as e:
return {
"status": "error",
"message": f"LLM generation failed: {str(e)}"
}
error_msg = f"LLM generation failed: {str(e)}"
writer({"type": "report_error", "message": error_msg})
return error_msg
report_content = full_report.strip()
# =========================
# 保存报告
# =========================
output_dir = "workspace/reports"
os.makedirs(output_dir, exist_ok=True)
safe_topic = re.sub(
r'[\\/*?:"<>|]',
"",
report_topic.replace(" ", "_")
)
safe_topic = re.sub(r'[\\/*?:"<>|]', "", report_topic.replace(" ", "_"))
filename = f"{output_dir}/{safe_topic}.md"
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(report_content)
writer({"type": "report_complete", "file_path": filename})
except Exception as e:
return {
"status": "error",
"message": f"Failed to save report: {str(e)}"
}
writer({"type": "report_save_warning", "message": str(e)})
return {
"status": "success",
"file_path": filename,
"message": "Report generated successfully."
}
# 返回完整内容(作为 tool result同时正文已通过 delta 流式输出
return report_content + f"\n\n✅ Report saved to: {filename}"