"""MCP Server with aggregated Finnhub + Yahoo Finance + Futu providers."""
import asyncio
import json
import os
from typing import Any
from pydantic import ValidationError
from mcp.server import Server
from mcp.types import Tool, TextContent
from .schemas import (
SearchSymbolInput,
GetQuoteInput,
GetHistoryInput,
GetFundamentalsInput,
GetFinancialStatementsInput,
ResolveSymbolInput,
SubscribeQuoteInput,
GetOrderBookInput,
GetTickerInput,
ErrorDetail,
)
from .providers.finnhub_provider import FinnhubProvider
from .providers.yahoo_provider import YahooProvider
from .providers.tushare_provider import TushareProvider
from .providers.futu_provider import FutuProvider
from .providers.symbols import detect_market
from .cache import get_cache
from .util import ProviderError, get_env_int
# Create server instance
app = Server("stock-data-mcp")
# Global providers
finnhub_provider: FinnhubProvider = None # type: ignore
yahoo_provider: YahooProvider = None # type: ignore
tushare_provider: TushareProvider = None # type: ignore
futu_provider: FutuProvider = None # type: ignore
def init_providers():
"""Initialize data providers."""
global finnhub_provider, yahoo_provider, tushare_provider, futu_provider
timeout = get_env_int("STOCK_MCP_TIMEOUT_SECONDS", 10)
try:
finnhub_provider = FinnhubProvider(timeout=timeout)
except ProviderError as e:
print(f"Warning: Finnhub initialization failed: {e.message}", flush=True)
finnhub_provider = None # type: ignore
yahoo_provider = YahooProvider()
try:
tushare_provider = TushareProvider(timeout=timeout)
except ProviderError as e:
print(f"Warning: Tushare initialization failed: {e.message}", flush=True)
tushare_provider = None # type: ignore
try:
# Futu is optional, requires local OpenD
futu_provider = FutuProvider()
except Exception as e:
# FutuProvider init (ensure_context) might fail if OpenD is not up
print(f"Warning: Futu initialization failed: {str(e)}", flush=True)
futu_provider = None
def create_error_response(error: ErrorDetail) -> list[TextContent]:
"""Create a formatted error response."""
error_obj = {
"error": {
"code": error.code,
"message": error.message,
"details": error.details
}
}
return [TextContent(type="text", text=json.dumps(error_obj, indent=2))]
def create_success_response(data: Any) -> list[TextContent]:
"""Create a formatted success response."""
def serialize(obj):
if hasattr(obj, 'model_dump'):
return obj.model_dump()
elif isinstance(obj, list):
return [serialize(item) for item in obj]
elif isinstance(obj, dict):
return {k: serialize(v) for k, v in obj.items()}
return obj
data = serialize(data)
return [TextContent(type="text", text=json.dumps(data, indent=2))]
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="search_symbol",
description=(
"Search for stock symbols across markets. "
"Tries Finnhub first, falls back to Yahoo if needed. "
"Supports US (AAPL), Hong Kong (0700.HK), Shanghai (600519.SS), Shenzhen (000001.SZ)."
),
inputSchema=SearchSymbolInput.model_json_schema()
),
Tool(
name="resolve_symbol",
description=(
"Resolve and normalize stock symbols between formats. "
"Useful for converting between Yahoo (0700.HK) and Futu (HK.00700) formats. "
"Returns canonical market and symbol information."
),
inputSchema=ResolveSymbolInput.model_json_schema()
),
Tool(
name="get_quote",
description=(
"Get real-time quote data for a stock. "
"Strategies: 'auto' (Futu for HK/CN, Yahoo fallback), 'futu', 'yahoo', 'finnhub'. "
"Data is cached for 5 seconds."
),
inputSchema=GetQuoteInput.model_json_schema()
),
Tool(
name="get_history",
description=(
"Get historical OHLCV data for a stock. "
"Supports daily, weekly, and monthly intervals. "
"Strategies: 'auto' (Futu for detailed HK/CN K-line, Yahoo fallback)."
),
inputSchema=GetHistoryInput.model_json_schema()
),
Tool(
name="get_fundamentals",
description="Get fundamental metrics from Finnhub or Tushare (CN).",
inputSchema=GetFundamentalsInput.model_json_schema()
),
Tool(
name="get_financial_statements",
description="Get financial statements from Finnhub or Tushare (CN).",
inputSchema=GetFinancialStatementsInput.model_json_schema()
),
Tool(
name="subscribe_quote",
description="Subscribe/Unsubscribe to Futu real-time quote updates.",
inputSchema=SubscribeQuoteInput.model_json_schema()
),
Tool(
name="get_order_book",
description="Get order book (market depth) from Futu.",
inputSchema=GetOrderBookInput.model_json_schema()
),
Tool(
name="get_ticker",
description="Get real-time tick data from Futu.",
inputSchema=GetTickerInput.model_json_schema()
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls with routing logic."""
cache = get_cache()
try:
if name == "search_symbol":
try:
input_data = SearchSymbolInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details={"validation_errors": e.errors()}))
# Check cache
cached = cache.get_search(input_data.query, input_data.limit)
if cached:
return create_success_response({"results": cached})
# Try Finnhub -> Yahoo
results = []
if finnhub_provider:
try:
results = finnhub_provider.search_symbol(input_data.query)
except:
pass
if not results:
try:
results = yahoo_provider.search_symbol(input_data.query)
except ProviderError as e:
return create_error_response(ErrorDetail(code=e.code, message=e.message, details=e.details))
results = results[:input_data.limit]
cache.set_search(input_data.query, input_data.limit, results)
return create_success_response({"results": results})
elif name == "resolve_symbol":
try:
input_data = ResolveSymbolInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details={"validation_errors": e.errors()}))
if not futu_provider:
# If no Futu, we can still parse strings if we implement simple logic here or use a helper
# But for now, let's rely on FutuProvider's logic if available, or basic check
return create_error_response(ErrorDetail(code="PROVIDER_ERROR", message="Futu provider not available for symbol resolution"))
result = futu_provider.resolve_symbol(input_data.symbol)
return create_success_response(result)
elif name == "get_quote":
try:
input_data = GetQuoteInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details={"validation_errors": e.errors()}))
cached = cache.get_quote(input_data.symbol)
if cached:
return create_success_response(cached)
quote = None
prefer = input_data.prefer or "auto"
# Strategy:
# - futu: Try Futu only.
# - yahoo: Try Yahoo -> Finnhub.
# - auto:
# - HK -> Futu first (if available) -> Yahoo fallback.
# - CN/US -> Yahoo first. (Futu often restricted for these markets).
# Simple Auto Logic
sources_to_try = []
if prefer == "futu":
sources_to_try = ["futu", "yahoo"]
elif prefer == "tushare":
sources_to_try = ["tushare", "yahoo"]
elif prefer == "yahoo":
sources_to_try = ["yahoo"]
elif prefer == "auto":
# Routing Strategy:
# - HK -> Futu -> Yahoo fallback
# - CN (A-shares) -> Tushare -> Yahoo fallback
# - US/Global -> Yahoo
market = detect_market(input_data.symbol)
if market == "HK":
sources_to_try = ["futu", "yahoo"]
elif market in ["CN_SH", "CN_SZ"]:
sources_to_try = ["tushare", "yahoo"]
else:
sources_to_try = ["yahoo"]
last_error = None
for source in sources_to_try:
try:
if source == "futu" and futu_provider:
quote = futu_provider.get_quote(input_data.symbol)
break
elif source == "tushare" and tushare_provider:
quote = tushare_provider.get_quote(input_data.symbol)
break
elif source == "yahoo" and yahoo_provider:
quote = yahoo_provider.get_quote(input_data.symbol)
break
elif source == "finnhub" and finnhub_provider:
quote = finnhub_provider.get_quote(input_data.symbol)
break
except ProviderError as e:
last_error = e
continue
except Exception:
continue
if quote:
cache.set_quote(input_data.symbol, quote)
return create_success_response(quote)
if last_error:
return create_error_response(ErrorDetail(code=last_error.code, message=last_error.message, details=last_error.details))
return create_error_response(ErrorDetail(code="NO_DATA", message=f"Could not fetch quote for {input_data.symbol} from any source"))
elif name == "get_history":
try:
input_data = GetHistoryInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details={"validation_errors": e.errors()}))
cached = cache.get_history(input_data.symbol, input_data.start_date, input_data.end_date, input_data.interval)
if cached:
return create_success_response({"symbol": input_data.symbol, "records": cached})
records = None
market = detect_market(input_data.symbol)
use_futu = futu_provider is not None
use_tushare = tushare_provider is not None
# Routing:
# - auto works by trying preferred provider first.
# - Only prefer Futu for HK.
# - Prefer Tushare for CN.
preferred_source = "yahoo"
if market == "HK":
preferred_source = "futu"
elif market in ["CN_SH", "CN_SZ"]:
preferred_source = "tushare"
# Try preferred first
if preferred_source == "futu" and use_futu:
try:
records = futu_provider.get_history(
input_data.symbol,
input_data.start_date,
input_data.end_date,
input_data.interval,
input_data.adjust
)
except:
pass # Fallback to Yahoo
elif preferred_source == "tushare" and use_tushare:
try:
records = tushare_provider.get_history(
input_data.symbol,
input_data.start_date,
input_data.end_date,
input_data.interval,
input_data.adjust
)
except:
pass # Fallback to Yahoo
if not records:
# Default/Fallback to Yahoo
try:
records = yahoo_provider.get_history(
input_data.symbol,
input_data.start_date,
input_data.end_date,
input_data.interval
)
except ProviderError as e:
return create_error_response(ErrorDetail(code=e.code, message=e.message, details=e.details))
cache.set_history(input_data.symbol, input_data.start_date, input_data.end_date, input_data.interval, records)
return create_success_response({"symbol": input_data.symbol, "records": records})
elif name == "subscribe_quote":
if not futu_provider:
return create_error_response(ErrorDetail(code="PROVIDER_ERROR", message="Futu provider not available"))
try:
input_data = SubscribeQuoteInput(**arguments)
result = futu_provider.subscribe(input_data.symbols, input_data.sub_types, input_data.enable)
return create_success_response(result)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details=e.errors()))
elif name == "get_order_book":
if not futu_provider:
return create_error_response(ErrorDetail(code="PROVIDER_ERROR", message="Futu provider not available"))
try:
input_data = GetOrderBookInput(**arguments)
result = futu_provider.get_order_book(input_data.symbol, input_data.num)
return create_success_response(result)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details=e.errors()))
elif name == "get_ticker":
if not futu_provider:
return create_error_response(ErrorDetail(code="PROVIDER_ERROR", message="Futu provider not available"))
try:
input_data = GetTickerInput(**arguments)
result = futu_provider.get_ticker(input_data.symbol, input_data.num)
return create_success_response(result)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details=e.errors()))
elif name in ["get_fundamentals", "get_financial_statements"]:
# Existing logic for fundamentals (no changes needed as Futu doesn't do fundamentals per plan)
# ... copy existing fundamentals logic ...
# For brevity rewriting basic flow:
if name == "get_fundamentals":
try:
input_data = GetFundamentalsInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details=e.errors()))
market = detect_market(input_data.symbol)
# Check cache
cached = cache.get_fundamentals(input_data.symbol, input_data.period)
if cached: return create_success_response(cached)
if market in ["CN_SH", "CN_SZ"]:
if not tushare_provider:
return create_error_response(ErrorDetail(code="MISSING_API_KEY", message="Tushare token required"))
res = tushare_provider.get_fundamentals(input_data.symbol, input_data.period)
else:
if not finnhub_provider:
return create_error_response(ErrorDetail(code="MISSING_API_KEY", message="Finnhub key required"))
res = finnhub_provider.get_fundamentals(input_data.symbol, input_data.period)
cache.set_fundamentals(input_data.symbol, input_data.period, res)
return create_success_response(res)
elif name == "get_financial_statements":
try:
input_data = GetFinancialStatementsInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message="Invalid input", details=e.errors()))
market = detect_market(input_data.symbol)
cached = cache.get_statements(input_data.symbol, input_data.statement, input_data.period)
if cached: return create_success_response(cached)
if market in ["CN_SH", "CN_SZ"]:
if not tushare_provider:
return create_error_response(ErrorDetail(code="MISSING_API_KEY", message="Tushare token required"))
res = tushare_provider.get_financial_statements(input_data.symbol, input_data.statement, input_data.period)
else:
if not finnhub_provider:
return create_error_response(ErrorDetail(code="MISSING_API_KEY", message="Finnhub key required"))
res = finnhub_provider.get_financial_statements(input_data.symbol, input_data.statement, input_data.period)
cache.set_statements(input_data.symbol, input_data.statement, input_data.period, res)
return create_success_response(res)
else:
return create_error_response(ErrorDetail(code="INVALID_ARGUMENT", message=f"Unknown tool: {name}"))
except ProviderError as e:
return create_error_response(ErrorDetail(code=e.code, message=e.message, details=e.details))
except Exception as e:
return create_error_response(ErrorDetail(code="PROVIDER_ERROR", message=f"Internal error: {str(e)}"))
async def main():
"""Run the MCP server."""
# Load environment variables
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# Initialize providers
init_providers()
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import json
import os
from typing import Any
from pydantic import ValidationError
from mcp.server import Server
from mcp.types import Tool, TextContent
from .schemas import (
SearchSymbolInput,
GetQuoteInput,
GetHistoryInput,
GetFundamentalsInput,
GetFinancialStatementsInput,
ErrorDetail,
)
from .providers.finnhub_provider import FinnhubProvider
from .providers.yahoo_provider import YahooProvider
from .providers.tushare_provider import TushareProvider
from .providers.symbols import detect_market
from .cache import get_cache
from .util import ProviderError, get_env_int
# Create server instance
app = Server("stock-data-mcp")
# Global providers
finnhub_provider: FinnhubProvider = None # type: ignore
yahoo_provider: YahooProvider = None # type: ignore
tushare_provider: TushareProvider = None # type: ignore
def init_providers():
"""Initialize data providers."""
global finnhub_provider, yahoo_provider, tushare_provider
timeout = get_env_int("STOCK_MCP_TIMEOUT_SECONDS", 10)
try:
finnhub_provider = FinnhubProvider(timeout=timeout)
except ProviderError as e:
print(f"Warning: Finnhub initialization failed: {e.message}", flush=True)
finnhub_provider = None # type: ignore
yahoo_provider = YahooProvider()
try:
tushare_provider = TushareProvider(timeout=timeout)
except ProviderError as e:
print(f"Warning: Tushare initialization failed: {e.message}", flush=True)
tushare_provider = None # type: ignore
def create_error_response(error: ErrorDetail) -> list[TextContent]:
"""Create a formatted error response."""
error_obj = {
"error": {
"code": error.code,
"message": error.message,
"details": error.details
}
}
return [TextContent(type="text", text=json.dumps(error_obj, indent=2))]
def create_success_response(data: Any) -> list[TextContent]:
"""Create a formatted success response."""
def serialize(obj):
if hasattr(obj, 'model_dump'):
return obj.model_dump()
elif isinstance(obj, list):
return [serialize(item) for item in obj]
elif isinstance(obj, dict):
return {k: serialize(v) for k, v in obj.items()}
return obj
data = serialize(data)
return [TextContent(type="text", text=json.dumps(data, indent=2))]
@app.list_tools()
async def list_tools() -> list[Tool]:
"""List available tools."""
return [
Tool(
name="search_symbol",
description=(
"Search for stock symbols across markets. "
"Tries Finnhub first, falls back to Yahoo if needed. "
"Supports US (AAPL), Hong Kong (0700.HK), Shanghai (600519.SS), Shenzhen (000001.SZ)."
),
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query for stock symbols"
},
"limit": {
"type": "integer",
"description": "Maximum number of results (1-50)",
"default": 10,
"minimum": 1,
"maximum": 50
},
"market_hint": {
"type": "string",
"description": "Market hint (US, HK, CN)",
"enum": ["US", "HK", "CN"]
}
},
"required": ["query"]
}
),
Tool(
name="get_quote",
description=(
"Get real-time quote data for a stock. "
"Defaults to Yahoo Finance (faster, no API limits). "
"Falls back to Finnhub if Yahoo fails. "
"Data is cached for 5 seconds."
),
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock symbol (e.g., AAPL, 0700.HK, 600519.SS)"
},
"prefer": {
"type": "string",
"description": "Preferred data source",
"enum": ["yahoo", "finnhub"]
}
},
"required": ["symbol"]
}
),
Tool(
name="get_history",
description=(
"Get historical OHLCV data for a stock. "
"Uses Yahoo Finance (most reliable for historical data). "
"Supports daily, weekly, and monthly intervals."
),
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock symbol"
},
"start_date": {
"type": "string",
"description": "Start date in YYYY-MM-DD format",
"pattern": "^\\d{4}-\\d{2}-\\d{2}$"
},
"end_date": {
"type": "string",
"description": "End date in YYYY-MM-DD format",
"pattern": "^\\d{4}-\\d{2}-\\d{2}$"
},
"interval": {
"type": "string",
"description": "Data interval",
"enum": ["1d", "1wk", "1mo"],
"default": "1d"
}
},
"required": ["symbol", "start_date", "end_date"]
}
),
Tool(
name="get_fundamentals",
description=(
"Get fundamental metrics (ROE, margins, ratios, etc.) from Finnhub. "
"Includes valuation, profitability, and financial health metrics. "
"Data is cached for 1 hour. "
"Requires FINNHUB_API_KEY."
),
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock symbol"
},
"period": {
"type": "string",
"description": "Period type",
"enum": ["ttm", "annual"],
"default": "ttm"
}
},
"required": ["symbol"]
}
),
Tool(
name="get_financial_statements",
description=(
"Get financial statements (income/balance/cashflow) from Finnhub. "
"Returns historical periods with key metrics. "
"Data is cached for 24 hours. "
"Requires FINNHUB_API_KEY."
),
inputSchema={
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "Stock symbol"
},
"statement": {
"type": "string",
"description": "Statement type",
"enum": ["income", "balance", "cashflow"]
},
"period": {
"type": "string",
"description": "Period frequency",
"enum": ["annual", "quarterly"],
"default": "annual"
}
},
"required": ["symbol", "statement"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[TextContent]:
"""Handle tool calls with routing logic."""
cache = get_cache()
try:
if name == "search_symbol":
# Validate input
try:
input_data = SearchSymbolInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(
code="INVALID_ARGUMENT",
message="Invalid input parameters",
details={"validation_errors": e.errors()}
))
# Check cache
cached = cache.get_search(input_data.query, input_data.limit)
if cached:
return create_success_response({"results": cached})
# Try Finnhub first
results = []
if finnhub_provider:
try:
results = finnhub_provider.search_symbol(input_data.query)
except ProviderError:
pass # Fall through to Yahoo
# Fallback to Yahoo if Finnhub failed or returned no results
if not results:
try:
results = yahoo_provider.search_symbol(input_data.query)
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
# Limit results and cache
results = results[:input_data.limit]
cache.set_search(input_data.query, input_data.limit, results)
return create_success_response({"results": results})
elif name == "get_quote":
# Validate input
try:
input_data = GetQuoteInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(
code="INVALID_ARGUMENT",
message="Invalid input parameters",
details={"validation_errors": e.errors()}
))
# Check cache
cached = cache.get_quote(input_data.symbol)
if cached:
return create_success_response(cached)
# Routing: Yahoo first by default
quote = None
prefer = input_data.prefer or "yahoo"
if prefer == "yahoo":
try:
quote = yahoo_provider.get_quote(input_data.symbol)
except ProviderError:
# Fallback to Finnhub
if finnhub_provider:
try:
quote = finnhub_provider.get_quote(input_data.symbol)
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
else:
raise
else:
# Prefer Finnhub
if finnhub_provider:
try:
quote = finnhub_provider.get_quote(input_data.symbol)
except ProviderError:
# Fallback to Yahoo
quote = yahoo_provider.get_quote(input_data.symbol)
else:
quote = yahoo_provider.get_quote(input_data.symbol)
# Cache and return
cache.set_quote(input_data.symbol, quote)
return create_success_response(quote)
elif name == "get_history":
# Validate input
try:
input_data = GetHistoryInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(
code="INVALID_ARGUMENT",
message="Invalid input parameters",
details={"validation_errors": e.errors()}
))
# Check cache
cached = cache.get_history(
input_data.symbol,
input_data.start_date,
input_data.end_date,
input_data.interval
)
if cached:
return create_success_response({"symbol": input_data.symbol, "records": cached})
# Use Yahoo (most reliable for history)
try:
records = yahoo_provider.get_history(
input_data.symbol,
input_data.start_date,
input_data.end_date,
input_data.interval
)
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
# Cache and return
cache.set_history(
input_data.symbol,
input_data.start_date,
input_data.end_date,
input_data.interval,
records
)
return create_success_response({"symbol": input_data.symbol, "records": records})
elif name == "get_fundamentals":
# Validate input
try:
input_data = GetFundamentalsInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(
code="INVALID_ARGUMENT",
message="Invalid input parameters",
details={"validation_errors": e.errors()}
))
# Detect market
market = detect_market(input_data.symbol)
# Route to Tushare for CN markets
if market in ["CN_SH", "CN_SZ"]:
if not tushare_provider:
return create_error_response(ErrorDetail(
code="MISSING_API_KEY",
message="Tushare token required for CN fundamentals. Set TUSHARE_TOKEN environment variable."
))
# Check cache
cached = cache.get_fundamentals(input_data.symbol, input_data.period)
if cached:
return create_success_response(cached)
# Get from Tushare
try:
fundamentals = tushare_provider.get_fundamentals(input_data.symbol, input_data.period)
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
# Cache and return
cache.set_fundamentals(input_data.symbol, input_data.period, fundamentals)
return create_success_response(fundamentals)
else:
# Non-CN markets: use Finnhub
if not finnhub_provider:
return create_error_response(ErrorDetail(
code="MISSING_API_KEY",
message="Finnhub API key required for fundamentals. Set FINNHUB_API_KEY environment variable."
))
# Check cache
cached = cache.get_fundamentals(input_data.symbol, input_data.period)
if cached:
return create_success_response(cached)
# Get from Finnhub
try:
fundamentals = finnhub_provider.get_fundamentals(input_data.symbol, input_data.period)
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
# Cache and return
cache.set_fundamentals(input_data.symbol, input_data.period, fundamentals)
return create_success_response(fundamentals)
elif name == "get_financial_statements":
# Validate input
try:
input_data = GetFinancialStatementsInput(**arguments)
except ValidationError as e:
return create_error_response(ErrorDetail(
code="INVALID_ARGUMENT",
message="Invalid input parameters",
details={"validation_errors": e.errors()}
))
# Detect market
market = detect_market(input_data.symbol)
# Route to Tushare for CN markets
if market in ["CN_SH", "CN_SZ"]:
if not tushare_provider:
return create_error_response(ErrorDetail(
code="MISSING_API_KEY",
message="Tushare token required for CN statements. Set TUSHARE_TOKEN environment variable."
))
# Check cache
cached = cache.get_statements(
input_data.symbol,
input_data.statement,
input_data.period
)
if cached:
return create_success_response(cached)
# Get from Tushare
try:
statements = tushare_provider.get_financial_statements(
input_data.symbol,
input_data.statement,
input_data.period
)
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
# Cache and return
cache.set_statements(
input_data.symbol,
input_data.statement,
input_data.period,
statements
)
return create_success_response(statements)
else:
# Non-CN markets: use Finnhub
if not finnhub_provider:
return create_error_response(ErrorDetail(
code="MISSING_API_KEY",
message="Finnhub API key required for financial statements. Set FINNHUB_API_KEY environment variable."
))
# Check cache
cached = cache.get_statements(
input_data.symbol,
input_data.statement,
input_data.period
)
if cached:
return create_success_response(cached)
# Get from Finnhub
try:
statements = finnhub_provider.get_financial_statements(
input_data.symbol,
input_data.statement,
input_data.period
)
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
# Cache and return
cache.set_statements(
input_data.symbol,
input_data.statement,
input_data.period,
statements
)
return create_success_response(statements)
else:
return create_error_response(ErrorDetail(
code="INVALID_ARGUMENT",
message=f"Unknown tool: {name}"
))
except ProviderError as e:
return create_error_response(ErrorDetail(
code=e.code,
message=e.message,
details=e.details
))
except Exception as e:
return create_error_response(ErrorDetail(
code="PROVIDER_ERROR",
message=f"Internal error: {str(e)}"
))
async def main():
"""Run the MCP server."""
# Load environment variables from .env file if present
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
# Initialize providers
init_providers()
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())