Files
FiDA_Python/src/server/deep_agent/tools/research_tool.py
2026-04-30 17:32:35 +08:00

153 lines
4.4 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# import asyncio
# import json
# from datetime import datetime
# from typing import List, Set, Optional
# from langchain_core.tools import tool
# from tavily import TavilyClient
#
# from src.core.config import settings
#
# # 模拟配置加载
# TAVILY_API_KEY = settings.TAVILY_API_KEY
#
#
# @tool
# async def topic_research(topic: list[str], max_urls: int = 5) -> str:
# """
# 深度调研工具。该工具会利用 Tavily 搜索引擎针对特定主题进行多维度搜索。
# 它会自动生成针对性的搜索词(包含年份和趋势),并返回去重后的高质量 URL 列表。
# """
# if not TAVILY_API_KEY:
# return "❌ 错误: 未配置 TAVILY_API_KEY。"
#
# client = TavilyClient(api_key=TAVILY_API_KEY)
#
# # 1. 自动生成多维度搜索词 (在工具内部快速生成)
#
# # 2. 并行执行搜索
# async def perform_search(q: str):
# # 使用 asyncio.to_thread 运行同步的 Tavily SDK
# def sync_search():
# try:
# response = client.search(
# query=q,
# search_depth="advanced",
# max_results=5,
# include_answer=False
# )
# return response.get('results', [])
# except Exception as e:
# print(f"Search error: {e}")
# return []
#
# return await asyncio.to_thread(sync_search)
#
# search_tasks = [perform_search(q) for q in topic]
# search_results_list = await asyncio.gather(*search_tasks)
#
# # 3. 结果去重与过滤
# seen_urls: Set[str] = set()
# final_urls = []
#
# # 常见的非内容页面过滤
# skip_extensions = ('.pdf', '.jpg', '.png', '.zip', '.exe')
#
# for results in search_results_list:
# for item in results:
# url = item.get('url')
# if url and url not in seen_urls:
# if not url.lower().endswith(skip_extensions):
# seen_urls.add(url)
# final_urls.append(url)
#
# # 4. 结果截断
# selected_urls = final_urls[:max_urls]
#
# # 返回 JSON 字符串,便于 Agent 下一步调用批量爬虫 (Crawl4ai)
# return json.dumps(selected_urls, ensure_ascii=False)
#
import asyncio
import json
from typing import List, Set
from ddgs import DDGS
from langchain.tools import tool
@tool
async def topic_research(topic: List[str], max_urls: int = 5) -> str:
"""
深度调研工具DuckDuckGo版本
根据多个主题关键词进行搜索,返回去重后的高质量 URL 列表JSON字符串
"""
# DuckDuckGo 是同步库,需要丢到线程池
def sync_search(query: str):
try:
with DDGS() as ddgs:
results = ddgs.text(
query,
max_results=8 # 稍微多一点,后面会过滤
)
return [r.get("href") for r in results if r.get("href")]
except Exception as e:
print(f"Search error: {e}")
return []
async def perform_search(q: str):
return await asyncio.to_thread(sync_search, q)
# 并发执行多个 query
search_tasks = [perform_search(q) for q in topic]
search_results_list = await asyncio.gather(*search_tasks)
# ========================
# 去重 + 过滤
# ========================
seen_urls: Set[str] = set()
final_urls = []
skip_extensions = ('.pdf', '.jpg', '.png', '.zip', '.exe')
for results in search_results_list:
for url in results:
if not url:
continue
if url not in seen_urls and not url.lower().endswith(skip_extensions):
seen_urls.add(url)
final_urls.append(url)
# ========================
# 截断结果
# ========================
selected_urls = final_urls[:max_urls]
print(f" topic research !!!!!!!!!!!!!!!!!!!!! {selected_urls}")
return json.dumps(selected_urls, ensure_ascii=False)
# import asyncio
#
#
# # 假设你已经定义了 topic_research
#
# async def test():
# topics = [
# "modern furniture design trends 2025",
# "scandinavian furniture materials",
# "minimalist living room furniture ideas"
# ]
#
# result = await topic_research.ainvoke({
# "topic": topics,
# "max_urls": 5
# })
#
# print("结果👇")
# print(result)
#
#
# asyncio.run(test())