mcp-server.py•7.25 kB
from typing import Any
import httpx
from mcp.server.fastmcp import FastMCP
from dotenv import load_dotenv
import os
import sys
load_dotenv()
ALPHA_VANTAGE_API_KEY = os.getenv('ALPHA_VANTAGE_API_KEY')
if not ALPHA_VANTAGE_API_KEY:
raise ValueError("ALPHA_VANTAGE_API_KEY environment variable is not set")
NEWS_API_KEY = os.getenv('NEWS_API_KEY')
if not NEWS_API_KEY:
raise ValueError("NEWS_API_KEY environment variable is not set")
mcp = FastMCP('mcpServer')
NWS_BASE_URL = 'https://api.weather.gov'
AV_BASE_URL = 'https://www.alphavantage.co'
NEWS_BASE_URL = 'https://newsapi.org/'
USER_AGENT = 'mcp-app/1.0'
async def make_nws_request(url: str) -> dict[str, Any] | None:
"""Make a request to the NWS API with proper error handling"""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/geo+json",
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception as e:
return None
async def make_av_request(url: str) -> dict[str, Any] | None:
"""Make a request to the Alpha Vantage with proper error handling"""
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json",
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(f'{url}&apikey={ALPHA_VANTAGE_API_KEY}', headers=headers, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception as e:
return None
def format_alert(feature: dict) -> str:
"""Format an alert into a readable string"""
props = feature["properties"]
return f"""
Event: {props.get("event", "Unknown")}
Area: {props.get("areaDesc", "Unknown")}
Severity: {props.get("severity", "Unknown")}
Description: {props.get("description", "No additional details")}
Instruction: {props.get("instruction", "No specific instructions provided")}
"""
@mcp.tool()
async def get_weather_alerts(state: str) -> str:
"""Get weather alerts for a US state.
Args:
state: Two-letter US state code (e.g., CA, NY)
"""
url = f"{NWS_BASE_URL}/alerts/active/area/{state}"
data = await make_nws_request(url)
if not data or "features" not in data:
return "Unable to fetch weather alerts or no alerts found."
if not data["features"]:
return "No active weather alerts for this state."
alerts = [format_alert(feature) for feature in data["features"]]
return "\n---\n".join(alerts)
@mcp.tool()
async def get_forecast(latitude: float, longitude: float) -> str:
"""Get the weather forecast for a specific location.
Args:
latitude: Latitude of the location
longitude: Longitude of the location
"""
points_url = f"{NWS_BASE_URL}/points/{latitude},{longitude}"
points_data = await make_nws_request(points_url)
if not points_data:
return "Unable to fetch forecast or invalid location."
forecast_url = points_data["properties"]["forecast"]
forecast_data = await make_nws_request(forecast_url)
if not forecast_data:
return "Unable to fetch forecast or invalid location."
periods = forecast_data["properties"]["periods"]
forecasts = []
for period in periods[:5]:
forecast = f"""
{period["name"]}:
Temperature: {period["temperature"]}°{period["temperatureUnit"]}
Wind: {period["windSpeed"]} {period["windDirection"]}
Forecast: {period["detailedForecast"]}
"""
forecasts.append(forecast)
return "\n---\n".join(forecasts)
@mcp.tool()
async def get_top_gainers_losers():
"""Get the top gainers and losers in the US stock market
Args:
None
"""
url = f"{AV_BASE_URL}/query?function=TOP_GAINERS_LOSERS"
gainers_losers_data = await make_av_request(url)
if not gainers_losers_data:
return "Unable to fetch stock market data."
top_gainers = []
for gainer in gainers_losers_data.get('top_gainers', []):
topGainer = f"""
{gainer['ticker']}:
Price: {gainer['price']}
Change Amount: {gainer['change_amount']}
Change Percentage: {gainer['change_percentage']}
Volume: {gainer['volume']}
"""
top_gainers.append(topGainer)
top_losers = []
for loser in gainers_losers_data.get('top_losers', []):
topLoser = f"""
{loser['ticker']}:
Price: {loser['price']}
Change Amount: {loser['change_amount']}
Change Percentage: {loser['change_percentage']}
Volume: {loser['volume']}
"""
top_losers.append(topLoser)
return f"""Top Gainers: {"\n---\n".join(top_gainers)}
Top Losers: {"\n---\n".join(top_losers)}"""
@mcp.tool()
async def get_news_and_sentiments(ticker: str):
"""Get the news and sentiments of a particular stock
Args:
ticker: Stock ticker/symbol (eg. AAPL)
"""
# print('Inside Function', flush=True)
url = f"{AV_BASE_URL}/query?function=NEWS_SENTIMENT&tickers={ticker}"
sentiment_data = await make_av_request(url)
# print(sentiment_data, flush=True, file=sys.stderr)
if not sentiment_data:
return "Unable to fetch stock sentiment data"
sentiments = []
for sentiment in sentiment_data["feed"][:5]:
sentimentText = f"""
Title: {sentiment["title"]}
Time published: {sentiment["time_published"]}
Summary: {sentiment['summary']}
Sentiment: {sentiment['overall_sentiment_label']}
URL: {sentiment['url']}
Source: {sentiment['source']}
"""
sentiments.append(sentimentText)
return "\n---\n".join(sentiments)
@mcp.tool()
async def get_news_today(keyword: str, date: str):
"""Get the news headlines for a specific keyword and date
Args:
keyword: Keyword to search for in the news
date: Date in YYYY-MM-DD format
"""
url = f"{NEWS_BASE_URL}/v2/everything?q={keyword}&from={date}&to={date}&sortBy=popularity&apiKey={os.getenv('NEWS_API_KEY')}"
headers = {
"User-Agent": USER_AGENT,
"Accept": "application/json",
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, headers=headers, timeout=30.0)
response.raise_for_status()
data = response.json()
if not data["articles"]:
return "No news articles found for this keyword and date."
articles = []
for article in data["articles"][:5]:
articleText = f"""
Title: {article['title']}
Description: {article['description']}
URL: {article['url']}
Source: {article['source']['name']}
Published At: {article['publishedAt']}
"""
articles.append(articleText)
return "\n---\n".join(articles)
except Exception as e:
return f"Error fetching news: {str(e)}"
if __name__ == "__main__":
# Initialize and run the server
mcp.run(transport='stdio')