Skip to main content
Glama

Sumanshu Arora

server.pyβ€’29.2 kB
#!/usr/bin/env python3 """ Zendesk MCP Server - Comprehensive Implementation A comprehensive Zendesk MCP server that provides: 1. Complete ticket management (CRUD operations) 2. User and organization management 3. Knowledge base article access 4. Analytics and reporting capabilities 5. Comment and note management 6. Rate limiting and caching for optimal performance This server uses FastMCP for modern MCP protocol implementation. """ import asyncio import json import logging import os import sys import time from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from urllib.parse import urlencode import aiohttp from fastmcp import FastMCP logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: from .config import ZendeskServerConfig except ImportError: try: from config import ZendeskServerConfig except ImportError: # Fallback for Docker or direct script execution sys.path.append(os.path.dirname(__file__)) from config import ZendeskServerConfig @dataclass class CacheEntry: """Cache entry with expiration.""" data: Any expires_at: float class RateLimiter: """Simple rate limiter for API requests.""" def __init__(self, requests_per_minute: int = 200): self.requests_per_minute = requests_per_minute self.requests = [] async def wait_if_needed(self): """Wait if we're hitting rate limits.""" now = time.time() # Remove requests older than 1 minute self.requests = [req_time for req_time in self.requests if now - req_time < 60] if len(self.requests) >= self.requests_per_minute: # Calculate how long to wait oldest_request = min(self.requests) wait_time = 60 - (now - oldest_request) if wait_time > 0: logger.info(f"Rate limit reached, waiting {wait_time:.2f} seconds") await asyncio.sleep(wait_time) self.requests.append(now) class ZendeskMCPServer: """ Comprehensive Zendesk MCP Server implementation using FastMCP. This server provides full Zendesk integration capabilities including: - Ticket management (create, read, update, search) - User and organization management - Knowledge base access - Comments and notes - Analytics and reporting - Rate limiting and caching """ def __init__(self, config_dict: dict = None): """Initialize the Zendesk MCP Server with configuration.""" self.config = ZendeskServerConfig(config_dict=config_dict or {}) # Standard configuration data from config_schema self.config_data = self.config.get_template_config() # Full template data (potentially modified by double underscore notation) self.template_data = self.config.get_template_data() self.logger = self.config.logger # Initialize HTTP session self.session = None # Initialize rate limiter rate_config = self.config.get_rate_limit_config() self.rate_limiter = RateLimiter(rate_config["requests_per_minute"]) self.timeout = rate_config["timeout_seconds"] # Initialize cache cache_config = self.config.get_cache_config() self.cache_enabled = cache_config["enabled"] self.cache_ttl = cache_config["ttl_seconds"] self.cache = {} # Zendesk configuration self.base_url = self.config.get_zendesk_url() self.api_url = f"{self.base_url}/api/v2" self.auth_headers = self.config.get_auth_headers() self.default_ticket_config = self.config.get_default_ticket_config() self.mcp = FastMCP( name=self.template_data.get("name", "zendesk-server"), instructions="Comprehensive Zendesk integration server", version=self.template_data.get("version", "1.0.0"), host=os.getenv("MCP_HOST", "0.0.0.0"), port=( int( os.getenv( "MCP_PORT", self.template_data.get("transport", {}).get("port", 7071), ) ) if not os.getenv("MCP_TRANSPORT") == "stdio" else None ), ) logger.info( "Zendesk MCP server %s created for %s", self.mcp.name, self.base_url ) self.register_tools() async def __aenter__(self): """Async context manager entry.""" self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout), headers=self.auth_headers ) return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" if self.session: await self.session.close() def register_tools(self): """Register all tools with the MCP server.""" # Ticket management tools self.mcp.tool(self.create_ticket, tags=["tickets", "create"]) self.mcp.tool(self.get_ticket, tags=["tickets", "read"]) self.mcp.tool(self.update_ticket, tags=["tickets", "update"]) self.mcp.tool(self.search_tickets, tags=["tickets", "search"]) self.mcp.tool(self.add_ticket_comment, tags=["tickets", "comments"]) # User management tools self.mcp.tool(self.create_user, tags=["users", "create"]) self.mcp.tool(self.get_user, tags=["users", "read"]) self.mcp.tool(self.search_users, tags=["users", "search"]) # Knowledge base tools self.mcp.tool(self.search_articles, tags=["knowledge", "search"]) self.mcp.tool(self.get_article, tags=["knowledge", "read"]) # Analytics and reporting tools self.mcp.tool(self.get_ticket_metrics, tags=["analytics", "metrics"]) # Organization tools self.mcp.tool(self.list_organizations, tags=["organizations", "list"]) logger.info("All tools registered with Zendesk MCP server") def _get_cache_key(self, method: str, url: str, params: dict = None) -> str: """Generate cache key for request.""" key_parts = [method, url] if params: key_parts.append(urlencode(sorted(params.items()))) return "|".join(key_parts) def _get_cached_data(self, cache_key: str) -> Optional[Any]: """Get data from cache if available and not expired.""" if not self.cache_enabled: return None entry = self.cache.get(cache_key) if entry and time.time() < entry.expires_at: self.logger.debug(f"Cache hit for {cache_key}") return entry.data elif entry: # Expired entry del self.cache[cache_key] return None def _cache_data(self, cache_key: str, data: Any) -> None: """Cache data with expiration.""" if self.cache_enabled: expires_at = time.time() + self.cache_ttl self.cache[cache_key] = CacheEntry(data=data, expires_at=expires_at) self.logger.debug(f"Cached data for {cache_key}") async def _make_request( self, method: str, endpoint: str, params: dict = None, data: dict = None, use_cache: bool = True, ) -> Dict[str, Any]: """Make authenticated request to Zendesk API with rate limiting and caching.""" if not self.session: raise RuntimeError( "HTTP session not initialized. Use 'async with' context manager." ) url = f"{self.api_url}/{endpoint.lstrip('/')}" # Check cache for GET requests cache_key = None if method.upper() == "GET" and use_cache: cache_key = self._get_cache_key(method, url, params) cached_data = self._get_cached_data(cache_key) if cached_data: return cached_data # Apply rate limiting await self.rate_limiter.wait_if_needed() try: self.logger.debug(f"Making {method} request to {url}") kwargs = {} if params: kwargs["params"] = params if data: kwargs["json"] = data async with self.session.request(method, url, **kwargs) as response: response_text = await response.text() if response.status == 429: # Rate limited, wait and retry once retry_after = int(response.headers.get("Retry-After", 60)) self.logger.warning(f"Rate limited, waiting {retry_after} seconds") await asyncio.sleep(retry_after) async with self.session.request( method, url, **kwargs ) as retry_response: retry_text = await retry_response.text() retry_response.raise_for_status() result = json.loads(retry_text) elif response.status >= 400: self.logger.error(f"API error {response.status}: {response_text}") response.raise_for_status() else: result = json.loads(response_text) # Cache successful GET requests if method.upper() == "GET" and use_cache and cache_key: self._cache_data(cache_key, result) return result except aiohttp.ClientError as e: self.logger.error(f"HTTP request failed: {e}") raise except json.JSONDecodeError as e: self.logger.error(f"Failed to parse JSON response: {e}") raise # Ticket Management Tools async def create_ticket( self, subject: str, description: str, requester_email: Optional[str] = None, priority: Optional[str] = None, type: Optional[str] = None, tags: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Create a new support ticket in Zendesk. Args: subject: Subject/title of the ticket description: Initial description or comment for the ticket requester_email: Email address of the person requesting support priority: Priority level (low, normal, high, urgent) type: Type of ticket (incident, problem, question, task) tags: List of tags to associate with the ticket Returns: Dictionary containing the created ticket information """ self.logger.info(f"Creating ticket: {subject}") ticket_data = { "subject": subject, "comment": {"body": description}, "priority": priority or self.default_ticket_config["priority"], "type": type or self.default_ticket_config["type"], } if requester_email: ticket_data["requester"] = {"email": requester_email} if tags: ticket_data["tags"] = tags result = await self._make_request( "POST", "tickets.json", data={"ticket": ticket_data}, use_cache=False ) ticket = result.get("ticket", {}) self.logger.info(f"Created ticket #{ticket.get('id')}") return { "ticket_id": ticket.get("id"), "url": ticket.get("url"), "status": ticket.get("status"), "priority": ticket.get("priority"), "created_at": ticket.get("created_at"), "ticket": ticket, } async def get_ticket( self, ticket_id: int, include_comments: bool = True ) -> Dict[str, Any]: """ Retrieve detailed information about a specific ticket. Args: ticket_id: ID of the ticket to retrieve include_comments: Whether to include ticket comments Returns: Dictionary containing ticket information and optionally comments """ self.logger.info(f"Retrieving ticket #{ticket_id}") # Get ticket details result = await self._make_request("GET", f"tickets/{ticket_id}.json") ticket = result.get("ticket", {}) response = { "ticket": ticket, "ticket_id": ticket.get("id"), "subject": ticket.get("subject"), "status": ticket.get("status"), "priority": ticket.get("priority"), "requester_id": ticket.get("requester_id"), "assignee_id": ticket.get("assignee_id"), "created_at": ticket.get("created_at"), "updated_at": ticket.get("updated_at"), } # Get comments if requested if include_comments: try: comments_result = await self._make_request( "GET", f"tickets/{ticket_id}/comments.json" ) response["comments"] = comments_result.get("comments", []) response["comment_count"] = len(response["comments"]) except Exception as e: self.logger.warning( f"Failed to retrieve comments for ticket #{ticket_id}: {e}" ) response["comments"] = [] response["comment_count"] = 0 return response async def update_ticket( self, ticket_id: int, status: Optional[str] = None, priority: Optional[str] = None, assignee_id: Optional[int] = None, tags: Optional[List[str]] = None, ) -> Dict[str, Any]: """ Update an existing ticket's properties. Args: ticket_id: ID of the ticket to update status: New status (new, open, pending, hold, solved, closed) priority: New priority (low, normal, high, urgent) assignee_id: ID of the agent to assign the ticket to tags: Tags to add to the ticket Returns: Dictionary containing the updated ticket information """ self.logger.info(f"Updating ticket #{ticket_id}") update_data = {} if status: update_data["status"] = status if priority: update_data["priority"] = priority if assignee_id: update_data["assignee_id"] = assignee_id if tags: update_data["tags"] = tags if not update_data: raise ValueError("At least one field must be provided for update") result = await self._make_request( "PUT", f"tickets/{ticket_id}.json", data={"ticket": update_data}, use_cache=False, ) ticket = result.get("ticket", {}) self.logger.info(f"Updated ticket #{ticket_id}") return { "ticket_id": ticket.get("id"), "status": ticket.get("status"), "priority": ticket.get("priority"), "assignee_id": ticket.get("assignee_id"), "updated_at": ticket.get("updated_at"), "ticket": ticket, } async def search_tickets( self, query: Optional[str] = None, status: Optional[str] = None, priority: Optional[str] = None, requester_email: Optional[str] = None, created_after: Optional[str] = None, limit: int = 25, ) -> Dict[str, Any]: """ Search for tickets using various criteria. Args: query: Search query using Zendesk search syntax status: Filter by ticket status priority: Filter by priority requester_email: Filter by requester email created_after: Filter tickets created after this date (ISO format) limit: Maximum number of tickets to return Returns: Dictionary containing search results """ self.logger.info("Searching tickets") search_terms = [] if query: search_terms.append(query) if status: search_terms.append(f"status:{status}") if priority: search_terms.append(f"priority:{priority}") if requester_email: search_terms.append(f"requester:{requester_email}") if created_after: search_terms.append(f"created>{created_after}") # Add type filter to only get tickets search_terms.append("type:ticket") search_query = " ".join(search_terms) if search_terms else "type:ticket" params = { "query": search_query, "per_page": min(limit, 100), # Zendesk API limit } result = await self._make_request("GET", "search.json", params=params) tickets = result.get("results", []) return { "query": search_query, "total_count": result.get("count", 0), "tickets": tickets, "ticket_count": len(tickets), } async def add_ticket_comment( self, ticket_id: int, body: str, public: bool = True, author_id: Optional[int] = None, ) -> Dict[str, Any]: """ Add a comment to an existing ticket. Args: ticket_id: ID of the ticket to comment on body: Content of the comment public: Whether comment is public (visible to requester) or internal author_id: ID of the comment author Returns: Dictionary containing the comment information """ self.logger.info(f"Adding comment to ticket #{ticket_id}") comment_data = {"body": body, "public": public} if author_id: comment_data["author_id"] = author_id update_data = {"comment": comment_data} result = await self._make_request( "PUT", f"tickets/{ticket_id}.json", data={"ticket": update_data}, use_cache=False, ) # Get the latest comment from the response comments = result.get("audit", {}).get("events", []) comment_events = [e for e in comments if e.get("type") == "Comment"] latest_comment = comment_events[-1] if comment_events else {} return { "ticket_id": ticket_id, "comment_id": latest_comment.get("id"), "body": body, "public": public, "created_at": latest_comment.get("created_at"), "author_id": latest_comment.get("author_id"), } # User Management Tools async def create_user( self, name: str, email: str, role: str = "end-user", organization_id: Optional[int] = None, ) -> Dict[str, Any]: """ Create a new user in Zendesk. Args: name: Full name of the user email: Email address of the user role: Role for the user (end-user, agent, admin) organization_id: ID of organization to associate user with Returns: Dictionary containing the created user information """ self.logger.info(f"Creating user: {email}") user_data = {"name": name, "email": email, "role": role} if organization_id: user_data["organization_id"] = organization_id result = await self._make_request( "POST", "users.json", data={"user": user_data}, use_cache=False ) user = result.get("user", {}) self.logger.info(f"Created user #{user.get('id')}") return { "user_id": user.get("id"), "name": user.get("name"), "email": user.get("email"), "role": user.get("role"), "organization_id": user.get("organization_id"), "created_at": user.get("created_at"), "user": user, } async def get_user( self, user_id: Optional[int] = None, email: Optional[str] = None ) -> Dict[str, Any]: """ Retrieve information about a specific user. Args: user_id: ID of the user to retrieve email: Email address of the user to retrieve Returns: Dictionary containing user information """ if not user_id and not email: raise ValueError("Either user_id or email must be provided") if user_id: self.logger.info(f"Retrieving user #{user_id}") result = await self._make_request("GET", f"users/{user_id}.json") else: self.logger.info(f"Retrieving user by email: {email}") params = {"query": f"email:{email}"} result = await self._make_request("GET", "search.json", params=params) users = [ r for r in result.get("results", []) if r.get("result_type") == "user" ] if not users: raise ValueError(f"User not found with email: {email}") user_data = users[0] result = {"user": user_data} user = result.get("user", {}) return { "user_id": user.get("id"), "name": user.get("name"), "email": user.get("email"), "role": user.get("role"), "organization_id": user.get("organization_id"), "active": user.get("active"), "created_at": user.get("created_at"), "updated_at": user.get("updated_at"), "user": user, } async def search_users( self, query: Optional[str] = None, role: Optional[str] = None, organization_id: Optional[int] = None, ) -> Dict[str, Any]: """ Search for users in Zendesk. Args: query: Search query for users role: Filter by user role organization_id: Filter by organization ID Returns: Dictionary containing search results """ self.logger.info("Searching users") search_terms = ["type:user"] if query: search_terms.append(query) if role: search_terms.append(f"role:{role}") if organization_id: search_terms.append(f"organization_id:{organization_id}") search_query = " ".join(search_terms) params = {"query": search_query} result = await self._make_request("GET", "search.json", params=params) users = [r for r in result.get("results", []) if r.get("result_type") == "user"] return { "query": search_query, "total_count": result.get("count", 0), "users": users, "user_count": len(users), } # Knowledge Base Tools async def search_articles( self, query: str, locale: str = "en-us", section_id: Optional[int] = None ) -> Dict[str, Any]: """ Search knowledge base articles. Args: query: Search query for articles locale: Language locale for articles section_id: Filter by specific section ID Returns: Dictionary containing article search results """ self.logger.info(f"Searching articles: {query}") params = {"query": query, "locale": locale} if section_id: params["section"] = section_id result = await self._make_request( "GET", "help_center/articles/search.json", params=params ) articles = result.get("results", []) return { "query": query, "locale": locale, "section_id": section_id, "articles": articles, "article_count": len(articles), } async def get_article( self, article_id: int, locale: str = "en-us" ) -> Dict[str, Any]: """ Retrieve a specific knowledge base article. Args: article_id: ID of the article to retrieve locale: Language locale for the article Returns: Dictionary containing article information """ self.logger.info(f"Retrieving article #{article_id}") params = {"locale": locale} result = await self._make_request( "GET", f"help_center/articles/{article_id}.json", params=params ) article = result.get("article", {}) return { "article_id": article.get("id"), "title": article.get("title"), "body": article.get("body"), "section_id": article.get("section_id"), "locale": article.get("locale"), "created_at": article.get("created_at"), "updated_at": article.get("updated_at"), "article": article, } # Analytics and Reporting Tools async def get_ticket_metrics( self, start_date: Optional[str] = None, end_date: Optional[str] = None, group_by: str = "day", ) -> Dict[str, Any]: """ Get metrics and analytics for tickets. Args: start_date: Start date for metrics (ISO format) end_date: End date for metrics (ISO format) group_by: Group metrics by field (day, week, month, assignee, priority, status) Returns: Dictionary containing ticket metrics """ self.logger.info("Retrieving ticket metrics") # Set default date range if not provided if not end_date: end_date = datetime.now().isoformat() if not start_date: start_date = (datetime.now() - timedelta(days=30)).isoformat() # Build search query for date range search_query = f"type:ticket created>{start_date} created<{end_date}" params = {"query": search_query, "per_page": 100} result = await self._make_request("GET", "search.json", params=params) tickets = result.get("results", []) # Process tickets for metrics metrics = { "date_range": {"start_date": start_date, "end_date": end_date}, "total_tickets": len(tickets), "status_breakdown": {}, "priority_breakdown": {}, "type_breakdown": {}, "resolution_stats": {}, } # Calculate breakdowns for ticket in tickets: # Status breakdown status = ticket.get("status", "unknown") metrics["status_breakdown"][status] = ( metrics["status_breakdown"].get(status, 0) + 1 ) # Priority breakdown priority = ticket.get("priority", "unknown") metrics["priority_breakdown"][priority] = ( metrics["priority_breakdown"].get(priority, 0) + 1 ) # Type breakdown ticket_type = ticket.get("type", "unknown") metrics["type_breakdown"][ticket_type] = ( metrics["type_breakdown"].get(ticket_type, 0) + 1 ) # Calculate resolution statistics solved_tickets = [t for t in tickets if t.get("status") in ["solved", "closed"]] metrics["resolution_stats"] = { "solved_count": len(solved_tickets), "resolution_rate": ( len(solved_tickets) / len(tickets) * 100 if tickets else 0 ), "open_count": len( [ t for t in tickets if t.get("status") in ["new", "open", "pending", "hold"] ] ), } return metrics # Organization Tools async def list_organizations(self, query: Optional[str] = None) -> Dict[str, Any]: """ List organizations in Zendesk. Args: query: Search query for organizations Returns: Dictionary containing organization list """ self.logger.info("Listing organizations") if query: params = {"query": f"type:organization {query}"} result = await self._make_request("GET", "search.json", params=params) organizations = [ r for r in result.get("results", []) if r.get("result_type") == "organization" ] else: result = await self._make_request("GET", "organizations.json") organizations = result.get("organizations", []) return { "query": query, "organizations": organizations, "organization_count": len(organizations), } def run(self): """Run the MCP server with the configured transport and port.""" self.mcp.run( transport=os.getenv( "MCP_TRANSPORT", self.template_data.get("transport", {}).get("default", "http"), ), port=int( os.getenv( "MCP_PORT", self.template_data.get("transport", {}).get("port", 7072), ) ), log_level=self.config_data.get("log_level", "info"), ) # Create the server instance server = ZendeskMCPServer(config_dict={}) if __name__ == "__main__": server.run()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Data-Everything/mcp-server-templates'

If you have feedback or need assistance with the MCP directory API, please join our Discord server