import os import asyncio import requests import random import concurrent import aiohttp import time import logging from typing import List, Optional, Dict, Any, Union from urllib.parse import unquote from exa_py import Exa from linkup import LinkupClient from tavily import AsyncTavilyClient from duckduckgo_search import DDGS from bs4 import BeautifulSoup from langchain_community.retrievers import ArxivRetriever from langchain_community.utilities.pubmed import PubMedAPIWrapper from langsmith import traceable from state import Section, ReportState def compile_completed_sections(state: ReportState) -> str: """ 将已完成的章节列表拼接为完整的文章字符串。 """ compiled = [] for number, section in enumerate(state["completed_sections"], 1): title = section.name content = section.content.strip() compiled.append(f"# {number}. {title}\n\n{content}\n") return "\n".join(compiled).strip() def get_config_value(value): """ Helper function to handle both string and enum cases of configuration values """ return value if isinstance(value, str) else value.value def get_search_params(search_api: str, search_api_config: Optional[Dict[str, Any]]) -> Dict[str, Any]: """ Filters the search_api_config dictionary to include only parameters accepted by the specified search API. Args: search_api (str): The search API identifier (e.g., "exa", "tavily"). search_api_config (Optional[Dict[str, Any]]): The configuration dictionary for the search API. Returns: Dict[str, Any]: A dictionary of parameters to pass to the search function. """ # Define accepted parameters for each search API SEARCH_API_PARAMS = { "exa": ["max_characters", "num_results", "include_domains", "exclude_domains", "subpages"], "tavily": [], # Tavily currently accepts no additional parameters "perplexity": [], # Perplexity accepts no additional parameters "arxiv": ["load_max_docs", "get_full_documents", "load_all_available_meta"], "pubmed": ["top_k_results", "email", "api_key", "doc_content_chars_max"], "linkup": ["depth"], } # Get the list of accepted parameters for the given search API accepted_params = SEARCH_API_PARAMS.get(search_api, []) # If no config provided, return an empty dict if not search_api_config: return {} # Filter the config to only include accepted parameters return {k: v for k, v in search_api_config.items() if k in accepted_params} def deduplicate_sources(search_response) -> dict[str, dict[str, str]]: # Collect all results sources_list = [] for response in search_response: sources_list.extend(response['results']) # Deduplicate by URL unique_sources = {source['url']: source for source in sources_list} return unique_sources def format_sources(unique_sources: dict, max_tokens_per_source=4000, handle_raw_content=False) -> str: formatted_text = "Content from sources:\n" for _, source in enumerate(unique_sources.values(), 1): formatted_text += f"TITILE: {source['title']}\n" formatted_text += f"URL: {source['url']}\n" formatted_text += f"CONTENT: {source['content']}\n\n" char_limit = max_tokens_per_source * 4 if handle_raw_content: raw_content = source.get('raw_content', '') raw_content = '' print(f"Warning: No raw_content found for source {source['url']}") if len(raw_content) > char_limit: raw_content = raw_content[:char_limit] + "... [truncated]" formatted_text += f"Full source content limited to {max_tokens_per_source} tokens: {raw_content}\n\n" return formatted_text.strip() def format_sections(sections: list[Section]) -> str: """ Format a list of sections into a string """ formatted_str = "" for idx, section in enumerate(sections, 1): formatted_str += f""" {'='*60} Section {idx}: {section.name} {'='*60} Description: {section.description} Requires Research: {section.research} Content: {section.content if section.content else '[Not yet written]'} """ return formatted_str @traceable async def tavily_search_async(search_queries): """ Performs concurrent web searches using the Tavily API. Args: search_queries (List[SearchQuery]): List of search queries to process Returns: List[dict]: List of search responses from Tavily API, one per query. Each response has format: { 'query': str, # The original search query 'follow_up_questions': None, 'answer': None, 'images': list, 'results': [ # List of search results { 'title': str, # Title of the webpage 'url': str, # URL of the result 'content': str, # Summary/snippet of content 'score': float, # Relevance score 'raw_content': str|None # Full page content if available }, ... ] } """ tavily_async_client = AsyncTavilyClient() search_tasks = [] for query in search_queries: search_tasks.append( tavily_async_client.search( query, max_results=5, include_raw_content=True, topic="general" ) ) # Execute all searches concurrently search_docs = await asyncio.gather(*search_tasks) return search_docs @traceable def perplexity_search(search_queries): """Search the web using the Perplexity API. Args: search_queries (List[SearchQuery]): List of search queries to process Returns: List[dict]: List of search responses from Perplexity API, one per query. Each response has format: { 'query': str, # The original search query 'follow_up_questions': None, 'answer': None, 'images': list, 'results': [ # List of search results { 'title': str, # Title of the search result 'url': str, # URL of the result 'content': str, # Summary/snippet of content 'score': float, # Relevance score 'raw_content': str|None # Full content or None for secondary citations }, ... ] } """ headers = { "accept": "application/json", "content-type": "application/json", "Authorization": f"Bearer {os.getenv('PERPLEXITY_API_KEY')}" } search_docs = [] for query in search_queries: payload = { "model": "sonar-pro", "messages": [ { "role": "system", "content": "Search the web and provide factual information with sources." }, { "role": "user", "content": query } ] } response = requests.post( "https://api.perplexity.ai/chat/completions", headers=headers, json=payload ) response.raise_for_status() # Raise exception for bad status codes # Parse the response data = response.json() content = data["choices"][0]["message"]["content"] citations = data.get("citations", ["https://perplexity.ai"]) # Create results list for this query results = [] # First citation gets the full content results.append({ "title": f"Perplexity Search, Source 1", "url": citations[0], "content": content, "raw_content": content, "score": 1.0 # Adding score to match Tavily format }) # Add additional citations without duplicating content for i, citation in enumerate(citations[1:], start=2): results.append({ "title": f"Perplexity Search, Source {i}", "url": citation, "content": "See primary source for full content", "raw_content": None, "score": 0.5 # Lower score for secondary sources }) # Format response to match Tavily structure search_docs.append({ "query": query, "follow_up_questions": None, "answer": None, "images": [], "results": results }) return search_docs @traceable async def exa_search(search_queries, max_characters: Optional[int] = None, num_results=5, include_domains: Optional[List[str]] = None, exclude_domains: Optional[List[str]] = None, subpages: Optional[int] = None): """Search the web using the Exa API. Args: search_queries (List[SearchQuery]): List of search queries to process max_characters (int, optional): Maximum number of characters to retrieve for each result's raw content. If None, the text parameter will be set to True instead of an object. num_results (int): Number of search results per query. Defaults to 5. include_domains (List[str], optional): List of domains to include in search results. When specified, only results from these domains will be returned. exclude_domains (List[str], optional): List of domains to exclude from search results. Cannot be used together with include_domains. subpages (int, optional): Number of subpages to retrieve per result. If None, subpages are not retrieved. Returns: List[dict]: List of search responses from Exa API, one per query. Each response has format: { 'query': str, # The original search query 'follow_up_questions': None, 'answer': None, 'images': list, 'results': [ # List of search results { 'title': str, # Title of the search result 'url': str, # URL of the result 'content': str, # Summary/snippet of content 'score': float, # Relevance score 'raw_content': str|None # Full content or None for secondary citations }, ... ] } """ # Check that include_domains and exclude_domains are not both specified if include_domains and exclude_domains: raise ValueError("Cannot specify both include_domains and exclude_domains") # Initialize Exa client (API key should be configured in your .env file) exa = Exa(api_key=f"{os.getenv('EXA_API_KEY')}") # Define the function to process a single query async def process_query(query): # Use run_in_executor to make the synchronous exa call in a non-blocking way loop = asyncio.get_event_loop() # Define the function for the executor with all parameters def exa_search_fn(): # Build parameters dictionary kwargs = { # Set text to True if max_characters is None, otherwise use an object with max_characters "text": True if max_characters is None else {"max_characters": max_characters}, "summary": True, # This is an amazing feature by EXA. It provides an AI generated summary of the content based on the query "num_results": num_results } # Add optional parameters only if they are provided if subpages is not None: kwargs["subpages"] = subpages if include_domains: kwargs["include_domains"] = include_domains elif exclude_domains: kwargs["exclude_domains"] = exclude_domains return exa.search_and_contents(query, **kwargs) response = await loop.run_in_executor(None, exa_search_fn) # Format the response to match the expected output structure formatted_results = [] seen_urls = set() # Track URLs to avoid duplicates # Helper function to safely get value regardless of if item is dict or object def get_value(item, key, default=None): if isinstance(item, dict): return item.get(key, default) else: return getattr(item, key, default) if hasattr(item, key) else default # Access the results from the SearchResponse object results_list = get_value(response, 'results', []) # First process all main results for result in results_list: # Get the score with a default of 0.0 if it's None or not present score = get_value(result, 'score', 0.0) # Combine summary and text for content if both are available text_content = get_value(result, 'text', '') summary_content = get_value(result, 'summary', '') content = text_content if summary_content: if content: content = f"{summary_content}\n\n{content}" else: content = summary_content title = get_value(result, 'title', '') url = get_value(result, 'url', '') # Skip if we've seen this URL before (removes duplicate entries) if url in seen_urls: continue seen_urls.add(url) # Main result entry result_entry = { "title": title, "url": url, "content": content, "score": score, "raw_content": text_content } # Add the main result to the formatted results formatted_results.append(result_entry) # Now process subpages only if the subpages parameter was provided if subpages is not None: for result in results_list: subpages_list = get_value(result, 'subpages', []) for subpage in subpages_list: # Get subpage score subpage_score = get_value(subpage, 'score', 0.0) # Combine summary and text for subpage content subpage_text = get_value(subpage, 'text', '') subpage_summary = get_value(subpage, 'summary', '') subpage_content = subpage_text if subpage_summary: if subpage_content: subpage_content = f"{subpage_summary}\n\n{subpage_content}" else: subpage_content = subpage_summary subpage_url = get_value(subpage, 'url', '') # Skip if we've seen this URL before if subpage_url in seen_urls: continue seen_urls.add(subpage_url) formatted_results.append({ "title": get_value(subpage, 'title', ''), "url": subpage_url, "content": subpage_content, "score": subpage_score, "raw_content": subpage_text }) # Collect images if available (only from main results to avoid duplication) images = [] for result in results_list: image = get_value(result, 'image') if image and image not in images: # Avoid duplicate images images.append(image) return { "query": query, "follow_up_questions": None, "answer": None, "images": images, "results": formatted_results } # Process all queries sequentially with delay to respect rate limit search_docs = [] for i, query in enumerate(search_queries): try: # Add delay between requests (0.25s = 4 requests per second, well within the 5/s limit) if i > 0: # Don't delay the first request await asyncio.sleep(0.25) result = await process_query(query) search_docs.append(result) except Exception as e: # Handle exceptions gracefully print(f"Error processing query '{query}': {str(e)}") # Add a placeholder result for failed queries to maintain index alignment search_docs.append({ "query": query, "follow_up_questions": None, "answer": None, "images": [], "results": [], "error": str(e) }) # Add additional delay if we hit a rate limit error if "429" in str(e): print("Rate limit exceeded. Adding additional delay...") await asyncio.sleep(1.0) # Add a longer delay if we hit a rate limit return search_docs @traceable async def arxiv_search_async(search_queries, load_max_docs=5, get_full_documents=True, load_all_available_meta=True): """ Performs concurrent searches on arXiv using the ArxivRetriever. Args: search_queries (List[str]): List of search queries or article IDs load_max_docs (int, optional): Maximum number of documents to return per query. Default is 5. get_full_documents (bool, optional): Whether to fetch full text of documents. Default is True. load_all_available_meta (bool, optional): Whether to load all available metadata. Default is True. Returns: List[dict]: List of search responses from arXiv, one per query. Each response has format: { 'query': str, # The original search query 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [ # List of search results { 'title': str, # Title of the paper 'url': str, # URL (Entry ID) of the paper 'content': str, # Formatted summary with metadata 'score': float, # Relevance score (approximated) 'raw_content': str|None # Full paper content if available }, ... ] } """ async def process_single_query(query): try: # Create retriever for each query retriever = ArxivRetriever( load_max_docs=load_max_docs, get_full_documents=get_full_documents, load_all_available_meta=load_all_available_meta ) # Run the synchronous retriever in a thread pool loop = asyncio.get_event_loop() docs = await loop.run_in_executor(None, lambda: retriever.invoke(query)) results = [] # Assign decreasing scores based on the order base_score = 1.0 score_decrement = 1.0 / (len(docs) + 1) if docs else 0 for i, doc in enumerate(docs): # Extract metadata metadata = doc.metadata # Use entry_id as the URL (this is the actual arxiv link) url = metadata.get('entry_id', '') # Format content with all useful metadata content_parts = [] # Primary information if 'Summary' in metadata: content_parts.append(f"Summary: {metadata['Summary']}") if 'Authors' in metadata: content_parts.append(f"Authors: {metadata['Authors']}") # Add publication information published = metadata.get('Published') published_str = published.isoformat() if hasattr(published, 'isoformat') else str(published) if published else '' if published_str: content_parts.append(f"Published: {published_str}") # Add additional metadata if available if 'primary_category' in metadata: content_parts.append(f"Primary Category: {metadata['primary_category']}") if 'categories' in metadata and metadata['categories']: content_parts.append(f"Categories: {', '.join(metadata['categories'])}") if 'comment' in metadata and metadata['comment']: content_parts.append(f"Comment: {metadata['comment']}") if 'journal_ref' in metadata and metadata['journal_ref']: content_parts.append(f"Journal Reference: {metadata['journal_ref']}") if 'doi' in metadata and metadata['doi']: content_parts.append(f"DOI: {metadata['doi']}") # Get PDF link if available in the links pdf_link = "" if 'links' in metadata and metadata['links']: for link in metadata['links']: if 'pdf' in link: pdf_link = link content_parts.append(f"PDF: {pdf_link}") break # Join all content parts with newlines content = "\n".join(content_parts) result = { 'title': metadata.get('Title', ''), 'url': url, # Using entry_id as the URL 'content': content, 'score': base_score - (i * score_decrement), 'raw_content': doc.page_content if get_full_documents else None } results.append(result) return { 'query': query, 'follow_up_questions': None, 'answer': None, 'images': [], 'results': results } except Exception as e: # Handle exceptions gracefully print(f"Error processing arXiv query '{query}': {str(e)}") return { 'query': query, 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [], 'error': str(e) } # Process queries sequentially with delay to respect arXiv rate limit (1 request per 3 seconds) search_docs = [] for i, query in enumerate(search_queries): try: # Add delay between requests (3 seconds per ArXiv's rate limit) if i > 0: # Don't delay the first request await asyncio.sleep(3.0) result = await process_single_query(query) search_docs.append(result) except Exception as e: # Handle exceptions gracefully print(f"Error processing arXiv query '{query}': {str(e)}") search_docs.append({ 'query': query, 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [], 'error': str(e) }) # Add additional delay if we hit a rate limit error if "429" in str(e) or "Too Many Requests" in str(e): print("ArXiv rate limit exceeded. Adding additional delay...") await asyncio.sleep(5.0) # Add a longer delay if we hit a rate limit return search_docs @traceable async def pubmed_search_async(search_queries, top_k_results=5, email=None, api_key=None, doc_content_chars_max=4000): """ Performs concurrent searches on PubMed using the PubMedAPIWrapper. Args: search_queries (List[str]): List of search queries top_k_results (int, optional): Maximum number of documents to return per query. Default is 5. email (str, optional): Email address for PubMed API. Required by NCBI. api_key (str, optional): API key for PubMed API for higher rate limits. doc_content_chars_max (int, optional): Maximum characters for document content. Default is 4000. Returns: List[dict]: List of search responses from PubMed, one per query. Each response has format: { 'query': str, # The original search query 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [ # List of search results { 'title': str, # Title of the paper 'url': str, # URL to the paper on PubMed 'content': str, # Formatted summary with metadata 'score': float, # Relevance score (approximated) 'raw_content': str # Full abstract content }, ... ] } """ async def process_single_query(query): try: # print(f"Processing PubMed query: '{query}'") # Create PubMed wrapper for the query wrapper = PubMedAPIWrapper( top_k_results=top_k_results, doc_content_chars_max=doc_content_chars_max, email=email if email else "your_email@example.com", api_key=api_key if api_key else "" ) # Run the synchronous wrapper in a thread pool loop = asyncio.get_event_loop() # Use wrapper.lazy_load instead of load to get better visibility docs = await loop.run_in_executor(None, lambda: list(wrapper.lazy_load(query))) print(f"Query '{query}' returned {len(docs)} results") results = [] # Assign decreasing scores based on the order base_score = 1.0 score_decrement = 1.0 / (len(docs) + 1) if docs else 0 for i, doc in enumerate(docs): # Format content with metadata content_parts = [] if doc.get('Published'): content_parts.append(f"Published: {doc['Published']}") if doc.get('Copyright Information'): content_parts.append(f"Copyright Information: {doc['Copyright Information']}") if doc.get('Summary'): content_parts.append(f"Summary: {doc['Summary']}") # Generate PubMed URL from the article UID uid = doc.get('uid', '') url = f"https://pubmed.ncbi.nlm.nih.gov/{uid}/" if uid else "" # Join all content parts with newlines content = "\n".join(content_parts) result = { 'title': doc.get('Title', ''), 'url': url, 'content': content, 'score': base_score - (i * score_decrement), 'raw_content': doc.get('Summary', '') } results.append(result) return { 'query': query, 'follow_up_questions': None, 'answer': None, 'images': [], 'results': results } except Exception as e: # Handle exceptions with more detailed information error_msg = f"Error processing PubMed query '{query}': {str(e)}" print(error_msg) import traceback print(traceback.format_exc()) # Print full traceback for debugging return { 'query': query, 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [], 'error': str(e) } # Process all queries with a reasonable delay between them search_docs = [] # Start with a small delay that increases if we encounter rate limiting delay = 1.0 # Start with a more conservative delay for i, query in enumerate(search_queries): try: # Add delay between requests if i > 0: # Don't delay the first request # print(f"Waiting {delay} seconds before next query...") await asyncio.sleep(delay) result = await process_single_query(query) search_docs.append(result) # If query was successful with results, we can slightly reduce delay (but not below minimum) if result.get('results') and len(result['results']) > 0: delay = max(0.5, delay * 0.9) # Don't go below 0.5 seconds except Exception as e: # Handle exceptions gracefully error_msg = f"Error in main loop processing PubMed query '{query}': {str(e)}" print(error_msg) search_docs.append({ 'query': query, 'follow_up_questions': None, 'answer': None, 'images': [], 'results': [], 'error': str(e) }) # If we hit an exception, increase delay for next query delay = min(5.0, delay * 1.5) # Don't exceed 5 seconds return search_docs @traceable async def linkup_search(search_queries, depth: Optional[str] = "standard"): """ Performs concurrent web searches using the Linkup API. Args: search_queries (List[SearchQuery]): List of search queries to process depth (str, optional): "standard" (default) or "deep". More details here https://docs.linkup.so/pages/documentation/get-started/concepts Returns: List[dict]: List of search responses from Linkup API, one per query. Each response has format: { 'results': [ # List of search results { 'title': str, # Title of the search result 'url': str, # URL of the result 'content': str, # Summary/snippet of content }, ... ] } """ client = LinkupClient() search_tasks = [] for query in search_queries: search_tasks.append( client.async_search( query, depth, output_type="searchResults", ) ) search_results = [] for response in await asyncio.gather(*search_tasks): search_results.append( { "results": [ {"title": result.name, "url": result.url, "content": result.content} for result in response.results ], } ) return search_results @traceable async def duckduckgo_search(search_queries): """Perform searches using DuckDuckGo Args: search_queries (List[str]): List of search queries to process Returns: List[dict]: List of search results """ async def process_single_query(query): # Execute synchronous search in the event loop's thread pool loop = asyncio.get_event_loop() def perform_search(): results = [] with DDGS() as ddgs: ddg_results = list(ddgs.text(query, max_results=5)) # Format results for i, result in enumerate(ddg_results): results.append({ 'title': result.get('title', ''), 'url': result.get('link', ''), 'content': result.get('body', ''), 'score': 1.0 - (i * 0.1), # Simple scoring mechanism 'raw_content': result.get('body', '') }) return { 'query': query, 'follow_up_questions': None, 'answer': None, 'images': [], 'results': results } return await loop.run_in_executor(None, perform_search) # Execute all queries concurrently tasks = [process_single_query(query) for query in search_queries] search_docs = await asyncio.gather(*tasks) return search_docs @traceable async def google_search_async(search_queries: Union[str, List[str]], max_results: int = 5, include_raw_content: bool = True): """ Performs concurrent web searches using Google. Uses Google Custom Search API if environment variables are set, otherwise falls back to web scraping. Args: search_queries (List[str]): List of search queries to process max_results (int): Maximum number of results to return per query include_raw_content (bool): Whether to fetch full page content Returns: List[dict]: List of search responses from Google, one per query """ # Check for API credentials from environment variables api_key = os.environ.get("GOOGLE_API_KEY") cx = os.environ.get("GOOGLE_CX") use_api = bool(api_key and cx) # Handle case where search_queries is a single string if isinstance(search_queries, str): search_queries = [search_queries] # Define user agent generator def get_useragent(): """Generates a random user agent string.""" lynx_version = f"Lynx/{random.randint(2, 3)}.{random.randint(8, 9)}.{random.randint(0, 2)}" libwww_version = f"libwww-FM/{random.randint(2, 3)}.{random.randint(13, 15)}" ssl_mm_version = f"SSL-MM/{random.randint(1, 2)}.{random.randint(3, 5)}" openssl_version = f"OpenSSL/{random.randint(1, 3)}.{random.randint(0, 4)}.{random.randint(0, 9)}" return f"{lynx_version} {libwww_version} {ssl_mm_version} {openssl_version}" # Create executor for running synchronous operations executor = None if use_api else concurrent.futures.ThreadPoolExecutor(max_workers=5) # Use a semaphore to limit concurrent requests semaphore = asyncio.Semaphore(5 if use_api else 2) async def search_single_query(query): async with semaphore: try: results = [] # API-based search if use_api: # The API returns up to 10 results per request for start_index in range(1, max_results + 1, 10): # Calculate how many results to request in this batch num = min(10, max_results - (start_index - 1)) # Make request to Google Custom Search API params = { 'q': query, 'key': api_key, 'cx': cx, 'start': start_index, 'num': num } print(f"Requesting {num} results for '{query}' from Google API...") async with aiohttp.ClientSession() as session: async with session.get('https://www.googleapis.com/customsearch/v1', params=params) as response: if response.status != 200: error_text = await response.text() print(f"API error: {response.status}, {error_text}") break data = await response.json() # Process search results for item in data.get('items', []): result = { "title": item.get('title', ''), "url": item.get('link', ''), "content": item.get('snippet', ''), "score": None, "raw_content": item.get('snippet', '') } results.append(result) # Respect API quota with a small delay await asyncio.sleep(0.2) # If we didn't get a full page of results, no need to request more if not data.get('items') or len(data.get('items', [])) < num: break # Web scraping based search else: # Add delay between requests await asyncio.sleep(0.5 + random.random() * 1.5) print(f"Scraping Google for '{query}'...") # Define scraping function def google_search(query, max_results): try: lang = "en" safe = "active" start = 0 fetched_results = 0 fetched_links = set() search_results = [] while fetched_results < max_results: # Send request to Google resp = requests.get( url="https://www.google.com/search", headers={ "User-Agent": get_useragent(), "Accept": "*/*" }, params={ "q": query, "num": max_results + 2, "hl": lang, "start": start, "safe": safe, }, cookies={ 'CONSENT': 'PENDING+987', # Bypasses the consent page 'SOCS': 'CAESHAgBEhIaAB', } ) resp.raise_for_status() # Parse results soup = BeautifulSoup(resp.text, "html.parser") result_block = soup.find_all("div", class_="ezO2md") new_results = 0 for result in result_block: link_tag = result.find("a", href=True) title_tag = link_tag.find("span", class_="CVA68e") if link_tag else None description_tag = result.find("span", class_="FrIlee") if link_tag and title_tag and description_tag: link = unquote(link_tag["href"].split("&")[0].replace("/url?q=", "")) if link in fetched_links: continue fetched_links.add(link) title = title_tag.text description = description_tag.text # Store result in the same format as the API results search_results.append({ "title": title, "url": link, "content": description, "score": None, "raw_content": description }) fetched_results += 1 new_results += 1 if fetched_results >= max_results: break if new_results == 0: break start += 10 time.sleep(1) # Delay between pages return search_results except Exception as e: print(f"Error in Google search for '{query}': {str(e)}") return [] # Execute search in thread pool loop = asyncio.get_running_loop() search_results = await loop.run_in_executor( executor, lambda: google_search(query, max_results) ) # Process the results results = search_results # If requested, fetch full page content asynchronously (for both API and web scraping) if include_raw_content and results: content_semaphore = asyncio.Semaphore(3) async with aiohttp.ClientSession() as session: fetch_tasks = [] async def fetch_full_content(result): async with content_semaphore: url = result['url'] headers = { 'User-Agent': get_useragent(), 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' } try: await asyncio.sleep(0.2 + random.random() * 0.6) async with session.get(url, headers=headers, timeout=10) as response: if response.status == 200: # Check content type to handle binary files content_type = response.headers.get('Content-Type', '').lower() # Handle PDFs and other binary files if 'application/pdf' in content_type or 'application/octet-stream' in content_type: # For PDFs, indicate that content is binary and not parsed result['raw_content'] = f"[Binary content: {content_type}. Content extraction not supported for this file type.]" else: try: # Try to decode as UTF-8 with replacements for non-UTF8 characters html = await response.text(errors='replace') soup = BeautifulSoup(html, 'html.parser') result['raw_content'] = soup.get_text() except UnicodeDecodeError as ude: # Fallback if we still have decoding issues result['raw_content'] = f"[Could not decode content: {str(ude)}]" except Exception as e: print(f"Warning: Failed to fetch content for {url}: {str(e)}") result['raw_content'] = f"[Error fetching content: {str(e)}]" return result for result in results: fetch_tasks.append(fetch_full_content(result)) updated_results = await asyncio.gather(*fetch_tasks) results = updated_results print(f"Fetched full content for {len(results)} results") return { "query": query, "follow_up_questions": None, "answer": None, "images": [], "results": results } except Exception as e: print(f"Error in Google search for query '{query}': {str(e)}") return { "query": query, "follow_up_questions": None, "answer": None, "images": [], "results": [] } try: # Create tasks for all search queries search_tasks = [search_single_query(query) for query in search_queries] # Execute all searches concurrently search_results = await asyncio.gather(*search_tasks) return search_results finally: # Only shut down executor if it was created if executor: executor.shutdown(wait=False) async def select_and_execute_search(search_api: str, query_list: list[str], params_to_pass: dict) -> dict[dict[str, Any]]: if search_api == "tavily": search_results = await tavily_search_async(query_list, **params_to_pass) return deduplicate_sources(search_results) elif search_api == "perplexity": search_results = perplexity_search(query_list, **params_to_pass) return deduplicate_sources(search_results) elif search_api == "exa": search_results = await exa_search(query_list, **params_to_pass) return deduplicate_sources(search_results) elif search_api == "arxiv": search_results = await arxiv_search_async(query_list, **params_to_pass) return deduplicate_sources(search_results) elif search_api == "pubmed": search_results = await pubmed_search_async(query_list, **params_to_pass) return deduplicate_sources(search_results) elif search_api == "linkup": search_results = await linkup_search(query_list, **params_to_pass) return deduplicate_sources(search_results) elif search_api == "duckduckgo": search_results = await duckduckgo_search(query_list) return deduplicate_sources(search_results) elif search_api == "googlesearch": search_results = await google_search_async(query_list, **params_to_pass) return deduplicate_sources(search_results) else: raise ValueError(f"Unsupported search API: {search_api}")