weather_server.py•5.23 kB
"""
MCP Weather Server - FastMCP ile Hava Durumu Servisi
Open-Meteo API kullanarak hava durumu bilgisi sağlar.
"""
import httpx
from mcp.server.fastmcp import FastMCP
# FastMCP sunucusu oluştur
mcp = FastMCP(name="Weather Service")
# WMO hava kodu açıklamaları (Türkçe)
WEATHER_CODES = {
0: "Açık",
1: "Çoğunlukla açık",
2: "Parçalı bulutlu",
3: "Bulutlu",
45: "Sisli",
48: "Kırağılı sis",
51: "Hafif çisenti",
53: "Orta çisenti",
55: "Yoğun çisenti",
56: "Hafif dondurucu çisenti",
57: "Yoğun dondurucu çisenti",
61: "Hafif yağmur",
63: "Orta yağmur",
65: "Şiddetli yağmur",
66: "Hafif dondurucu yağmur",
67: "Şiddetli dondurucu yağmur",
71: "Hafif kar",
73: "Orta kar",
75: "Yoğun kar",
77: "Kar taneleri",
80: "Hafif sağanak",
81: "Orta sağanak",
82: "Şiddetli sağanak",
85: "Hafif kar sağanağı",
86: "Yoğun kar sağanağı",
95: "Gök gürültülü fırtına",
96: "Hafif dolu ile fırtına",
99: "Şiddetli dolu ile fırtına",
}
async def get_coordinates(city: str) -> tuple[float, float, str]:
"""
Şehir adından enlem/boylam koordinatlarını al.
Args:
city: Şehir adı
Returns:
(latitude, longitude, display_name) tuple
"""
url = "https://geocoding-api.open-meteo.com/v1/search"
params = {"name": city, "count": 1, "language": "tr"}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
data = response.json()
if "results" not in data or len(data["results"]) == 0:
raise ValueError(f"Şehir bulunamadı: {city}")
result = data["results"][0]
display_name = result.get("name", city)
country = result.get("country", "")
if country:
display_name = f"{display_name}, {country}"
return result["latitude"], result["longitude"], display_name
@mcp.tool()
async def get_weather(city: str) -> str:
"""
Belirtilen şehir için anlık hava durumunu döner.
Args:
city: Şehir adı (örn: Hamburg, Istanbul, Berlin, Ankara)
Returns:
Anlık hava durumu bilgisi (sıcaklık, durum, rüzgar)
"""
try:
lat, lon, display_name = await get_coordinates(city)
except ValueError as e:
return str(e)
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"current_weather": True,
"timezone": "auto"
}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
data = response.json()
if "current_weather" not in data:
return f"Hava durumu verisi alınamadı: {city}"
weather = data["current_weather"]
temp = weather["temperature"]
windspeed = weather["windspeed"]
weather_code = weather.get("weathercode", 0)
condition = WEATHER_CODES.get(weather_code, "Bilinmiyor")
return f"{display_name}: {temp}°C, {condition}, Rüzgar: {windspeed} km/s"
@mcp.tool()
async def get_forecast(city: str, days: int = 3) -> str:
"""
Belirtilen şehir için hava durumu tahminini döner.
Args:
city: Şehir adı (örn: Hamburg, Istanbul, Berlin, Ankara)
days: Kaç günlük tahmin isteniyor (1-7 arası, varsayılan 3)
Returns:
Günlük hava durumu tahmini (tarih, min/max sıcaklık, durum)
"""
try:
lat, lon, display_name = await get_coordinates(city)
except ValueError as e:
return str(e)
# Gün sayısını 1-7 arasında sınırla
days = min(max(days, 1), 7)
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"daily": "temperature_2m_max,temperature_2m_min,weathercode",
"timezone": "auto",
"forecast_days": days
}
async with httpx.AsyncClient() as client:
response = await client.get(url, params=params)
data = response.json()
if "daily" not in data:
return f"Tahmin verisi alınamadı: {city}"
daily = data["daily"]
result_lines = [f"{display_name} için {days} günlük tahmin:"]
for i in range(days):
date = daily["time"][i]
max_temp = daily["temperature_2m_max"][i]
min_temp = daily["temperature_2m_min"][i]
weather_code = daily["weathercode"][i] if daily["weathercode"] else 0
condition = WEATHER_CODES.get(weather_code, "")
line = f" {date}: {min_temp}°C - {max_temp}°C"
if condition:
line += f" ({condition})"
result_lines.append(line)
return "\n".join(result_lines)
if __name__ == "__main__":
import sys
if "--http" in sys.argv:
# HTTP mode - OpenAI Responses API için uzak erişim
# Varsayılan: http://localhost:8000/mcp
print("🌐 HTTP Server başlatılıyor: http://localhost:8000/mcp")
print(" Durdurmak için Ctrl+C")
mcp.run(transport="streamable-http")
else:
# Stdio mode - lokal MCP client için
mcp.run()