1.优化隔离工作目录2.新增图像生成和编辑功能3.生成模型替换为本地flux2 klein

This commit is contained in:
zcr
2026-03-19 17:55:39 +08:00
parent b6ca7ae6ed
commit ac8a5e5a30
18 changed files with 1167 additions and 403 deletions

View File

@@ -1,3 +1,4 @@
import os
import time
import asyncio
from typing import List, Dict, Any
@@ -8,19 +9,6 @@ import uuid
from crawl4ai import AsyncWebCrawler, BrowserConfig, CrawlerRunConfig, CacheMode
from langchain_core.tools import tool
# ─────────────────────────────────────
# 路径配置
# ─────────────────────────────────────
TOOL_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = TOOL_DIR.parent
# DeepAgents 推荐目录
SAVE_DIR = PROJECT_ROOT / "agent_workspace" / "raw_data"
SAVE_DIR.mkdir(parents=True, exist_ok=True)
print(f"tool save : {str(PROJECT_ROOT / "agent_workspace")}")
# ─────────────────────────────────────
# Browser 配置
# ─────────────────────────────────────
@@ -65,7 +53,7 @@ def build_filename(url: str) -> str:
# 单个 URL 抓取
# ─────────────────────────────────────
async def crawl_one(crawler, url: str, sem: asyncio.Semaphore) -> Dict[str, Any]:
async def crawl_one(crawler, url: str, sem: asyncio.Semaphore, save_dir: str) -> Dict[str, Any]:
async with sem:
try:
result = await crawler.arun(url=url, config=run_config)
@@ -87,7 +75,7 @@ async def crawl_one(crawler, url: str, sem: asyncio.Semaphore) -> Dict[str, Any]
}
filename = build_filename(url)
filepath = SAVE_DIR / filename
filepath = os.path.join(save_dir, filename)
header = (
f"<!-- Source: {url} -->\n"
@@ -115,7 +103,7 @@ async def crawl_one(crawler, url: str, sem: asyncio.Semaphore) -> Dict[str, Any]
# Async 主逻辑
# ─────────────────────────────────────
async def _crawl4ai_batch(urls: List[str]) -> Dict[str, Any]:
async def _crawl4ai_batch(urls: List[str], save_dir: str) -> Dict[str, Any]:
urls = list(set(urls)) # 去重
if not urls:
@@ -126,7 +114,7 @@ async def _crawl4ai_batch(urls: List[str]) -> Dict[str, Any]:
async with AsyncWebCrawler(config=browser_config) as crawler:
tasks = [
crawl_one(crawler, url, sem)
crawl_one(crawler, url, sem, save_dir)
for url in urls
]
@@ -150,42 +138,46 @@ async def _crawl4ai_batch(urls: List[str]) -> Dict[str, Any]:
}
# ─────────────────────────────────────
# Tool同步
# ─────────────────────────────────────
@tool
def crawl4ai_batch(urls: List[str]) -> str:
"""
Batch crawl webpages and save their content as markdown files.
def create_crawl4ai_batch_tool(workspace_dir):
@tool
def crawl4ai_batch(urls: List[str]) -> str:
"""
Batch crawl webpages and save their content as markdown files.
Args:
urls: List of webpage URLs to crawl.
Args:
urls: List of webpage URLs to crawl.
Returns:
A summary of crawling results and saved file paths.
"""
Returns:
A summary of crawling results and saved file paths.
"""
try:
result = asyncio.run(_crawl4ai_batch(urls))
try:
save_dir = os.path.join(workspace_dir, "raw_data")
if not os.path.exists(save_dir):
os.makedirs(save_dir, exist_ok=True)
if "error" in result:
return f"❌ Error: {result['error']}"
result = asyncio.run(_crawl4ai_batch(urls, save_dir))
output = [
"### 批量抓取完成 ###",
f"成功保存文件: {result['count']}",
f"保存目录: {SAVE_DIR}",
"",
"抓取详情:"
]
if "error" in result:
return f"❌ Error: {result['error']}"
output.extend(result["summary"])
output = [
"### 批量抓取完成 ###",
f"成功保存文件: {result['count']}",
f"保存目录: {workspace_dir}",
"",
"抓取详情:"
]
if result["saved_files"]:
output.append("\n可读取文件:")
output.extend(result["saved_files"])
output.extend(result["summary"])
return "\n".join(output)
if result["saved_files"]:
output.append("\n可读取文件:")
output.extend(result["saved_files"])
except Exception as e:
return f"🚨 爬虫系统异常: {str(e)}"
return "\n".join(output)
except Exception as e:
return f"🚨 爬虫系统异常: {str(e)}"
return crawl4ai_batch

View File

@@ -1,15 +1,21 @@
import json
import logging
import os
import uuid
from pathlib import Path
from typing import Annotated
import httpx
from google.oauth2 import service_account
from langchain_core.tools import tool
from google import genai
from google.genai.types import GenerateContentConfig, Modality
from langgraph.prebuilt import ToolRuntime
from minio import Minio
from src.core.config import settings
from src.server.utils.new_oss_client import oss_upload_image
from src.server.utils.new_oss_client import oss_upload_image, oss_get_image, is_minio_file_exist, oss_upload_image_file
logger = logging.getLogger(__name__)
# 初始化全局凭证和客户端
@@ -27,47 +33,187 @@ client = genai.Client(
)
@tool
async def generate_furniture(prompt: str) -> str:
"""
使用 Gemini 图像生成模型根据详细的英文提示词生成家具设计草图。
"""
print(f"\n[系统日志] 正在调用 Nano Banana (Gemini Image Gen) ...")
def is_image_path_exist(image_path):
try:
response = client.models.generate_content(
model="gemini-2.5-flash-image",
contents=(f"Generate a professional furniture design sketch: {prompt}"),
config=GenerateContentConfig(
response_modalities=[Modality.TEXT, Modality.IMAGE],
),
)
return Path(image_path).exists()
except:
return False
image_bytes = None
for part in response.candidates[0].content.parts:
if part.inline_data:
image_bytes = part.inline_data.data
break
if not image_bytes:
return "未能生成图像数据。"
object_name = f"furniture/sketches/{uuid.uuid4()}.png"
bucket = "fida-test" # 替换为你的 bucket 名称
# 3. 调用你的上传函数
upload_res = oss_upload_image(
oss_client=minio_client,
bucket=bucket,
object_name=object_name,
image_bytes=image_bytes
)
def create_generate_furniture_tool(workspace_dir, width: int = 1024, height: int = 1024):
@tool
async def generate_furniture(prompt: str, runtime: ToolRuntime) -> str:
"""
使用 Gemini 图像生成模型根据详细的英文提示词生成家具设计草图。
"""
logger.info(f"\n[系统日志] 正在调用 generate_furniture ...")
try:
# 1. 生成图像 - local flux2-klein
object_name = f"furniture/sketches/{uuid.uuid4()}.png"
bucket_name = "fida-test" # 替换为你的 bucket 名称
request_data = {
"prompt": prompt,
"bucket_name": bucket_name,
"object_name": object_name,
"width": width,
"height": height
}
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"http://{settings.FLUX2_GEN_IMG_MODEL_URL}/predict",
json=request_data,
)
result = resp.json()
image_url = result.get("output_path", None)
if upload_res:
# 4. 构造访问链接 (如果是私有 bucket需使用 presigned_get_object)
# 这里简单示例为直接访问地址
image_url = f"{bucket}/{object_name}"
return image_url
else:
return "图片生成成功,但上传至存储服务器失败。"
except Exception as e:
logger.warning(e)
return "绘图流程异常"
if image_url:
filename = os.path.join(workspace_dir, image_url)
# 2. 创建本地目录(确保目录存在)
local_dir = os.path.dirname(filename)
if not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
img = oss_get_image(oss_client=minio_client, bucket=image_url.split('/')[0], object_name=image_url[image_url.find('/') + 1:])
img.save(filename)
return image_url
else:
return f"Image generation failed."
except Exception as e:
logger.warning(f"绘图流程异常:{e}")
return "绘图流程异常"
return generate_furniture
def create_edit_furniture_tool(workspace_dir, width: int = 1024, height: int = 1024):
@tool
async def edit_furniture(prompt: str, input_image_path) -> str:
"""
使用图像生成模型根据详细的英文提示词编辑家具设计草图。
"""
logger.info(f"\n[系统日志] 正在调用 edit_furniture ...")
try:
# 0. 编辑前先检查工作环境和minio上是否存在该图像
input_image_path = input_image_path.lstrip('/')
filename = os.path.join(workspace_dir, input_image_path)
local_exist = is_image_path_exist(filename)
minio_exist = is_minio_file_exist(minio_client=minio_client, bucket_name=input_image_path.split('/')[0], object_name=input_image_path.split('/')[0])
if not local_exist and not minio_exist:
# 两个地方都不存在 直接报错
return f"Image generation failed."
elif local_exist and not minio_exist:
# 把本地的上传到minio
oss_upload_image_file(oss_client=minio_client, bucket=input_image_path.split('/')[0], object_name=input_image_path.split('/')[0], file_path=filename)
elif not local_exist and minio_exist:
# minio的下载到本地
img = oss_get_image(oss_client=minio_client, bucket=input_image_path.split('/')[0], object_name=input_image_path.split('/')[0], )
img.save(filename)
elif minio_exist and local_exist:
# 两个地方都存在 直接跳过
pass
# 1. 生成图像 - local flux2-klein
object_name = f"furniture/sketches/{uuid.uuid4()}.png"
bucket_name = "fida-test" # 替换为你的 bucket 名称
request_data = {
"input_image_paths": [input_image_path],
"prompt": prompt,
"bucket_name": bucket_name,
"object_name": object_name,
"width": width,
"height": height
}
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"http://{settings.FLUX2_GEN_IMG_MODEL_URL}/predict",
json=request_data,
)
result = resp.json()
image_url = result.get("output_path", None)
if image_url:
filename = os.path.join(workspace_dir, image_url)
# 2. 创建本地目录(确保目录存在)
local_dir = os.path.dirname(filename)
if not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
img = oss_get_image(oss_client=minio_client, bucket=image_url.split('/')[0], object_name=image_url[image_url.find('/') + 1:])
img.save(filename)
return image_url
else:
return f"Image generation failed."
except Exception as e:
logger.warning(f"edit_furniture error {e}")
return "edit_furniture error"
return edit_furniture
# def create_generate_furniture_tool(workspace_dir):
# @tool
# async def generate_furniture(prompt: str) -> str:
# """
# 使用 Gemini 图像生成模型根据详细的英文提示词生成家具设计草图。
# """
# print(f"\n[系统日志] 正在调用 Nano Banana (Gemini Image Gen) ...")
#
# try:
# response = client.models.generate_content(
# model="gemini-2.5-flash-image",
# contents=(f"Generate a professional furniture design sketch: {prompt}"),
# config=GenerateContentConfig(
# response_modalities=[Modality.TEXT, Modality.IMAGE],
# ),
# )
#
# image_bytes = None
# for part in response.candidates[0].content.parts:
# if part.inline_data:
# image_bytes = part.inline_data.data
# break
#
# if not image_bytes:
# return "未能生成图像数据。"
# # 1. 定义OSS存储路径和本地保存路径
# object_name = f"furniture/sketches/{uuid.uuid4()}.png"
# bucket = "fida-test" # 替换为你的 bucket 名称
# filename = os.path.join(workspace_dir, f"{bucket}/{object_name}")
#
# # 2. 创建本地目录(确保目录存在)
# local_dir = os.path.dirname(filename)
# if not os.path.exists(local_dir):
# os.makedirs(local_dir, exist_ok=True)
#
# # 3. 保存图片到本地文件(新增核心逻辑)
# try:
# with open(filename, "wb") as f:
# f.write(image_bytes)
# print(f"[系统日志] 图片已保存到本地:{filename}")
# except Exception as save_e:
# logger.warning(f"保存图片到本地失败:{save_e}")
# # 本地保存失败不中断上传流程,仅记录日志
#
# # 4. 上传图片到OSS原有逻辑
# upload_res = oss_upload_image(
# oss_client=minio_client,
# bucket=bucket,
# object_name=object_name,
# image_bytes=image_bytes
# )
#
# if upload_res:
# image_url = f"{bucket}/{object_name}"
# return image_url
# else:
# return f"图片生成成功(本地路径:{filename}),但上传至存储服务器失败。"
#
# except Exception as e:
# logger.warning(f"绘图流程异常:{e}")
# return "绘图流程异常"
#
# return generate_furniture

View File

@@ -32,104 +32,108 @@ class ReportInput(BaseModel):
# LangGraph Tool
# =========================
@tool("report_generator", args_schema=ReportInput)
async def report_generator(
report_topic: str,
structured_data: List[Dict],
language: str = "English"
) -> dict:
"""
Generate a professional design/market report
directly from structured retrieval results.
"""
def create_report_generator_tool(workspace_dir):
@tool("report_generator", args_schema=ReportInput)
async def report_generator(
report_topic: str,
structured_data: List[Dict],
language: str = "English"
) -> dict:
"""
Generate a professional design/market report
directly from structured retrieval results.
"""
writer = get_stream_writer()
if not structured_data:
error_msg = "Error: No structured data provided."
writer({"type": "report_error", "message": error_msg})
return error_msg
writer = get_stream_writer()
if not structured_data:
error_msg = "Error: No structured data provided."
writer({"type": "report_error", "message": error_msg})
return error_msg
collected_data_str = json.dumps(
structured_data,
ensure_ascii=False,
indent=2
)
# =========================
# Prompt
# =========================
system_prompt = f"""
You are a professional design trend analyst.
Generate a long, structured Markdown report.
REQUIREMENTS:
1. Follow MECE principle.
2. Embed images ONLY if they start with https://
using: ![alt](url)
3. Insert images inline.
4. Every key insight must cite source:
[Website Name](url)
5. Use Markdown headings.
6. Start directly with title.
7. Be detailed and analytical.
Output Language: {language}
"""
user_prompt = f"""
Topic: {report_topic}
Input Data:
{collected_data_str}
"""
# =========================
# 调用 LLM
# =========================
writer({"type": "report_start", "topic": report_topic, "language": language})
full_report = ""
try:
report_llm = repoer_llm.with_config(
callbacks=[]
collected_data_str = json.dumps(
structured_data,
ensure_ascii=False,
indent=2
)
async for chunk in report_llm.astream(
[
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
]
):
if chunk.content: # Gemini 返回的 chunk.content
delta = chunk.content
full_report += delta
# return {"type": "report_delta", "delta": delta}
writer({"type": "report_delta", "delta": delta}) # ← 实时推送给前端
writer({"type": "report_stop", "topic": report_topic, "language": language})
except Exception as e:
error_msg = f"LLM generation failed: {str(e)}"
writer({"type": "report_error", "message": error_msg})
return error_msg
report_content = full_report.strip()
# =========================
# Prompt
# =========================
# =========================
# 保存报告
# =========================
output_dir = "workspace/reports"
os.makedirs(output_dir, exist_ok=True)
system_prompt = f"""
You are a professional design trend analyst.
safe_topic = re.sub(r'[\\/*?:"<>|]', "", report_topic.replace(" ", "_"))
filename = f"{output_dir}/{safe_topic}.md"
Generate a long, structured Markdown report.
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(report_content)
writer({"type": "report_complete", "file_path": filename})
except Exception as e:
writer({"type": "report_save_warning", "message": str(e)})
REQUIREMENTS:
# 返回完整内容(作为 tool result同时正文已通过 delta 流式输出
return report_content + f"\n\n✅ Report saved to: {filename}"
1. Follow MECE principle.
2. Embed images ONLY if they start with https://
using: ![alt](url)
3. Insert images inline.
4. Every key insight must cite source:
[Website Name](url)
5. Use Markdown headings.
6. Start directly with title.
7. Be detailed and analytical.
Output Language: {language}
"""
user_prompt = f"""
Topic: {report_topic}
Input Data:
{collected_data_str}
"""
# =========================
# 调用 LLM
# =========================
writer({"type": "report_start", "topic": report_topic, "language": language})
full_report = ""
try:
report_llm = repoer_llm.with_config(
callbacks=[]
)
async for chunk in report_llm.astream(
[
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
]
):
if chunk.content: # Gemini 返回的 chunk.content
delta = chunk.content
full_report += delta
# return {"type": "report_delta", "delta": delta}
writer({"type": "report_delta", "delta": delta}) # ← 实时推送给前端
writer({"type": "report_stop", "topic": report_topic, "language": language})
except Exception as e:
error_msg = f"LLM generation failed: {str(e)}"
writer({"type": "report_error", "message": error_msg})
return error_msg
report_content = full_report.strip()
# =========================
# 保存报告
# =========================
output_dir = os.path.join(workspace_dir, "reports")
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
safe_topic = re.sub(r'[\\/*?:"<>|]', "", report_topic.replace(" ", "_"))
filename = f"{output_dir}/{safe_topic}.md"
try:
with open(filename, "w", encoding="utf-8") as f:
f.write(report_content)
writer({"type": "report_complete", "file_path": filename})
except Exception as e:
writer({"type": "report_save_warning", "message": str(e)})
# 返回完整内容(作为 tool result同时正文已通过 delta 流式输出
return report_content + f"\n\n✅ Report saved to: {filename}"
return report_generator

View File

@@ -32,121 +32,6 @@ class StructuredRetrievalInput(BaseModel):
source_url: Optional[str] = Field(None, description="Optional global source URL")
@tool("structured_retrieval", args_schema=StructuredRetrievalInput)
def structured_retrieval(
file_paths: List[str],
query: str,
source_url: Optional[str] = None
) -> Dict:
"""
Batch structured extraction from markdown files.
- Performs vector search + re-ranking
- Saves extracted structured data as JSON file to disk
- Returns ONLY summary (status, count, file path)
"""
# ── 1. 收集所有文件內容 ──────────────────────────────────────
all_docs_pool: List[Document] = []
for path in file_paths:
if not os.path.exists(path) or not path.endswith((".md", ".markdown")):
continue
file_name = os.path.basename(path)
with open(path, "r", encoding="utf-8") as f:
content = f.read()
current_source = source_url or _extract_source_from_md(content) or "unknown"
sections = _split_markdown_by_headers(content)
for sec in sections:
all_docs_pool.append(
Document(
page_content=sec,
metadata={"source_url": current_source, "file_name": file_name}
)
)
if not all_docs_pool:
return {"status": "no_documents_found", "items_count": 0, "json_path": None}
# ── 2. Vector search ────────────────────────────────────────────
vector_store = FAISS.from_documents(all_docs_pool, _EMBEDDING_MODEL)
retrieved = vector_store.similarity_search(query, k=200)
# ── 3. 提取結構化片段 ──────────────────────────────────────────
structured_items = []
for doc in retrieved:
text = doc.page_content.strip()
if len(text) < 30:
continue
images = list(set(re.findall(r"!\[.*?\]\((.*?)\)", text)))
structured_items.append(
{
"text": text,
"images": images,
"source_url": doc.metadata.get("source_url"),
"file_name": doc.metadata.get("file_name")
}
)
# ── 4. Re-rank ──────────────────────────────────────────────────
if structured_items:
unique_items = {item["text"]: item for item in structured_items}.values()
pairs = [[query, item["text"]] for item in unique_items]
scores = _RERANK_MODEL.predict(pairs)
sorted_items = sorted(
zip(scores, unique_items),
key=lambda x: x[0],
reverse=True
)
top_items = [item for _, item in sorted_items[:50]]
else:
top_items = []
# ── 5. 寫入 JSON 文件 ──────────────────────────────────────────
if not top_items:
return {"status": "no_relevant_content", "items_count": 0, "json_path": None}
# 產生有意義的檔名
safe_query = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '_', query)[:40]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_filename = f"extracted_{safe_query}_{timestamp}.json"
# 建議的儲存目錄(與 crawl4ai_batch 對齊)
output_dir = os.path.join(os.path.dirname(file_paths[0]), "..", "extracted")
os.makedirs(output_dir, exist_ok=True)
json_path = os.path.join(output_dir, json_filename)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(
{
"query": query,
"extracted_at": timestamp,
"item_count": len(top_items),
"items": top_items
},
f,
ensure_ascii=False,
indent=2
)
# ── 6. 只回傳摘要 ──────────────────────────────────────────────
return {
"status": "success",
"items_count": len(top_items),
"json_path": json_path,
"summary": f"已提取 {len(top_items)} 個高相關片段,儲存於 {json_path}"
}
def _extract_source_from_md(content: str) -> Optional[str]:
match = re.search(r"<!--\s*Source:\s*(.*?)\s*-->", content)
return match.group(1).strip() if match else None
@@ -223,3 +108,126 @@ def _chunk_text(
start = max(0, end - overlap)
return chunks
def create_structured_retrieval_tool(workspace_dir):
@tool("structured_retrieval", args_schema=StructuredRetrievalInput)
def structured_retrieval(
file_paths: List[str],
query: str,
source_url: Optional[str] = None
) -> Dict:
"""
Batch structured extraction from markdown files.
- Performs vector search + re-ranking
- Saves extracted structured data as JSON file to disk
- Returns ONLY summary (status, count, file path)
"""
# ── 1. 收集所有文件內容 ──────────────────────────────────────
all_docs_pool: List[Document] = []
for path in file_paths:
if not os.path.exists(path) or not path.endswith((".md", ".markdown")):
continue
file_name = os.path.basename(path)
with open(path, "r", encoding="utf-8") as f:
content = f.read()
current_source = source_url or _extract_source_from_md(content) or "unknown"
sections = _split_markdown_by_headers(content)
for sec in sections:
all_docs_pool.append(
Document(
page_content=sec,
metadata={"source_url": current_source, "file_name": file_name}
)
)
if not all_docs_pool:
return {"status": "no_documents_found", "items_count": 0, "json_path": None}
# ── 2. Vector search ────────────────────────────────────────────
vector_store = FAISS.from_documents(all_docs_pool, _EMBEDDING_MODEL)
retrieved = vector_store.similarity_search(query, k=200)
# ── 3. 提取結構化片段 ──────────────────────────────────────────
structured_items = []
for doc in retrieved:
text = doc.page_content.strip()
if len(text) < 30:
continue
images = list(set(re.findall(r"!\[.*?\]\((.*?)\)", text)))
structured_items.append(
{
"text": text,
"images": images,
"source_url": doc.metadata.get("source_url"),
"file_name": doc.metadata.get("file_name")
}
)
# ── 4. Re-rank ──────────────────────────────────────────────────
if structured_items:
unique_items = {item["text"]: item for item in structured_items}.values()
pairs = [[query, item["text"]] for item in unique_items]
scores = _RERANK_MODEL.predict(pairs)
sorted_items = sorted(
zip(scores, unique_items),
key=lambda x: x[0],
reverse=True
)
top_items = [item for _, item in sorted_items[:50]]
else:
top_items = []
# ── 5. 寫入 JSON 文件 ──────────────────────────────────────────
if not top_items:
return {"status": "no_relevant_content", "items_count": 0, "json_path": None}
# 產生有意義的檔名
safe_query = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '_', query)[:40]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_filename = f"extracted_{safe_query}_{timestamp}.json"
# 建議的儲存目錄(與 crawl4ai_batch 對齊)
output_dir = os.path.join(workspace_dir, "extracted")
if not os.path.exists(output_dir):
os.makedirs(output_dir, exist_ok=True)
if not os.path.exists(output_dir):
# 2. 不存在则创建makedirs 支持创建多级目录mkdir 只能创建单级)
os.makedirs(output_dir, exist_ok=True)
json_path = os.path.join(output_dir, json_filename)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(
{
"query": query,
"extracted_at": timestamp,
"item_count": len(top_items),
"items": top_items
},
f,
ensure_ascii=False,
indent=2
)
# ── 6. 只回傳摘要 ──────────────────────────────────────────────
return {
"status": "success",
"items_count": len(top_items),
"json_path": json_path,
"summary": f"已提取 {len(top_items)} 個高相關片段,儲存於 {json_path}"
}
return structured_retrieval

View File

@@ -0,0 +1,21 @@
from langchain.tools import tool
from langchain_core.messages import HumanMessage
from PIL import Image
import requests
from io import BytesIO
from src.server.deep_agent.init_llm import vision_llm
@tool
def analyze_image(image_url: str) -> str:
"""分析给定URL的图像。输入图像URL输出图像描述和关键观察。"""
response = requests.get(image_url)
image = Image.open(BytesIO(response.content))
# 这里使用模型直接分析图像(简化示例)
msg = HumanMessage(content=[
{"type": "text", "text": "详细描述这张图像,包括物体、颜色、场景和任何文本。"},
{"type": "image_url", "image_url": {"url": image_url}}
])
result = vision_llm.invoke([msg])
return result.content