Skip to main content
Glama
Rootly-AI-Labs

Rootly MCP server

Official

createIncident

Generate a new incident with details like title, severity, and status using Rootly MCP server. Provides structured responses for success, unauthorized access, or invalid data.

Instructions

Creates a new incident from provided data

Responses:

  • 201 (Success): incident created

    • Content-Type: application/vnd.api+json

    • Example:

{ "key": "value" }
  • 401: responds with unauthorized for invalid token

    • Content-Type: application/vnd.api+json

    • Example:

{ "key": "value" }
  • 422: invalid causes association

    • Content-Type: application/vnd.api+json

    • Example:

{ "key": "value" }

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dataYes

Implementation Reference

  • Registration of dynamic OpenAPI tools via FastMCP.from_openapi. The 'createIncident' tool is auto-generated from the POST /v1/incidents endpoint in the Rootly OpenAPI swagger spec.
    # Create the MCP server using OpenAPI integration # By default, all routes become tools which is what we want mcp = FastMCP.from_openapi( openapi_spec=filtered_spec, client=http_client.client, name=name, timeout=30.0,
  • Custom HTTPX client wrapper used by all generated OpenAPI tools (including createIncident) to make authenticated requests to the Rootly API.
    class AuthenticatedHTTPXClient: """An HTTPX client wrapper that handles Rootly API authentication and parameter transformation.""" def __init__(self, base_url: str = "https://api.rootly.com", hosted: bool = False, parameter_mapping: Optional[Dict[str, str]] = None): self._base_url = base_url self.hosted = hosted self._api_token = None self.parameter_mapping = parameter_mapping or {} if not self.hosted: self._api_token = self._get_api_token() # Create the HTTPX client headers = { "Content-Type": "application/vnd.api+json", "Accept": "application/vnd.api+json" # Let httpx handle Accept-Encoding automatically with all supported formats } if self._api_token: headers["Authorization"] = f"Bearer {self._api_token}" self.client = httpx.AsyncClient( base_url=base_url, headers=headers, timeout=30.0, follow_redirects=True, # Ensure proper handling of compressed responses limits=httpx.Limits(max_keepalive_connections=5, max_connections=10) ) def _get_api_token(self) -> Optional[str]: """Get the API token from environment variables.""" api_token = os.getenv("ROOTLY_API_TOKEN") if not api_token: logger.warning("ROOTLY_API_TOKEN environment variable is not set") return None return api_token def _transform_params(self, params: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Transform sanitized parameter names back to original names.""" if not params or not self.parameter_mapping: return params transformed = {} for key, value in params.items(): # Use the original name if we have a mapping, otherwise keep the sanitized name original_key = self.parameter_mapping.get(key, key) transformed[original_key] = value if original_key != key: logger.debug(f"Transformed parameter: '{key}' -> '{original_key}'") return transformed async def request(self, method: str, url: str, **kwargs): """Override request to transform parameters.""" # Transform query parameters if 'params' in kwargs: kwargs['params'] = self._transform_params(kwargs['params']) # Call the underlying client's request method and let it handle everything return await self.client.request(method, url, **kwargs) async def get(self, url: str, **kwargs): """Proxy to request with GET method.""" return await self.request('GET', url, **kwargs) async def post(self, url: str, **kwargs): """Proxy to request with POST method.""" return await self.request('POST', url, **kwargs) async def put(self, url: str, **kwargs): """Proxy to request with PUT method.""" return await self.request('PUT', url, **kwargs) async def patch(self, url: str, **kwargs): """Proxy to request with PATCH method.""" return await self.request('PATCH', url, **kwargs) async def delete(self, url: str, **kwargs): """Proxy to request with DELETE method.""" return await self.request('DELETE', url, **kwargs) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): pass def __getattr__(self, name): # Delegate all other attributes to the underlying client, except for request methods if name in ['request', 'get', 'post', 'put', 'patch', 'delete']: # Use our overridden methods instead return getattr(self, name) return getattr(self.client, name) @property def base_url(self): return self._base_url @property def headers(self): return self.client.headers
  • Utility to sanitize parameter names in the OpenAPI spec for MCP compliance, applied to the spec before tool generation including createIncident schema.
    def sanitize_parameters_in_spec(spec: Dict[str, Any]) -> Dict[str, str]: """
  • Filters and sanitizes the OpenAPI spec, defining the input schema for createIncident from the POST /v1/incidents path.
    def _filter_openapi_spec(spec: Dict[str, Any], allowed_paths: List[str]) -> Dict[str, Any]: """ Filter an OpenAPI specification to only include specified paths and clean up schema references. Args: spec: The original OpenAPI specification. allowed_paths: List of paths to include. Returns: A filtered OpenAPI specification with cleaned schema references. """ # Use deepcopy to ensure all nested structures are properly copied filtered_spec = deepcopy(spec) # Filter paths original_paths = filtered_spec.get("paths", {}) filtered_paths = { path: path_item for path, path_item in original_paths.items() if path in allowed_paths } filtered_spec["paths"] = filtered_paths # Clean up schema references that might be broken # Remove problematic schema references from request bodies and parameters for path, path_item in filtered_paths.items(): for method, operation in path_item.items(): if method.lower() not in ["get", "post", "put", "delete", "patch"]: continue # Clean request body schemas if "requestBody" in operation: request_body = operation["requestBody"] if "content" in request_body: for content_type, content_info in request_body["content"].items(): if "schema" in content_info: schema = content_info["schema"] # Remove problematic $ref references if "$ref" in schema and "incident_trigger_params" in schema["$ref"]: # Replace with a generic object schema content_info["schema"] = { "type": "object", "description": "Request parameters for this endpoint", "additionalProperties": True } # Remove response schemas to avoid validation issues # FastMCP will still return the data, just without strict validation if "responses" in operation: for status_code, response in operation["responses"].items(): if "content" in response: for content_type, content_info in response["content"].items(): if "schema" in content_info: # Replace with a simple schema that accepts any response content_info["schema"] = { "type": "object", "additionalProperties": True } # Clean parameter schemas (parameter names are already sanitized) if "parameters" in operation: for param in operation["parameters"]: if "schema" in param and "$ref" in param["schema"]: ref_path = param["schema"]["$ref"] if "incident_trigger_params" in ref_path: # Replace with a simple string schema param["schema"] = { "type": "string", "description": param.get("description", "Parameter value") } # Add/modify pagination limits to alerts and incident-related endpoints to prevent infinite loops if method.lower() == "get" and ("alerts" in path.lower() or "incident" in path.lower()): if "parameters" not in operation: operation["parameters"] = [] # Find existing pagination parameters and update them with limits page_size_param = None page_number_param = None for param in operation["parameters"]: if param.get("name") == "page[size]": page_size_param = param elif param.get("name") == "page[number]": page_number_param = param # Update or add page[size] parameter with limits if page_size_param: # Update existing parameter with limits if "schema" not in page_size_param: page_size_param["schema"] = {} page_size_param["schema"].update({ "type": "integer", "default": 10, "minimum": 1, "maximum": 20, "description": "Number of results per page (max: 20)" }) else: # Add new parameter operation["parameters"].append({ "name": "page[size]", "in": "query", "required": False, "schema": { "type": "integer", "default": 10, "minimum": 1, "maximum": 20, "description": "Number of results per page (max: 20)" } }) # Update or add page[number] parameter with defaults if page_number_param: # Update existing parameter if "schema" not in page_number_param: page_number_param["schema"] = {} page_number_param["schema"].update({ "type": "integer", "default": 1, "minimum": 1, "description": "Page number to retrieve" }) else: # Add new parameter operation["parameters"].append({ "name": "page[number]", "in": "query", "required": False, "schema": { "type": "integer", "default": 1, "minimum": 1, "description": "Page number to retrieve" } }) # Add sparse fieldsets for alerts endpoints to reduce payload size if "alert" in path.lower(): # Add fields[alerts] parameter with essential fields only - make it required with default operation["parameters"].append({ "name": "fields[alerts]", "in": "query", "required": True, "schema": { "type": "string", "default": "id,summary,status,started_at,ended_at,short_id,alert_urgency_id,source,noise", "description": "Comma-separated list of alert fields to include (reduces payload size)" } }) # Add include parameter for alerts endpoints to minimize relationships if "alert" in path.lower(): # Check if include parameter already exists include_param_exists = any(param.get("name") == "include" for param in operation["parameters"]) if not include_param_exists: operation["parameters"].append({ "name": "include", "in": "query", "required": True, "schema": { "type": "string", "default": "", "description": "Related resources to include (empty for minimal payload)" } }) # Add sparse fieldsets for incidents endpoints to reduce payload size if "incident" in path.lower(): # Add fields[incidents] parameter with essential fields only - make it required with default operation["parameters"].append({ "name": "fields[incidents]", "in": "query", "required": True, "schema": { "type": "string", "default": "id,title,summary,status,severity,created_at,updated_at,url,started_at", "description": "Comma-separated list of incident fields to include (reduces payload size)" } }) # Add include parameter for incidents endpoints to minimize relationships if "incident" in path.lower(): # Check if include parameter already exists include_param_exists = any(param.get("name") == "include" for param in operation["parameters"]) if not include_param_exists: operation["parameters"].append({ "name": "include", "in": "query", "required": True, "schema": { "type": "string", "default": "", "description": "Related resources to include (empty for minimal payload)" } }) # Also clean up any remaining broken references in components if "components" in filtered_spec and "schemas" in filtered_spec["components"]: schemas = filtered_spec["components"]["schemas"] # Remove or fix any schemas that reference missing components schemas_to_remove = [] for schema_name, schema_def in schemas.items(): if isinstance(schema_def, dict) and _has_broken_references(schema_def): schemas_to_remove.append(schema_name) for schema_name in schemas_to_remove: logger.warning(f"Removing schema with broken references: {schema_name}") del schemas[schema_name] # Clean up any operation-level references to removed schemas removed_schemas = set() if "components" in filtered_spec and "schemas" in filtered_spec["components"]: removed_schemas = {"new_workflow", "update_workflow", "workflow", "workflow_task", "workflow_response", "workflow_list", "new_workflow_task", "update_workflow_task", "workflow_task_response", "workflow_task_list"} for path, path_item in filtered_spec.get("paths", {}).items(): for method, operation in path_item.items(): if method.lower() not in ["get", "post", "put", "delete", "patch"]: continue # Clean request body references if "requestBody" in operation: request_body = operation["requestBody"] if "content" in request_body: for content_type, content_info in request_body["content"].items(): if "schema" in content_info and "$ref" in content_info["schema"]: ref_path = content_info["schema"]["$ref"] schema_name = ref_path.split("/")[-1] if schema_name in removed_schemas: # Replace with generic object schema content_info["schema"] = { "type": "object", "description": "Request data for this endpoint", "additionalProperties": True } logger.debug(f"Cleaned broken reference in {method.upper()} {path} request body: {ref_path}") # Clean response references if "responses" in operation: for status_code, response in operation["responses"].items(): if "content" in response: for content_type, content_info in response["content"].items(): if "schema" in content_info and "$ref" in content_info["schema"]: ref_path = content_info["schema"]["$ref"] schema_name = ref_path.split("/")[-1] if schema_name in removed_schemas: # Replace with generic object schema content_info["schema"] = { "type": "object", "description": "Response data from this endpoint", "additionalProperties": True } logger.debug(f"Cleaned broken reference in {method.upper()} {path} response: {ref_path}") return filtered_spec
  • Test confirming the presence of the createIncident tool among expected OpenAPI-generated tools.
    expected_tools = [ "search_incidents", # Our custom tool "listIncidents", # OpenAPI generated "createIncident",

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Rootly-AI-Labs/Rootly-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server