diff --git a/src/server/deep_agent/tools/research_tool.py b/src/server/deep_agent/tools/research_tool.py index 49fa09c..0e63eea 100755 --- a/src/server/deep_agent/tools/research_tool.py +++ b/src/server/deep_agent/tools/research_tool.py @@ -1,67 +1,152 @@ +# import asyncio +# import json +# from datetime import datetime +# from typing import List, Set, Optional +# from langchain_core.tools import tool +# from tavily import TavilyClient +# +# from src.core.config import settings +# +# # 模拟配置加载 +# TAVILY_API_KEY = settings.TAVILY_API_KEY +# +# +# @tool +# async def topic_research(topic: list[str], max_urls: int = 5) -> str: +# """ +# 深度调研工具。该工具会利用 Tavily 搜索引擎针对特定主题进行多维度搜索。 +# 它会自动生成针对性的搜索词(包含年份和趋势),并返回去重后的高质量 URL 列表。 +# """ +# if not TAVILY_API_KEY: +# return "❌ 错误: 未配置 TAVILY_API_KEY。" +# +# client = TavilyClient(api_key=TAVILY_API_KEY) +# +# # 1. 自动生成多维度搜索词 (在工具内部快速生成) +# +# # 2. 并行执行搜索 +# async def perform_search(q: str): +# # 使用 asyncio.to_thread 运行同步的 Tavily SDK +# def sync_search(): +# try: +# response = client.search( +# query=q, +# search_depth="advanced", +# max_results=5, +# include_answer=False +# ) +# return response.get('results', []) +# except Exception as e: +# print(f"Search error: {e}") +# return [] +# +# return await asyncio.to_thread(sync_search) +# +# search_tasks = [perform_search(q) for q in topic] +# search_results_list = await asyncio.gather(*search_tasks) +# +# # 3. 结果去重与过滤 +# seen_urls: Set[str] = set() +# final_urls = [] +# +# # 常见的非内容页面过滤 +# skip_extensions = ('.pdf', '.jpg', '.png', '.zip', '.exe') +# +# for results in search_results_list: +# for item in results: +# url = item.get('url') +# if url and url not in seen_urls: +# if not url.lower().endswith(skip_extensions): +# seen_urls.add(url) +# final_urls.append(url) +# +# # 4. 结果截断 +# selected_urls = final_urls[:max_urls] +# +# # 返回 JSON 字符串,便于 Agent 下一步调用批量爬虫 (Crawl4ai) +# return json.dumps(selected_urls, ensure_ascii=False) +# + + import asyncio import json -from datetime import datetime -from typing import List, Set, Optional -from langchain_core.tools import tool -from tavily import TavilyClient +from typing import List, Set -from src.core.config import settings - -# 模拟配置加载 -TAVILY_API_KEY = settings.TAVILY_API_KEY +from ddgs import DDGS +from langchain.tools import tool @tool -async def topic_research(topic: list[str], max_urls: int = 5) -> str: +async def topic_research(topic: List[str], max_urls: int = 5) -> str: """ - 深度调研工具。该工具会利用 Tavily 搜索引擎针对特定主题进行多维度搜索。 - 它会自动生成针对性的搜索词(包含年份和趋势),并返回去重后的高质量 URL 列表。 + 深度调研工具(DuckDuckGo版本)。 + 根据多个主题关键词进行搜索,返回去重后的高质量 URL 列表(JSON字符串)。 """ - if not TAVILY_API_KEY: - return "❌ 错误: 未配置 TAVILY_API_KEY。" - client = TavilyClient(api_key=TAVILY_API_KEY) - - # 1. 自动生成多维度搜索词 (在工具内部快速生成) - - # 2. 并行执行搜索 - async def perform_search(q: str): - # 使用 asyncio.to_thread 运行同步的 Tavily SDK - def sync_search(): - try: - response = client.search( - query=q, - search_depth="advanced", - max_results=5, - include_answer=False + # DuckDuckGo 是同步库,需要丢到线程池 + def sync_search(query: str): + try: + with DDGS() as ddgs: + results = ddgs.text( + query, + max_results=8 # 稍微多一点,后面会过滤 ) - return response.get('results', []) - except Exception as e: - print(f"Search error: {e}") - return [] + return [r.get("href") for r in results if r.get("href")] + except Exception as e: + print(f"Search error: {e}") + return [] - return await asyncio.to_thread(sync_search) + async def perform_search(q: str): + return await asyncio.to_thread(sync_search, q) + # 并发执行多个 query search_tasks = [perform_search(q) for q in topic] search_results_list = await asyncio.gather(*search_tasks) - # 3. 结果去重与过滤 + # ======================== + # 去重 + 过滤 + # ======================== seen_urls: Set[str] = set() final_urls = [] - # 常见的非内容页面过滤 skip_extensions = ('.pdf', '.jpg', '.png', '.zip', '.exe') for results in search_results_list: - for item in results: - url = item.get('url') - if url and url not in seen_urls: - if not url.lower().endswith(skip_extensions): - seen_urls.add(url) - final_urls.append(url) + for url in results: + if not url: + continue - # 4. 结果截断 + if url not in seen_urls and not url.lower().endswith(skip_extensions): + seen_urls.add(url) + final_urls.append(url) + + # ======================== + # 截断结果 + # ======================== selected_urls = final_urls[:max_urls] - - # 返回 JSON 字符串,便于 Agent 下一步调用批量爬虫 (Crawl4ai) + print(f" topic research !!!!!!!!!!!!!!!!!!!!! {selected_urls}") return json.dumps(selected_urls, ensure_ascii=False) + + +# import asyncio +# +# +# # 假设你已经定义了 topic_research +# +# async def test(): +# topics = [ +# "modern furniture design trends 2025", +# "scandinavian furniture materials", +# "minimalist living room furniture ideas" +# ] +# +# result = await topic_research.ainvoke({ +# "topic": topics, +# "max_urls": 5 +# }) +# +# print("结果👇") +# print(result) +# +# +# asyncio.run(test())