From 65f417ce0c881fa77465d9f96fe595b8591ec0c8 Mon Sep 17 00:00:00 2001 From: Element <2401926342@qq.com> Date: Wed, 20 Aug 2025 20:13:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E4=BC=A0=E6=96=87=E4=BB=B6=E8=87=B3?= =?UTF-8?q?=20/?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | 8 + utils.py | 1160 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1168 insertions(+) create mode 100644 requirements.txt create mode 100644 utils.py diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0cfba27 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,8 @@ +exa_py +linkup-sdk +duckduckgo-search +langchain-community +langchain-deepseek +langgraph +tavily-python +streamlit \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..472eb33 --- /dev/null +++ b/utils.py @@ -0,0 +1,1160 @@ +import os +import asyncio +import requests +import random +import concurrent +import aiohttp +import time +import logging +from typing import List, Optional, Dict, Any, Union +from urllib.parse import unquote + +from exa_py import Exa +from linkup import LinkupClient +from tavily import AsyncTavilyClient +from duckduckgo_search import DDGS +from bs4 import BeautifulSoup + +from langchain_community.retrievers import ArxivRetriever +from langchain_community.utilities.pubmed import PubMedAPIWrapper +from langsmith import traceable + +from state import Section, ReportState + + + +def compile_completed_sections(state: ReportState) -> str: + """ + 将已完成的章节列表拼接为完整的文章字符串。 + """ + compiled = [] + + for number, section in enumerate(state["completed_sections"], 1): + title = section.name + content = section.content.strip() + compiled.append(f"# {number}. {title}\n\n{content}\n") + + return "\n".join(compiled).strip() + + +def get_config_value(value): + """ + Helper function to handle both string and enum cases of configuration values + """ + return value if isinstance(value, str) else value.value + + +def get_search_params(search_api: str, search_api_config: Optional[Dict[str, Any]]) -> Dict[str, Any]: + """ + Filters the search_api_config dictionary to include only parameters accepted by the specified search API. + + Args: + search_api (str): The search API identifier (e.g., "exa", "tavily"). + search_api_config (Optional[Dict[str, Any]]): The configuration dictionary for the search API. + + Returns: + Dict[str, Any]: A dictionary of parameters to pass to the search function. + """ + # Define accepted parameters for each search API + SEARCH_API_PARAMS = { + "exa": ["max_characters", "num_results", "include_domains", "exclude_domains", "subpages"], + "tavily": [], # Tavily currently accepts no additional parameters + "perplexity": [], # Perplexity accepts no additional parameters + "arxiv": ["load_max_docs", "get_full_documents", "load_all_available_meta"], + "pubmed": ["top_k_results", "email", "api_key", "doc_content_chars_max"], + "linkup": ["depth"], + } + + # Get the list of accepted parameters for the given search API + accepted_params = SEARCH_API_PARAMS.get(search_api, []) + + # If no config provided, return an empty dict + if not search_api_config: + return {} + + # Filter the config to only include accepted parameters + return {k: v for k, v in search_api_config.items() if k in accepted_params} + + +def deduplicate_sources(search_response) -> dict[str, dict[str, str]]: + # Collect all results + sources_list = [] + for response in search_response: + sources_list.extend(response['results']) + + # Deduplicate by URL + unique_sources = {source['url']: source for source in sources_list} + + return unique_sources + + +def format_sources(unique_sources: dict, max_tokens_per_source=4000, handle_raw_content=False) -> str: + formatted_text = "Content from sources:\n" + for _, source in enumerate(unique_sources.values(), 1): + formatted_text += f"TITILE: {source['title']}\n" + formatted_text += f"URL: {source['url']}\n" + formatted_text += f"CONTENT: {source['content']}\n\n" + char_limit = max_tokens_per_source * 4 + if handle_raw_content: + raw_content = source.get('raw_content', '') + raw_content = '' + print(f"Warning: No raw_content found for source {source['url']}") + if len(raw_content) > char_limit: + raw_content = raw_content[:char_limit] + "... [truncated]" + formatted_text += f"Full source content limited to {max_tokens_per_source} tokens: {raw_content}\n\n" + + return formatted_text.strip() + + +def format_sections(sections: list[Section]) -> str: + """ Format a list of sections into a string """ + formatted_str = "" + for idx, section in enumerate(sections, 1): + formatted_str += f""" +{'='*60} +Section {idx}: {section.name} +{'='*60} +Description: +{section.description} +Requires Research: +{section.research} + +Content: +{section.content if section.content else '[Not yet written]'} + +""" + return formatted_str + + +@traceable +async def tavily_search_async(search_queries): + """ + Performs concurrent web searches using the Tavily API. + + Args: + search_queries (List[SearchQuery]): List of search queries to process + + Returns: + List[dict]: List of search responses from Tavily API, one per query. Each response has format: + { + 'query': str, # The original search query + 'follow_up_questions': None, + 'answer': None, + 'images': list, + 'results': [ # List of search results + { + 'title': str, # Title of the webpage + 'url': str, # URL of the result + 'content': str, # Summary/snippet of content + 'score': float, # Relevance score + 'raw_content': str|None # Full page content if available + }, + ... + ] + } + """ + tavily_async_client = AsyncTavilyClient() + search_tasks = [] + for query in search_queries: + search_tasks.append( + tavily_async_client.search( + query, + max_results=5, + include_raw_content=True, + topic="general" + ) + ) + + # Execute all searches concurrently + search_docs = await asyncio.gather(*search_tasks) + + return search_docs + + +@traceable +def perplexity_search(search_queries): + """Search the web using the Perplexity API. + + Args: + search_queries (List[SearchQuery]): List of search queries to process + + Returns: + List[dict]: List of search responses from Perplexity API, one per query. Each response has format: + { + 'query': str, # The original search query + 'follow_up_questions': None, + 'answer': None, + 'images': list, + 'results': [ # List of search results + { + 'title': str, # Title of the search result + 'url': str, # URL of the result + 'content': str, # Summary/snippet of content + 'score': float, # Relevance score + 'raw_content': str|None # Full content or None for secondary citations + }, + ... + ] + } + """ + + headers = { + "accept": "application/json", + "content-type": "application/json", + "Authorization": f"Bearer {os.getenv('PERPLEXITY_API_KEY')}" + } + + search_docs = [] + for query in search_queries: + + payload = { + "model": "sonar-pro", + "messages": [ + { + "role": "system", + "content": "Search the web and provide factual information with sources." + }, + { + "role": "user", + "content": query + } + ] + } + + response = requests.post( + "https://api.perplexity.ai/chat/completions", + headers=headers, + json=payload + ) + response.raise_for_status() # Raise exception for bad status codes + + # Parse the response + data = response.json() + content = data["choices"][0]["message"]["content"] + citations = data.get("citations", ["https://perplexity.ai"]) + + # Create results list for this query + results = [] + + # First citation gets the full content + results.append({ + "title": f"Perplexity Search, Source 1", + "url": citations[0], + "content": content, + "raw_content": content, + "score": 1.0 # Adding score to match Tavily format + }) + + # Add additional citations without duplicating content + for i, citation in enumerate(citations[1:], start=2): + results.append({ + "title": f"Perplexity Search, Source {i}", + "url": citation, + "content": "See primary source for full content", + "raw_content": None, + "score": 0.5 # Lower score for secondary sources + }) + + # Format response to match Tavily structure + search_docs.append({ + "query": query, + "follow_up_questions": None, + "answer": None, + "images": [], + "results": results + }) + + return search_docs + + +@traceable +async def exa_search(search_queries, max_characters: Optional[int] = None, num_results=5, + include_domains: Optional[List[str]] = None, + exclude_domains: Optional[List[str]] = None, + subpages: Optional[int] = None): + """Search the web using the Exa API. + + Args: + search_queries (List[SearchQuery]): List of search queries to process + max_characters (int, optional): Maximum number of characters to retrieve for each result's raw content. + If None, the text parameter will be set to True instead of an object. + num_results (int): Number of search results per query. Defaults to 5. + include_domains (List[str], optional): List of domains to include in search results. + When specified, only results from these domains will be returned. + exclude_domains (List[str], optional): List of domains to exclude from search results. + Cannot be used together with include_domains. + subpages (int, optional): Number of subpages to retrieve per result. If None, subpages are not retrieved. + + Returns: + List[dict]: List of search responses from Exa API, one per query. Each response has format: + { + 'query': str, # The original search query + 'follow_up_questions': None, + 'answer': None, + 'images': list, + 'results': [ # List of search results + { + 'title': str, # Title of the search result + 'url': str, # URL of the result + 'content': str, # Summary/snippet of content + 'score': float, # Relevance score + 'raw_content': str|None # Full content or None for secondary citations + }, + ... + ] + } + """ + # Check that include_domains and exclude_domains are not both specified + if include_domains and exclude_domains: + raise ValueError("Cannot specify both include_domains and exclude_domains") + + # Initialize Exa client (API key should be configured in your .env file) + exa = Exa(api_key=f"{os.getenv('EXA_API_KEY')}") + + # Define the function to process a single query + async def process_query(query): + # Use run_in_executor to make the synchronous exa call in a non-blocking way + loop = asyncio.get_event_loop() + + # Define the function for the executor with all parameters + def exa_search_fn(): + # Build parameters dictionary + kwargs = { + # Set text to True if max_characters is None, otherwise use an object with max_characters + "text": True if max_characters is None else {"max_characters": max_characters}, + "summary": True, # This is an amazing feature by EXA. It provides an AI generated summary of the content based on the query + "num_results": num_results + } + + # Add optional parameters only if they are provided + if subpages is not None: + kwargs["subpages"] = subpages + + if include_domains: + kwargs["include_domains"] = include_domains + elif exclude_domains: + kwargs["exclude_domains"] = exclude_domains + + return exa.search_and_contents(query, **kwargs) + + response = await loop.run_in_executor(None, exa_search_fn) + + # Format the response to match the expected output structure + formatted_results = [] + seen_urls = set() # Track URLs to avoid duplicates + + # Helper function to safely get value regardless of if item is dict or object + def get_value(item, key, default=None): + if isinstance(item, dict): + return item.get(key, default) + else: + return getattr(item, key, default) if hasattr(item, key) else default + + # Access the results from the SearchResponse object + results_list = get_value(response, 'results', []) + + # First process all main results + for result in results_list: + # Get the score with a default of 0.0 if it's None or not present + score = get_value(result, 'score', 0.0) + + # Combine summary and text for content if both are available + text_content = get_value(result, 'text', '') + summary_content = get_value(result, 'summary', '') + + content = text_content + if summary_content: + if content: + content = f"{summary_content}\n\n{content}" + else: + content = summary_content + + title = get_value(result, 'title', '') + url = get_value(result, 'url', '') + + # Skip if we've seen this URL before (removes duplicate entries) + if url in seen_urls: + continue + + seen_urls.add(url) + + # Main result entry + result_entry = { + "title": title, + "url": url, + "content": content, + "score": score, + "raw_content": text_content + } + + # Add the main result to the formatted results + formatted_results.append(result_entry) + + # Now process subpages only if the subpages parameter was provided + if subpages is not None: + for result in results_list: + subpages_list = get_value(result, 'subpages', []) + for subpage in subpages_list: + # Get subpage score + subpage_score = get_value(subpage, 'score', 0.0) + + # Combine summary and text for subpage content + subpage_text = get_value(subpage, 'text', '') + subpage_summary = get_value(subpage, 'summary', '') + + subpage_content = subpage_text + if subpage_summary: + if subpage_content: + subpage_content = f"{subpage_summary}\n\n{subpage_content}" + else: + subpage_content = subpage_summary + + subpage_url = get_value(subpage, 'url', '') + + # Skip if we've seen this URL before + if subpage_url in seen_urls: + continue + + seen_urls.add(subpage_url) + + formatted_results.append({ + "title": get_value(subpage, 'title', ''), + "url": subpage_url, + "content": subpage_content, + "score": subpage_score, + "raw_content": subpage_text + }) + + # Collect images if available (only from main results to avoid duplication) + images = [] + for result in results_list: + image = get_value(result, 'image') + if image and image not in images: # Avoid duplicate images + images.append(image) + + return { + "query": query, + "follow_up_questions": None, + "answer": None, + "images": images, + "results": formatted_results + } + + # Process all queries sequentially with delay to respect rate limit + search_docs = [] + for i, query in enumerate(search_queries): + try: + # Add delay between requests (0.25s = 4 requests per second, well within the 5/s limit) + if i > 0: # Don't delay the first request + await asyncio.sleep(0.25) + + result = await process_query(query) + search_docs.append(result) + except Exception as e: + # Handle exceptions gracefully + print(f"Error processing query '{query}': {str(e)}") + # Add a placeholder result for failed queries to maintain index alignment + search_docs.append({ + "query": query, + "follow_up_questions": None, + "answer": None, + "images": [], + "results": [], + "error": str(e) + }) + + # Add additional delay if we hit a rate limit error + if "429" in str(e): + print("Rate limit exceeded. Adding additional delay...") + await asyncio.sleep(1.0) # Add a longer delay if we hit a rate limit + + return search_docs + + +@traceable +async def arxiv_search_async(search_queries, load_max_docs=5, get_full_documents=True, load_all_available_meta=True): + """ + Performs concurrent searches on arXiv using the ArxivRetriever. + + Args: + search_queries (List[str]): List of search queries or article IDs + load_max_docs (int, optional): Maximum number of documents to return per query. Default is 5. + get_full_documents (bool, optional): Whether to fetch full text of documents. Default is True. + load_all_available_meta (bool, optional): Whether to load all available metadata. Default is True. + + Returns: + List[dict]: List of search responses from arXiv, one per query. Each response has format: + { + 'query': str, # The original search query + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': [ # List of search results + { + 'title': str, # Title of the paper + 'url': str, # URL (Entry ID) of the paper + 'content': str, # Formatted summary with metadata + 'score': float, # Relevance score (approximated) + 'raw_content': str|None # Full paper content if available + }, + ... + ] + } + """ + + async def process_single_query(query): + try: + # Create retriever for each query + retriever = ArxivRetriever( + load_max_docs=load_max_docs, + get_full_documents=get_full_documents, + load_all_available_meta=load_all_available_meta + ) + + # Run the synchronous retriever in a thread pool + loop = asyncio.get_event_loop() + docs = await loop.run_in_executor(None, lambda: retriever.invoke(query)) + + results = [] + # Assign decreasing scores based on the order + base_score = 1.0 + score_decrement = 1.0 / (len(docs) + 1) if docs else 0 + + for i, doc in enumerate(docs): + # Extract metadata + metadata = doc.metadata + + # Use entry_id as the URL (this is the actual arxiv link) + url = metadata.get('entry_id', '') + + # Format content with all useful metadata + content_parts = [] + + # Primary information + if 'Summary' in metadata: + content_parts.append(f"Summary: {metadata['Summary']}") + + if 'Authors' in metadata: + content_parts.append(f"Authors: {metadata['Authors']}") + + # Add publication information + published = metadata.get('Published') + published_str = published.isoformat() if hasattr(published, 'isoformat') else str(published) if published else '' + if published_str: + content_parts.append(f"Published: {published_str}") + + # Add additional metadata if available + if 'primary_category' in metadata: + content_parts.append(f"Primary Category: {metadata['primary_category']}") + + if 'categories' in metadata and metadata['categories']: + content_parts.append(f"Categories: {', '.join(metadata['categories'])}") + + if 'comment' in metadata and metadata['comment']: + content_parts.append(f"Comment: {metadata['comment']}") + + if 'journal_ref' in metadata and metadata['journal_ref']: + content_parts.append(f"Journal Reference: {metadata['journal_ref']}") + + if 'doi' in metadata and metadata['doi']: + content_parts.append(f"DOI: {metadata['doi']}") + + # Get PDF link if available in the links + pdf_link = "" + if 'links' in metadata and metadata['links']: + for link in metadata['links']: + if 'pdf' in link: + pdf_link = link + content_parts.append(f"PDF: {pdf_link}") + break + + # Join all content parts with newlines + content = "\n".join(content_parts) + + result = { + 'title': metadata.get('Title', ''), + 'url': url, # Using entry_id as the URL + 'content': content, + 'score': base_score - (i * score_decrement), + 'raw_content': doc.page_content if get_full_documents else None + } + results.append(result) + + return { + 'query': query, + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': results + } + except Exception as e: + # Handle exceptions gracefully + print(f"Error processing arXiv query '{query}': {str(e)}") + return { + 'query': query, + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': [], + 'error': str(e) + } + + # Process queries sequentially with delay to respect arXiv rate limit (1 request per 3 seconds) + search_docs = [] + for i, query in enumerate(search_queries): + try: + # Add delay between requests (3 seconds per ArXiv's rate limit) + if i > 0: # Don't delay the first request + await asyncio.sleep(3.0) + + result = await process_single_query(query) + search_docs.append(result) + except Exception as e: + # Handle exceptions gracefully + print(f"Error processing arXiv query '{query}': {str(e)}") + search_docs.append({ + 'query': query, + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': [], + 'error': str(e) + }) + + # Add additional delay if we hit a rate limit error + if "429" in str(e) or "Too Many Requests" in str(e): + print("ArXiv rate limit exceeded. Adding additional delay...") + await asyncio.sleep(5.0) # Add a longer delay if we hit a rate limit + + return search_docs + + +@traceable +async def pubmed_search_async(search_queries, top_k_results=5, email=None, api_key=None, doc_content_chars_max=4000): + """ + Performs concurrent searches on PubMed using the PubMedAPIWrapper. + + Args: + search_queries (List[str]): List of search queries + top_k_results (int, optional): Maximum number of documents to return per query. Default is 5. + email (str, optional): Email address for PubMed API. Required by NCBI. + api_key (str, optional): API key for PubMed API for higher rate limits. + doc_content_chars_max (int, optional): Maximum characters for document content. Default is 4000. + + Returns: + List[dict]: List of search responses from PubMed, one per query. Each response has format: + { + 'query': str, # The original search query + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': [ # List of search results + { + 'title': str, # Title of the paper + 'url': str, # URL to the paper on PubMed + 'content': str, # Formatted summary with metadata + 'score': float, # Relevance score (approximated) + 'raw_content': str # Full abstract content + }, + ... + ] + } + """ + + async def process_single_query(query): + try: + # print(f"Processing PubMed query: '{query}'") + + # Create PubMed wrapper for the query + wrapper = PubMedAPIWrapper( + top_k_results=top_k_results, + doc_content_chars_max=doc_content_chars_max, + email=email if email else "your_email@example.com", + api_key=api_key if api_key else "" + ) + + # Run the synchronous wrapper in a thread pool + loop = asyncio.get_event_loop() + + # Use wrapper.lazy_load instead of load to get better visibility + docs = await loop.run_in_executor(None, lambda: list(wrapper.lazy_load(query))) + + print(f"Query '{query}' returned {len(docs)} results") + + results = [] + # Assign decreasing scores based on the order + base_score = 1.0 + score_decrement = 1.0 / (len(docs) + 1) if docs else 0 + + for i, doc in enumerate(docs): + # Format content with metadata + content_parts = [] + + if doc.get('Published'): + content_parts.append(f"Published: {doc['Published']}") + + if doc.get('Copyright Information'): + content_parts.append(f"Copyright Information: {doc['Copyright Information']}") + + if doc.get('Summary'): + content_parts.append(f"Summary: {doc['Summary']}") + + # Generate PubMed URL from the article UID + uid = doc.get('uid', '') + url = f"https://pubmed.ncbi.nlm.nih.gov/{uid}/" if uid else "" + + # Join all content parts with newlines + content = "\n".join(content_parts) + + result = { + 'title': doc.get('Title', ''), + 'url': url, + 'content': content, + 'score': base_score - (i * score_decrement), + 'raw_content': doc.get('Summary', '') + } + results.append(result) + + return { + 'query': query, + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': results + } + except Exception as e: + # Handle exceptions with more detailed information + error_msg = f"Error processing PubMed query '{query}': {str(e)}" + print(error_msg) + import traceback + print(traceback.format_exc()) # Print full traceback for debugging + + return { + 'query': query, + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': [], + 'error': str(e) + } + + # Process all queries with a reasonable delay between them + search_docs = [] + + # Start with a small delay that increases if we encounter rate limiting + delay = 1.0 # Start with a more conservative delay + + for i, query in enumerate(search_queries): + try: + # Add delay between requests + if i > 0: # Don't delay the first request + # print(f"Waiting {delay} seconds before next query...") + await asyncio.sleep(delay) + + result = await process_single_query(query) + search_docs.append(result) + + # If query was successful with results, we can slightly reduce delay (but not below minimum) + if result.get('results') and len(result['results']) > 0: + delay = max(0.5, delay * 0.9) # Don't go below 0.5 seconds + + except Exception as e: + # Handle exceptions gracefully + error_msg = f"Error in main loop processing PubMed query '{query}': {str(e)}" + print(error_msg) + + search_docs.append({ + 'query': query, + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': [], + 'error': str(e) + }) + + # If we hit an exception, increase delay for next query + delay = min(5.0, delay * 1.5) # Don't exceed 5 seconds + + return search_docs + + +@traceable +async def linkup_search(search_queries, depth: Optional[str] = "standard"): + """ + Performs concurrent web searches using the Linkup API. + + Args: + search_queries (List[SearchQuery]): List of search queries to process + depth (str, optional): "standard" (default) or "deep". More details here https://docs.linkup.so/pages/documentation/get-started/concepts + + Returns: + List[dict]: List of search responses from Linkup API, one per query. Each response has format: + { + 'results': [ # List of search results + { + 'title': str, # Title of the search result + 'url': str, # URL of the result + 'content': str, # Summary/snippet of content + }, + ... + ] + } + """ + client = LinkupClient() + search_tasks = [] + for query in search_queries: + search_tasks.append( + client.async_search( + query, + depth, + output_type="searchResults", + ) + ) + + search_results = [] + for response in await asyncio.gather(*search_tasks): + search_results.append( + { + "results": [ + {"title": result.name, "url": result.url, "content": result.content} + for result in response.results + ], + } + ) + + return search_results + + +@traceable +async def duckduckgo_search(search_queries): + """Perform searches using DuckDuckGo + + Args: + search_queries (List[str]): List of search queries to process + + Returns: + List[dict]: List of search results + """ + async def process_single_query(query): + # Execute synchronous search in the event loop's thread pool + loop = asyncio.get_event_loop() + + def perform_search(): + results = [] + with DDGS() as ddgs: + ddg_results = list(ddgs.text(query, max_results=5)) + + # Format results + for i, result in enumerate(ddg_results): + results.append({ + 'title': result.get('title', ''), + 'url': result.get('link', ''), + 'content': result.get('body', ''), + 'score': 1.0 - (i * 0.1), # Simple scoring mechanism + 'raw_content': result.get('body', '') + }) + return { + 'query': query, + 'follow_up_questions': None, + 'answer': None, + 'images': [], + 'results': results + } + + return await loop.run_in_executor(None, perform_search) + + # Execute all queries concurrently + tasks = [process_single_query(query) for query in search_queries] + search_docs = await asyncio.gather(*tasks) + + return search_docs + + +@traceable +async def google_search_async(search_queries: Union[str, List[str]], max_results: int = 5, include_raw_content: bool = True): + """ + Performs concurrent web searches using Google. + Uses Google Custom Search API if environment variables are set, otherwise falls back to web scraping. + + Args: + search_queries (List[str]): List of search queries to process + max_results (int): Maximum number of results to return per query + include_raw_content (bool): Whether to fetch full page content + + Returns: + List[dict]: List of search responses from Google, one per query + """ + + # Check for API credentials from environment variables + api_key = os.environ.get("GOOGLE_API_KEY") + cx = os.environ.get("GOOGLE_CX") + use_api = bool(api_key and cx) + + # Handle case where search_queries is a single string + if isinstance(search_queries, str): + search_queries = [search_queries] + + # Define user agent generator + def get_useragent(): + """Generates a random user agent string.""" + lynx_version = f"Lynx/{random.randint(2, 3)}.{random.randint(8, 9)}.{random.randint(0, 2)}" + libwww_version = f"libwww-FM/{random.randint(2, 3)}.{random.randint(13, 15)}" + ssl_mm_version = f"SSL-MM/{random.randint(1, 2)}.{random.randint(3, 5)}" + openssl_version = f"OpenSSL/{random.randint(1, 3)}.{random.randint(0, 4)}.{random.randint(0, 9)}" + return f"{lynx_version} {libwww_version} {ssl_mm_version} {openssl_version}" + + # Create executor for running synchronous operations + executor = None if use_api else concurrent.futures.ThreadPoolExecutor(max_workers=5) + + # Use a semaphore to limit concurrent requests + semaphore = asyncio.Semaphore(5 if use_api else 2) + + async def search_single_query(query): + async with semaphore: + try: + results = [] + + # API-based search + if use_api: + # The API returns up to 10 results per request + for start_index in range(1, max_results + 1, 10): + # Calculate how many results to request in this batch + num = min(10, max_results - (start_index - 1)) + + # Make request to Google Custom Search API + params = { + 'q': query, + 'key': api_key, + 'cx': cx, + 'start': start_index, + 'num': num + } + print(f"Requesting {num} results for '{query}' from Google API...") + + async with aiohttp.ClientSession() as session: + async with session.get('https://www.googleapis.com/customsearch/v1', params=params) as response: + if response.status != 200: + error_text = await response.text() + print(f"API error: {response.status}, {error_text}") + break + + data = await response.json() + + # Process search results + for item in data.get('items', []): + result = { + "title": item.get('title', ''), + "url": item.get('link', ''), + "content": item.get('snippet', ''), + "score": None, + "raw_content": item.get('snippet', '') + } + results.append(result) + + # Respect API quota with a small delay + await asyncio.sleep(0.2) + + # If we didn't get a full page of results, no need to request more + if not data.get('items') or len(data.get('items', [])) < num: + break + + # Web scraping based search + else: + # Add delay between requests + await asyncio.sleep(0.5 + random.random() * 1.5) + print(f"Scraping Google for '{query}'...") + + # Define scraping function + def google_search(query, max_results): + try: + lang = "en" + safe = "active" + start = 0 + fetched_results = 0 + fetched_links = set() + search_results = [] + + while fetched_results < max_results: + # Send request to Google + resp = requests.get( + url="https://www.google.com/search", + headers={ + "User-Agent": get_useragent(), + "Accept": "*/*" + }, + params={ + "q": query, + "num": max_results + 2, + "hl": lang, + "start": start, + "safe": safe, + }, + cookies={ + 'CONSENT': 'PENDING+987', # Bypasses the consent page + 'SOCS': 'CAESHAgBEhIaAB', + } + ) + resp.raise_for_status() + + # Parse results + soup = BeautifulSoup(resp.text, "html.parser") + result_block = soup.find_all("div", class_="ezO2md") + new_results = 0 + + for result in result_block: + link_tag = result.find("a", href=True) + title_tag = link_tag.find("span", class_="CVA68e") if link_tag else None + description_tag = result.find("span", class_="FrIlee") + + if link_tag and title_tag and description_tag: + link = unquote(link_tag["href"].split("&")[0].replace("/url?q=", "")) + + if link in fetched_links: + continue + + fetched_links.add(link) + title = title_tag.text + description = description_tag.text + + # Store result in the same format as the API results + search_results.append({ + "title": title, + "url": link, + "content": description, + "score": None, + "raw_content": description + }) + + fetched_results += 1 + new_results += 1 + + if fetched_results >= max_results: + break + + if new_results == 0: + break + + start += 10 + time.sleep(1) # Delay between pages + + return search_results + + except Exception as e: + print(f"Error in Google search for '{query}': {str(e)}") + return [] + + # Execute search in thread pool + loop = asyncio.get_running_loop() + search_results = await loop.run_in_executor( + executor, + lambda: google_search(query, max_results) + ) + + # Process the results + results = search_results + + # If requested, fetch full page content asynchronously (for both API and web scraping) + if include_raw_content and results: + content_semaphore = asyncio.Semaphore(3) + + async with aiohttp.ClientSession() as session: + fetch_tasks = [] + + async def fetch_full_content(result): + async with content_semaphore: + url = result['url'] + headers = { + 'User-Agent': get_useragent(), + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8' + } + + try: + await asyncio.sleep(0.2 + random.random() * 0.6) + async with session.get(url, headers=headers, timeout=10) as response: + if response.status == 200: + # Check content type to handle binary files + content_type = response.headers.get('Content-Type', '').lower() + + # Handle PDFs and other binary files + if 'application/pdf' in content_type or 'application/octet-stream' in content_type: + # For PDFs, indicate that content is binary and not parsed + result['raw_content'] = f"[Binary content: {content_type}. Content extraction not supported for this file type.]" + else: + try: + # Try to decode as UTF-8 with replacements for non-UTF8 characters + html = await response.text(errors='replace') + soup = BeautifulSoup(html, 'html.parser') + result['raw_content'] = soup.get_text() + except UnicodeDecodeError as ude: + # Fallback if we still have decoding issues + result['raw_content'] = f"[Could not decode content: {str(ude)}]" + except Exception as e: + print(f"Warning: Failed to fetch content for {url}: {str(e)}") + result['raw_content'] = f"[Error fetching content: {str(e)}]" + return result + + for result in results: + fetch_tasks.append(fetch_full_content(result)) + + updated_results = await asyncio.gather(*fetch_tasks) + results = updated_results + print(f"Fetched full content for {len(results)} results") + + return { + "query": query, + "follow_up_questions": None, + "answer": None, + "images": [], + "results": results + } + except Exception as e: + print(f"Error in Google search for query '{query}': {str(e)}") + return { + "query": query, + "follow_up_questions": None, + "answer": None, + "images": [], + "results": [] + } + + try: + # Create tasks for all search queries + search_tasks = [search_single_query(query) for query in search_queries] + + # Execute all searches concurrently + search_results = await asyncio.gather(*search_tasks) + + return search_results + finally: + # Only shut down executor if it was created + if executor: + executor.shutdown(wait=False) + + +async def select_and_execute_search(search_api: str, query_list: list[str], params_to_pass: dict) -> dict[dict[str, Any]]: + if search_api == "tavily": + search_results = await tavily_search_async(query_list, **params_to_pass) + return deduplicate_sources(search_results) + elif search_api == "perplexity": + search_results = perplexity_search(query_list, **params_to_pass) + return deduplicate_sources(search_results) + elif search_api == "exa": + search_results = await exa_search(query_list, **params_to_pass) + return deduplicate_sources(search_results) + elif search_api == "arxiv": + search_results = await arxiv_search_async(query_list, **params_to_pass) + return deduplicate_sources(search_results) + elif search_api == "pubmed": + search_results = await pubmed_search_async(query_list, **params_to_pass) + return deduplicate_sources(search_results) + elif search_api == "linkup": + search_results = await linkup_search(query_list, **params_to_pass) + return deduplicate_sources(search_results) + elif search_api == "duckduckgo": + search_results = await duckduckgo_search(query_list) + return deduplicate_sources(search_results) + elif search_api == "googlesearch": + search_results = await google_search_async(query_list, **params_to_pass) + return deduplicate_sources(search_results) + else: + raise ValueError(f"Unsupported search API: {search_api}")