get_peak_analysis
Analyze daily maximum and minimum electricity demand patterns and calculate load factors for specified date ranges to identify peak usage trends.
Instructions
Get peak demand analysis over a period.
Analyzes daily maximum and minimum demand to identify patterns and calculate load factors.
Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format
Returns: JSON string with peak demand analysis.
Examples: Get peak analysis for a week: >>> await get_peak_analysis("2025-10-01", "2025-10-07")
Get peak analysis for a month:
>>> await get_peak_analysis("2025-10-01", "2025-10-31")
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| end_date | Yes | ||
| start_date | Yes |
Implementation Reference
- Implementation of the get_peak_analysis MCP tool handler. This async function fetches daily maximum and minimum demand data from REE indicators, processes it to compute daily peaks, valleys, average demand, load factors, and period-wide statistics. Registered via @mcp.tool() decorator. Input schema inferred from parameters: start_date (str), end_date (str). Returns formatted JSON analysis.@mcp.tool() async def get_peak_analysis(start_date: str, end_date: str) -> str: """Get peak demand analysis over a period. Analyzes daily maximum and minimum demand to identify patterns and calculate load factors. Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format Returns: JSON string with peak demand analysis. Examples: Get peak analysis for a week: >>> await get_peak_analysis("2025-10-01", "2025-10-07") Get peak analysis for a month: >>> await get_peak_analysis("2025-10-01", "2025-10-31") """ try: async with ToolExecutor() as executor: use_case = executor.create_get_indicator_data_use_case() # Get max demand max_request = GetIndicatorDataRequest( indicator_id=IndicatorIDs.MAX_DAILY_DEMAND.id, start_date=start_date, end_date=end_date, time_granularity="day", ) max_response = await use_case.execute(max_request) max_data = max_response.model_dump() # Get min demand min_request = GetIndicatorDataRequest( indicator_id=IndicatorIDs.MIN_DAILY_DEMAND.id, start_date=start_date, end_date=end_date, time_granularity="day", ) min_response = await use_case.execute(min_request) min_data = min_response.model_dump() # Combine data max_values = max_data.get("values", []) min_values = min_data.get("values", []) daily_analysis = [] peak_demands = [] load_factors = [] for max_val, min_val in zip(max_values, min_values, strict=False): max_mw = max_val["value"] min_mw = min_val["value"] avg_mw = (max_mw + min_mw) / 2 peak_to_valley = max_mw - min_mw load_factor = (avg_mw / max_mw * 100) if max_mw > 0 else 0 daily_analysis.append( { "date": max_val["datetime"][:10], "peak_demand_mw": max_mw, "minimum_demand_mw": min_mw, "average_demand_mw": round(avg_mw, 2), "peak_to_valley_mw": round(peak_to_valley, 2), "load_factor_percentage": round(load_factor, 2), } ) peak_demands.append(max_mw) load_factors.append(load_factor) # Calculate period statistics period_stats = {} if peak_demands: period_stats = { "highest_peak_mw": max(peak_demands), "lowest_peak_mw": min(peak_demands), "average_peak_mw": round(sum(peak_demands) / len(peak_demands), 2), "average_load_factor_percentage": round(sum(load_factors) / len(load_factors), 2), "interpretation": { "high_load_factor": "> 70% (efficient, stable demand)", "medium_load_factor": "50-70% (moderate variability)", "low_load_factor": "< 50% (high variability, inefficient)", }, } result = { "period": {"start": start_date, "end": end_date}, "daily_analysis": daily_analysis, "period_statistics": period_stats, } return ResponseFormatter.success(result) except Exception as e: return ResponseFormatter.unexpected_error(e, context="Error analyzing peaks")