StockFlow MCP Server

import logging import asyncio import yfinance as yf from mcp.server import Server from mcp.types import Tool, TextContent from mcp.server.stdio import stdio_server import json import traceback import pandas as pd import datetime from functools import wraps import time from typing import Optional, Dict, Any, List import numpy as np class StockflowJSONEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, pd.Timestamp): return obj.isoformat() if isinstance(obj, pd.Period): return str(obj) if isinstance(obj, datetime.date): # Add this line return obj.isoformat() # Add this line if pd.isna(obj): return None try: return super().default(obj) except TypeError: return str(obj) def convert_df_timestamps(df): """Convert DataFrame timestamps in column names to ISO format strings""" df = df.copy() df.columns = [col.strftime('%Y-%m-%d') if isinstance(col, pd.Timestamp) else col for col in df.columns] return df.to_dict('records') # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler("stockflow_v2.log"), logging.StreamHandler() ] ) logger = logging.getLogger("stockflow-server-v2") class StockflowError(Exception): pass class ValidationError(StockflowError): pass class APIError(StockflowError): pass def retry_on_error(max_retries: int = 3, delay: float = 1.0): def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_error = None for attempt in range(max_retries): try: return await func(*args, **kwargs) except Exception as e: last_error = e if attempt < max_retries - 1: wait_time = delay * (2 ** attempt) logger.warning(f"Attempt {attempt + 1} failed, retrying in {wait_time}s: {str(e)}") await asyncio.sleep(wait_time) else: logger.error(f"All {max_retries} attempts failed: {str(e)}\n{traceback.format_exc()}") raise last_error return wrapper return decorator # API response wrapper def format_response(data: Any, error: Optional[str] = None) -> List[TextContent]: response = { "success": error is None, "timestamp": time.time(), "data": data if error is None else None, "error": error } return [TextContent( type="text", text=json.dumps(response, indent=2, cls=StockflowJSONEncoder) )] app = Server("stockflow-server-v2") @app.list_tools() async def list_tools(): return [ Tool( name="get_stock_data_v2", description="Get comprehensive stock data including financials, analyst ratings, and calendar events", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string", "description": "Stock ticker symbol"}, "include_financials": {"type": "boolean", "description": "Include quarterly financials"}, "include_analysis": {"type": "boolean", "description": "Include analyst data"}, "include_calendar": {"type": "boolean", "description": "Include calendar events"} }, "required": ["symbol"] } ), Tool( name="get_historical_data_v2", description="Get historical price data with technical indicators", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string", "description": "Stock ticker symbol"}, "period": { "type": "string", "description": "Time period (1d, 5d, 1mo, 3mo, 6mo, 1y, 2y, 5y, 10y, ytd, max)" }, "interval": { "type": "string", "description": "Data interval (1m, 2m, 5m, 15m, 30m, 60m, 90m, 1h, 1d, 5d, 1wk, 1mo, 3mo)", "default": "1d" }, "prepost": { "type": "boolean", "description": "Include pre and post market data", "default": False } }, "required": ["symbol", "period"] } ), Tool( name="get_options_chain_v2", description="Get options chain data with advanced greeks and analysis", inputSchema={ "type": "object", "properties": { "symbol": {"type": "string", "description": "Stock ticker symbol"}, "expiration_date": {"type": "string", "description": "Options expiration date (YYYY-MM-DD)"}, "include_greeks": {"type": "boolean", "description": "Include options greeks"} }, "required": ["symbol"] } ) ] @app.call_tool() @retry_on_error(max_retries=3, delay=1.0) async def call_tool(name: str, arguments: dict): try: if name == "get_stock_data_v2": symbol = arguments['symbol'].strip().upper() include_financials = arguments.get('include_financials', False) include_analysis = arguments.get('include_analysis', False) include_calendar = arguments.get('include_calendar', False) ticker = yf.Ticker(symbol) # Basic info must be available info = ticker.info if not info: raise APIError(f"No data available for {symbol}") price = info.get('regularMarketPrice') or info.get('currentPrice') if not price: raise APIError(f"No price data available for {symbol}") response = { "basic_info": { "symbol": symbol, "name": info.get("longName", "N/A"), "sector": info.get("sector", "N/A"), "industry": info.get("industry", "N/A"), "description": info.get("longBusinessSummary", "N/A"), "website": info.get("website", "N/A"), "employees": info.get("fullTimeEmployees", 0) }, "market_data": { "price": info.get("currentPrice") or info.get("regularMarketPrice"), "currency": info.get("currency", "USD"), "market_cap": info.get("marketCap"), "float_shares": info.get("floatShares"), "regular_market_open": info.get("regularMarketOpen"), "regular_market_high": info.get("regularMarketDayHigh"), "regular_market_low": info.get("regularMarketDayLow"), "regular_market_volume": info.get("regularMarketVolume"), "regular_market_previous_close": info.get("regularMarketPreviousClose") }, "valuation_metrics": { "pe_ratio": info.get("forwardPE"), "peg_ratio": info.get("pegRatio"), "price_to_book": info.get("priceToBook"), "enterprise_value": info.get("enterpriseValue"), "enterprise_to_revenue": info.get("enterpriseToRevenue"), "enterprise_to_ebitda": info.get("enterpriseToEbitda") }, "trading_info": { "beta": info.get("beta"), "52w_high": info.get("fiftyTwoWeekHigh"), "52w_low": info.get("fiftyTwoWeekLow"), "50d_avg": info.get("fiftyDayAverage"), "200d_avg": info.get("twoHundredDayAverage"), "avg_volume_10d": info.get("averageVolume10days"), "avg_volume": info.get("averageVolume") } } if include_financials: try: financials = { "quarterly_income": convert_df_timestamps(ticker.quarterly_income_stmt), "quarterly_balance": convert_df_timestamps(ticker.quarterly_balance_sheet), "quarterly_cashflow": convert_df_timestamps(ticker.quarterly_cashflow) } response["financials"] = financials except Exception as e: logger.warning(f"Could not fetch financials for {symbol}: {str(e)}") if include_analysis: try: analysis = { "recommendations": ticker.recommendations.to_dict() if hasattr(ticker, 'recommendations') else None, "analyst_price_targets": ticker.analyst_price_targets.to_dict() if hasattr(ticker, 'analyst_price_targets') else None } response["analysis"] = analysis except Exception as e: logger.warning(f"Could not fetch analysis for {symbol}: {str(e)}") if include_calendar: try: calendar = ticker.calendar if calendar is not None: response["calendar"] = calendar.to_dict() except Exception as e: logger.warning(f"Could not fetch calendar for {symbol}: {str(e)}") return format_response(response) elif name == "get_historical_data_v2": symbol = arguments['symbol'].strip().upper() period = arguments['period'] interval = arguments.get('interval', '1d') prepost = arguments.get('prepost', False) valid_periods = ['1d', '5d', '1mo', '3mo', '6mo', '1y', '2y', '5y', '10y', 'ytd', 'max'] valid_intervals = ['1m', '2m', '5m', '15m', '30m', '60m', '90m', '1h', '1d', '5d', '1wk', '1mo', '3mo'] if period not in valid_periods: raise ValidationError(f"Invalid period. Must be one of: {', '.join(valid_periods)}") if interval not in valid_intervals: raise ValidationError(f"Invalid interval. Must be one of: {', '.join(valid_intervals)}") # Use download for single symbol (more efficient) history = yf.download( symbol, period=period, interval=interval, prepost=prepost, progress=False, multi_level_index=False ) if history.empty: raise APIError(f"No historical data available for {symbol}") # Create DataFrame copy for calculations data = history.copy() # Technical indicators data['SMA_20'] = data['Close'].rolling(window=20).mean() data['SMA_50'] = data['Close'].rolling(window=50).mean() data['EMA_12'] = data['Close'].ewm(span=12, adjust=False).mean() data['EMA_26'] = data['Close'].ewm(span=26, adjust=False).mean() # MACD data['MACD'] = data['EMA_12'] - data['EMA_26'] data['Signal_Line'] = data['MACD'].ewm(span=9, adjust=False).mean() # Fixed RSI calculation: delta = data['Close'].diff() gain = delta.where(delta > 0, 0) loss = -delta.where(delta < 0, 0) avg_gain = gain.rolling(window=14, min_periods=1).mean() avg_loss = loss.rolling(window=14, min_periods=1).mean() rs = (avg_gain / avg_loss).replace([np.inf, -np.inf], np.nan) data['RSI'] = 100 - (100 / (1 + rs)) # Convert index to string format for serialization data.index = data.index.strftime('%Y-%m-%d %H:%M:%S') # Convert to dict with records orientation history_dict = data.to_dict(orient='records') # Calculate summary statistics price_change = float(data['Close'].iloc[-1] - data['Close'].iloc[0]) price_change_pct = (price_change / float(data['Close'].iloc[0])) * 100 response = { "symbol": symbol, "period": period, "interval": interval, "prepost": prepost, "data": history_dict, "summary": { "start_date": data.index[0], "end_date": data.index[-1], "total_days": len(data), "price_change": price_change, "price_change_percent": price_change_pct, "volatility": float(data['Close'].pct_change().std() * (252 ** 0.5) * 100), "highest_price": float(data['High'].max()), "lowest_price": float(data['Low'].min()), "average_volume": float(data['Volume'].mean()), "current_rsi": float(data['RSI'].iloc[-1]) if pd.notnull(data['RSI'].iloc[-1]) else None, "current_macd": float(data['MACD'].iloc[-1]) if pd.notnull(data['MACD'].iloc[-1]) else None } } return format_response(response) elif name == "get_options_chain_v2": symbol = arguments['symbol'].strip().upper() include_greeks = arguments.get('include_greeks', False) ticker = yf.Ticker(symbol) # Get available expiration dates exp_dates = ticker.options if not exp_dates: raise APIError(f"No options data available for {symbol}") # If no expiration date provided, use the nearest one expiration_date = arguments.get('expiration_date') if expiration_date: # Validate date format try: exp_date = datetime.datetime.strptime(expiration_date, '%Y-%m-%d') if exp_date < datetime.datetime.now(): raise ValidationError("Expiration date must be in the future") if expiration_date not in exp_dates: raise ValidationError(f"No options available for date {expiration_date}. Available dates: {exp_dates}") except ValueError: raise ValidationError("Invalid date format. Use YYYY-MM-DD") else: expiration_date = exp_dates[0] # Use nearest expiration # Get the stock's current price for moneyness calculation current_price = ticker.info.get('regularMarketPrice') or ticker.info.get('currentPrice') if not current_price: raise APIError("Could not determine current stock price") try: options = ticker.option_chain(expiration_date) if not hasattr(options, 'calls') or not hasattr(options, 'puts'): raise APIError(f"Invalid options data for {symbol}") # Helper function to process option chain def process_chain(chain, option_type): chain['moneyness'] = chain['strike'] / current_price chain['bid_ask_spread'] = chain['ask'] - chain['bid'] chain['bid_ask_spread_pct'] = (chain['bid_ask_spread'] / ((chain['bid'] + chain['ask']) / 2)) * 100 # Convert to records and handle NaN values processed = chain.where(pd.notnull(chain), None).to_dict(orient="records") # Add summary metrics summary = { f"total_{option_type}": len(chain), f"itm_{option_type}": len(chain[chain['inTheMoney']]) if 'inTheMoney' in chain else 0, f"total_volume": int(chain['volume'].sum()), f"total_openInterest": int(chain['openInterest'].sum()), "highest_volume_strikes": chain.nlargest(3, 'volume')[['strike', 'volume', 'openInterest', 'impliedVolatility']].to_dict('records'), "highest_openInterest_strikes": chain.nlargest(3, 'openInterest')[['strike', 'volume', 'openInterest', 'impliedVolatility']].to_dict('records') } return processed, summary # Process calls and puts calls_processed, calls_summary = process_chain(options.calls, "calls") puts_processed, puts_summary = process_chain(options.puts, "puts") # Calculate overall options statistics total_volume = calls_summary['total_volume'] + puts_summary['total_volume'] put_call_ratio = puts_summary['total_volume'] / max(1, calls_summary['total_volume']) response = { "symbol": symbol, "underlying_price": current_price, "expiration_date": expiration_date, "days_to_expiration": (datetime.datetime.strptime(expiration_date, '%Y-%m-%d') - datetime.datetime.now()).days, "available_expiration_dates": exp_dates, "summary": { "total_volume": total_volume, "put_call_ratio": put_call_ratio, "total_calls": calls_summary['total_calls'], "total_puts": puts_summary['total_puts'], "itm_calls": calls_summary['itm_calls'], "itm_puts": puts_summary['itm_puts'], "calls_summary": calls_summary, "puts_summary": puts_summary }, "calls": calls_processed, "puts": puts_processed } return format_response(response) except Exception as e: raise APIError(f"Failed to get options data: {str(e)}") except ValidationError as e: logger.error(f"Validation error in {name}: {str(e)}") return format_response(None, f"Validation error: {str(e)}") except APIError as e: logger.error(f"API error in {name}: {str(e)}\n{traceback.format_exc()}") return format_response(None, f"API error: {str(e)}") except Exception as e: logger.error(f"Unexpected error in {name}: {str(e)}\n{traceback.format_exc()}") return format_response(None, f"Internal error: {str(e)}") async def main(): logger.info("Starting Stockflow server v2...") try: async with stdio_server() as (read_stream, write_stream): await app.run( read_stream, write_stream, app.create_initialization_options() ) except Exception as e: logger.error(f"Server error: {str(e)}\n{traceback.format_exc()}") raise if __name__ == "__main__": asyncio.run(main())