get_storage_operations
Analyze pumped storage operations to identify energy arbitrage opportunities and assess storage efficiency by tracking daily pumping consumption and turbining activities.
Instructions
Get pumped storage operations for a day.
Shows pumping consumption (storing energy) and turbining (releasing energy) to identify arbitrage opportunities and storage efficiency.
Args: date: Date in YYYY-MM-DD format
Returns: JSON string with storage operations and efficiency metrics.
Examples: Get storage operations for Oct 8: >>> await get_storage_operations("2025-10-08")
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| date | Yes |
Implementation Reference
- The primary handler implementation for the 'get_storage_operations' MCP tool. This async function is decorated with @mcp.tool(), serving as both the execution logic and registration. It fetches hourly pumped hydro pumping consumption and turbining generation data from REE APIs, computes per-hour operations (storing/releasing/idle), daily totals, net balance, and round-trip efficiency. Returns formatted JSON response.async def get_storage_operations(date: str) -> str: """Get pumped storage operations for a day. Shows pumping consumption (storing energy) and turbining (releasing energy) to identify arbitrage opportunities and storage efficiency. Args: date: Date in YYYY-MM-DD format Returns: JSON string with storage operations and efficiency metrics. Examples: Get storage operations for Oct 8: >>> await get_storage_operations("2025-10-08") """ try: start_date, end_date = DateTimeHelper.build_day_range(date) async with ToolExecutor() as executor: use_case = executor.create_get_indicator_data_use_case() # Get pumping consumption pumping_request = GetIndicatorDataRequest( indicator_id=IndicatorIDs.PUMPING_CONSUMPTION.id, start_date=start_date, end_date=end_date, time_granularity="hour", ) pumping_response = await use_case.execute(pumping_request) pumping_data = pumping_response.model_dump() # Get turbining turbining_request = GetIndicatorDataRequest( indicator_id=IndicatorIDs.PUMPED_TURBINING.id, start_date=start_date, end_date=end_date, time_granularity="hour", ) turbining_response = await use_case.execute(turbining_request) turbining_data = turbining_response.model_dump() # Combine data pumping_values = pumping_data.get("values", []) turbining_values = turbining_data.get("values", []) operations = [] total_pumping_mwh = 0.0 total_turbining_mwh = 0.0 for pumping, turbining in zip(pumping_values, turbining_values, strict=False): pump_mw = pumping["value"] turb_mw = turbining["value"] net_mw = turb_mw - pump_mw operations.append( { "datetime": pumping["datetime"], "pumping_mw": pump_mw, "turbining_mw": turb_mw, "net_storage_mw": round(net_mw, 2), "operation": ( "storing" if pump_mw > turb_mw else "releasing" if turb_mw > pump_mw else "idle" ), } ) total_pumping_mwh += pump_mw total_turbining_mwh += turb_mw # Calculate efficiency (typical pumped storage is 70-85%) efficiency_pct = ( (total_turbining_mwh / total_pumping_mwh * 100) if total_pumping_mwh > 0 else 0 ) result = { "date": date, "operations": operations, "summary": { "total_energy_stored_mwh": round(total_pumping_mwh, 2), "total_energy_released_mwh": round(total_turbining_mwh, 2), "net_energy_balance_mwh": round(total_turbining_mwh - total_pumping_mwh, 2), "efficiency_percentage": round(efficiency_pct, 2), "efficiency_assessment": ("normal" if 70 <= efficiency_pct <= 85 else "check_data"), }, } return ResponseFormatter.success(result) except Exception as e: return ResponseFormatter.unexpected_error(e, context="Error getting storage operations")