使用ddgs代替TAVILY
This commit is contained in:
@@ -1,67 +1,152 @@
|
||||
# import asyncio
|
||||
# import json
|
||||
# from datetime import datetime
|
||||
# from typing import List, Set, Optional
|
||||
# from langchain_core.tools import tool
|
||||
# from tavily import TavilyClient
|
||||
#
|
||||
# from src.core.config import settings
|
||||
#
|
||||
# # 模拟配置加载
|
||||
# TAVILY_API_KEY = settings.TAVILY_API_KEY
|
||||
#
|
||||
#
|
||||
# @tool
|
||||
# async def topic_research(topic: list[str], max_urls: int = 5) -> str:
|
||||
# """
|
||||
# 深度调研工具。该工具会利用 Tavily 搜索引擎针对特定主题进行多维度搜索。
|
||||
# 它会自动生成针对性的搜索词(包含年份和趋势),并返回去重后的高质量 URL 列表。
|
||||
# """
|
||||
# if not TAVILY_API_KEY:
|
||||
# return "❌ 错误: 未配置 TAVILY_API_KEY。"
|
||||
#
|
||||
# client = TavilyClient(api_key=TAVILY_API_KEY)
|
||||
#
|
||||
# # 1. 自动生成多维度搜索词 (在工具内部快速生成)
|
||||
#
|
||||
# # 2. 并行执行搜索
|
||||
# async def perform_search(q: str):
|
||||
# # 使用 asyncio.to_thread 运行同步的 Tavily SDK
|
||||
# def sync_search():
|
||||
# try:
|
||||
# response = client.search(
|
||||
# query=q,
|
||||
# search_depth="advanced",
|
||||
# max_results=5,
|
||||
# include_answer=False
|
||||
# )
|
||||
# return response.get('results', [])
|
||||
# except Exception as e:
|
||||
# print(f"Search error: {e}")
|
||||
# return []
|
||||
#
|
||||
# return await asyncio.to_thread(sync_search)
|
||||
#
|
||||
# search_tasks = [perform_search(q) for q in topic]
|
||||
# search_results_list = await asyncio.gather(*search_tasks)
|
||||
#
|
||||
# # 3. 结果去重与过滤
|
||||
# seen_urls: Set[str] = set()
|
||||
# final_urls = []
|
||||
#
|
||||
# # 常见的非内容页面过滤
|
||||
# skip_extensions = ('.pdf', '.jpg', '.png', '.zip', '.exe')
|
||||
#
|
||||
# for results in search_results_list:
|
||||
# for item in results:
|
||||
# url = item.get('url')
|
||||
# if url and url not in seen_urls:
|
||||
# if not url.lower().endswith(skip_extensions):
|
||||
# seen_urls.add(url)
|
||||
# final_urls.append(url)
|
||||
#
|
||||
# # 4. 结果截断
|
||||
# selected_urls = final_urls[:max_urls]
|
||||
#
|
||||
# # 返回 JSON 字符串,便于 Agent 下一步调用批量爬虫 (Crawl4ai)
|
||||
# return json.dumps(selected_urls, ensure_ascii=False)
|
||||
#
|
||||
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from datetime import datetime
|
||||
from typing import List, Set, Optional
|
||||
from langchain_core.tools import tool
|
||||
from tavily import TavilyClient
|
||||
from typing import List, Set
|
||||
|
||||
from src.core.config import settings
|
||||
|
||||
# 模拟配置加载
|
||||
TAVILY_API_KEY = settings.TAVILY_API_KEY
|
||||
from ddgs import DDGS
|
||||
from langchain.tools import tool
|
||||
|
||||
|
||||
@tool
|
||||
async def topic_research(topic: list[str], max_urls: int = 5) -> str:
|
||||
async def topic_research(topic: List[str], max_urls: int = 5) -> str:
|
||||
"""
|
||||
深度调研工具。该工具会利用 Tavily 搜索引擎针对特定主题进行多维度搜索。
|
||||
它会自动生成针对性的搜索词(包含年份和趋势),并返回去重后的高质量 URL 列表。
|
||||
深度调研工具(DuckDuckGo版本)。
|
||||
根据多个主题关键词进行搜索,返回去重后的高质量 URL 列表(JSON字符串)。
|
||||
"""
|
||||
if not TAVILY_API_KEY:
|
||||
return "❌ 错误: 未配置 TAVILY_API_KEY。"
|
||||
|
||||
client = TavilyClient(api_key=TAVILY_API_KEY)
|
||||
|
||||
# 1. 自动生成多维度搜索词 (在工具内部快速生成)
|
||||
|
||||
# 2. 并行执行搜索
|
||||
async def perform_search(q: str):
|
||||
# 使用 asyncio.to_thread 运行同步的 Tavily SDK
|
||||
def sync_search():
|
||||
try:
|
||||
response = client.search(
|
||||
query=q,
|
||||
search_depth="advanced",
|
||||
max_results=5,
|
||||
include_answer=False
|
||||
# DuckDuckGo 是同步库,需要丢到线程池
|
||||
def sync_search(query: str):
|
||||
try:
|
||||
with DDGS() as ddgs:
|
||||
results = ddgs.text(
|
||||
query,
|
||||
max_results=8 # 稍微多一点,后面会过滤
|
||||
)
|
||||
return response.get('results', [])
|
||||
except Exception as e:
|
||||
print(f"Search error: {e}")
|
||||
return []
|
||||
return [r.get("href") for r in results if r.get("href")]
|
||||
except Exception as e:
|
||||
print(f"Search error: {e}")
|
||||
return []
|
||||
|
||||
return await asyncio.to_thread(sync_search)
|
||||
async def perform_search(q: str):
|
||||
return await asyncio.to_thread(sync_search, q)
|
||||
|
||||
# 并发执行多个 query
|
||||
search_tasks = [perform_search(q) for q in topic]
|
||||
search_results_list = await asyncio.gather(*search_tasks)
|
||||
|
||||
# 3. 结果去重与过滤
|
||||
# ========================
|
||||
# 去重 + 过滤
|
||||
# ========================
|
||||
seen_urls: Set[str] = set()
|
||||
final_urls = []
|
||||
|
||||
# 常见的非内容页面过滤
|
||||
skip_extensions = ('.pdf', '.jpg', '.png', '.zip', '.exe')
|
||||
|
||||
for results in search_results_list:
|
||||
for item in results:
|
||||
url = item.get('url')
|
||||
if url and url not in seen_urls:
|
||||
if not url.lower().endswith(skip_extensions):
|
||||
seen_urls.add(url)
|
||||
final_urls.append(url)
|
||||
for url in results:
|
||||
if not url:
|
||||
continue
|
||||
|
||||
# 4. 结果截断
|
||||
if url not in seen_urls and not url.lower().endswith(skip_extensions):
|
||||
seen_urls.add(url)
|
||||
final_urls.append(url)
|
||||
|
||||
# ========================
|
||||
# 截断结果
|
||||
# ========================
|
||||
selected_urls = final_urls[:max_urls]
|
||||
|
||||
# 返回 JSON 字符串,便于 Agent 下一步调用批量爬虫 (Crawl4ai)
|
||||
print(f" topic research !!!!!!!!!!!!!!!!!!!!! {selected_urls}")
|
||||
return json.dumps(selected_urls, ensure_ascii=False)
|
||||
|
||||
|
||||
# import asyncio
|
||||
#
|
||||
#
|
||||
# # 假设你已经定义了 topic_research
|
||||
#
|
||||
# async def test():
|
||||
# topics = [
|
||||
# "modern furniture design trends 2025",
|
||||
# "scandinavian furniture materials",
|
||||
# "minimalist living room furniture ideas"
|
||||
# ]
|
||||
#
|
||||
# result = await topic_research.ainvoke({
|
||||
# "topic": topics,
|
||||
# "max_urls": 5
|
||||
# })
|
||||
#
|
||||
# print("结果👇")
|
||||
# print(result)
|
||||
#
|
||||
#
|
||||
# asyncio.run(test())
|
||||
|
||||
Reference in New Issue
Block a user