Tavily MCP Server

  • src
  • mcp_server_tavily
from typing import Annotated from mcp.server import Server from mcp.shared.exceptions import McpError from mcp.types import ErrorData from mcp.server.stdio import stdio_server from mcp.types import ( GetPromptResult, Prompt, PromptArgument, PromptMessage, TextContent, Tool, INVALID_PARAMS, INTERNAL_ERROR, ) from pydantic import BaseModel, Field from tavily import TavilyClient, InvalidAPIKeyError, UsageLimitExceededError from typing import Literal import json import asyncio from pydantic import field_validator class SearchBase(BaseModel): """Base parameters for Tavily search.""" query: Annotated[str, Field(description="Search query")] max_results: Annotated[ int, Field( default=5, description="Maximum number of results to return", gt=0, lt=20, ), ] include_domains: Annotated[ list[str] | None, Field( default=None, description="List of domains to specifically include in the search results (e.g. ['example.com', 'test.org'] or 'example.com')", ), ] exclude_domains: Annotated[ list[str] | None, Field( default=None, description="List of domains to specifically exclude from the search results (e.g. ['example.com', 'test.org'] or 'example.com')", ), ] @field_validator('include_domains', 'exclude_domains', mode='before') @classmethod def parse_domains_list(cls, v): """Parse domain lists from various input formats. Handles: - None -> [] - String JSON arrays -> list - Single domain string -> [string] - Comma-separated string -> list of domains - List of domains -> unchanged """ if v is None: return [] if isinstance(v, list): return [domain.strip() for domain in v if domain.strip()] if isinstance(v, str): v = v.strip() if not v: return [] try: # Try to parse as JSON string parsed = json.loads(v) if isinstance(parsed, list): return [domain.strip() for domain in parsed if domain.strip()] return [parsed.strip()] # Single value from JSON except json.JSONDecodeError: # Not JSON, check if comma-separated if ',' in v: return [domain.strip() for domain in v.split(',') if domain.strip()] return [v] # Single domain return [] class GeneralSearch(SearchBase): """Parameters for general web search.""" search_depth: Annotated[ Literal["basic", "advanced"], Field( default="basic", description="Depth of search - 'basic' or 'advanced'", ), ] class AnswerSearch(SearchBase): """Parameters for search with answer.""" search_depth: Annotated[ Literal["basic", "advanced"], Field( default="advanced", description="Depth of search - 'basic' or 'advanced'", ), ] class NewsSearch(SearchBase): """Parameters for news search.""" days: Annotated[ int | None, Field( default=None, description="Number of days back to search (default is 3)", gt=0, le=365, ), ] async def serve(api_key: str) -> None: """Run the Tavily MCP server. Args: api_key: Tavily API key """ # Ensure we don't have any lingering tasks for task in asyncio.all_tasks(): if task is not asyncio.current_task() and task.get_name().startswith('tavily_'): task.cancel() server = Server("mcp-tavily") client = TavilyClient(api_key=api_key) @server.list_tools() async def list_tools() -> list[Tool]: return [ Tool( name="tavily_web_search", description="""Performs a comprehensive web search using Tavily's AI-powered search engine. Excels at extracting and summarizing relevant content from web pages, making it ideal for research, fact-finding, and gathering detailed information. Can run in either 'basic' mode for faster, simpler searches or 'advanced' mode for more thorough analysis. Basic is cheaper and good for most use cases. Supports filtering results by including or excluding specific domains. Use include_domains/exclude_domains parameters to filter by specific websites. Returns multiple search results with AI-extracted relevant content.""", inputSchema=GeneralSearch.model_json_schema(), ), Tool( name="tavily_answer_search", description="""Performs a web search using Tavily's AI search engine and generates a direct answer to the query, along with supporting search results. Best used for questions that need concrete answers backed by current web sources. Uses advanced search depth by default for comprehensive analysis. Features powerful source control through domain filtering: - For academic research: exclude_domains=["wikipedia.org"] for more scholarly sources - For financial analysis: include_domains=["wsj.com", "bloomberg.com", "ft.com"] - For technical documentation: include_domains=["docs.python.org", "developer.mozilla.org"] - For scientific papers: include_domains=["nature.com", "sciencedirect.com"] - Can combine includes and excludes to fine-tune your sources Particularly effective for factual queries, technical questions, and queries requiring synthesis of multiple sources.""", inputSchema=AnswerSearch.model_json_schema(), ), Tool( name="tavily_news_search", description="""Searches recent news articles using Tavily's specialized news search functionality. Ideal for current events, recent developments, and trending topics. Can filter results by recency (number of days back to search) and by including or excluding specific news domains. Powerful domain filtering for news sources: - For mainstream news: include_domains=["reuters.com", "apnews.com", "bbc.com"] - For financial news: include_domains=["bloomberg.com", "wsj.com", "ft.com"] - For tech news: include_domains=["techcrunch.com", "theverge.com"] - To exclude paywalled content: exclude_domains=["wsj.com", "ft.com"] - To focus on specific regions: include_domains=["bbc.co.uk"] for UK news Returns news articles with publication dates and relevant excerpts.""", inputSchema=NewsSearch.model_json_schema(), ), ] @server.list_prompts() async def list_prompts() -> list[Prompt]: return [ Prompt( name="tavily_web_search", description="Search the web using Tavily's AI-powered search engine", arguments=[ PromptArgument( name="query", description="Search query", required=True, ), PromptArgument( name="include_domains", description="Optional list of domains to specifically include (e.g., 'wsj.com,bloomberg.com' for financial sources, 'nature.com,sciencedirect.com' for scientific sources)", required=False, ), PromptArgument( name="exclude_domains", description="Optional list of domains to exclude from results (e.g., 'wikipedia.org' to exclude Wikipedia, or 'wsj.com,ft.com' to exclude paywalled sources)", required=False, ), ], ), Prompt( name="tavily_answer_search", description="Search the web and get an AI-generated answer with supporting evidence", arguments=[ PromptArgument( name="query", description="Search query", required=True, ), PromptArgument( name="include_domains", description="Optional comma-separated list of domains to include", required=False, ), PromptArgument( name="exclude_domains", description="Optional comma-separated list of domains to exclude", required=False, ), ], ), Prompt( name="tavily_news_search", description="Search recent news articles with Tavily's news search", arguments=[ PromptArgument( name="query", description="Search query", required=True, ), PromptArgument( name="days", description="Number of days back to search", required=False, ), PromptArgument( name="include_domains", description="Optional comma-separated list of domains to include", required=False, ), PromptArgument( name="exclude_domains", description="Optional comma-separated list of domains to exclude", required=False, ), ], ), ] def format_results(response: dict) -> str: """Format Tavily search results into a readable string.""" output = [] # Add domain filter information if present if response.get("included_domains") or response.get("excluded_domains"): filters = [] if response.get("included_domains"): filters.append(f"Including domains: {', '.join(response['included_domains'])}") if response.get("excluded_domains"): filters.append(f"Excluding domains: {', '.join(response['excluded_domains'])}") output.append("Search Filters:") output.extend(filters) output.append("") # Empty line for separation if response.get("answer"): output.append(f"Answer: {response['answer']}") output.append("\nSources:") # Add immediate source references for the answer for result in response["results"]: output.append(f"- {result['title']}: {result['url']}") output.append("") # Empty line for separation output.append("Detailed Results:") for result in response["results"]: output.append(f"\nTitle: {result['title']}") output.append(f"URL: {result['url']}") output.append(f"Content: {result['content']}") if result.get("published_date"): output.append(f"Published: {result['published_date']}") return "\n".join(output) @server.call_tool() async def call_tool(name: str, arguments: dict) -> list[TextContent]: try: if name == "tavily_web_search": args = GeneralSearch(**arguments) response = client.search( query=args.query, max_results=args.max_results, search_depth=args.search_depth, include_domains=args.include_domains or [], # Convert None to empty list exclude_domains=args.exclude_domains or [], # Convert None to empty list ) elif name == "tavily_answer_search": args = AnswerSearch(**arguments) response = client.search( query=args.query, max_results=args.max_results, search_depth=args.search_depth, include_answer=True, include_domains=args.include_domains or [], # Convert None to empty list exclude_domains=args.exclude_domains or [], # Convert None to empty list ) elif name == "tavily_news_search": args = NewsSearch(**arguments) response = client.search( query=args.query, max_results=args.max_results, topic="news", days=args.days if args.days is not None else 3, include_domains=args.include_domains or [], exclude_domains=args.exclude_domains or [], ) else: raise ValueError(f"Unknown tool: {name}") # Add domain filter information to response for formatting if args.include_domains: response["included_domains"] = args.include_domains if args.exclude_domains: response["excluded_domains"] = args.exclude_domains except (InvalidAPIKeyError, UsageLimitExceededError) as e: raise McpError(ErrorData(INTERNAL_ERROR, str(e))) except ValueError as e: raise McpError(ErrorData(INVALID_PARAMS, str(e))) return [TextContent( type="text", text=format_results(response), )] @server.get_prompt() async def get_prompt(name: str, arguments: dict | None) -> GetPromptResult: if not arguments or "query" not in arguments: raise McpError(ErrorData(INVALID_PARAMS, "Query is required")) try: # Parse domain filters if provided include_domains = None exclude_domains = None if "include_domains" in arguments: include_domains = SearchBase.parse_domains_list(arguments["include_domains"]) if "exclude_domains" in arguments: exclude_domains = SearchBase.parse_domains_list(arguments["exclude_domains"]) if name == "tavily_web_search": response = client.search( query=arguments["query"], include_domains=include_domains or [], # Convert None to empty list exclude_domains=exclude_domains or [], # Convert None to empty list ) elif name == "tavily_answer_search": response = client.search( query=arguments["query"], include_answer=True, search_depth="advanced", include_domains=include_domains or [], exclude_domains=exclude_domains or [], ) elif name == "tavily_news_search": days = arguments.get("days") response = client.search( query=arguments["query"], topic="news", days=int(days) if days else 3, include_domains=include_domains or [], exclude_domains=exclude_domains or [], ) else: raise McpError(ErrorData(INVALID_PARAMS, f"Unknown prompt: {name}")) # Add domain filter information to response for formatting if include_domains: response["included_domains"] = include_domains if exclude_domains: response["excluded_domains"] = exclude_domains except (InvalidAPIKeyError, UsageLimitExceededError) as e: return GetPromptResult( description=f"Failed to search: {str(e)}", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=str(e)), ) ], ) return GetPromptResult( description=f"Search results for: {arguments['query']}", messages=[ PromptMessage( role="user", content=TextContent(type="text", text=format_results(response)), ) ], ) options = server.create_initialization_options() async with stdio_server() as (read_stream, write_stream): try: await server.run(read_stream, write_stream, options, raise_exceptions=True) finally: # Clean up any lingering tasks for task in asyncio.all_tasks(): if task is not asyncio.current_task() and task.get_name().startswith('tavily_'): task.cancel() try: await asyncio.wait_for(task, timeout=0.1) except (asyncio.CancelledError, asyncio.TimeoutError): pass if __name__ == "__main__": import asyncio import os from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() api_key = os.getenv("TAVILY_API_KEY") if not api_key: raise ValueError("TAVILY_API_KEY environment variable is required") asyncio.run(serve(api_key))