Files
Deepresearch/utils.py
2025-08-20 20:13:23 +08:00

1161 lines
46 KiB
Python

import os
import asyncio
import requests
import random
import concurrent
import aiohttp
import time
import logging
from typing import List, Optional, Dict, Any, Union
from urllib.parse import unquote
from exa_py import Exa
from linkup import LinkupClient
from tavily import AsyncTavilyClient
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from langchain_community.retrievers import ArxivRetriever
from langchain_community.utilities.pubmed import PubMedAPIWrapper
from langsmith import traceable
from state import Section, ReportState
def compile_completed_sections(state: ReportState) -> str:
"""
将已完成的章节列表拼接为完整的文章字符串。
"""
compiled = []
for number, section in enumerate(state["completed_sections"], 1):
title = section.name
content = section.content.strip()
compiled.append(f"# {number}. {title}\n\n{content}\n")
return "\n".join(compiled).strip()
def get_config_value(value):
"""
Helper function to handle both string and enum cases of configuration values
"""
return value if isinstance(value, str) else value.value
def get_search_params(search_api: str, search_api_config: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""
Filters the search_api_config dictionary to include only parameters accepted by the specified search API.
Args:
search_api (str): The search API identifier (e.g., "exa", "tavily").
search_api_config (Optional[Dict[str, Any]]): The configuration dictionary for the search API.
Returns:
Dict[str, Any]: A dictionary of parameters to pass to the search function.
"""
# Define accepted parameters for each search API
SEARCH_API_PARAMS = {
"exa": ["max_characters", "num_results", "include_domains", "exclude_domains", "subpages"],
"tavily": [], # Tavily currently accepts no additional parameters
"perplexity": [], # Perplexity accepts no additional parameters
"arxiv": ["load_max_docs", "get_full_documents", "load_all_available_meta"],
"pubmed": ["top_k_results", "email", "api_key", "doc_content_chars_max"],
"linkup": ["depth"],
}
# Get the list of accepted parameters for the given search API
accepted_params = SEARCH_API_PARAMS.get(search_api, [])
# If no config provided, return an empty dict
if not search_api_config:
return {}
# Filter the config to only include accepted parameters
return {k: v for k, v in search_api_config.items() if k in accepted_params}
def deduplicate_sources(search_response) -> dict[str, dict[str, str]]:
# Collect all results
sources_list = []
for response in search_response:
sources_list.extend(response['results'])
# Deduplicate by URL
unique_sources = {source['url']: source for source in sources_list}
return unique_sources
def format_sources(unique_sources: dict, max_tokens_per_source=4000, handle_raw_content=False) -> str:
formatted_text = "Content from sources:\n"
for _, source in enumerate(unique_sources.values(), 1):
formatted_text += f"TITILE: {source['title']}\n"
formatted_text += f"URL: {source['url']}\n"
formatted_text += f"CONTENT: {source['content']}\n\n"
char_limit = max_tokens_per_source * 4
if handle_raw_content:
raw_content = source.get('raw_content', '')
raw_content = ''
print(f"Warning: No raw_content found for source {source['url']}")
if len(raw_content) > char_limit:
raw_content = raw_content[:char_limit] + "... [truncated]"
formatted_text += f"Full source content limited to {max_tokens_per_source} tokens: {raw_content}\n\n"
return formatted_text.strip()
def format_sections(sections: list[Section]) -> str:
""" Format a list of sections into a string """
formatted_str = ""
for idx, section in enumerate(sections, 1):
formatted_str += f"""
{'='*60}
Section {idx}: {section.name}
{'='*60}
Description:
{section.description}
Requires Research:
{section.research}
Content:
{section.content if section.content else '[Not yet written]'}
"""
return formatted_str
@traceable
async def tavily_search_async(search_queries):
"""
Performs concurrent web searches using the Tavily API.
Args:
search_queries (List[SearchQuery]): List of search queries to process
Returns:
List[dict]: List of search responses from Tavily API, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': list,
'results': [ # List of search results
{
'title': str, # Title of the webpage
'url': str, # URL of the result
'content': str, # Summary/snippet of content
'score': float, # Relevance score
'raw_content': str|None # Full page content if available
},
...
]
}
"""
tavily_async_client = AsyncTavilyClient()
search_tasks = []
for query in search_queries:
search_tasks.append(
tavily_async_client.search(
query,
max_results=5,
include_raw_content=True,
topic="general"
)
)
# Execute all searches concurrently
search_docs = await asyncio.gather(*search_tasks)
return search_docs
@traceable
def perplexity_search(search_queries):
"""Search the web using the Perplexity API.
Args:
search_queries (List[SearchQuery]): List of search queries to process
Returns:
List[dict]: List of search responses from Perplexity API, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': list,
'results': [ # List of search results
{
'title': str, # Title of the search result
'url': str, # URL of the result
'content': str, # Summary/snippet of content
'score': float, # Relevance score
'raw_content': str|None # Full content or None for secondary citations
},
...
]
}
"""
headers = {
"accept": "application/json",
"content-type": "application/json",
"Authorization": f"Bearer {os.getenv('PERPLEXITY_API_KEY')}"
}
search_docs = []
for query in search_queries:
payload = {
"model": "sonar-pro",
"messages": [
{
"role": "system",
"content": "Search the web and provide factual information with sources."
},
{
"role": "user",
"content": query
}
]
}
response = requests.post(
"https://api.perplexity.ai/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status() # Raise exception for bad status codes
# Parse the response
data = response.json()
content = data["choices"][0]["message"]["content"]
citations = data.get("citations", ["https://perplexity.ai"])
# Create results list for this query
results = []
# First citation gets the full content
results.append({
"title": f"Perplexity Search, Source 1",
"url": citations[0],
"content": content,
"raw_content": content,
"score": 1.0 # Adding score to match Tavily format
})
# Add additional citations without duplicating content
for i, citation in enumerate(citations[1:], start=2):
results.append({
"title": f"Perplexity Search, Source {i}",
"url": citation,
"content": "See primary source for full content",
"raw_content": None,
"score": 0.5 # Lower score for secondary sources
})
# Format response to match Tavily structure
search_docs.append({
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": results
})
return search_docs
@traceable
async def exa_search(search_queries, max_characters: Optional[int] = None, num_results=5,
include_domains: Optional[List[str]] = None,
exclude_domains: Optional[List[str]] = None,
subpages: Optional[int] = None):
"""Search the web using the Exa API.
Args:
search_queries (List[SearchQuery]): List of search queries to process
max_characters (int, optional): Maximum number of characters to retrieve for each result's raw content.
If None, the text parameter will be set to True instead of an object.
num_results (int): Number of search results per query. Defaults to 5.
include_domains (List[str], optional): List of domains to include in search results.
When specified, only results from these domains will be returned.
exclude_domains (List[str], optional): List of domains to exclude from search results.
Cannot be used together with include_domains.
subpages (int, optional): Number of subpages to retrieve per result. If None, subpages are not retrieved.
Returns:
List[dict]: List of search responses from Exa API, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': list,
'results': [ # List of search results
{
'title': str, # Title of the search result
'url': str, # URL of the result
'content': str, # Summary/snippet of content
'score': float, # Relevance score
'raw_content': str|None # Full content or None for secondary citations
},
...
]
}
"""
# Check that include_domains and exclude_domains are not both specified
if include_domains and exclude_domains:
raise ValueError("Cannot specify both include_domains and exclude_domains")
# Initialize Exa client (API key should be configured in your .env file)
exa = Exa(api_key=f"{os.getenv('EXA_API_KEY')}")
# Define the function to process a single query
async def process_query(query):
# Use run_in_executor to make the synchronous exa call in a non-blocking way
loop = asyncio.get_event_loop()
# Define the function for the executor with all parameters
def exa_search_fn():
# Build parameters dictionary
kwargs = {
# Set text to True if max_characters is None, otherwise use an object with max_characters
"text": True if max_characters is None else {"max_characters": max_characters},
"summary": True, # This is an amazing feature by EXA. It provides an AI generated summary of the content based on the query
"num_results": num_results
}
# Add optional parameters only if they are provided
if subpages is not None:
kwargs["subpages"] = subpages
if include_domains:
kwargs["include_domains"] = include_domains
elif exclude_domains:
kwargs["exclude_domains"] = exclude_domains
return exa.search_and_contents(query, **kwargs)
response = await loop.run_in_executor(None, exa_search_fn)
# Format the response to match the expected output structure
formatted_results = []
seen_urls = set() # Track URLs to avoid duplicates
# Helper function to safely get value regardless of if item is dict or object
def get_value(item, key, default=None):
if isinstance(item, dict):
return item.get(key, default)
else:
return getattr(item, key, default) if hasattr(item, key) else default
# Access the results from the SearchResponse object
results_list = get_value(response, 'results', [])
# First process all main results
for result in results_list:
# Get the score with a default of 0.0 if it's None or not present
score = get_value(result, 'score', 0.0)
# Combine summary and text for content if both are available
text_content = get_value(result, 'text', '')
summary_content = get_value(result, 'summary', '')
content = text_content
if summary_content:
if content:
content = f"{summary_content}\n\n{content}"
else:
content = summary_content
title = get_value(result, 'title', '')
url = get_value(result, 'url', '')
# Skip if we've seen this URL before (removes duplicate entries)
if url in seen_urls:
continue
seen_urls.add(url)
# Main result entry
result_entry = {
"title": title,
"url": url,
"content": content,
"score": score,
"raw_content": text_content
}
# Add the main result to the formatted results
formatted_results.append(result_entry)
# Now process subpages only if the subpages parameter was provided
if subpages is not None:
for result in results_list:
subpages_list = get_value(result, 'subpages', [])
for subpage in subpages_list:
# Get subpage score
subpage_score = get_value(subpage, 'score', 0.0)
# Combine summary and text for subpage content
subpage_text = get_value(subpage, 'text', '')
subpage_summary = get_value(subpage, 'summary', '')
subpage_content = subpage_text
if subpage_summary:
if subpage_content:
subpage_content = f"{subpage_summary}\n\n{subpage_content}"
else:
subpage_content = subpage_summary
subpage_url = get_value(subpage, 'url', '')
# Skip if we've seen this URL before
if subpage_url in seen_urls:
continue
seen_urls.add(subpage_url)
formatted_results.append({
"title": get_value(subpage, 'title', ''),
"url": subpage_url,
"content": subpage_content,
"score": subpage_score,
"raw_content": subpage_text
})
# Collect images if available (only from main results to avoid duplication)
images = []
for result in results_list:
image = get_value(result, 'image')
if image and image not in images: # Avoid duplicate images
images.append(image)
return {
"query": query,
"follow_up_questions": None,
"answer": None,
"images": images,
"results": formatted_results
}
# Process all queries sequentially with delay to respect rate limit
search_docs = []
for i, query in enumerate(search_queries):
try:
# Add delay between requests (0.25s = 4 requests per second, well within the 5/s limit)
if i > 0: # Don't delay the first request
await asyncio.sleep(0.25)
result = await process_query(query)
search_docs.append(result)
except Exception as e:
# Handle exceptions gracefully
print(f"Error processing query '{query}': {str(e)}")
# Add a placeholder result for failed queries to maintain index alignment
search_docs.append({
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": [],
"error": str(e)
})
# Add additional delay if we hit a rate limit error
if "429" in str(e):
print("Rate limit exceeded. Adding additional delay...")
await asyncio.sleep(1.0) # Add a longer delay if we hit a rate limit
return search_docs
@traceable
async def arxiv_search_async(search_queries, load_max_docs=5, get_full_documents=True, load_all_available_meta=True):
"""
Performs concurrent searches on arXiv using the ArxivRetriever.
Args:
search_queries (List[str]): List of search queries or article IDs
load_max_docs (int, optional): Maximum number of documents to return per query. Default is 5.
get_full_documents (bool, optional): Whether to fetch full text of documents. Default is True.
load_all_available_meta (bool, optional): Whether to load all available metadata. Default is True.
Returns:
List[dict]: List of search responses from arXiv, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [ # List of search results
{
'title': str, # Title of the paper
'url': str, # URL (Entry ID) of the paper
'content': str, # Formatted summary with metadata
'score': float, # Relevance score (approximated)
'raw_content': str|None # Full paper content if available
},
...
]
}
"""
async def process_single_query(query):
try:
# Create retriever for each query
retriever = ArxivRetriever(
load_max_docs=load_max_docs,
get_full_documents=get_full_documents,
load_all_available_meta=load_all_available_meta
)
# Run the synchronous retriever in a thread pool
loop = asyncio.get_event_loop()
docs = await loop.run_in_executor(None, lambda: retriever.invoke(query))
results = []
# Assign decreasing scores based on the order
base_score = 1.0
score_decrement = 1.0 / (len(docs) + 1) if docs else 0
for i, doc in enumerate(docs):
# Extract metadata
metadata = doc.metadata
# Use entry_id as the URL (this is the actual arxiv link)
url = metadata.get('entry_id', '')
# Format content with all useful metadata
content_parts = []
# Primary information
if 'Summary' in metadata:
content_parts.append(f"Summary: {metadata['Summary']}")
if 'Authors' in metadata:
content_parts.append(f"Authors: {metadata['Authors']}")
# Add publication information
published = metadata.get('Published')
published_str = published.isoformat() if hasattr(published, 'isoformat') else str(published) if published else ''
if published_str:
content_parts.append(f"Published: {published_str}")
# Add additional metadata if available
if 'primary_category' in metadata:
content_parts.append(f"Primary Category: {metadata['primary_category']}")
if 'categories' in metadata and metadata['categories']:
content_parts.append(f"Categories: {', '.join(metadata['categories'])}")
if 'comment' in metadata and metadata['comment']:
content_parts.append(f"Comment: {metadata['comment']}")
if 'journal_ref' in metadata and metadata['journal_ref']:
content_parts.append(f"Journal Reference: {metadata['journal_ref']}")
if 'doi' in metadata and metadata['doi']:
content_parts.append(f"DOI: {metadata['doi']}")
# Get PDF link if available in the links
pdf_link = ""
if 'links' in metadata and metadata['links']:
for link in metadata['links']:
if 'pdf' in link:
pdf_link = link
content_parts.append(f"PDF: {pdf_link}")
break
# Join all content parts with newlines
content = "\n".join(content_parts)
result = {
'title': metadata.get('Title', ''),
'url': url, # Using entry_id as the URL
'content': content,
'score': base_score - (i * score_decrement),
'raw_content': doc.page_content if get_full_documents else None
}
results.append(result)
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': results
}
except Exception as e:
# Handle exceptions gracefully
print(f"Error processing arXiv query '{query}': {str(e)}")
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
}
# Process queries sequentially with delay to respect arXiv rate limit (1 request per 3 seconds)
search_docs = []
for i, query in enumerate(search_queries):
try:
# Add delay between requests (3 seconds per ArXiv's rate limit)
if i > 0: # Don't delay the first request
await asyncio.sleep(3.0)
result = await process_single_query(query)
search_docs.append(result)
except Exception as e:
# Handle exceptions gracefully
print(f"Error processing arXiv query '{query}': {str(e)}")
search_docs.append({
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
})
# Add additional delay if we hit a rate limit error
if "429" in str(e) or "Too Many Requests" in str(e):
print("ArXiv rate limit exceeded. Adding additional delay...")
await asyncio.sleep(5.0) # Add a longer delay if we hit a rate limit
return search_docs
@traceable
async def pubmed_search_async(search_queries, top_k_results=5, email=None, api_key=None, doc_content_chars_max=4000):
"""
Performs concurrent searches on PubMed using the PubMedAPIWrapper.
Args:
search_queries (List[str]): List of search queries
top_k_results (int, optional): Maximum number of documents to return per query. Default is 5.
email (str, optional): Email address for PubMed API. Required by NCBI.
api_key (str, optional): API key for PubMed API for higher rate limits.
doc_content_chars_max (int, optional): Maximum characters for document content. Default is 4000.
Returns:
List[dict]: List of search responses from PubMed, one per query. Each response has format:
{
'query': str, # The original search query
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [ # List of search results
{
'title': str, # Title of the paper
'url': str, # URL to the paper on PubMed
'content': str, # Formatted summary with metadata
'score': float, # Relevance score (approximated)
'raw_content': str # Full abstract content
},
...
]
}
"""
async def process_single_query(query):
try:
# print(f"Processing PubMed query: '{query}'")
# Create PubMed wrapper for the query
wrapper = PubMedAPIWrapper(
top_k_results=top_k_results,
doc_content_chars_max=doc_content_chars_max,
email=email if email else "your_email@example.com",
api_key=api_key if api_key else ""
)
# Run the synchronous wrapper in a thread pool
loop = asyncio.get_event_loop()
# Use wrapper.lazy_load instead of load to get better visibility
docs = await loop.run_in_executor(None, lambda: list(wrapper.lazy_load(query)))
print(f"Query '{query}' returned {len(docs)} results")
results = []
# Assign decreasing scores based on the order
base_score = 1.0
score_decrement = 1.0 / (len(docs) + 1) if docs else 0
for i, doc in enumerate(docs):
# Format content with metadata
content_parts = []
if doc.get('Published'):
content_parts.append(f"Published: {doc['Published']}")
if doc.get('Copyright Information'):
content_parts.append(f"Copyright Information: {doc['Copyright Information']}")
if doc.get('Summary'):
content_parts.append(f"Summary: {doc['Summary']}")
# Generate PubMed URL from the article UID
uid = doc.get('uid', '')
url = f"https://pubmed.ncbi.nlm.nih.gov/{uid}/" if uid else ""
# Join all content parts with newlines
content = "\n".join(content_parts)
result = {
'title': doc.get('Title', ''),
'url': url,
'content': content,
'score': base_score - (i * score_decrement),
'raw_content': doc.get('Summary', '')
}
results.append(result)
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': results
}
except Exception as e:
# Handle exceptions with more detailed information
error_msg = f"Error processing PubMed query '{query}': {str(e)}"
print(error_msg)
import traceback
print(traceback.format_exc()) # Print full traceback for debugging
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
}
# Process all queries with a reasonable delay between them
search_docs = []
# Start with a small delay that increases if we encounter rate limiting
delay = 1.0 # Start with a more conservative delay
for i, query in enumerate(search_queries):
try:
# Add delay between requests
if i > 0: # Don't delay the first request
# print(f"Waiting {delay} seconds before next query...")
await asyncio.sleep(delay)
result = await process_single_query(query)
search_docs.append(result)
# If query was successful with results, we can slightly reduce delay (but not below minimum)
if result.get('results') and len(result['results']) > 0:
delay = max(0.5, delay * 0.9) # Don't go below 0.5 seconds
except Exception as e:
# Handle exceptions gracefully
error_msg = f"Error in main loop processing PubMed query '{query}': {str(e)}"
print(error_msg)
search_docs.append({
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [],
'error': str(e)
})
# If we hit an exception, increase delay for next query
delay = min(5.0, delay * 1.5) # Don't exceed 5 seconds
return search_docs
@traceable
async def linkup_search(search_queries, depth: Optional[str] = "standard"):
"""
Performs concurrent web searches using the Linkup API.
Args:
search_queries (List[SearchQuery]): List of search queries to process
depth (str, optional): "standard" (default) or "deep". More details here https://docs.linkup.so/pages/documentation/get-started/concepts
Returns:
List[dict]: List of search responses from Linkup API, one per query. Each response has format:
{
'results': [ # List of search results
{
'title': str, # Title of the search result
'url': str, # URL of the result
'content': str, # Summary/snippet of content
},
...
]
}
"""
client = LinkupClient()
search_tasks = []
for query in search_queries:
search_tasks.append(
client.async_search(
query,
depth,
output_type="searchResults",
)
)
search_results = []
for response in await asyncio.gather(*search_tasks):
search_results.append(
{
"results": [
{"title": result.name, "url": result.url, "content": result.content}
for result in response.results
],
}
)
return search_results
@traceable
async def duckduckgo_search(search_queries):
"""Perform searches using DuckDuckGo
Args:
search_queries (List[str]): List of search queries to process
Returns:
List[dict]: List of search results
"""
async def process_single_query(query):
# Execute synchronous search in the event loop's thread pool
loop = asyncio.get_event_loop()
def perform_search():
results = []
with DDGS() as ddgs:
ddg_results = list(ddgs.text(query, max_results=5))
# Format results
for i, result in enumerate(ddg_results):
results.append({
'title': result.get('title', ''),
'url': result.get('link', ''),
'content': result.get('body', ''),
'score': 1.0 - (i * 0.1), # Simple scoring mechanism
'raw_content': result.get('body', '')
})
return {
'query': query,
'follow_up_questions': None,
'answer': None,
'images': [],
'results': results
}
return await loop.run_in_executor(None, perform_search)
# Execute all queries concurrently
tasks = [process_single_query(query) for query in search_queries]
search_docs = await asyncio.gather(*tasks)
return search_docs
@traceable
async def google_search_async(search_queries: Union[str, List[str]], max_results: int = 5, include_raw_content: bool = True):
"""
Performs concurrent web searches using Google.
Uses Google Custom Search API if environment variables are set, otherwise falls back to web scraping.
Args:
search_queries (List[str]): List of search queries to process
max_results (int): Maximum number of results to return per query
include_raw_content (bool): Whether to fetch full page content
Returns:
List[dict]: List of search responses from Google, one per query
"""
# Check for API credentials from environment variables
api_key = os.environ.get("GOOGLE_API_KEY")
cx = os.environ.get("GOOGLE_CX")
use_api = bool(api_key and cx)
# Handle case where search_queries is a single string
if isinstance(search_queries, str):
search_queries = [search_queries]
# Define user agent generator
def get_useragent():
"""Generates a random user agent string."""
lynx_version = f"Lynx/{random.randint(2, 3)}.{random.randint(8, 9)}.{random.randint(0, 2)}"
libwww_version = f"libwww-FM/{random.randint(2, 3)}.{random.randint(13, 15)}"
ssl_mm_version = f"SSL-MM/{random.randint(1, 2)}.{random.randint(3, 5)}"
openssl_version = f"OpenSSL/{random.randint(1, 3)}.{random.randint(0, 4)}.{random.randint(0, 9)}"
return f"{lynx_version} {libwww_version} {ssl_mm_version} {openssl_version}"
# Create executor for running synchronous operations
executor = None if use_api else concurrent.futures.ThreadPoolExecutor(max_workers=5)
# Use a semaphore to limit concurrent requests
semaphore = asyncio.Semaphore(5 if use_api else 2)
async def search_single_query(query):
async with semaphore:
try:
results = []
# API-based search
if use_api:
# The API returns up to 10 results per request
for start_index in range(1, max_results + 1, 10):
# Calculate how many results to request in this batch
num = min(10, max_results - (start_index - 1))
# Make request to Google Custom Search API
params = {
'q': query,
'key': api_key,
'cx': cx,
'start': start_index,
'num': num
}
print(f"Requesting {num} results for '{query}' from Google API...")
async with aiohttp.ClientSession() as session:
async with session.get('https://www.googleapis.com/customsearch/v1', params=params) as response:
if response.status != 200:
error_text = await response.text()
print(f"API error: {response.status}, {error_text}")
break
data = await response.json()
# Process search results
for item in data.get('items', []):
result = {
"title": item.get('title', ''),
"url": item.get('link', ''),
"content": item.get('snippet', ''),
"score": None,
"raw_content": item.get('snippet', '')
}
results.append(result)
# Respect API quota with a small delay
await asyncio.sleep(0.2)
# If we didn't get a full page of results, no need to request more
if not data.get('items') or len(data.get('items', [])) < num:
break
# Web scraping based search
else:
# Add delay between requests
await asyncio.sleep(0.5 + random.random() * 1.5)
print(f"Scraping Google for '{query}'...")
# Define scraping function
def google_search(query, max_results):
try:
lang = "en"
safe = "active"
start = 0
fetched_results = 0
fetched_links = set()
search_results = []
while fetched_results < max_results:
# Send request to Google
resp = requests.get(
url="https://www.google.com/search",
headers={
"User-Agent": get_useragent(),
"Accept": "*/*"
},
params={
"q": query,
"num": max_results + 2,
"hl": lang,
"start": start,
"safe": safe,
},
cookies={
'CONSENT': 'PENDING+987', # Bypasses the consent page
'SOCS': 'CAESHAgBEhIaAB',
}
)
resp.raise_for_status()
# Parse results
soup = BeautifulSoup(resp.text, "html.parser")
result_block = soup.find_all("div", class_="ezO2md")
new_results = 0
for result in result_block:
link_tag = result.find("a", href=True)
title_tag = link_tag.find("span", class_="CVA68e") if link_tag else None
description_tag = result.find("span", class_="FrIlee")
if link_tag and title_tag and description_tag:
link = unquote(link_tag["href"].split("&")[0].replace("/url?q=", ""))
if link in fetched_links:
continue
fetched_links.add(link)
title = title_tag.text
description = description_tag.text
# Store result in the same format as the API results
search_results.append({
"title": title,
"url": link,
"content": description,
"score": None,
"raw_content": description
})
fetched_results += 1
new_results += 1
if fetched_results >= max_results:
break
if new_results == 0:
break
start += 10
time.sleep(1) # Delay between pages
return search_results
except Exception as e:
print(f"Error in Google search for '{query}': {str(e)}")
return []
# Execute search in thread pool
loop = asyncio.get_running_loop()
search_results = await loop.run_in_executor(
executor,
lambda: google_search(query, max_results)
)
# Process the results
results = search_results
# If requested, fetch full page content asynchronously (for both API and web scraping)
if include_raw_content and results:
content_semaphore = asyncio.Semaphore(3)
async with aiohttp.ClientSession() as session:
fetch_tasks = []
async def fetch_full_content(result):
async with content_semaphore:
url = result['url']
headers = {
'User-Agent': get_useragent(),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
}
try:
await asyncio.sleep(0.2 + random.random() * 0.6)
async with session.get(url, headers=headers, timeout=10) as response:
if response.status == 200:
# Check content type to handle binary files
content_type = response.headers.get('Content-Type', '').lower()
# Handle PDFs and other binary files
if 'application/pdf' in content_type or 'application/octet-stream' in content_type:
# For PDFs, indicate that content is binary and not parsed
result['raw_content'] = f"[Binary content: {content_type}. Content extraction not supported for this file type.]"
else:
try:
# Try to decode as UTF-8 with replacements for non-UTF8 characters
html = await response.text(errors='replace')
soup = BeautifulSoup(html, 'html.parser')
result['raw_content'] = soup.get_text()
except UnicodeDecodeError as ude:
# Fallback if we still have decoding issues
result['raw_content'] = f"[Could not decode content: {str(ude)}]"
except Exception as e:
print(f"Warning: Failed to fetch content for {url}: {str(e)}")
result['raw_content'] = f"[Error fetching content: {str(e)}]"
return result
for result in results:
fetch_tasks.append(fetch_full_content(result))
updated_results = await asyncio.gather(*fetch_tasks)
results = updated_results
print(f"Fetched full content for {len(results)} results")
return {
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": results
}
except Exception as e:
print(f"Error in Google search for query '{query}': {str(e)}")
return {
"query": query,
"follow_up_questions": None,
"answer": None,
"images": [],
"results": []
}
try:
# Create tasks for all search queries
search_tasks = [search_single_query(query) for query in search_queries]
# Execute all searches concurrently
search_results = await asyncio.gather(*search_tasks)
return search_results
finally:
# Only shut down executor if it was created
if executor:
executor.shutdown(wait=False)
async def select_and_execute_search(search_api: str, query_list: list[str], params_to_pass: dict) -> dict[dict[str, Any]]:
if search_api == "tavily":
search_results = await tavily_search_async(query_list, **params_to_pass)
return deduplicate_sources(search_results)
elif search_api == "perplexity":
search_results = perplexity_search(query_list, **params_to_pass)
return deduplicate_sources(search_results)
elif search_api == "exa":
search_results = await exa_search(query_list, **params_to_pass)
return deduplicate_sources(search_results)
elif search_api == "arxiv":
search_results = await arxiv_search_async(query_list, **params_to_pass)
return deduplicate_sources(search_results)
elif search_api == "pubmed":
search_results = await pubmed_search_async(query_list, **params_to_pass)
return deduplicate_sources(search_results)
elif search_api == "linkup":
search_results = await linkup_search(query_list, **params_to_pass)
return deduplicate_sources(search_results)
elif search_api == "duckduckgo":
search_results = await duckduckgo_search(query_list)
return deduplicate_sources(search_results)
elif search_api == "googlesearch":
search_results = await google_search_async(query_list, **params_to_pass)
return deduplicate_sources(search_results)
else:
raise ValueError(f"Unsupported search API: {search_api}")