Zerodha MCP Integration

by aptro
Verified
from typing import Any, Dict, List, Optional, AsyncIterator import os import httpx from contextlib import asynccontextmanager from dataclasses import dataclass from threading import Thread import webbrowser import uvicorn from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from mcp.server.fastmcp import FastMCP, Context from kiteconnect import KiteConnect from dotenv import load_dotenv # Load environment variables from .env file load_dotenv() # Constants KITE_API_KEY = os.getenv("KITE_API_KEY") KITE_API_SECRET = os.getenv("KITE_API_SECRET") REDIRECT_URL = "http://127.0.0.1:5000/zerodha/auth/redirect" TOKEN_STORE_PATH = os.path.join(os.path.dirname(__file__), ".tokens") # Initialize FastAPI app for handling redirect app = FastAPI(title="Zerodha Login Handler") # Global variables for auth flow _request_token: Optional[str] = None @dataclass class ZerodhaContext: """Typed context for the Zerodha MCP server""" kite: KiteConnect api_key: str api_secret: str app: FastAPI server_thread: Optional[Thread] = None def load_stored_token() -> Optional[str]: """Load stored access token if it exists""" try: if os.path.exists(TOKEN_STORE_PATH): with open(TOKEN_STORE_PATH, "r") as f: return f.read().strip() except Exception: return None return None def save_access_token(token: str): """Save access token to file""" try: with open(TOKEN_STORE_PATH, "w") as f: f.write(token) except Exception as e: print(f"Warning: Could not save access token: {e}") def start_server(): """Start the FastAPI server""" print("Starting FastAPI server on http://127.0.0.1:5000") uvicorn.run(app, host="127.0.0.1", port=5000, log_level="error") @asynccontextmanager async def zerodha_lifespan(server: FastMCP) -> AsyncIterator[ZerodhaContext]: """Manage application lifecycle for Zerodha integration""" # Initialize Kite Connect print("Initializing Zerodha context...") if not KITE_API_KEY or not KITE_API_SECRET: raise ValueError( "KITE_API_KEY and KITE_API_SECRET must be set in the .env file" ) kite = KiteConnect(api_key=KITE_API_KEY) # Try to load existing token stored_token = load_stored_token() if stored_token: try: kite.set_access_token(stored_token) # Verify token is still valid with a simple API call kite.margins() print("Successfully restored previous session") except Exception: print("Stored token is invalid, will wait for new login...") if os.path.exists(TOKEN_STORE_PATH): os.remove(TOKEN_STORE_PATH) # Create context ctx = ZerodhaContext( kite=kite, api_key=KITE_API_KEY, api_secret=KITE_API_SECRET, app=app, ) try: # Setup FastAPI endpoint for auth callback @app.get("/zerodha/auth/redirect") async def callback(request_token: str = None, status: str = None): """Handle the redirect from Zerodha login""" global _request_token if status != "success": print(f"Login failed with status: {status}") raise HTTPException( status_code=400, detail=f"Login failed with status: {status}" ) if not request_token: print("No request token received") raise HTTPException(status_code=400, detail="No request token received") try: # Generate session print("Generating session with request token") data = ctx.kite.generate_session( request_token, api_secret=ctx.api_secret ) access_token = data["access_token"] # Save and set the access token print("Saving and setting access token") save_access_token(access_token) ctx.kite.set_access_token(access_token) _request_token = request_token print("Login successful") return HTMLResponse( content=""" <html> <body style="font-family: Arial, sans-serif; display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; background-color: #f5f5f5;"> <div style="text-align: center; padding: 2rem; background-color: white; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1);"> <h1 style="color: #2ecc71;">Login Successful!</h1> <p>You can close this window now.</p> </div> </body> </html> """ ) except Exception as e: error_msg = f"Failed to generate session: {str(e)}" print(error_msg) raise HTTPException(status_code=500, detail=error_msg) # Yield the context to the tools yield ctx finally: # Cleanup on shutdown print("Shutting down Zerodha context...") # Additional cleanup could go here if needed # Initialize FastMCP server with lifespan and dependencies mcp = FastMCP( "zerodha", lifespan=zerodha_lifespan, dependencies=["kiteconnect", "fastapi", "uvicorn", "python-dotenv", "httpx"], ) @mcp.tool() def initiate_login(ctx: Context) -> Dict[str, Any]: """ Start the Zerodha login flow by opening the login URL in a browser and starting a local server to handle the redirect """ try: # Reset the request token global _request_token _request_token = None print("Initiating Zerodha login flow") # Get strongly typed context zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context # Start the local server in a separate thread if not already running if not zerodha_ctx.server_thread or not zerodha_ctx.server_thread.is_alive(): server_thread = Thread(target=start_server) server_thread.daemon = True server_thread.start() zerodha_ctx.server_thread = server_thread # Get the login URL login_url = zerodha_ctx.kite.login_url() print(f"Generated login URL: {login_url}") # Open the login URL in browser webbrowser.open(login_url) print("Opened login URL in browser") return { "message": "Login page opened in browser. Please complete the login process." } except Exception as e: error_msg = f"Error initiating login: {str(e)}" print(error_msg) return {"error": error_msg} @mcp.tool() def get_request_token(ctx: Context) -> Dict[str, Any]: """Get the current request token after login redirect""" if _request_token: return {"request_token": _request_token} return { "error": "No request token available. Please complete the login process first." } @mcp.tool() def get_holdings(ctx: Context) -> List[Dict[str, Any]]: """Get user's holdings/portfolio""" try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.holdings() except Exception as e: return {"error": str(e)} @mcp.tool() def get_positions(ctx: Context) -> Dict[str, Any]: """Get user's positions""" try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.positions() except Exception as e: return {"error": str(e)} @mcp.tool() def get_margins(ctx: Context) -> Dict[str, Any]: """Get account margins""" try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.margins() except Exception as e: return {"error": str(e)} @mcp.tool() def place_order( ctx: Context, tradingsymbol: str, exchange: str, transaction_type: str, quantity: int, product: str, order_type: str, price: Optional[float] = None, trigger_price: Optional[float] = None, ) -> Dict[str, Any]: """ Place an order on Zerodha Args: tradingsymbol: Trading symbol (e.g., 'INFY') exchange: Exchange (NSE, BSE, NFO, etc.) transaction_type: BUY or SELL quantity: Number of shares/units product: Product code (CNC, MIS, NRML) order_type: Order type (MARKET, LIMIT, SL, SL-M) price: Price for LIMIT orders trigger_price: Trigger price for SL orders """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.place_order( variety="regular", exchange=exchange, tradingsymbol=tradingsymbol, transaction_type=transaction_type, quantity=quantity, product=product, order_type=order_type, price=price, trigger_price=trigger_price, ) except Exception as e: return {"error": str(e)} @mcp.tool() def get_quote(ctx: Context, symbols: List[str]) -> Dict[str, Any]: """ Get quote for symbols Args: symbols: List of symbols (e.g., ['NSE:INFY', 'BSE:RELIANCE']) """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.quote(symbols) except Exception as e: return {"error": str(e)} @mcp.tool() def get_historical_data( ctx: Context, instrument_token: int, from_date: str, to_date: str, interval: str ) -> List[Dict[str, Any]]: """ Get historical data for an instrument Args: instrument_token: Instrument token from_date: From date (format: 2024-01-01) to_date: To date (format: 2024-03-13) interval: Candle interval (minute, day, 3minute, etc.) """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.historical_data( instrument_token=instrument_token, from_date=from_date, to_date=to_date, interval=interval, ) except Exception as e: return {"error": str(e)} @mcp.tool() def check_and_authenticate(ctx: Context) -> Dict[str, Any]: """ Check if Kite is authenticated and initiate authentication if needed. Returns the authentication status and any relevant messages. """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context # First try to load existing token stored_token = load_stored_token() if stored_token: try: zerodha_ctx.kite.set_access_token(stored_token) # Verify token is still valid with a simple API call zerodha_ctx.kite.margins() return { "status": "authenticated", "message": "Already authenticated with valid token", } except Exception: print("Stored token is invalid, will initiate new login...") if os.path.exists(TOKEN_STORE_PATH): os.remove(TOKEN_STORE_PATH) # If we reach here, we need to authenticate # Call the existing initiate_login function login_result = initiate_login(ctx) if "error" in login_result: return {"status": "error", "message": login_result["error"]} return {"status": "login_initiated", "message": login_result["message"]} except Exception as e: error_msg = f"Error checking/initiating authentication: {str(e)}" print(error_msg) return {"status": "error", "message": error_msg} # Mutual Fund Tools @mcp.tool() def get_mf_orders(ctx: Context) -> List[Dict[str, Any]]: """Get all mutual fund orders""" try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.mf_orders() except Exception as e: return {"error": str(e)} @mcp.tool() def place_mf_order( ctx: Context, tradingsymbol: str, transaction_type: str, amount: float, tag: Optional[str] = None, ) -> Dict[str, Any]: """ Place a mutual fund order Args: tradingsymbol: Trading symbol (e.g., 'INF090I01239') transaction_type: BUY or SELL amount: Amount to invest or redeem tag: Optional tag for the order """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.place_mf_order( tradingsymbol=tradingsymbol, transaction_type=transaction_type, amount=amount, tag=tag, ) except Exception as e: return {"error": str(e)} @mcp.tool() def cancel_mf_order(ctx: Context, order_id: str) -> Dict[str, Any]: """ Cancel a mutual fund order Args: order_id: Order ID to cancel """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.cancel_mf_order(order_id=order_id) except Exception as e: return {"error": str(e)} @mcp.tool() def get_mf_instruments(ctx: Context) -> List[Dict[str, Any]]: """Get all available mutual fund instruments""" try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.mf_instruments() except Exception as e: return {"error": str(e)} @mcp.tool() def get_mf_holdings(ctx: Context) -> List[Dict[str, Any]]: """Get user's mutual fund holdings""" try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.mf_holdings() except Exception as e: return {"error": str(e)} @mcp.tool() def get_mf_sips(ctx: Context) -> List[Dict[str, Any]]: """Get all mutual fund SIPs""" try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.mf_sips() except Exception as e: return {"error": str(e)} @mcp.tool() def place_mf_sip( ctx: Context, tradingsymbol: str, amount: float, instalments: int, frequency: str, initial_amount: Optional[float] = None, instalment_day: Optional[int] = None, tag: Optional[str] = None, ) -> Dict[str, Any]: """ Place a mutual fund SIP (Systematic Investment Plan) Args: tradingsymbol: Trading symbol (e.g., 'INF090I01239') amount: Amount per instalment instalments: Number of instalments (minimum 6) frequency: weekly, monthly, or quarterly initial_amount: Optional initial amount instalment_day: Optional day of month/week for instalment (1-31 for monthly, 1-7 for weekly) tag: Optional tag for the SIP """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.place_mf_sip( tradingsymbol=tradingsymbol, amount=amount, instalments=instalments, frequency=frequency, initial_amount=initial_amount, instalment_day=instalment_day, tag=tag, ) except Exception as e: return {"error": str(e)} @mcp.tool() def modify_mf_sip( ctx: Context, sip_id: str, amount: Optional[float] = None, frequency: Optional[str] = None, instalments: Optional[int] = None, instalment_day: Optional[int] = None, status: Optional[str] = None, ) -> Dict[str, Any]: """ Modify a mutual fund SIP Args: sip_id: SIP ID to modify amount: New amount per instalment frequency: New frequency (weekly, monthly, or quarterly) instalments: New number of instalments instalment_day: New day of month/week for instalment status: SIP status (active or paused) """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.modify_mf_sip( sip_id=sip_id, amount=amount, frequency=frequency, instalments=instalments, instalment_day=instalment_day, status=status, ) except Exception as e: return {"error": str(e)} @mcp.tool() def cancel_mf_sip(ctx: Context, sip_id: str) -> Dict[str, Any]: """ Cancel a mutual fund SIP Args: sip_id: SIP ID to cancel """ try: zerodha_ctx: ZerodhaContext = ctx.request_context.lifespan_context return zerodha_ctx.kite.cancel_mf_sip(sip_id=sip_id) except Exception as e: return {"error": str(e)} if __name__ == "__main__": # We don't need the main function anymore since MCP handles the lifecycle print("Starting Zerodha MCP server...") mcp.run()