Skip to main content
Glama
tool_services.py25.4 kB
"""Service classes for complex tool operations. This module contains business logic services that handle multi-indicator data fetching and complex calculations, following SRP and DRY principles. """ from typing import Any from ..application.dtos import GetIndicatorDataRequest from ..application.use_cases import GetIndicatorDataUseCase from .indicator_config import IndicatorIDs, IndicatorMetadata class DataFetcher: """Service for fetching data from multiple indicators. This class eliminates the repeated pattern of fetching data from multiple indicators in parallel and handling errors individually. """ def __init__(self, use_case: GetIndicatorDataUseCase): """Initialize the data fetcher. Args: use_case: Use case for getting indicator data """ self.use_case = use_case async def fetch_single_indicator( self, indicator: IndicatorMetadata, start_date: str, end_date: str, time_granularity: str = "hour", ) -> dict[str, Any] | None: """Fetch data for a single indicator. Args: indicator: Indicator metadata start_date: Start datetime in ISO format end_date: End datetime in ISO format time_granularity: Time aggregation level Returns: Response dictionary or None if error occurred """ try: request = GetIndicatorDataRequest( indicator_id=indicator.id, start_date=start_date, end_date=end_date, time_granularity=time_granularity, ) response = await self.use_case.execute(request) return response.model_dump() except Exception: return None async def fetch_multiple_indicators( self, indicators: dict[str, IndicatorMetadata], start_date: str, end_date: str, time_granularity: str = "hour", ) -> dict[str, dict[str, Any]]: """Fetch data for multiple indicators. Args: indicators: Dictionary mapping names to indicator metadata start_date: Start datetime in ISO format end_date: End datetime in ISO format time_granularity: Time aggregation level Returns: Dictionary mapping names to response data (or error info) """ results: dict[str, dict[str, Any]] = {} for name, indicator in indicators.items(): response = await self.fetch_single_indicator( indicator, start_date, end_date, time_granularity ) if response is None: results[name] = {"error": f"Failed to fetch data for {indicator.name}"} else: results[name] = response return results async def fetch_value_at_time( self, indicator: IndicatorMetadata, start_date: str, end_date: str, time_granularity: str = "hour", ) -> float | None: """Fetch single value for an indicator at a specific time. Args: indicator: Indicator metadata start_date: Start datetime in ISO format end_date: End datetime in ISO format time_granularity: Time aggregation level Returns: First value from response or None if not available """ response = await self.fetch_single_indicator( indicator, start_date, end_date, time_granularity ) if response and "values" in response and response["values"]: return response["values"][0]["value"] # type: ignore[no-any-return] return None class GenerationMixService: """Service for generation mix analysis. Handles fetching and aggregating generation data from multiple sources. """ def __init__(self, data_fetcher: DataFetcher): """Initialize the generation mix service. Args: data_fetcher: Data fetcher instance """ self.data_fetcher = data_fetcher async def get_generation_mix(self, start_date: str, end_date: str) -> dict[str, Any]: """Get generation mix for a specific time. Args: start_date: Start datetime in ISO format end_date: End datetime in ISO format Returns: Generation mix data with sources and values """ sources = IndicatorIDs.get_generation_mix_sources() raw_data = await self.data_fetcher.fetch_multiple_indicators( sources, start_date, end_date, "hour" ) generation_mix: dict[str, Any] = { "datetime": start_date, "sources": {}, } for source_name, response_data in raw_data.items(): if "error" in response_data: generation_mix["sources"][source_name] = response_data else: values = response_data.get("values", []) if values: generation_mix["sources"][source_name] = { "value_mw": values[0]["value"], "unit": response_data["indicator"]["unit"], } else: generation_mix["sources"][source_name] = {"error": "No data available"} return generation_mix async def get_generation_mix_timeline( self, start_date: str, end_date: str, time_granularity: str = "hour" ) -> dict[str, Any]: """Get generation mix over a period. Args: start_date: Start datetime in ISO format end_date: End datetime in ISO format time_granularity: Time aggregation level Returns: Timeline data with generation mix at each point """ sources = IndicatorIDs.get_generation_mix_sources() raw_data = await self.data_fetcher.fetch_multiple_indicators( sources, start_date, end_date, time_granularity ) result: dict[str, Any] = { "period": { "start": start_date, "end": end_date, "granularity": time_granularity, }, "timeline": [], } # Build timeline by combining data points source_data: dict[str, list[dict[str, Any]]] = {} for source_name, response_data in raw_data.items(): if "error" not in response_data: source_data[source_name] = response_data.get("values", []) else: source_data[source_name] = [] if source_data: # Use first source to get timestamps first_source_values = next(iter(source_data.values())) for i, value_point in enumerate(first_source_values): timestamp = value_point["datetime"] generation_point: dict[str, Any] = { "datetime": timestamp, "sources": {}, "total_mw": 0.0, } for source_name, values in source_data.items(): if i < len(values): mw_value = values[i]["value"] generation_point["sources"][source_name] = mw_value generation_point["total_mw"] += mw_value else: generation_point["sources"][source_name] = 0.0 generation_point["total_mw"] = round(generation_point["total_mw"], 2) result["timeline"].append(generation_point) return result class RenewableAnalysisService: """Service for renewable energy analysis. Handles renewable generation aggregation and percentage calculations. """ def __init__(self, data_fetcher: DataFetcher): """Initialize the renewable analysis service. Args: data_fetcher: Data fetcher instance """ self.data_fetcher = data_fetcher async def get_renewable_summary(self, start_date: str, end_date: str) -> dict[str, Any]: """Get renewable generation summary. Args: start_date: Start datetime in ISO format end_date: End datetime in ISO format Returns: Renewable summary with breakdowns and percentages """ renewable_sources = IndicatorIDs.get_renewable_sources() raw_data = await self.data_fetcher.fetch_multiple_indicators( renewable_sources, start_date, end_date, "hour" ) result: dict[str, Any] = { "datetime": start_date, "renewable_sources": {}, "summary": {}, } total_renewable_mw = 0.0 variable_renewable_mw = 0.0 # Process renewable sources for source_name, response_data in raw_data.items(): if "error" in response_data: result["renewable_sources"][source_name] = response_data else: values = response_data.get("values", []) if values: value_mw = values[0]["value"] is_variable = source_name in [ "wind_national", "solar_pv_national", "solar_thermal_national", ] result["renewable_sources"][source_name] = { "value_mw": value_mw, "type": "variable" if is_variable else "synchronous", } total_renewable_mw += value_mw if is_variable: variable_renewable_mw += value_mw else: result["renewable_sources"][source_name] = {"error": "No data available"} # Get total demand for percentage calculation demand_mw = await self.data_fetcher.fetch_value_at_time( IndicatorIDs.REAL_DEMAND_NATIONAL, start_date, end_date, "hour" ) if demand_mw and demand_mw > 0: renewable_pct = (total_renewable_mw / demand_mw) * 100 variable_pct = (variable_renewable_mw / demand_mw) * 100 result["summary"] = { "total_renewable_mw": round(total_renewable_mw, 2), "variable_renewable_mw": round(variable_renewable_mw, 2), "synchronous_renewable_mw": round(total_renewable_mw - variable_renewable_mw, 2), "total_demand_mw": round(demand_mw, 2), "renewable_percentage": round(renewable_pct, 2), "variable_renewable_percentage": round(variable_pct, 2), } else: result["summary"] = {"error": "Could not calculate percentages: No demand data"} return result class GridStabilityService: """Service for grid stability analysis. Analyzes balance between synchronous generation (provides inertia) and variable renewables (no inertia). """ def __init__(self, data_fetcher: DataFetcher): """Initialize the grid stability service. Args: data_fetcher: Data fetcher instance """ self.data_fetcher = data_fetcher async def get_grid_stability(self, start_date: str, end_date: str) -> dict[str, Any]: """Get grid stability metrics. Args: start_date: Start datetime in ISO format end_date: End datetime in ISO format Returns: Grid stability analysis with synchronous/variable breakdown """ synchronous = IndicatorIDs.get_synchronous_sources() variable_renewables = IndicatorIDs.get_variable_renewable_sources() # Fetch all data sync_data = await self.data_fetcher.fetch_multiple_indicators( synchronous, start_date, end_date, "hour" ) var_data = await self.data_fetcher.fetch_multiple_indicators( variable_renewables, start_date, end_date, "hour" ) result: dict[str, Any] = { "datetime": start_date, "synchronous_generation": {}, "variable_renewables": {}, "analysis": {}, } # Process synchronous generation total_synchronous_mw = 0.0 for source_name, response_data in sync_data.items(): if "error" not in response_data: values = response_data.get("values", []) if values: value_mw = values[0]["value"] result["synchronous_generation"][source_name] = {"value_mw": value_mw} total_synchronous_mw += value_mw else: result["synchronous_generation"][source_name] = {"error": "No data"} else: result["synchronous_generation"][source_name] = response_data # Process variable renewables total_variable_mw = 0.0 for source_name, response_data in var_data.items(): if "error" not in response_data: values = response_data.get("values", []) if values: value_mw = values[0]["value"] result["variable_renewables"][source_name] = {"value_mw": value_mw} total_variable_mw += value_mw else: result["variable_renewables"][source_name] = {"error": "No data"} else: result["variable_renewables"][source_name] = response_data # Calculate analysis metrics demand_mw = await self.data_fetcher.fetch_value_at_time( IndicatorIDs.REAL_DEMAND_NATIONAL, start_date, end_date, "hour" ) if demand_mw and demand_mw > 0: synchronous_pct = (total_synchronous_mw / demand_mw) * 100 variable_pct = (total_variable_mw / demand_mw) * 100 inertia_ratio = ( (total_synchronous_mw / total_variable_mw) if total_variable_mw > 0 else float("inf") ) # Stability assessment if synchronous_pct >= 70: stability_level = "excellent" elif synchronous_pct >= 50: stability_level = "good" elif synchronous_pct >= 30: stability_level = "moderate" else: stability_level = "concerning" result["analysis"] = { "total_synchronous_mw": round(total_synchronous_mw, 2), "total_variable_renewable_mw": round(total_variable_mw, 2), "total_demand_mw": round(demand_mw, 2), "synchronous_percentage": round(synchronous_pct, 2), "variable_renewable_percentage": round(variable_pct, 2), "inertia_ratio": ( round(inertia_ratio, 2) if inertia_ratio != float("inf") else "infinite" ), "stability_level": stability_level, "interpretation": { "excellent": ">=70% synchronous (high inertia)", "good": "50-70% synchronous (adequate inertia)", "moderate": "30-50% synchronous (requires monitoring)", "concerning": "<30% synchronous (stability risk)", }, } else: result["analysis"] = {"error": "Could not calculate analysis: No demand data"} return result class InternationalExchangeService: """Service for international electricity exchange analysis.""" def __init__(self, data_fetcher: DataFetcher): """Initialize the international exchange service. Args: data_fetcher: Data fetcher instance """ self.data_fetcher = data_fetcher async def get_international_exchanges(self, start_date: str, end_date: str) -> dict[str, Any]: """Get international electricity exchanges. Args: start_date: Start datetime in ISO format end_date: End datetime in ISO format Returns: Exchange data by country with net balance """ exchanges = IndicatorIDs.get_international_exchanges() result: dict[str, Any] = { "datetime": start_date, "exchanges": {}, "totals": {"total_exports_mw": 0.0, "total_imports_mw": 0.0, "net_balance_mw": 0.0}, } for country, indicators in exchanges.items(): # Fetch export and import data export_mw = await self.data_fetcher.fetch_value_at_time( indicators["export"], start_date, end_date, "hour" ) import_mw = await self.data_fetcher.fetch_value_at_time( indicators["import"], start_date, end_date, "hour" ) if export_mw is not None and import_mw is not None: net_mw = import_mw - export_mw result["exchanges"][country] = { "export_mw": export_mw, "import_mw": import_mw, "net_balance_mw": net_mw, "net_flow": ( "import" if net_mw > 0 else "export" if net_mw < 0 else "balanced" ), } result["totals"]["total_exports_mw"] += export_mw result["totals"]["total_imports_mw"] += import_mw else: result["exchanges"][country] = {"error": "Could not fetch exchange data"} result["totals"]["net_balance_mw"] = ( result["totals"]["total_imports_mw"] - result["totals"]["total_exports_mw"] ) return result class DemandAnalysisService: """Service for demand pattern analysis. Handles daily demand statistics and volatility calculations. """ def __init__(self, data_fetcher: DataFetcher): """Initialize the demand analysis service. Args: data_fetcher: Data fetcher instance """ self.data_fetcher = data_fetcher async def get_daily_demand_statistics(self, start_date: str, end_date: str) -> dict[str, Any]: """Get daily demand statistics. Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format Returns: Daily demand statistics with max, min, and sum of generation """ # Fetch all three demand indicators indicators = { "max_daily": IndicatorIDs.MAX_DAILY_DEMAND, "min_daily": IndicatorIDs.MIN_DAILY_DEMAND, "sum_generation": IndicatorIDs.REAL_DEMAND_SUM_GENERATION, } raw_data = await self.data_fetcher.fetch_multiple_indicators( indicators, start_date, end_date, "day" ) result: dict[str, Any] = { "period": {"start": start_date, "end": end_date}, "daily_statistics": [], } # Extract values by date max_values = raw_data.get("max_daily", {}).get("values", []) min_values = raw_data.get("min_daily", {}).get("values", []) sum_values = raw_data.get("sum_generation", {}).get("values", []) # Combine data by date for i, max_point in enumerate(max_values): date = max_point["datetime"][:10] # Extract YYYY-MM-DD max_mw = max_point["value"] min_mw = min_values[i]["value"] if i < len(min_values) else None sum_mw = sum_values[i]["value"] if i < len(sum_values) else None daily_stat: dict[str, Any] = { "date": date, "max_demand_mw": max_mw, "min_demand_mw": min_mw, "sum_generation_mw": sum_mw, } # Calculate load factor if we have both max and min if min_mw is not None and max_mw > 0: daily_stat["load_factor"] = round((min_mw / max_mw) * 100, 2) daily_stat["daily_swing_mw"] = round(max_mw - min_mw, 2) else: daily_stat["load_factor"] = None daily_stat["daily_swing_mw"] = None result["daily_statistics"].append(daily_stat) # Calculate overall statistics if result["daily_statistics"]: max_values_list = [s["max_demand_mw"] for s in result["daily_statistics"]] min_values_list = [ s["min_demand_mw"] for s in result["daily_statistics"] if s["min_demand_mw"] is not None ] load_factors = [ s["load_factor"] for s in result["daily_statistics"] if s["load_factor"] is not None ] result["summary"] = { "peak_demand_mw": round(max(max_values_list), 2), "lowest_demand_mw": round(min(min_values_list), 2) if min_values_list else None, "average_max_demand_mw": round(sum(max_values_list) / len(max_values_list), 2), "average_load_factor_pct": ( round(sum(load_factors) / len(load_factors), 2) if load_factors else None ), "days_analyzed": len(result["daily_statistics"]), } return result async def analyze_demand_volatility(self, start_date: str, end_date: str) -> dict[str, Any]: """Analyze demand volatility patterns. Args: start_date: Start date in YYYY-MM-DD format end_date: End date in YYYY-MM-DD format Returns: Volatility analysis with swings and stability metrics """ # Fetch max and min daily demand indicators = { "max_daily": IndicatorIDs.MAX_DAILY_DEMAND, "min_daily": IndicatorIDs.MIN_DAILY_DEMAND, } raw_data = await self.data_fetcher.fetch_multiple_indicators( indicators, start_date, end_date, "day" ) result: dict[str, Any] = { "period": {"start": start_date, "end": end_date}, "daily_volatility": [], "analysis": {}, } max_values = raw_data.get("max_daily", {}).get("values", []) min_values = raw_data.get("min_daily", {}).get("values", []) daily_swings = [] load_factors = [] # Calculate daily volatility for i, max_point in enumerate(max_values): if i >= len(min_values): break date = max_point["datetime"][:10] max_mw = max_point["value"] min_mw = min_values[i]["value"] swing_mw = max_mw - min_mw swing_pct = (swing_mw / max_mw) * 100 if max_mw > 0 else 0 load_factor = (min_mw / max_mw) * 100 if max_mw > 0 else 0 volatility_data = { "date": date, "max_demand_mw": round(max_mw, 2), "min_demand_mw": round(min_mw, 2), "daily_swing_mw": round(swing_mw, 2), "swing_percentage": round(swing_pct, 2), "load_factor_pct": round(load_factor, 2), } # Volatility classification if swing_pct < 20: volatility_data["volatility_level"] = "low" elif swing_pct < 40: volatility_data["volatility_level"] = "moderate" elif swing_pct < 60: volatility_data["volatility_level"] = "high" else: volatility_data["volatility_level"] = "very_high" result["daily_volatility"].append(volatility_data) daily_swings.append(swing_mw) load_factors.append(load_factor) # Overall analysis if daily_swings: avg_swing = sum(daily_swings) / len(daily_swings) max_swing = max(daily_swings) min_swing = min(daily_swings) avg_load_factor = sum(load_factors) / len(load_factors) # Count days by volatility level volatility_counts = {"low": 0, "moderate": 0, "high": 0, "very_high": 0} for day_data in result["daily_volatility"]: level = day_data["volatility_level"] volatility_counts[level] += 1 result["analysis"] = { "average_daily_swing_mw": round(avg_swing, 2), "max_daily_swing_mw": round(max_swing, 2), "min_daily_swing_mw": round(min_swing, 2), "average_load_factor_pct": round(avg_load_factor, 2), "volatility_distribution": volatility_counts, "stability_assessment": ( "excellent" if avg_load_factor >= 80 else "good" if avg_load_factor >= 70 else "moderate" if avg_load_factor >= 60 else "concerning" ), "interpretation": { "load_factor": "Higher load factor (closer to 100%) indicates more stable demand", "volatility_levels": { "low": "<20% swing", "moderate": "20-40% swing", "high": "40-60% swing", "very_high": ">60% swing", }, }, } return result

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ESJavadex/ree-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server