ServiceNow MCP Server
- mcp_server_servicenow
"""
ServiceNow MCP Server
This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
It allows AI agents to access and manipulate ServiceNow data through a secure API.
"""
import os
import json
import asyncio
import logging
import re
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Any, Union, Literal, Tuple
import requests
import httpx
from pydantic import BaseModel, Field, field_validator
from mcp_server_servicenow.nlp import NLPProcessor
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.fastmcp.utilities.logging import get_logger
logger = get_logger(__name__)
# ServiceNow API models
class IncidentState(int, Enum):
NEW = 1
IN_PROGRESS = 2
ON_HOLD = 3
RESOLVED = 6
CLOSED = 7
CANCELED = 8
class IncidentPriority(int, Enum):
CRITICAL = 1
HIGH = 2
MODERATE = 3
LOW = 4
PLANNING = 5
class IncidentUrgency(int, Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
class IncidentImpact(int, Enum):
HIGH = 1
MEDIUM = 2
LOW = 3
class IncidentCreate(BaseModel):
"""Model for creating a new incident"""
short_description: str = Field(..., description="A brief description of the incident")
description: str = Field(..., description="A detailed description of the incident")
caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
category: Optional[str] = Field(None, description="The incident category")
subcategory: Optional[str] = Field(None, description="The incident subcategory")
urgency: Optional[IncidentUrgency] = Field(IncidentUrgency.MEDIUM, description="The urgency of the incident")
impact: Optional[IncidentImpact] = Field(IncidentImpact.MEDIUM, description="The impact of the incident")
assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
class IncidentUpdate(BaseModel):
"""Model for updating an existing incident"""
short_description: Optional[str] = Field(None, description="A brief description of the incident")
description: Optional[str] = Field(None, description="A detailed description of the incident")
caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
category: Optional[str] = Field(None, description="The incident category")
subcategory: Optional[str] = Field(None, description="The incident subcategory")
urgency: Optional[IncidentUrgency] = Field(None, description="The urgency of the incident")
impact: Optional[IncidentImpact] = Field(None, description="The impact of the incident")
state: Optional[IncidentState] = Field(None, description="The state of the incident")
assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
work_notes: Optional[str] = Field(None, description="Work notes to add to the incident (internal)")
comments: Optional[str] = Field(None, description="Customer visible comments to add to the incident")
@field_validator('work_notes', 'comments')
@classmethod
def validate_not_empty(cls, v):
if v is not None and v.strip() == '':
raise ValueError("Cannot be an empty string")
return v
class Config:
use_enum_values = True
class QueryOptions(BaseModel):
"""Options for querying ServiceNow records"""
limit: int = Field(10, description="Maximum number of records to return", ge=1, le=1000)
offset: int = Field(0, description="Number of records to skip", ge=0)
fields: Optional[List[str]] = Field(None, description="List of fields to return")
query: Optional[str] = Field(None, description="ServiceNow encoded query string")
order_by: Optional[str] = Field(None, description="Field to order results by")
order_direction: Optional[Literal["asc", "desc"]] = Field("desc", description="Order direction")
class Authentication:
"""Base class for ServiceNow authentication methods"""
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
raise NotImplementedError("Subclasses must implement this method")
class BasicAuth(Authentication):
"""Basic authentication for ServiceNow"""
def __init__(self, username: str, password: str):
self.username = username
self.password = password
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
return {}
def get_auth(self) -> tuple:
"""Get authentication tuple for requests"""
return (self.username, self.password)
class TokenAuth(Authentication):
"""Token authentication for ServiceNow"""
def __init__(self, token: str):
self.token = token
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
return {"Authorization": f"Bearer {self.token}"}
def get_auth(self) -> None:
"""Get authentication tuple for requests"""
return None
class OAuthAuth(Authentication):
"""OAuth authentication for ServiceNow"""
def __init__(self, client_id: str, client_secret: str, username: str, password: str,
instance_url: str, token: Optional[str] = None, refresh_token: Optional[str] = None,
token_expiry: Optional[datetime] = None):
self.client_id = client_id
self.client_secret = client_secret
self.username = username
self.password = password
self.instance_url = instance_url
self.token = token
self.refresh_token = refresh_token
self.token_expiry = token_expiry
async def get_headers(self) -> Dict[str, str]:
"""Get authentication headers for ServiceNow API requests"""
if self.token is None or (self.token_expiry and datetime.now() > self.token_expiry):
await self.refresh()
return {"Authorization": f"Bearer {self.token}"}
def get_auth(self) -> None:
"""Get authentication tuple for requests"""
return None
async def refresh(self):
"""Refresh the OAuth token"""
if self.refresh_token:
# Try refresh flow first
data = {
"grant_type": "refresh_token",
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token
}
else:
# Fall back to password flow
data = {
"grant_type": "password",
"client_id": self.client_id,
"client_secret": self.client_secret,
"username": self.username,
"password": self.password
}
token_url = f"{self.instance_url}/oauth_token.do"
async with httpx.AsyncClient() as client:
response = await client.post(token_url, data=data)
response.raise_for_status()
result = response.json()
self.token = result["access_token"]
self.refresh_token = result.get("refresh_token")
expires_in = result.get("expires_in", 1800) # Default 30 minutes
self.token_expiry = datetime.now().timestamp() + expires_in
class ServiceNowClient:
"""Client for interacting with ServiceNow API"""
def __init__(self, instance_url: str, auth: Authentication):
self.instance_url = instance_url.rstrip('/')
self.auth = auth
self.client = httpx.AsyncClient()
async def close(self):
"""Close the HTTP client"""
await self.client.aclose()
async def request(self, method: str, path: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Make a request to the ServiceNow API"""
url = f"{self.instance_url}{path}"
headers = await self.auth.get_headers()
headers["Accept"] = "application/json"
if isinstance(self.auth, BasicAuth):
auth = self.auth.get_auth()
else:
auth = None
try:
response = await self.client.request(
method=method,
url=url,
params=params,
json=json_data,
headers=headers,
auth=auth
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"ServiceNow API error: {e.response.text}")
raise
async def get_record(self, table: str, sys_id: str) -> Dict[str, Any]:
"""Get a record by sys_id"""
return await self.request("GET", f"/api/now/table/{table}/{sys_id}")
async def get_records(self, table: str, options: QueryOptions = None) -> Dict[str, Any]:
"""Get records with query options"""
if options is None:
options = QueryOptions()
params = {
"sysparm_limit": options.limit,
"sysparm_offset": options.offset
}
if options.fields:
params["sysparm_fields"] = ",".join(options.fields)
if options.query:
params["sysparm_query"] = options.query
if options.order_by:
direction = "desc" if options.order_direction == "desc" else "asc"
params["sysparm_order_by"] = f"{options.order_by}^{direction}"
return await self.request("GET", f"/api/now/table/{table}", params=params)
async def create_record(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new record"""
return await self.request("POST", f"/api/now/table/{table}", json_data=data)
async def update_record(self, table: str, sys_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing record"""
return await self.request("PUT", f"/api/now/table/{table}/{sys_id}", json_data=data)
async def delete_record(self, table: str, sys_id: str) -> Dict[str, Any]:
"""Delete a record"""
return await self.request("DELETE", f"/api/now/table/{table}/{sys_id}")
async def get_incident_by_number(self, number: str) -> Dict[str, Any]:
"""Get an incident by its number"""
result = await self.request("GET", f"/api/now/table/incident",
params={"sysparm_query": f"number={number}", "sysparm_limit": 1})
if result.get("result") and len(result["result"]) > 0:
return result["result"][0]
return None
async def search(self, query: str, table: str = "incident", limit: int = 10) -> Dict[str, Any]:
"""Search for records using text query"""
return await self.request("GET", f"/api/now/table/{table}",
params={"sysparm_query": f"123TEXTQUERY321={query}", "sysparm_limit": limit})
async def get_available_tables(self) -> List[str]:
"""Get a list of available tables"""
result = await self.request("GET", "/api/now/table/sys_db_object",
params={"sysparm_fields": "name,label", "sysparm_limit": 100})
return result.get("result", [])
async def get_table_schema(self, table: str) -> Dict[str, Any]:
"""Get the schema for a table"""
result = await self.request("GET", f"/api/now/ui/meta/{table}")
return result
class ScriptUpdateModel(BaseModel):
"""Model for updating a ServiceNow script"""
name: str = Field(..., description="The name of the script")
script: str = Field(..., description="The script content")
type: str = Field(..., description="The type of script (e.g., sys_script_include)")
description: Optional[str] = Field(None, description="Description of the script")
class ServiceNowMCP:
"""ServiceNow MCP Server"""
def __init__(self,
instance_url: str,
auth: Authentication,
name: str = "ServiceNow MCP"):
self.client = ServiceNowClient(instance_url, auth)
self.mcp = FastMCP(name, dependencies=[
"requests",
"httpx",
"pydantic"
])
# Register resources
self.mcp.resource("servicenow://incidents")(self.list_incidents)
self.mcp.resource("servicenow://incidents/{number}")(self.get_incident)
self.mcp.resource("servicenow://users")(self.list_users)
self.mcp.resource("servicenow://knowledge")(self.list_knowledge)
self.mcp.resource("servicenow://tables")(self.get_tables)
self.mcp.resource("servicenow://tables/{table}")(self.get_table_records)
self.mcp.resource("servicenow://schema/{table}")(self.get_table_schema)
# Register tools
self.mcp.tool(name="create_incident")(self.create_incident)
self.mcp.tool(name="update_incident")(self.update_incident)
self.mcp.tool(name="search_records")(self.search_records)
self.mcp.tool(name="get_record")(self.get_record)
self.mcp.tool(name="perform_query")(self.perform_query)
self.mcp.tool(name="add_comment")(self.add_comment)
self.mcp.tool(name="add_work_notes")(self.add_work_notes)
# Register natural language tools
self.mcp.tool(name="natural_language_search")(self.natural_language_search)
self.mcp.tool(name="natural_language_update")(self.natural_language_update)
self.mcp.tool(name="update_script")(self.update_script)
# Register prompts
self.mcp.prompt(name="analyze_incident")(self.incident_analysis_prompt)
self.mcp.prompt(name="create_incident_prompt")(self.create_incident_prompt)
async def close(self):
"""Close the ServiceNow client"""
await self.client.close()
def run(self, transport: str = "stdio"):
"""Run the ServiceNow MCP server"""
try:
self.mcp.run(transport=transport)
finally:
asyncio.run(self.close())
# Resource handlers
async def list_incidents(self) -> str:
"""List recent incidents in ServiceNow"""
options = QueryOptions(limit=10)
result = await self.client.get_records("incident", options)
return json.dumps(result, indent=2)
async def get_incident(self, number: str) -> str:
"""Get a specific incident by number"""
incident = await self.client.get_incident_by_number(number)
if incident:
return json.dumps({"result": incident}, indent=2)
return json.dumps({"result": "Incident not found"})
async def list_users(self) -> str:
"""List users in ServiceNow"""
options = QueryOptions(limit=10)
result = await self.client.get_records("sys_user", options)
return json.dumps(result, indent=2)
async def list_knowledge(self) -> str:
"""List knowledge articles in ServiceNow"""
options = QueryOptions(limit=10)
result = await self.client.get_records("kb_knowledge", options)
return json.dumps(result, indent=2)
async def get_tables(self) -> str:
"""Get a list of available tables"""
result = await self.client.get_available_tables()
return json.dumps({"result": result}, indent=2)
async def get_table_records(self, table: str) -> str:
"""Get records from a specific table"""
options = QueryOptions(limit=10)
result = await self.client.get_records(table, options)
return json.dumps(result, indent=2)
async def get_table_schema(self, table: str) -> str:
"""Get the schema for a table"""
result = await self.client.get_table_schema(table)
return json.dumps(result, indent=2)
# Tool handlers
async def create_incident(self,
incident: IncidentCreate,
ctx: Context = None) -> str:
"""
Create a new incident in ServiceNow
Args:
incident: The incident details to create
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Creating incident: {incident.short_description}")
data = incident.dict(exclude_none=True)
result = await self.client.create_record("incident", data)
if ctx:
await ctx.info(f"Created incident: {result['result']['number']}")
return json.dumps(result, indent=2)
async def update_incident(self,
number: str,
updates: IncidentUpdate,
ctx: Context = None) -> str:
"""
Update an existing incident in ServiceNow
Args:
number: The incident number (INC0010001)
updates: The fields to update
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
# First, get the sys_id for the incident number
if ctx:
await ctx.info(f"Looking up incident: {number}")
incident = await self.client.get_incident_by_number(number)
if not incident:
error_message = f"Incident {number} not found"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
sys_id = incident['sys_id']
# Now update the incident
if ctx:
await ctx.info(f"Updating incident: {number}")
data = updates.dict(exclude_none=True)
result = await self.client.update_record("incident", sys_id, data)
return json.dumps(result, indent=2)
async def search_records(self,
query: str,
table: str = "incident",
limit: int = 10,
ctx: Context = None) -> str:
"""
Search for records in ServiceNow using text query
Args:
query: Text to search for
table: Table to search in
limit: Maximum number of results to return
ctx: Optional context object for progress reporting
Returns:
JSON response containing matching records
"""
if ctx:
await ctx.info(f"Searching {table} for: {query}")
result = await self.client.search(query, table, limit)
return json.dumps(result, indent=2)
async def get_record(self,
table: str,
sys_id: str,
ctx: Context = None) -> str:
"""
Get a specific record by sys_id
Args:
table: Table to query
sys_id: System ID of the record
ctx: Optional context object for progress reporting
Returns:
JSON response containing the record
"""
if ctx:
await ctx.info(f"Getting {table} record: {sys_id}")
result = await self.client.get_record(table, sys_id)
return json.dumps(result, indent=2)
async def perform_query(self,
table: str,
query: str = "",
limit: int = 10,
offset: int = 0,
fields: Optional[List[str]] = None,
ctx: Context = None) -> str:
"""
Perform a query against ServiceNow
Args:
table: Table to query
query: Encoded query string (ServiceNow syntax)
limit: Maximum number of results to return
offset: Number of records to skip
fields: List of fields to return (or all fields if None)
ctx: Optional context object for progress reporting
Returns:
JSON response containing query results
"""
if ctx:
await ctx.info(f"Querying {table} with: {query}")
options = QueryOptions(
limit=limit,
offset=offset,
fields=fields,
query=query
)
result = await self.client.get_records(table, options)
return json.dumps(result, indent=2)
async def add_comment(self,
number: str,
comment: str,
ctx: Context = None) -> str:
"""
Add a comment to an incident (customer visible)
Args:
number: Incident number
comment: Comment to add
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Adding comment to incident: {number}")
incident = await self.client.get_incident_by_number(number)
if not incident:
error_message = f"Incident {number} not found"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
sys_id = incident['sys_id']
# Add the comment
update = {"comments": comment}
result = await self.client.update_record("incident", sys_id, update)
return json.dumps(result, indent=2)
async def add_work_notes(self,
number: str,
work_notes: str,
ctx: Context = None) -> str:
"""
Add work notes to an incident (internal)
Args:
number: Incident number
work_notes: Work notes to add
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Adding work notes to incident: {number}")
incident = await self.client.get_incident_by_number(number)
if not incident:
error_message = f"Incident {number} not found"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
sys_id = incident['sys_id']
# Add the work notes
update = {"work_notes": work_notes}
result = await self.client.update_record("incident", sys_id, update)
return json.dumps(result, indent=2)
# Natural language tools
async def natural_language_search(self,
query: str,
ctx: Context = None) -> str:
"""
Search for records using natural language
Examples:
- "find all incidents about SAP"
- "search for incidents related to email"
- "show me all incidents with high priority"
Args:
query: Natural language query
ctx: Optional context object for progress reporting
Returns:
JSON response containing matching records
"""
if ctx:
await ctx.info(f"Processing natural language query: {query}")
# Parse the query
search_params = NLPProcessor.parse_search_query(query)
if ctx:
await ctx.info(f"Searching {search_params['table']} with query: {search_params['query']}")
# Perform the search
options = QueryOptions(
limit=search_params['limit'],
query=search_params['query']
)
result = await self.client.get_records(search_params['table'], options)
return json.dumps(result, indent=2)
async def natural_language_update(self,
command: str,
ctx: Context = None) -> str:
"""
Update a record using natural language
Examples:
- "Update incident INC0010001 saying I'm working on it"
- "Set incident INC0010002 to in progress"
- "Close incident INC0010003 with resolution: fixed the issue"
Args:
command: Natural language update command
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Processing natural language update: {command}")
try:
# Parse the command
record_number, updates = NLPProcessor.parse_update_command(command)
if ctx:
await ctx.info(f"Updating {record_number} with: {updates}")
# Get the record
if record_number.startswith("INC"):
incident = await self.client.get_incident_by_number(record_number)
if not incident:
error_message = f"Incident {record_number} not found"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
sys_id = incident['sys_id']
table = "incident"
else:
# Handle other record types if needed
error_message = f"Record type not supported: {record_number}"
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
# Update the record
result = await self.client.update_record(table, sys_id, updates)
return json.dumps(result, indent=2)
except ValueError as e:
error_message = str(e)
if ctx:
await ctx.error(error_message)
return json.dumps({"error": error_message})
async def update_script(self,
script_update: ScriptUpdateModel,
ctx: Context = None) -> str:
"""
Update a ServiceNow script
Args:
script_update: The script update details
ctx: Optional context object for progress reporting
Returns:
JSON response from ServiceNow
"""
if ctx:
await ctx.info(f"Updating script: {script_update.name}")
# Search for the script by name
table = script_update.type
query = f"name={script_update.name}"
options = QueryOptions(
limit=1,
query=query
)
result = await self.client.get_records(table, options)
if not result.get("result") or len(result["result"]) == 0:
# Script doesn't exist, create it
if ctx:
await ctx.info(f"Script not found, creating new script: {script_update.name}")
data = {
"name": script_update.name,
"script": script_update.script
}
if script_update.description:
data["description"] = script_update.description
result = await self.client.create_record(table, data)
else:
# Script exists, update it
script = result["result"][0]
sys_id = script["sys_id"]
if ctx:
await ctx.info(f"Updating existing script: {script_update.name} ({sys_id})")
data = {
"script": script_update.script
}
if script_update.description:
data["description"] = script_update.description
result = await self.client.update_record(table, sys_id, data)
return json.dumps(result, indent=2)
# Prompt templates
def incident_analysis_prompt(self, incident_number: str) -> str:
"""Create a prompt to analyze a ServiceNow incident
Args:
incident_number: The incident number to analyze (e.g., INC0010001)
Returns:
Prompt text for analyzing the incident
"""
return f"""
Please analyze the following ServiceNow incident {incident_number}.
First, call the appropriate tool to fetch the incident details using get_incident.
Then, provide a comprehensive analysis with the following sections:
1. Summary: A brief overview of the incident
2. Impact Assessment: Analysis of the impact based on the severity, priority, and affected users
3. Root Cause Analysis: Potential causes based on available information
4. Resolution Recommendations: Suggested next steps to resolve the incident
5. SLA Status: Whether the incident is at risk of breaching SLAs
Use a professional and clear tone appropriate for IT service management.
"""
def create_incident_prompt(self) -> str:
"""Create a prompt for incident creation guidance
Returns:
Prompt text for helping users create an incident
"""
return """
I'll help you create a new ServiceNow incident. Please provide the following information:
1. Short Description: A brief title for the incident (required)
2. Detailed Description: A thorough explanation of the issue (required)
3. Caller: The person reporting the issue (optional)
4. Category and Subcategory: The type of issue (optional)
5. Impact (1-High, 2-Medium, 3-Low): How broadly this affects users (optional)
6. Urgency (1-High, 2-Medium, 3-Low): How time-sensitive this issue is (optional)
After collecting this information, I'll use the create_incident tool to submit the incident to ServiceNow.
"""
# Factory functions for creating authentication objects
def create_basic_auth(username: str, password: str) -> BasicAuth:
"""Create BasicAuth object for ServiceNow authentication"""
return BasicAuth(username, password)
def create_token_auth(token: str) -> TokenAuth:
"""Create TokenAuth object for ServiceNow authentication"""
return TokenAuth(token)
def create_oauth_auth(client_id: str, client_secret: str,
username: str, password: str,
instance_url: str) -> OAuthAuth:
"""Create OAuthAuth object for ServiceNow authentication"""
return OAuthAuth(client_id, client_secret, username, password, instance_url)