Skip to main content
Glama
Rootly-AI-Labs

Rootly MCP server

Official

createIncident

Generate a new incident with details like title, severity, and status using Rootly MCP server. Provides structured responses for success, unauthorized access, or invalid data.

Instructions

Creates a new incident from provided data

Responses:

  • 201 (Success): incident created

    • Content-Type: application/vnd.api+json

    • Example:

{
  "key": "value"
}
  • 401: responds with unauthorized for invalid token

    • Content-Type: application/vnd.api+json

    • Example:

{
  "key": "value"
}
  • 422: invalid causes association

    • Content-Type: application/vnd.api+json

    • Example:

{
  "key": "value"
}

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
dataYes

Implementation Reference

  • Registration of dynamic OpenAPI tools via FastMCP.from_openapi. The 'createIncident' tool is auto-generated from the POST /v1/incidents endpoint in the Rootly OpenAPI swagger spec.
    # Create the MCP server using OpenAPI integration
    # By default, all routes become tools which is what we want
    mcp = FastMCP.from_openapi(
        openapi_spec=filtered_spec,
        client=http_client.client,
        name=name,
        timeout=30.0,
  • Custom HTTPX client wrapper used by all generated OpenAPI tools (including createIncident) to make authenticated requests to the Rootly API.
    class AuthenticatedHTTPXClient:
        """An HTTPX client wrapper that handles Rootly API authentication and parameter transformation."""
    
        def __init__(self, base_url: str = "https://api.rootly.com", hosted: bool = False, parameter_mapping: Optional[Dict[str, str]] = None):
            self._base_url = base_url
            self.hosted = hosted
            self._api_token = None
            self.parameter_mapping = parameter_mapping or {}
    
            if not self.hosted:
                self._api_token = self._get_api_token()
    
            # Create the HTTPX client  
            headers = {
                "Content-Type": "application/vnd.api+json", 
                "Accept": "application/vnd.api+json"
                # Let httpx handle Accept-Encoding automatically with all supported formats
            }
            if self._api_token:
                headers["Authorization"] = f"Bearer {self._api_token}"
    
            self.client = httpx.AsyncClient(
                base_url=base_url,
                headers=headers,
                timeout=30.0,
                follow_redirects=True,
                # Ensure proper handling of compressed responses
                limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
            )
    
        def _get_api_token(self) -> Optional[str]:
            """Get the API token from environment variables."""
            api_token = os.getenv("ROOTLY_API_TOKEN")
            if not api_token:
                logger.warning("ROOTLY_API_TOKEN environment variable is not set")
                return None
            return api_token
    
        def _transform_params(self, params: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
            """Transform sanitized parameter names back to original names."""
            if not params or not self.parameter_mapping:
                return params
    
            transformed = {}
            for key, value in params.items():
                # Use the original name if we have a mapping, otherwise keep the sanitized name
                original_key = self.parameter_mapping.get(key, key)
                transformed[original_key] = value
                if original_key != key:
                    logger.debug(f"Transformed parameter: '{key}' -> '{original_key}'")
            return transformed
    
        async def request(self, method: str, url: str, **kwargs):
            """Override request to transform parameters."""
            # Transform query parameters
            if 'params' in kwargs:
                kwargs['params'] = self._transform_params(kwargs['params'])
    
            # Call the underlying client's request method and let it handle everything
            return await self.client.request(method, url, **kwargs)
    
        async def get(self, url: str, **kwargs):
            """Proxy to request with GET method."""
            return await self.request('GET', url, **kwargs)
    
        async def post(self, url: str, **kwargs):
            """Proxy to request with POST method."""
            return await self.request('POST', url, **kwargs)
    
        async def put(self, url: str, **kwargs):
            """Proxy to request with PUT method."""
            return await self.request('PUT', url, **kwargs)
    
        async def patch(self, url: str, **kwargs):
            """Proxy to request with PATCH method."""
            return await self.request('PATCH', url, **kwargs)
    
        async def delete(self, url: str, **kwargs):
            """Proxy to request with DELETE method."""
            return await self.request('DELETE', url, **kwargs)
    
        async def __aenter__(self):
            return self
    
        async def __aexit__(self, exc_type, exc_val, exc_tb):
            pass
    
        def __getattr__(self, name):
            # Delegate all other attributes to the underlying client, except for request methods
            if name in ['request', 'get', 'post', 'put', 'patch', 'delete']:
                # Use our overridden methods instead
                return getattr(self, name)
            return getattr(self.client, name)
        
        @property 
        def base_url(self):
            return self._base_url
            
        @property
        def headers(self):
            return self.client.headers
  • Utility to sanitize parameter names in the OpenAPI spec for MCP compliance, applied to the spec before tool generation including createIncident schema.
    def sanitize_parameters_in_spec(spec: Dict[str, Any]) -> Dict[str, str]:
        """
  • Filters and sanitizes the OpenAPI spec, defining the input schema for createIncident from the POST /v1/incidents path.
    def _filter_openapi_spec(spec: Dict[str, Any], allowed_paths: List[str]) -> Dict[str, Any]:
        """
        Filter an OpenAPI specification to only include specified paths and clean up schema references.
    
        Args:
            spec: The original OpenAPI specification.
            allowed_paths: List of paths to include.
    
        Returns:
            A filtered OpenAPI specification with cleaned schema references.
        """
        # Use deepcopy to ensure all nested structures are properly copied
        filtered_spec = deepcopy(spec)
    
        # Filter paths
        original_paths = filtered_spec.get("paths", {})
        filtered_paths = {
            path: path_item
            for path, path_item in original_paths.items()
            if path in allowed_paths
        }
    
        filtered_spec["paths"] = filtered_paths
    
        # Clean up schema references that might be broken
        # Remove problematic schema references from request bodies and parameters
        for path, path_item in filtered_paths.items():
            for method, operation in path_item.items():
                if method.lower() not in ["get", "post", "put", "delete", "patch"]:
                    continue
    
                # Clean request body schemas
                if "requestBody" in operation:
                    request_body = operation["requestBody"]
                    if "content" in request_body:
                        for content_type, content_info in request_body["content"].items():
                            if "schema" in content_info:
                                schema = content_info["schema"]
                                # Remove problematic $ref references
                                if "$ref" in schema and "incident_trigger_params" in schema["$ref"]:
                                    # Replace with a generic object schema
                                    content_info["schema"] = {
                                        "type": "object",
                                        "description": "Request parameters for this endpoint",
                                        "additionalProperties": True
                                    }
    
                # Remove response schemas to avoid validation issues
                # FastMCP will still return the data, just without strict validation
                if "responses" in operation:
                    for status_code, response in operation["responses"].items():
                        if "content" in response:
                            for content_type, content_info in response["content"].items():
                                if "schema" in content_info:
                                    # Replace with a simple schema that accepts any response
                                    content_info["schema"] = {
                                        "type": "object",
                                        "additionalProperties": True
                                    }
    
                # Clean parameter schemas (parameter names are already sanitized)
                if "parameters" in operation:
                    for param in operation["parameters"]:
                        if "schema" in param and "$ref" in param["schema"]:
                            ref_path = param["schema"]["$ref"]
                            if "incident_trigger_params" in ref_path:
                                # Replace with a simple string schema
                                param["schema"] = {
                                    "type": "string",
                                    "description": param.get("description", "Parameter value")
                                }
    
                # Add/modify pagination limits to alerts and incident-related endpoints to prevent infinite loops
                if method.lower() == "get" and ("alerts" in path.lower() or "incident" in path.lower()):
                    if "parameters" not in operation:
                        operation["parameters"] = []
                    
                    # Find existing pagination parameters and update them with limits
                    page_size_param = None
                    page_number_param = None
                    
                    for param in operation["parameters"]:
                        if param.get("name") == "page[size]":
                            page_size_param = param
                        elif param.get("name") == "page[number]":
                            page_number_param = param
                    
                    # Update or add page[size] parameter with limits
                    if page_size_param:
                        # Update existing parameter with limits
                        if "schema" not in page_size_param:
                            page_size_param["schema"] = {}
                        page_size_param["schema"].update({
                            "type": "integer",
                            "default": 10,
                            "minimum": 1,
                            "maximum": 20,
                            "description": "Number of results per page (max: 20)"
                        })
                    else:
                        # Add new parameter
                        operation["parameters"].append({
                            "name": "page[size]",
                            "in": "query",
                            "required": False,
                            "schema": {
                                "type": "integer",
                                "default": 10,
                                "minimum": 1,
                                "maximum": 20,
                                "description": "Number of results per page (max: 20)"
                            }
                        })
                    
                    # Update or add page[number] parameter with defaults
                    if page_number_param:
                        # Update existing parameter 
                        if "schema" not in page_number_param:
                            page_number_param["schema"] = {}
                        page_number_param["schema"].update({
                            "type": "integer",
                            "default": 1,
                            "minimum": 1,
                            "description": "Page number to retrieve"
                        })
                    else:
                        # Add new parameter
                        operation["parameters"].append({
                            "name": "page[number]",
                            "in": "query", 
                            "required": False,
                            "schema": {
                                "type": "integer",
                                "default": 1,
                                "minimum": 1,
                                "description": "Page number to retrieve"
                            }
                        })
                    
                    # Add sparse fieldsets for alerts endpoints to reduce payload size
                    if "alert" in path.lower():
                        # Add fields[alerts] parameter with essential fields only - make it required with default
                        operation["parameters"].append({
                            "name": "fields[alerts]",
                            "in": "query",
                            "required": True,
                            "schema": {
                                "type": "string",
                                "default": "id,summary,status,started_at,ended_at,short_id,alert_urgency_id,source,noise",
                                "description": "Comma-separated list of alert fields to include (reduces payload size)"
                            }
                        })
                    
                    # Add include parameter for alerts endpoints to minimize relationships
                    if "alert" in path.lower():
                        # Check if include parameter already exists
                        include_param_exists = any(param.get("name") == "include" for param in operation["parameters"])
                        if not include_param_exists:
                            operation["parameters"].append({
                                "name": "include",
                                "in": "query",
                                "required": True,
                                "schema": {
                                    "type": "string",
                                    "default": "",
                                    "description": "Related resources to include (empty for minimal payload)"
                                }
                            })
                    
                    # Add sparse fieldsets for incidents endpoints to reduce payload size
                    if "incident" in path.lower():
                        # Add fields[incidents] parameter with essential fields only - make it required with default
                        operation["parameters"].append({
                            "name": "fields[incidents]", 
                            "in": "query",
                            "required": True,
                            "schema": {
                                "type": "string",
                                "default": "id,title,summary,status,severity,created_at,updated_at,url,started_at",
                                "description": "Comma-separated list of incident fields to include (reduces payload size)"
                            }
                        })
                    
                    # Add include parameter for incidents endpoints to minimize relationships
                    if "incident" in path.lower():
                        # Check if include parameter already exists
                        include_param_exists = any(param.get("name") == "include" for param in operation["parameters"])
                        if not include_param_exists:
                            operation["parameters"].append({
                                "name": "include",
                                "in": "query",
                                "required": True,
                                "schema": {
                                    "type": "string", 
                                    "default": "",
                                    "description": "Related resources to include (empty for minimal payload)"
                                }
                            })
    
        # Also clean up any remaining broken references in components
        if "components" in filtered_spec and "schemas" in filtered_spec["components"]:
            schemas = filtered_spec["components"]["schemas"]
            # Remove or fix any schemas that reference missing components
            schemas_to_remove = []
            for schema_name, schema_def in schemas.items():
                if isinstance(schema_def, dict) and _has_broken_references(schema_def):
                    schemas_to_remove.append(schema_name)
    
            for schema_name in schemas_to_remove:
                logger.warning(f"Removing schema with broken references: {schema_name}")
                del schemas[schema_name]
    
        # Clean up any operation-level references to removed schemas
        removed_schemas = set()
        if "components" in filtered_spec and "schemas" in filtered_spec["components"]:
            removed_schemas = {"new_workflow", "update_workflow", "workflow", "workflow_task", 
                              "workflow_response", "workflow_list", "new_workflow_task", 
                              "update_workflow_task", "workflow_task_response", "workflow_task_list"}
        
        for path, path_item in filtered_spec.get("paths", {}).items():
            for method, operation in path_item.items():
                if method.lower() not in ["get", "post", "put", "delete", "patch"]:
                    continue
                    
                # Clean request body references
                if "requestBody" in operation:
                    request_body = operation["requestBody"]
                    if "content" in request_body:
                        for content_type, content_info in request_body["content"].items():
                            if "schema" in content_info and "$ref" in content_info["schema"]:
                                ref_path = content_info["schema"]["$ref"]
                                schema_name = ref_path.split("/")[-1]
                                if schema_name in removed_schemas:
                                    # Replace with generic object schema
                                    content_info["schema"] = {
                                        "type": "object",
                                        "description": "Request data for this endpoint",
                                        "additionalProperties": True
                                    }
                                    logger.debug(f"Cleaned broken reference in {method.upper()} {path} request body: {ref_path}")
                
                # Clean response references 
                if "responses" in operation:
                    for status_code, response in operation["responses"].items():
                        if "content" in response:
                            for content_type, content_info in response["content"].items():
                                if "schema" in content_info and "$ref" in content_info["schema"]:
                                    ref_path = content_info["schema"]["$ref"]
                                    schema_name = ref_path.split("/")[-1]
                                    if schema_name in removed_schemas:
                                        # Replace with generic object schema
                                        content_info["schema"] = {
                                            "type": "object",
                                            "description": "Response data from this endpoint",
                                            "additionalProperties": True
                                        }
                                        logger.debug(f"Cleaned broken reference in {method.upper()} {path} response: {ref_path}")
    
        return filtered_spec
  • Test confirming the presence of the createIncident tool among expected OpenAPI-generated tools.
    expected_tools = [
        "search_incidents",  # Our custom tool
        "listIncidents",     # OpenAPI generated
        "createIncident", 

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Rootly-AI-Labs/Rootly-MCP-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server