stock.py•1.15 kB
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
import os
load_dotenv()
ALPHA_VANTAGE_API_KEY = os.getenv('ALPHA_VANTAGE_API_KEY')
if not ALPHA_VANTAGE_API_KEY:
raise ValueError("ALPHA_VANTAGE_API_KEY environment variable is not set")
mcp = FastMCP('stock')
AV_BASE_URL = 'https://www.alphavantage.co'
USER_AGENT = 'stock-app/1.0'
async def make_av_request(url: str) -> dict[str, Any] | None:
"""Make a request to the Alpha Vantage with proper error handling"""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json",
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
# print(response)
return response.json()
except Exception as e:
return None
@mcp.tool()
async def get_top_gainers_losers():
"""Get Top Gainers and Losers in the US stock market,
Args:
None
"""
url = f'{AV_BASE_URL}/query?functionTOP_GAINERS_LOSERS&apikey={ALPHA_VANTAGE_API_KEY}'