Skip to main content
Glama

Financial MCP Server

by osamadev
server.py8.98 kB
from mcp.server.fastmcp import FastMCP from services.fetcher import fetch_web_data from services.summarizer import summarize_articles from services.context_builder import build_final_prompt from services.intent_parser import extract_financial_entities from services.portfolio import load_portfolio, add_ticker, remove_ticker from services.alerts import check_alerts, check_trading_opportunities, send_trading_alert from services.market_summary import get_market_wrap import logging import json import sys from dotenv import load_dotenv # Configure logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stderr), logging.FileHandler('financial_mcp.log') ] ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() mcp = FastMCP("Financial-MCP-Server") @mcp.tool() async def financial_context(query: str) -> dict: try: if not query or not isinstance(query, str): logger.error(f"Invalid query input: {query}") return { "error": "Invalid query input", "query": str(query), "tickers": [], "keywords": [], "context": [], "final_prompt": "" } logger.debug(f"Processing financial context query: {query}") try: parsed = extract_financial_entities(query) logger.debug(f"Parsed entities: {parsed}") except Exception as e: logger.error(f"Error parsing entities: {str(e)}", exc_info=True) return { "error": f"Failed to parse query: {str(e)}", "query": query, "tickers": [], "keywords": [], "context": [], "final_prompt": "" } try: articles = await fetch_web_data(query, tickers=parsed["tickers"]) logger.debug(f"Fetched {len(articles)} articles") except Exception as e: logger.error(f"Error fetching articles: {str(e)}", exc_info=True) return { "error": f"Failed to fetch articles: {str(e)}", "query": query, "tickers": parsed["tickers"], "keywords": parsed["keywords"], "context": [], "final_prompt": f"Error fetching market data: {str(e)}" } if not articles: logger.warning("No articles found") return { "query": query, "tickers": parsed["tickers"], "keywords": parsed["keywords"], "context": [], "final_prompt": "No recent market data found for the query." } try: summaries = await summarize_articles(articles) logger.debug(f"Generated summaries: {json.dumps(summaries, indent=2)}") except Exception as e: logger.error(f"Error summarizing articles: {str(e)}", exc_info=True) return { "error": f"Failed to summarize articles: {str(e)}", "query": query, "tickers": parsed["tickers"], "keywords": parsed["keywords"], "context": articles, # Return raw articles if summarization fails "final_prompt": f"Error summarizing market data: {str(e)}" } final_prompt = build_final_prompt(query, summaries) response = { "query": query, "tickers": parsed["tickers"], "keywords": parsed["keywords"], "context": summaries, "final_prompt": final_prompt } logger.debug(f"Returning response: {json.dumps(response, indent=2)}") return response except Exception as e: logger.error(f"Unhandled error in financial_context: {str(e)}", exc_info=True) return { "error": str(e), "query": query if isinstance(query, str) else str(query), "tickers": [], "keywords": [], "context": [], "final_prompt": f"Unhandled error: {str(e)}" } @mcp.tool() def market_summary() -> dict: return get_market_wrap() @mcp.tool() def get_portfolio() -> dict: logger.debug("Loading portfolio data from resource endpoint") return load_portfolio() @mcp.resource("financial://market-summary") def market_summary() -> dict: return get_market_wrap() @mcp.resource("portfolio://data") def get_portfolio() -> dict: logger.debug("Loading portfolio data from resource endpoint") return load_portfolio() # Cache invalidation helper def invalidate_portfolio_cache(): logger.debug("Invalidating portfolio cache") if hasattr(mcp, '_resource_cache'): mcp._resource_cache.pop('portfolio://data', None) @mcp.tool() def add_stock(ticker: str) -> dict: try: logger.debug(f"Adding stock {ticker}") result = add_ticker(ticker) logger.debug(f"Add stock result: {result}") # Invalidate the portfolio cache invalidate_portfolio_cache() # Verify the change was saved current = load_portfolio() logger.debug(f"Current portfolio after add: {current}") return current except Exception as e: logger.error(f"Error in add_stock: {str(e)}") return {"error": str(e), "tickers": []} @mcp.tool() def remove_stock(ticker: str) -> dict: try: logger.debug(f"Removing stock {ticker}") result = remove_ticker(ticker) logger.debug(f"Remove stock result: {result}") # Invalidate the portfolio cache invalidate_portfolio_cache() # Verify the change was saved current = load_portfolio() logger.debug(f"Current portfolio after remove: {current}") return current except Exception as e: logger.error(f"Error in remove_stock: {str(e)}") return {"error": str(e), "tickers": []} @mcp.tool() async def portfolio_alerts(random_string: str) -> dict: """ Check alerts for stocks. Can handle both all stocks and specific tickers. Args: random_string: Can be either "all" for all stocks, or a specific ticker symbol Returns: dict: Dictionary containing alerts """ try: # If a specific ticker is requested, check trading opportunities if random_string and random_string.upper() != "ALL": opportunities = await check_trading_opportunities(random_string) return {"alerts": opportunities} # Otherwise get alerts for all stocks alerts = await check_alerts(None) return {"alerts": alerts} except Exception as e: logger.error(f"Error in portfolio_alerts: {str(e)}") return {"alerts": [f"⚠️ Error: {str(e)}"]} @mcp.tool() async def check_stock_alerts(ticker: str) -> dict: """ Check alerts for a specific stock. Args: ticker: The stock ticker to check alerts for Returns: dict: Dictionary containing alerts for the specified stock """ try: if not ticker: return {"alerts": ["⚠️ No ticker provided"]} # Convert to list with single ticker ticker_list = [ticker.strip().upper()] alerts = await check_alerts(ticker_list) # Filter alerts to only show those for the requested ticker filtered_alerts = [alert for alert in alerts if ticker.upper() in alert] return {"alerts": filtered_alerts} except Exception as e: logger.error(f"Error in check_stock_alerts: {str(e)}") return {"alerts": [f"⚠️ Error checking {ticker}: {str(e)}"]} @mcp.tool() async def single_stock_alert(ticker: str) -> dict: """ Check alerts for a specific stock only. Args: ticker: The stock ticker to check alerts for Returns: dict: Dictionary containing alerts for the specified stock """ try: if not ticker: return {"alerts": ["⚠️ No ticker provided"]} alerts = await check_alerts([ticker.strip().upper()]) return {"alerts": alerts} except Exception as e: logger.error(f"Error in single_stock_alert: {str(e)}") return {"alerts": [f"⚠️ Error: {str(e)}"]} if __name__ == "__main__": logger.debug("Starting Financial-MCP-Server...") try: logger.info("Initializing MCP server with stdio transport") mcp.run(transport="stdio") logger.debug("MCP server started successfully") except json.JSONDecodeError as je: logger.error(f"JSON decode error: {str(je)}", exc_info=True) raise except Exception as e: logger.error(f"Failed to start MCP server: {str(e)}", exc_info=True) raise

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/osamadev/financial_mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server