Skip to main content
Glama
forecaster.py10.1 kB
import json from datetime import datetime from typing import Annotated, List, Dict, Union from mcp.server.fastmcp import Context from pydantic import Field from ._types import InputField, DataRow, generate_model_from_fields from chronulus import Session from chronulus.estimator import NormalizedForecaster from chronulus.prediction import RescaledForecast async def create_forecasting_agent_and_get_forecast( session_id: Annotated[str, Field(description="The session_id for the forecasting or prediction use case")], input_data_model: Annotated[List[InputField], Field( description="""Metadata on the fields you will include in the input_data.""" )], input_data: Annotated[Dict[str, Union[str, dict, List[dict]]], Field(description="The forecast inputs that you will pass to the chronulus agent to make the prediction. The keys of the dict should correspond to the InputField name you provided in input_fields.")], forecast_start_dt_str: Annotated[str, Field(description="The datetime str in '%Y-%m-%d %H:%M:%S' format of the first value in the forecast horizon.")], ctx: Context, time_scale: Annotated[str, Field(description="The times scale of the forecast horizon. Valid time scales are 'hours', 'days', and 'weeks'.", default="days")], horizon_len: Annotated[int, Field(description="The integer length of the forecast horizon. Eg., 60 if a 60 day forecast was requested.", default=60)], ) -> Union[str, Dict[str, Union[dict, str]]]: """Queues and retrieves a forecast from Chronulus with a predefined session_id This tool creates a NormalizedForecaster agent and then provides a forecast input to the agent and returns the prediction data and text explanation from the agent. Args: session_id (str): The session_id for the forecasting or prediction use case. input_data_model (List[InputField]): Metadata on the fields you will include in the input_data. Eg., for a field named "brand", add a description like "the brand of the product to forecast" input_data (Dict[str, Union[str, dict, List[dict]]]): The forecast inputs that you will pass to the chronulus agent to make the prediction. The keys of the dict should correspond to the InputField name you provided in input_fields. forecast_start_dt_str (str): The datetime str in '%Y-%m-%d %H:%M:%S' format of the first value in the forecast horizon." ctx (Context): Context object providing access to MCP capabilities. time_scale (str): The times scale of the forecast horizon. Valid time scales are 'hours', 'days', and 'weeks'. horizon_len (int): The integer length of the forecast horizon. Eg., 60 if a 60 day forecast was requested. Returns: Union[str, Dict[str, Union[dict, str]]]: a dictionary with prediction data, a text explanation of the predictions, estimator_id, and the prediction id. """ try: chronulus_session = Session.load_from_saved_session(session_id=session_id, verbose=False) except Exception as e: error_message = f"Failed to retrieve session with session_id: {session_id}\n\n{e}" _ = await ctx.error( message=error_message) return error_message try: InputItem = generate_model_from_fields("InputItem", input_data_model) except Exception as e: error_message = f"Failed to create InputItem model with input data model: {json.dumps(input_data_model, indent=2)}\n\n{e}" _ = await ctx.error(message=error_message) return error_message try: item = InputItem(**input_data) except Exception as e: error_message = f"Failed to validate the input_data with the generated InputItem model. \n\n{e}" _ = await ctx.error(message=error_message) return error_message try: nf_agent = NormalizedForecaster( session=chronulus_session, input_type=InputItem, verbose=False, ) except Exception as e: return f"""Error at nf_agent: {str(e)} input_fields = {input_data_model} input_data = {json.dumps(input_data, indent=2)} input_type = {str(type(InputItem))} """ try: forecast_start_dt = datetime.fromisoformat(forecast_start_dt_str) horizon_params = { 'start_dt': forecast_start_dt, time_scale: horizon_len } req = nf_agent.queue(item, **horizon_params) except Exception as e: return f"""Error at nf_agent: {str(e)}""" try: predictions = nf_agent.get_predictions(req.request_id) prediction = predictions[0] return { "agent_id": nf_agent.estimator_id, "prediction_id": prediction.id, 'data': prediction.to_json(orient='rows'), 'explanation': prediction.text} except Exception as e: return f"""Error on prediction: {str(e)}""" async def reuse_forecasting_agent_and_get_forecast( agent_id: Annotated[str, Field(description="The agent_id for the forecasting or prediction use case and previously defined input_data_model")], input_data: Annotated[Dict[str, Union[str, dict, List[dict]]], Field( description="The forecast inputs that you will pass to the chronulus agent to make the prediction. The keys of the dict should correspond to the InputField name you provided in input_fields.")], forecast_start_dt_str: Annotated[str, Field( description="The datetime str in '%Y-%m-%d %H:%M:%S' format of the first value in the forecast horizon.")], time_scale: Annotated[str, Field( description="The times scale of the forecast horizon. Valid time scales are 'hours', 'days', and 'weeks'.", default="days")], horizon_len: Annotated[int, Field( description="The integer length of the forecast horizon. Eg., 60 if a 60 day forecast was requested.", default=60)], ) -> Union[str, Dict[str, Union[dict, str]]]: """Queues and retrieves a forecast from Chronulus with a previously created agent_id This tool provides a forecast input to a previous created Chronulus NormalizedForecaster agent and returns the prediction data and text explanation from the agent. Args: agent_id (str): The agent_id for the forecasting or prediction use case and previously defined input_data_model input_data (Dict[str, Union[str, dict, List[dict]]]): The forecast inputs that you will pass to the chronulus agent to make the prediction. The keys of the dict should correspond to the InputField name you provided in input_fields. forecast_start_dt_str (str): The datetime str in '%Y-%m-%d %H:%M:%S' format of the first value in the forecast horizon." time_scale (str): The times scale of the forecast horizon. Valid time scales are 'hours', 'days', and 'weeks'. horizon_len (int): The integer length of the forecast horizon. Eg., 60 if a 60 day forecast was requested. Returns: Union[str, Dict[str, Union[dict, str]]]: a dictionary with prediction data, a text explanation of the predictions, agent_id, and the prediction id. """ nf_agent = NormalizedForecaster.load_from_saved_estimator(estimator_id=agent_id, verbose=False) item = nf_agent.input_type(**input_data) try: forecast_start_dt = datetime.fromisoformat(forecast_start_dt_str) horizon_params = { 'start_dt': forecast_start_dt, time_scale: horizon_len } req = nf_agent.queue(item, **horizon_params) except Exception as e: return f"""Error at nf_agent: {str(e)}""" try: predictions = nf_agent.get_predictions(req.request_id) prediction = predictions[0] return { "agent_id": nf_agent.estimator_id, "prediction_id": prediction.id, 'data': prediction.to_json(orient='rows'), 'explanation': prediction.text} except Exception as e: return f"""Error on prediction: {str(e)}""" async def rescale_forecast( prediction_id: Annotated[str, Field(description="The prediction_id from a prediction result")], y_min: Annotated[float, Field(description="The expected smallest value for the use case. E.g., for product sales, 0 would be the least possible value for sales.")], y_max: Annotated[float, Field(description="The expected largest value for the use case. E.g., for product sales, 0 would be the largest possible value would be given by the user or determined from this history of sales for the product in question or a similar product.")], invert_scale: Annotated[bool, Field(description="Set this flag to true if the scale of the new units will run in the opposite direction from the inputs.", default=False)], ) -> List[dict]: """Rescales prediction data from the NormalizedForecaster agent Args: prediction_id (str) : The prediction_id for the prediction you would like to rescale as returned by the forecasting agent y_min (float) : The expected smallest value for the use case. E.g., for product sales, 0 would be the least possible value for sales. y_max (float) : The expected largest value for the use case. E.g., for product sales, 0 would be the largest possible value would be given by the user or determined from this history of sales for the product in question or a similar product. invert_scale (bool): Set this flag to true if the scale of the new units will run in the opposite direction from the inputs. Returns: List[dict] : The prediction data rescaled to suit the use case """ normalized_forecast = NormalizedForecaster.get_prediction_static(prediction_id) rescaled_forecast = RescaledForecast.from_forecast( forecast=normalized_forecast, y_min=y_min, y_max=y_max, invert_scale=invert_scale ) return [DataRow(dt=row.get('date',row.get('datetime')), y_hat=row.get('y_hat')).model_dump() for row in rescaled_forecast.to_json(orient='rows')]

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ChronulusAI/chronulus-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server