server_web.py•9.4 kB
#!/usr/bin/env python3
"""
MCP Server for Energy Information Administration (EIA) API - Web Version
Provides access to comprehensive energy data from the EIA via HTTP endpoints.
"""
import asyncio
import json
import os
from typing import Any, Optional
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
# EIA API Configuration
EIA_API_KEY = os.getenv("EIA_API_KEY", "BPlYS1rJRCA0cyGaS6c2JP0rl3wDgUv0QPZQf1o0")
EIA_BASE_URL = "https://api.eia.gov/v2"
# Initialize FastAPI
app = FastAPI(
title="MCP Energy Server",
description="Energy Information Administration (EIA) data API",
version="0.1.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class EIARequest(BaseModel):
frequency: str = "monthly"
data_type: Optional[str] = None
facets: Optional[dict] = None
async def fetch_eia_data(route: str, params: dict[str, Any]) -> dict[str, Any]:
"""
Fetch data from EIA API
Args:
route: API endpoint route
params: Query parameters
Returns:
API response data
"""
params["api_key"] = EIA_API_KEY
url = f"{EIA_BASE_URL}/{route}"
async with httpx.AsyncClient(timeout=30.0) as client:
try:
response = await client.get(url, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
return {
"error": str(e),
"status_code": getattr(e.response, "status_code", None) if hasattr(e, "response") else None
}
@app.get("/")
async def root():
"""Root endpoint with API information"""
return {
"name": "MCP Energy Server",
"version": "0.1.0",
"description": "Energy Information Administration (EIA) data API",
"endpoints": {
"/electricity": "Electricity generation, consumption, sales, revenue",
"/natural-gas": "Natural gas production, consumption, prices, storage",
"/petroleum": "Petroleum production, consumption, imports, exports, prices",
"/coal": "Coal production, consumption, prices",
"/renewable": "Renewable energy generation (solar, wind, hydro, etc.)",
"/co2-emissions": "CO2 emissions by state and sector",
"/steo": "Short-Term Energy Outlook forecasts",
"/international": "International energy statistics"
},
"docs": "/docs"
}
@app.get("/health")
async def health_check():
"""Health check endpoint for monitoring"""
return {"status": "healthy", "service": "mcp-energy"}
@app.post("/electricity")
async def get_electricity_data(request: EIARequest):
"""Get electricity generation, consumption, sales, or revenue data"""
params = {
"frequency": request.frequency,
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
data_type = request.data_type or "generation"
route = f"electricity/{data_type}/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.post("/natural-gas")
async def get_natural_gas_data(request: EIARequest):
"""Get natural gas production, consumption, prices, or storage data"""
params = {
"frequency": request.frequency,
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
data_type = request.data_type or "consumption"
route = f"natural-gas/{data_type}/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.post("/petroleum")
async def get_petroleum_data(request: EIARequest):
"""Get petroleum production, consumption, imports, exports, prices, or stocks data"""
params = {
"frequency": request.frequency,
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
data_type = request.data_type or "production"
route = f"petroleum/{data_type}/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.post("/coal")
async def get_coal_data(request: EIARequest):
"""Get coal production, consumption, or price data"""
params = {
"frequency": request.frequency,
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
data_type = request.data_type or "production"
route = f"coal/{data_type}/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.post("/renewable")
async def get_renewable_data(request: EIARequest):
"""Get renewable energy generation data (solar, wind, hydro, etc.)"""
params = {
"frequency": request.frequency,
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
route = "electricity/generation/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.post("/co2-emissions")
async def get_co2_emissions_data(request: EIARequest):
"""Get CO2 emissions data by state and sector"""
params = {
"frequency": request.frequency,
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
route = "co2-emissions/co2-emissions-aggregates/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.post("/steo")
async def get_steo_data(request: EIARequest):
"""Get Short-Term Energy Outlook (STEO) forecasts"""
params = {
"frequency": "monthly",
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
route = "steo/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
@app.post("/international")
async def get_international_data(request: EIARequest):
"""Get international energy statistics"""
params = {
"frequency": "annual",
"data": ["value"],
"sort[0][column]": "period",
"sort[0][direction]": "desc",
"length": 100
}
if request.facets:
for key, value in request.facets.items():
if isinstance(value, list):
params[f"facets[{key}][]"] = value
else:
params[f"facets[{key}]"] = value
route = "international/data"
result = await fetch_eia_data(route, params)
if "error" in result:
raise HTTPException(status_code=500, detail=result["error"])
return result
if __name__ == "__main__":
port = int(os.getenv("PORT", 10000))
uvicorn.run(app, host="0.0.0.0", port=port)