ServiceNow MCP Server
"""
ServiceNow MCP Server
This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
It allows AI agents to access and manipulate ServiceNow data through a secure API.
"""
import os
import json
import asyncio
import logging
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Any, Union, Literal
import requests
import httpx
from pydantic import BaseModel, Field, field_validator
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.fastmcp.utilities.logging import get_logger
logger = get_logger(__name__)
# ServiceNow API models
class IncidentState(int, Enum):
NEW = 1
IN_PROGRESS = 2
ON_HOLD = 3
RESOLVED = 6
CLOSED = 7
CANCELED = 8
class IncidentPriority(int, Enum):
CRITICAL = 1
HIGH = 2
MODERATE = 3
LOW = 4
PLANNING = 5
class IncidentUrgency(int, Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
class IncidentImpact(int, Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
class IncidentCreate(BaseModel):
"""Model for creating a new incident"""
short_description: str = Field(..., description="A brief description of the incident")
description: str = Field(..., description="A detailed description of the incident")
caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
category: Optional[str] = Field(None, description="The incident category")
subcategory: Optional[str] = Field(None, description="The incident subcategory")
urgency: Optional[IncidentUrgency] = Field(IncidentUrgency.MEDIUM, description="The urgency of the incident")
impact: Optional[IncidentImpact] = Field(IncidentImpact.MEDIUM, description="The impact of the incident")
assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
class IncidentUpdate(BaseModel):
"""Model for updating an existing incident"""
short_description: Optional[str] = Field(None, description="A brief description of the incident")
description: Optional[str] = Field(None, description="A detailed description of the incident")
caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
category: Optional[str] = Field(None, description="The incident category")
subcategory: Optional[str] = Field(None, description="The incident subcategory")
urgency: Optional[IncidentUrgency] = Field(None, description="The urgency of the incident")
impact: Optional[IncidentImpact] = Field(None, description="The impact of the incident")
state: Optional[IncidentState] = Field(None, description="The state of the incident")
assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
work_notes: Optional[str] = Field(None, description="Work notes to add to the incident (internal)")
comments: Optional[str] = Field(None, description="Customer visible comments to add to the incident")
@field_validator('work_notes', 'comments')
@classmethod
def validate_not_empty(cls, v):
if v is not None and v.strip() == '':
raise ValueError("Cannot be an empty string")
return v
class Config:
use_enum_values = True
class QueryOptions(BaseModel):
"""Options for querying ServiceNow records"""
limit: int = Field(10, description="Maximum number of records to return", ge=1, le=1000)
offset: int = Field(0, description="Number of records to skip", ge=0)
fields: Optional[List[str]] = Field(None, description="List of fields to return")
query: Optional[str] = Field(None, description="ServiceNow encoded query string")
order_by: Optional[str] = Field(None, description="Field to order results by")
order_direction: Optional[Literal["asc", "desc"]] = Field("desc", description="Order direction")
class Authentication:
"""Base class for ServiceNow authentication methods"""
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
raise NotImplementedError("Subclasses must implement this method")
class BasicAuth(Authentication):
"""Basic authentication for ServiceNow"""
def __init__(self, username: str, password: str):
self.username = username
self.password = password
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
return {}
def get_auth(self) -> tuple:
"""Get authentication tuple for requests"""
return (self.username, self.password)
class TokenAuth(Authentication):
"""Token authentication for ServiceNow"""
def __init__(self, token: str):
self.token = token
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
return {"Authorization": f"Bearer {self.token}"}
def get_auth(self) -> None:
"""Get authentication tuple for requests"""
return None
class OAuthAuth(Authentication):
"""OAuth authentication for ServiceNow"""
def __init__(self, client_id: str, client_secret: str, username: str, password: str,
instance_url: str, token: Optional[str] = None, refresh_token: Optional[str] = None,
token_expiry: Optional[datetime] = None):
self.client_id = client_id
self.client_secret = client_secret
self.username = username
self.password = password
self.instance_url = instance_url
self.token = token
self.refresh_token = refresh_token
self.token_expiry = token_expiry
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
if self.token is None or (self.token_expiry and datetime.now() > self.token_expiry):
await self.refresh()
return {"Authorization": f"Bearer {self.token}"}
def get_auth(self) -> None:
"""Get authentication tuple for requests"""
return None
async def refresh(self):
"""Refresh the OAuth token"""
if self.refresh_token:
# Try refresh flow first
data = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token
}
else:
# Fall back to password flow
data = {
"grant_type": "password",
"client_id": self.client_id,
"client_secret": self.client_secret,
"username": self.username,
"password": self.password
}
token_url = f"{self.instance_url}/oauth_token.do"
async with httpx.AsyncClient() as client:
response = await client.post(token_url, data=data)
response.raise_for_status()
result = response.json()
self.token = result["access_token"]
self.refresh_token = result.get("refresh_token")
expires_in = result.get("expires_in", 1800) # Default 30 minutes
self.token_expiry = datetime.now().timestamp() + expires_in
class ServiceNowClient:
"""Client for interacting with ServiceNow API"""
def __init__(self, instance_url: str, auth: Authentication):
self.instance_url = instance_url.rstrip('/')
self.auth = auth
self.client = httpx.AsyncClient()
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
async def request(self, method: str, path: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make a request to the ServiceNow API"""
url = f"{self.instance_url}{path}"
headers = await self.auth.get_headers()
headers["Accept"] = "application/json"
if isinstance(self.auth, BasicAuth):
auth = self.auth.get_auth()
else:
auth = None
try:
response = await self.client.request(
method=method,
url=url,
params=params,
json=json_data,
headers=headers,
auth=auth
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"ServiceNow API error: {e.response.text}")
raise
async def get_record(self, table: str, sys_id: str) -> Dict[str, Any]:
"""Get a record by sys_id"""
return await self.request("GET", f"/api/now/table/{table}/{sys_id}")
async def get_records(self, table: str, options: QueryOptions = None) -> Dict[str, Any]:
"""Get records with query options"""
if options is None:
options = QueryOptions()
params = {
"sysparm_limit": options.limit,
"sysparm_offset": options.offset
}
if options.fields:
params["sysparm_fields"] = ",".join(options.fields)
if options.query:
params["sysparm_query"] = options.query
if options.order_by:
direction = "desc" if options.order_direction == "desc" else "asc"
params["sysparm_order_by"] = f"{options.order_by}^{direction}"
return await self.request("GET", f"/api/now/table/{table}", params=params)
async def create_record(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new record"""
return await self.request("POST", f"/api/now/table/{table}", json_data=data)
async def update_record(self, table: str, sys_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing record"""
return await self.request("PUT", f"/api/now/table/{table}/{sys_id}", json_data=data)
async def delete_record(self, table: str, sys_id: str) -> Dict[str, Any]:
"""Delete a record"""
return await self.request("DELETE", f"/api/now/table/{table}/{sys_id}")
async def get_incident_by_number(self, number: str) -> Dict[str, Any]:
"""Get an incident by its number"""
result = await self.request("GET", f"/api/now/table/incident",
params={"sysparm_query": f"number={number}", "sysparm_limit": 1})
if result.get("result") and len(result["result"]) > 0:
return result["result"][0]
return None
async def search(self, query: str, table: str = "incident", limit: int = 10) -> Dict[str, Any]:
"""Search for records using text query"""
return await self.request("GET", f"/api/now/table/{table}",
params={"sysparm_query": f"123TEXTQUERY321={query}", "sysparm_limit": limit})
async def get_available_tables(self) -> List[str]:
"""Get a list of available tables"""
result = await self.request("GET", "/api/now/table/sys_db_object",
params={"sysparm_fields": "name,label", "sysparm_limit": 100})
return result.get("result", [])
async def get_table_schema(self, table: str) -> Dict[str, Any]:
"""Get the schema for a table"""
result = await self.request("GET", f"/api/now/ui/meta/{table}")
return result
class ServiceNowMCP:
"""ServiceNow MCP Server"""
def __init__(self,
instance_url: str,
auth: Authentication,
name: str = "ServiceNow MCP"):
self.client = ServiceNowClient(instance_url, auth)
self.mcp = FastMCP(name, dependencies=[
"requests",
"httpx",
"pydantic"
])
# Register resources
self.mcp.resource("servicenow://incidents")(self.list_incidents)
self.mcp.resource("servicenow://incidents/{number}")(self.get_incident)
self.mcp.resource("servicenow://users")(self.list_users)
self.mcp.resource("servicenow://knowledge")(self.list_knowledge)
self.mcp.resource("servicenow://tables")(self.get_tables)
self.mcp.resource("servicenow://tables/{table}")(self.get_table_records)
self.mcp.resource("servicenow://schema/{table}")(self.get_table_schema)
# Register tools
self.mcp.tool(name="create_incident")(self.create_incident)
self.mcp.tool(name="update_incident")(self.update_incident)
self.mcp.tool(name="search_records")(self.search_records)
self.mcp.tool(name="get_record")(self.get_record)
self.mcp.tool(name="perform_query")(self.perform_query)
self.mcp.tool(name="add_comment")(self.add_comment)
self.mcp.tool(name="add_work_notes")(self.add_work_notes)
# Register prompts
self.mcp.prompt(name="analyze_incident")(self.incident_analysis_prompt)
self.mcp.prompt(name="create_incident_prompt")(self.create_incident_prompt)
async def close(self):
"""Close the ServiceNow client"""
await self.client.close()
def run(self, transport: str = "stdio"):
"""Run the ServiceNow MCP server"""
try:
self.mcp.run(transport=transport)
finally:
asyncio.run(self.close())
# Resource handlers
async def list_incidents(self) -> str:
"""List recent incidents in ServiceNow"""
options = QueryOptions(limit=10)
result = await self.client.get_records("incident", options)
return json.dumps(result, indent=2)
async def get_incident(self, number: str) -> str:
"""Get a specific incident by number"""
incident = await self.client.get_incident_by_number(number)
if incident:
return json.dumps({"result": incident}, indent=2)
return json.dumps({"result": "Incident not found"})
async def list_users(self) -> str:
"""List users in ServiceNow"""
options = QueryOptions(limit=10)
result = await self.client.get_records("sys_user", options)
return json.dumps(result, indent=2)
async def list_knowledge(self) -> str:
"""List knowledge articles in ServiceNow"""
options = QueryOptions(limit=10)
result = await self.client.get_records("kb_knowledge", options)
return json.dumps(result, indent=2)
async def get_tables(self) -> str:
"""Get a list of available tables"""
result = await self.client.get_available_tables()
return json.dumps({"result": result}, indent=2)
async def get_table_records(self, table: str) -> str:
"""Get records from a specific table"""
options = QueryOptions(limit=10)
result = await self.client.get_records(table, options)
return json.dumps(result, indent=2)
async def get_table_schema(self, table: str) -> str:
"""Get the schema for a table"""
result = await self.client.get_table_schema(table)
return json.dumps(result, indent=2)
# Tool handlers
async def create_incident(self,
incident: IncidentCreate,
ctx: Context = None) -> str:
"""
Create a new incident in ServiceNow
Args:
incident: The incident details to create
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Creating incident: {incident.short_description}")
data = incident.dict(exclude_none=True)
result = await self.client.create_record("incident", data)
if ctx:
await ctx.info(f"Created incident: {result['result']['number']}")
return json.dumps(result, indent=2)
async def update_incident(self,
number: str,
updates: IncidentUpdate,
ctx: Context = None) -> str:
"""
Update an existing incident in ServiceNow
Args:
number: The incident number (INC0010001)
updates: The fields to update
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
# First, get the sys_id for the incident number
if ctx:
await ctx.info(f"Looking up incident: {number}")
incident = await self.client.get_incident_by_number(number)
if not incident:
error_message = f"Incident {number} not found"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
sys_id = incident['sys_id']
# Now update the incident
if ctx:
await ctx.info(f"Updating incident: {number}")
data = updates.dict(exclude_none=True)
result = await self.client.update_record("incident", sys_id, data)
return json.dumps(result, indent=2)
async def search_records(self,
query: str,
table: str = "incident",
limit: int = 10,
ctx: Context = None) -> str:
"""
Search for records in ServiceNow using text query
Args:
query: Text to search for
table: Table to search in
limit: Maximum number of results to return
ctx: Optional context object for progress reporting
Returns:
JSON response containing matching records
"""
if ctx:
await ctx.info(f"Searching {table} for: {query}")
result = await self.client.search(query, table, limit)
return json.dumps(result, indent=2)
async def get_record(self,
table: str,
sys_id: str,
ctx: Context = None) -> str:
"""
Get a specific record by sys_id
Args:
table: Table to query
sys_id: System ID of the record
ctx: Optional context object for progress reporting
Returns:
JSON response containing the record
"""
if ctx:
await ctx.info(f"Getting {table} record: {sys_id}")
result = await self.client.get_record(table, sys_id)
return json.dumps(result, indent=2)
async def perform_query(self,
table: str,
query: str = "",
limit: int = 10,
offset: int = 0,
fields: Optional[List[str]] = None,
ctx: Context = None) -> str:
"""
Perform a query against ServiceNow
Args:
table: Table to query
query: Encoded query string (ServiceNow syntax)
limit: Maximum number of results to return
offset: Number of records to skip
fields: List of fields to return (or all fields if None)
ctx: Optional context object for progress reporting
Returns:
JSON response containing query results
"""
if ctx:
await ctx.info(f"Querying {table} with: {query}")
options = QueryOptions(
limit=limit,
offset=offset,
fields=fields,
query=query
)
result = await self.client.get_records(table, options)
return json.dumps(result, indent=2)
async def add_comment(self,
number: str,
comment: str,
ctx: Context = None) -> str:
"""
Add a comment to an incident (customer visible)
Args:
number: Incident number
comment: Comment to add
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Adding comment to incident: {number}")
incident = await self.client.get_incident_by_number(number)
if not incident:
error_message = f"Incident {number} not found"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
sys_id = incident['sys_id']
# Add the comment
update = {"comments": comment}
result = await self.client.update_record("incident", sys_id, update)
return json.dumps(result, indent=2)
async def add_work_notes(self,
number: str,
work_notes: str,
ctx: Context = None) -> str:
"""
Add work notes to an incident (internal)
Args:
number: Incident number
work_notes: Work notes to add
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Adding work notes to incident: {number}")
incident = await self.client.get_incident_by_number(number)
if not incident:
error_message = f"Incident {number} not found"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
sys_id = incident['sys_id']
# Add the work notes
update = {"work_notes": work_notes}
result = await self.client.update_record("incident", sys_id, update)
return json.dumps(result, indent=2)
# Prompt templates
def incident_analysis_prompt(self, incident_number: str) -> str:
"""Create a prompt to analyze a ServiceNow incident
Args:
incident_number: The incident number to analyze (e.g., INC0010001)
Returns:
Prompt text for analyzing the incident
"""
return f"""
Please analyze the following ServiceNow incident {incident_number}.
First, call the appropriate tool to fetch the incident details using get_incident.
Then, provide a comprehensive analysis with the following sections:
1. Summary: A brief overview of the incident
2. Impact Assessment: Analysis of the impact based on the severity, priority, and affected users
3. Root Cause Analysis: Potential causes based on available information
4. Resolution Recommendations: Suggested next steps to resolve the incident
5. SLA Status: Whether the incident is at risk of breaching SLAs
Use a professional and clear tone appropriate for IT service management.
"""
def create_incident_prompt(self) -> str:
"""Create a prompt for incident creation guidance
Returns:
Prompt text for helping users create an incident
"""
return """
I'll help you create a new ServiceNow incident. Please provide the following information:
1. Short Description: A brief title for the incident (required)
2. Detailed Description: A thorough explanation of the issue (required)
3. Caller: The person reporting the issue (optional)
4. Category and Subcategory: The type of issue (optional)
5. Impact (1-High, 2-Medium, 3-Low): How broadly this affects users (optional)
6. Urgency (1-High, 2-Medium, 3-Low): How time-sensitive this issue is (optional)
After collecting this information, I'll use the create_incident tool to submit the incident to ServiceNow.
"""
# Factory functions for creating authentication objects
def create_basic_auth(username: str, password: str) -> BasicAuth:
"""Create BasicAuth object for ServiceNow authentication"""
return BasicAuth(username, password)
def create_token_auth(token: str) -> TokenAuth:
"""Create TokenAuth object for ServiceNow authentication"""
return TokenAuth(token)
def create_oauth_auth(client_id: str, client_secret: str,
username: str, password: str,
instance_url: str) -> OAuthAuth:
"""Create OAuthAuth object for ServiceNow authentication"""
return OAuthAuth(client_id, client_secret, username, password, instance_url)
# Main function for running the server from the command line
def main():
"""Run the ServiceNow MCP server from the command line"""
import argparse
import sys
parser = argparse.ArgumentParser(description="ServiceNow MCP Server")
parser.add_argument("--url", help="ServiceNow instance URL", default=os.environ.get("SERVICENOW_INSTANCE_URL"))
parser.add_argument("--transport", help="Transport protocol (stdio or sse)", default="stdio", choices=["stdio", "sse"])
# Authentication options
auth_group = parser.add_argument_group("Authentication")
auth_group.add_argument("--username", help="ServiceNow username", default=os.environ.get("SERVICENOW_USERNAME"))
auth_group.add_argument("--password", help="ServiceNow password", default=os.environ.get("SERVICENOW_PASSWORD"))
auth_group.add_argument("--token", help="ServiceNow token", default=os.environ.get("SERVICENOW_TOKEN"))
auth_group.add_argument("--client-id", help="OAuth client ID", default=os.environ.get("SERVICENOW_CLIENT_ID"))
auth_group.add_argument("--client-secret", help="OAuth client secret", default=os.environ.get("SERVICENOW_CLIENT_SECRET"))
args = parser.parse_args()
# Check required parameters
if not args.url:
print("Error: ServiceNow instance URL is required")
print("Set SERVICENOW_INSTANCE_URL environment variable or use --url")
sys.exit(1)
# Determine authentication method
auth = None
if args.token:
auth = create_token_auth(args.token)
elif args.client_id and args.client_secret and args.username and args.password:
auth = create_oauth_auth(args.client_id, args.client_secret, args.username, args.password, args.url)
elif args.username and args.password:
auth = create_basic_auth(args.username, args.password)
else:
print("Error: Authentication credentials required")
print("Either provide username/password, token, or OAuth credentials")
sys.exit(1)
# Create and run the server
server = ServiceNowMCP(instance_url=args.url, auth=auth)
server.run(transport=args.transport)
# Entry point
if __name__ == "__main__":
main()