MCP Personal Assistant Agent

  • modules
from typing import List, Dict, Any, Optional import os import logging import httpx import json import io from urllib.parse import quote_plus import matplotlib.pyplot as plt import numpy as np # Import the MCP server instance from the main file from mcp_server import mcp, Context from mcp.server.fastmcp import Image logger = logging.getLogger("mcp-pa-agent.knowledge") # Resources @mcp.resource("search://{query}") async def search_resource(query: str) -> str: """Provide search results as a resource""" try: api_key = os.getenv("DUCKDUCKGO_API_KEY") if not api_key: return "Web search is not available. Please set the DUCKDUCKGO_API_KEY environment variable." url = f"https://api.duckduckgo.com/?q={quote_plus(query)}&format=json&key={api_key}" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=10.0) response.raise_for_status() # This is simplified as the actual DuckDuckGo API response would be different search_results = response.json() if not search_results or "results" not in search_results: return "No search results found." results = search_results["results"][:5] return json.dumps(results, indent=2) except Exception as e: logger.error(f"Error performing search: {str(e)}") return f"Error performing search: {str(e)}" # Prompts @mcp.prompt() def analyze_news_prompt(topic: str) -> str: """Create a prompt for news analysis""" return f"Please analyze the latest news about '{topic}' and provide a short summary of the key developments and their potential implications." # Tool functions @mcp.tool() async def web_search(query: str, num_results: int = 5, ctx: Context = None) -> str: """Search the web for information. Args: query: The search query num_results: Number of results to return (default 5) """ if not query or len(query.strip()) == 0: error_msg = "Error: Search query cannot be empty." if ctx: ctx.error(error_msg) return error_msg if ctx: ctx.info(f"Searching for: {query}") api_key = os.getenv("DUCKDUCKGO_API_KEY") if not api_key: error_msg = "Web search is not available. Please set the DUCKDUCKGO_API_KEY environment variable." if ctx: ctx.error(error_msg) return error_msg try: # Log progress if ctx: ctx.info("Connecting to search API...") # This is a demonstration URL - DuckDuckGo doesn't actually have this API structure url = f"https://api.duckduckgo.com/?q={quote_plus(query)}&format=json&key={api_key}" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=10.0) response.raise_for_status() # This is simplified as the actual DuckDuckGo API response would be different search_results = response.json() if not search_results or "results" not in search_results: return "No search results found." results = search_results["results"][:num_results] formatted_results = [] for i, result in enumerate(results): if ctx: await ctx.report_progress(i, len(results)) formatted_results.append(f""" Title: {result.get('title', 'No title')} URL: {result.get('url', 'No URL')} Description: {result.get('description', 'No description')} """) return "\n---\n".join(formatted_results) except httpx.TimeoutException: error_msg = "Search request timed out. Please try again later." if ctx: ctx.error(error_msg) return error_msg except httpx.HTTPStatusError as e: error_msg = f"Search API error: HTTP {e.response.status_code}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Error performing web search: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def get_weather(location: str, ctx: Context = None) -> str: """Get current weather information for a location. Args: location: The location to get weather for (city name or coordinates) """ if not location or len(location.strip()) == 0: error_msg = "Error: Location cannot be empty." if ctx: ctx.error(error_msg) return error_msg if ctx: ctx.info(f"Getting weather for: {location}") try: # Mock API call api_key = os.getenv("WEATHER_API_KEY") if not api_key: # If no API key, return mock data but with a warning if ctx: ctx.warning("Using mock weather data - set WEATHER_API_KEY for real data") # Create mock data based on the location mock_response = { "location": location, "temperature": "72°F", "condition": "Partly Cloudy", "humidity": "45%", "wind": "5 mph NW", "forecast": "Clear skies with occasional clouds. Low chance of precipitation." } return f""" Weather for {mock_response['location']}: Temperature: {mock_response['temperature']} Condition: {mock_response['condition']} Humidity: {mock_response['humidity']} Wind: {mock_response['wind']} Forecast: {mock_response['forecast']} Note: This is simulated weather data. Set the WEATHER_API_KEY environment variable for real data. """ # With a real API key, we would make a real request url = f"https://api.example-weather-service.com/current?location={quote_plus(location)}&key={api_key}" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=10.0) response.raise_for_status() # Process the actual response here weather_data = response.json() return f""" Weather for {weather_data['location']}: Temperature: {weather_data['temperature']} Condition: {weather_data['condition']} Humidity: {weather_data['humidity']} Wind: {weather_data['wind']} Forecast: {weather_data['forecast']} """ except httpx.TimeoutException: error_msg = "Weather request timed out. Please try again later." if ctx: ctx.error(error_msg) return error_msg except httpx.HTTPStatusError as e: error_msg = f"Weather API error: HTTP {e.response.status_code}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Error getting weather information: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def get_news(topic: str = "", num_results: int = 5, ctx: Context = None) -> str: """Get latest news, optionally filtered by topic. Args: topic: Topic to filter news by (optional) num_results: Number of news items to return (default 5) """ if ctx: if topic: ctx.info(f"Getting news about: {topic}") else: ctx.info("Getting latest news") try: api_key = os.getenv("NEWS_API_KEY") if not api_key: # If no API key, return mock data but with a warning if ctx: ctx.warning("Using mock news data - set NEWS_API_KEY for real data") # Create mock news mock_news = [ { "title": f"Example News Headline About {topic if topic else 'Current Events'} 1", "source": "Example News", "description": f"This is a sample news description about {topic if topic else 'current events'} for demonstration purposes.", "url": "https://example-news.com/article1", "published_at": "2023-07-01T12:00:00Z" }, { "title": f"Example News Headline About {topic if topic else 'Current Events'} 2", "source": "Sample News Network", "description": f"Another mock news article description about {topic if topic else 'current events'}.", "url": "https://sample-news.com/article2", "published_at": "2023-07-01T11:30:00Z" } ] if not mock_news: return f"No news found{' for topic: ' + topic if topic else ''}." formatted_news = [] for news in mock_news[:num_results]: formatted_news.append(f""" Title: {news.get('title', 'No title')} Source: {news.get('source', 'Unknown source')} Published: {news.get('published_at', 'Unknown date')} Description: {news.get('description', 'No description')} URL: {news.get('url', 'No URL')} """) result = "\n---\n".join(formatted_news) result += "\n\nNote: This is simulated news data. Set the NEWS_API_KEY environment variable for real data." return result # With a real API key url_path = f"top-headlines?apiKey={api_key}" if topic: url_path += f"&q={quote_plus(topic)}" url = f"https://newsapi.org/v2/{url_path}" async with httpx.AsyncClient() as client: response = await client.get(url, timeout=10.0) response.raise_for_status() # Process the actual response here news_data = response.json() if not news_data or "articles" not in news_data or not news_data["articles"]: return f"No news found{' for topic: ' + topic if topic else ''}." formatted_news = [] for article in news_data["articles"][:num_results]: formatted_news.append(f""" Title: {article.get('title', 'No title')} Source: {article.get('source', {}).get('name', 'Unknown source')} Published: {article.get('publishedAt', 'Unknown date')} Description: {article.get('description', 'No description')} URL: {article.get('url', 'No URL')} """) return "\n---\n".join(formatted_news) except httpx.TimeoutException: error_msg = "News request timed out. Please try again later." if ctx: ctx.error(error_msg) return error_msg except httpx.HTTPStatusError as e: error_msg = f"News API error: HTTP {e.response.status_code}" if ctx: ctx.error(error_msg) return error_msg except Exception as e: error_msg = f"Error getting news information: {str(e)}" if ctx: ctx.error(error_msg) return error_msg @mcp.tool() async def create_chart(data_points: str, chart_type: str = "bar", title: str = "Chart", ctx: Context = None) -> Image: """Create a chart from data points. Args: data_points: Comma-separated numbers or JSON array of numbers chart_type: Type of chart ('bar', 'line', 'pie', 'scatter') title: Title for the chart """ if ctx: ctx.info(f"Creating {chart_type} chart with title: {title}") # Parse data try: if data_points.startswith('[') and data_points.endswith(']'): import json data = json.loads(data_points) else: data = [float(x.strip()) for x in data_points.split(',')] if not data: error_msg = "Error: No valid data points provided." if ctx: ctx.error(error_msg) return error_msg except json.JSONDecodeError: error_msg = "Error: Invalid JSON format. Use format [1,2,3] or comma-separated values." if ctx: ctx.error(error_msg) return error_msg except ValueError: error_msg = "Error: Invalid number format. Ensure all values are numbers." if ctx: ctx.error(error_msg) return error_msg # Check chart type valid_chart_types = ['bar', 'line', 'pie', 'scatter'] if chart_type.lower() not in valid_chart_types: error_msg = f"Error: Invalid chart type. Use one of: {', '.join(valid_chart_types)}" if ctx: ctx.error(error_msg) return error_msg # Create chart try: plt.figure(figsize=(10, 6)) chart_type = chart_type.lower() if chart_type == 'bar': plt.bar(range(len(data)), data) plt.xticks(range(len(data)), [f"Item {i+1}" for i in range(len(data))]) elif chart_type == 'line': plt.plot(data, marker='o') plt.xticks(range(len(data)), [f"Point {i+1}" for i in range(len(data))]) elif chart_type == 'pie': if all(x >= 0 for x in data): plt.pie(data, autopct='%1.1f%%', labels=[f"Slice {i+1}" for i in range(len(data))]) else: error_msg = "Error: Pie charts require non-negative values." if ctx: ctx.error(error_msg) return error_msg elif chart_type == 'scatter': # For scatter, we either need pairs or we can just plot against index if len(data) % 2 == 0: # If even number, try to use pairs x = data[0::2] y = data[1::2] plt.scatter(x, y) else: plt.scatter(range(len(data)), data) plt.title(title) plt.grid(True, linestyle='--', alpha=0.7) # Save to bytes buf = io.BytesIO() plt.savefig(buf, format='png', dpi=100, bbox_inches='tight') buf.seek(0) # Convert to MCP Image return Image(data=buf.getvalue(), format="png") except Exception as e: error_msg = f"Error creating chart: {str(e)}" if ctx: ctx.error(error_msg) return error_msg