Skip to main content
Glama
config.py15.7 kB
"""Jentic project management module.""" import json import logging from pathlib import Path from typing import Any, Dict, List, Optional, Union from arazzo_runner.auth.auth_processor import AuthProcessor from arazzo_runner.extractor.openapi_extractor import extract_operation_io from arazzo_runner import ArazzoRunner from jentic.lib.agent_runtime.api_hub import JenticAPIClient from jentic.lib.models import GetFilesResponse, LoadResponse logger = logging.getLogger(__name__) def load_json_file(filepath: str | Path) -> dict[str, Any]: """Load a JSON file. Args: filepath: Path to the JSON file Returns: The contents of the JSON file """ with open(filepath) as f: return json.load(f) class JenticConfig: """A class that loads and manages a Jentic project.""" def __init__(self, config_path: str = "./jentic.json") -> None: """Initialize the Jentic project. Args: config_path: Path to the Jentic config file """ self.config_path = Path(config_path) logger.debug(f"Initializing JenticConfig at {self.config_path}") # Load project configuration self.config: dict[str, Any] = self._load_project_config(self.config_path) logger.debug(f"Loaded project configuration with {len(self.config)} entries") self._workflows: dict[str, dict[str, Any]] = {} self._operations: dict[str, Any] = {} self._extract_workflows() self._extract_operations() logger.debug(f"Extracted {len(self._workflows)} workflows from API specifications") logger.debug(f"Extracted {len(self._operations)} operations from config") def _load_project_config(self, config_path: str | Path) -> dict[str, Any]: """Load project configuration from jentic.json. Returns: Project configuration """ if not config_path.exists(): logger.warning(f"Project configuration file not found: {config_path}") return {} try: return load_json_file(config_path) except Exception as e: logger.error(f"Error loading project configuration: {e}") return {} def get_workflows(self) -> dict[str, dict[str, Any]]: """Get the workflows defined in the project. Returns: Dictionary of workflows """ return self._workflows def _extract_workflows(self) -> None: """Extract workflows from the project configuration. This method extracts all workflows from a top-level 'workflows' key only (new style). """ self._workflows = {} top_level_workflows = self.config.get("workflows", {}) for workflow_id, workflow in top_level_workflows.items(): self._workflows[workflow_id] = workflow def _extract_operations(self) -> None: """Extract top-level operations from the project configuration.""" # Operations are any top-level key not named 'workflows' if self.config.get("operations"): self._operations = self.config["operations"] def get_operations(self) -> dict[str, Any]: """Return the extracted operations.""" return self._operations @staticmethod def _extract_workflow_details(arazzo_doc: dict[str, Any]) -> dict[str, dict[str, Any]]: """ Extract workflow details from an Arazzo document. Args: arazzo_doc: The Arazzo document containing workflow definitions Returns: Dictionary mapping workflow IDs to their details (inputs, outputs, description) """ workflow_details = {} if not arazzo_doc or "workflows" not in arazzo_doc: return workflow_details for workflow in arazzo_doc.get("workflows", []): workflow_id = workflow.get("workflowId") if not workflow_id: continue # Extract the relevant details workflow_details[workflow_id] = { "description": workflow.get("description", ""), "summary": workflow.get("summary", ""), "inputs": workflow.get("inputs", {}), "outputs": workflow.get("outputs", {}), } return workflow_details @staticmethod def _flatten_security_requirements(security_requirements: dict) -> dict: """ Converts the security_requirements dict from {source: [SecurityOption, ...]} to {source: [flattened dict, ...]} where each flattened dict is a SecurityRequirement as a dict (model_dump), with no extra 'requirements' nesting. """ flattened = {} for k, v in security_requirements.items(): # v is a list of SecurityOption objects (or similar) # Each SecurityOption has a 'requirements' field, which is a list of SecurityRequirement objects # We want: {k: [req.model_dump() for s in v for req in s.requirements]} flattened[k] = [ req.model_dump() if hasattr(req, "model_dump") else dict(req) for s in v for req in getattr(s, "requirements", []) ] return flattened @staticmethod async def generate_config_from_uuids( api_hub_client: JenticAPIClient, workflow_uuids: Optional[List[str]], operation_uuids: Optional[List[str]], api_name: Optional[str] = None, ) -> Dict[str, Any]: """ Generate a runtime configuration object from a list of workflow UUIDs and/or operation UUIDs. """ if not (workflow_uuids or operation_uuids): raise ValueError("No workflow or operation UUIDs provided.") logger.info( f"Generating config for workflow UUIDs: {workflow_uuids} and operation UUIDs: {operation_uuids}, API name: {api_name}" ) # Step 1: Fetch execution files for both workflows and operations exec_files_response = await JenticConfig._fetch_execution_files( api_hub_client, workflow_uuids, operation_uuids ) # Step 2: Collect OpenAPI specs all_openapi_specs = JenticConfig._collect_openapi_specs(exec_files_response) # Step 3: Extract workflow details all_arazzo_specs, extracted_workflow_details = JenticConfig._extract_all_workflow_details( exec_files_response, workflow_uuids ) # Step 4: Extract operation details if present extracted_operation_details = {} if operation_uuids: extracted_operation_details = JenticConfig._extract_all_operation_details( exec_files_response, operation_uuids ) # Step 5: Process authentication requirements env_mappings = ArazzoRunner.generate_env_mappings( arazzo_docs=all_arazzo_specs, source_descriptions=all_openapi_specs ) # Step 6: Compose final config final_config = { "version": "1.0", "workflows": extracted_workflow_details, "operations": extracted_operation_details, "environment_variable_mappings": env_mappings, } logger.info("Successfully generated runtime configuration.") return final_config @staticmethod async def _fetch_execution_files( api_hub_client: JenticAPIClient, workflow_uuids: List[str], operation_uuids: List[str] = None, ): try: return await api_hub_client.get_execution_files( workflow_ids=workflow_uuids, operation_uuids=operation_uuids or [] ) except Exception as e: logger.error(f"Failed to fetch execution files: {e}") raise ValueError(f"Failed to fetch execution files: {e}") from e @staticmethod def _collect_openapi_specs(exec_files_response) -> Dict[str, dict]: all_openapi_specs = {} for openapi_file_id, openapi_file_entry in exec_files_response.files.get( "open_api", {} ).items(): openapi_content = openapi_file_entry.content title = None if isinstance(openapi_content, dict): title = openapi_content.get("info", {}).get("title") key = title if title else openapi_file_entry.filename all_openapi_specs[key] = openapi_content return all_openapi_specs @staticmethod def _extract_all_workflow_details( exec_files_response: Union[GetFilesResponse, LoadResponse], workflow_uuids: list[str] ) -> tuple[list[dict], dict[str, dict]]: all_arazzo_specs: list[dict] = [] extracted_workflow_details: dict[str, dict] = {} if not workflow_uuids: return [], {} for workflow_id in workflow_uuids: if workflow_id not in exec_files_response.workflows: logger.error(f"Workflow ID {workflow_id} not found in execution files response.") raise ValueError( f"Workflow ID {workflow_id} not found in execution files response." ) workflow_entry = exec_files_response.workflows[workflow_id] # Extract Arazzo doc if not workflow_entry.files.arazzo: logger.error(f"Missing Arazzo document for workflow_id: {workflow_id}") raise ValueError(f"Missing Arazzo document for workflow_id: {workflow_id}") arazzo_file_id = workflow_entry.files.arazzo[0].id arazzo_files_dict = exec_files_response.files.get("arazzo", {}) if arazzo_file_id not in arazzo_files_dict: logger.error( f"Arazzo file content not found for ID {arazzo_file_id} in workflow {workflow_id}." ) raise ValueError( f"Arazzo file content not found for ID {arazzo_file_id} in workflow {workflow_id}." ) arazzo_doc = arazzo_files_dict[arazzo_file_id].content all_arazzo_specs.append(arazzo_doc) # Build source_descriptions relevant to this workflow/arazzo doc source_descriptions: dict[str, dict] = {} for openapi_ref in workflow_entry.files.open_api: file_id = openapi_ref.id openapi_files_dict = exec_files_response.files.get("open_api", {}) if file_id in openapi_files_dict: openapi_file = openapi_files_dict[file_id] source_descriptions[openapi_file.filename] = openapi_file.content # Extract all workflows defined within this specific Arazzo document workflows_in_doc = JenticConfig._extract_workflow_details(arazzo_doc) if workflow_entry.workflow_id and workflow_entry.workflow_id in workflows_in_doc: workflow_details = workflows_in_doc[workflow_entry.workflow_id] workflow_details["id"] = workflow_id workflow_details["api_names"] = [ ref.api_name for ref in workflow_entry.api_references ] workflow_details["security_requirements"] = ( JenticConfig._flatten_security_requirements( AuthProcessor.get_security_requirements_for_workflow( workflow_id=workflow_entry.workflow_id, arazzo_spec=arazzo_doc, source_descriptions=source_descriptions, ) ) ) extracted_workflow_details[workflow_entry.workflow_id] = workflow_details else: logger.error( f"Requested workflow UUID {workflow_id} with friendly_id '{workflow_entry.workflow_id}' " f"not found within its own Arazzo document's extracted details." ) return all_arazzo_specs, extracted_workflow_details @staticmethod def _extract_all_operation_details( exec_files_response: Union[GetFilesResponse, LoadResponse], operation_uuids: list[str] ) -> dict[str, dict]: extracted_operation_details: dict[str, dict] = {} if not hasattr(exec_files_response, "operations") or not exec_files_response.operations: return extracted_operation_details for operation_uuid in operation_uuids: if operation_uuid not in exec_files_response.operations: logger.error( f"Operation ID {operation_uuid} not found in execution files response." ) continue operation_entry = exec_files_response.operations[operation_uuid] # Find the first OpenAPI file associated with this operation openapi_spec = None if operation_entry.files.open_api: openapi_file_id = operation_entry.files.open_api[0].id openapi_file_entry = exec_files_response.files.get("open_api", {}).get( openapi_file_id ) if openapi_file_entry: openapi_spec = openapi_file_entry.content # Extract inputs and outputs using extract_operation_io io_details = None if ( openapi_spec and getattr(operation_entry, "path", None) and getattr(operation_entry, "method", None) ): try: logger.info(f"Extracting operation IO for {operation_uuid}") io_details = extract_operation_io( openapi_spec, operation_entry.path, operation_entry.method.lower(), input_max_depth=6, output_max_depth=2, ) except Exception as e: logger.error(f"Failed to extract operation IO for {operation_uuid}: {e}") extracted_operation_details[operation_uuid] = { "id": operation_uuid, "method": getattr(operation_entry, "method", None), "path": getattr(operation_entry, "path", None), "summary": getattr(operation_entry, "summary", None), "api_name": getattr(operation_entry, "api_name", None), "inputs": io_details["inputs"] if io_details and "inputs" in io_details else None, "outputs": ( io_details["outputs"] if io_details and "outputs" in io_details else None ), "security_requirements": ( io_details["security_requirements"] if io_details and "security_requirements" in io_details else None ), } return extracted_operation_details @staticmethod def _process_auth(all_openapi_specs, all_arazzo_specs): logger.debug( f"Processing auth for {len(all_openapi_specs)} unique OpenAPI specs and {len(all_arazzo_specs)} Arazzo docs." ) auth_processor = AuthProcessor() try: auth_data = auth_processor.process_api_auth( openapi_specs=all_openapi_specs, arazzo_specs=all_arazzo_specs ) logger.info("Auth processing completed.") except Exception as e: logger.error(f"Auth processing failed: {e}") raise ValueError("Auth processing failed during config generation") from e if auth_data.get("auth_workflows"): del auth_data["auth_workflows"] return auth_data.get("env_mappings", {})

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jentic/jentic-tools'

If you have feedback or need assistance with the MCP directory API, please join our Discord server