Skip to main content
Glama
service.py29.1 kB
"""Service layer for Kanka API operations.""" # mypy: warn_return_any=False import logging import os from typing import Any from kanka import KankaClient from kanka.exceptions import KankaException from kanka.models import ( Character, Creature, Entity, Journal, Location, Note, Organisation, Quest, Race, Tag, ) from .converter import ContentConverter from .types import EntityType logger = logging.getLogger(__name__) class KankaService: """Service layer wrapping the python-kanka client.""" # Map entity types to their model classes ENTITY_TYPE_MAP = { "character": Character, "creature": Creature, "location": Location, "organization": Organisation, # Note: Kanka uses "organisation" "race": Race, "note": Note, "journal": Journal, "quest": Quest, } # Map entity types to their Kanka API endpoints API_ENDPOINT_MAP = { "character": "characters", "creature": "creatures", "location": "locations", "organization": "organisations", # API uses British spelling "race": "races", "note": "notes", "journal": "journals", "quest": "quests", } def __init__(self) -> None: """Initialize the service with Kanka client.""" token = os.getenv("KANKA_TOKEN") campaign_id = os.getenv("KANKA_CAMPAIGN_ID") if not token or not campaign_id: raise ValueError( "KANKA_TOKEN and KANKA_CAMPAIGN_ID environment variables are required" ) self.client = KankaClient(token=token, campaign_id=int(campaign_id)) self.converter = ContentConverter() self._tag_cache: dict[str, Tag] = {} def search_entities( self, query: str, entity_type: EntityType | None = None, limit: int = 100, ) -> list[dict[str, Any]]: """ Search for entities by name using list endpoints with filtering. This uses the list endpoints with name filtering instead of the search API, as they provide the same partial matching capability but with more control. Args: query: Search query (matches partial names) entity_type: Optional entity type filter limit: Maximum results Returns: List of minimal entity data """ try: entities = [] if entity_type: # Search specific entity type manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Use name filter to search - it does partial matching! results = manager.list(name=query, limit=limit) for entity in results: entities.append( { "entity_id": entity.entity_id, "name": entity.name, "entity_type": entity_type, } ) else: # Search across all entity types # We'll need to query each type separately remaining_limit = limit for our_type, manager_name in self.API_ENDPOINT_MAP.items(): if remaining_limit <= 0: break manager = getattr(self.client, manager_name) # Get up to remaining_limit results from this type type_limit = min(remaining_limit, 100) # API max is 100 try: results = manager.list(name=query, limit=type_limit) for entity in results: entities.append( { "entity_id": entity.entity_id, "name": entity.name, "entity_type": our_type, } ) remaining_limit -= len(results) except Exception as e: # Some entity types might not be available in the campaign logger.debug(f"Could not search {our_type}: {e}") continue return entities except KankaException as e: logger.error(f"Search failed: {e}") raise def list_entities( self, entity_type: EntityType, page: int = 1, limit: int = 100, last_sync: str | None = None, related: bool = False, ) -> list[Entity]: """ List entities of a specific type. Args: entity_type: Entity type to list page: Page number limit: Results per page (0 for all) last_sync: ISO 8601 timestamp to get only entities modified after this time related: Include related data (posts, attributes, etc.) Returns: List of entity objects """ try: manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Build filters filters = {} if last_sync: filters["lastSync"] = last_sync if limit == 0: # Get all results by paginating through all API pages # Use the proper pagination info from the SDK all_entities = [] current_page = 1 logger.debug( f"Starting pagination for {entity_type} with related={related}" ) while True: logger.debug(f"Fetching page {current_page}") try: batch = manager.list( page=current_page, related=related, **filters ) logger.debug( f"Page {current_page} returned {len(batch)} entities" ) # Add current page results all_entities.extend(batch) # Check if there's a next page using SDK pagination info if not manager.has_next_page: logger.debug("No more pages, stopping pagination") break current_page += 1 # Safety limit to prevent infinite loops if current_page > 50: logger.warning( f"Hit safety limit of 50 pages for {entity_type}" ) break except Exception as e: logger.error( f"Error fetching page {current_page} for {entity_type}: {e}" ) break logger.debug( f"Pagination complete for {entity_type}: {len(all_entities)} total entities" ) entities = all_entities else: # Get limited results (client-side limiting) # Fetch pages until we have enough entities all_entities = [] current_page = page # Start from requested page logger.debug( f"Fetching for client-side limit of {limit} {entity_type}s starting from page {page}" ) while len(all_entities) < limit: try: batch = manager.list( page=current_page, related=related, **filters ) all_entities.extend(batch) # Stop if no more pages or we have enough if not manager.has_next_page or len(all_entities) >= limit: break current_page += 1 # Safety limit if current_page > 50: logger.warning( f"Hit safety limit of 50 pages for {entity_type}" ) break except Exception as e: logger.error( f"Error fetching page {current_page} for {entity_type}: {e}" ) break # Apply client-side limit entities = all_entities[:limit] return list(entities) except KankaException as e: logger.error(f"List entities failed: {e}") raise def get_entity_by_id( self, entity_id: int, include_posts: bool = False ) -> dict[str, Any] | None: """ Get a specific entity by its entity_id. Args: entity_id: Entity ID include_posts: Whether to include posts Returns: Entity data with converted content """ try: # Use the direct entity endpoint found_entity = self.client.entity(entity_id) if not found_entity: # Entity not found return None # Get entity type - it's in the 'type' field entity_type = found_entity.get("type") # Map to our internal type our_type = None if entity_type == "character": our_type = "character" elif entity_type == "creature": our_type = "creature" elif entity_type == "location": our_type = "location" elif entity_type == "organisation": our_type = "organization" elif entity_type == "race": our_type = "race" elif entity_type == "note": our_type = "note" elif entity_type == "journal": our_type = "journal" elif entity_type == "quest": our_type = "quest" else: return None # The entity endpoint returns the data in 'child' field child_data = found_entity.get("child") if not child_data: return None # Get the type-specific ID type_id = child_data.get("id") if not type_id: return None # Now use the type-specific manager to get a proper entity object # This gives us consistent data format with datetime objects manager = getattr(self.client, self.API_ENDPOINT_MAP[our_type]) entity = manager.get(type_id) # Use _entity_to_dict to handle all conversions consistently result = self._entity_to_dict(entity, our_type) # Get posts if requested if include_posts: try: # Get the manager for this entity type manager = getattr(self.client, self.API_ENDPOINT_MAP[our_type]) # Use entity_id, not the type-specific id posts = manager.list_posts(entity_id, limit=100) result["posts"] = [self._post_to_dict(post) for post in posts] except Exception as e: logger.warning(f"Failed to get posts for entity {entity_id}: {e}") result["posts"] = [] return result except Exception as e: logger.error(f"Get entity failed for {entity_id}: {e}") return None def create_entity( self, entity_type: EntityType, name: str, type: str | None = None, entry: str | None = None, tags: list[str] | None = None, is_hidden: bool | None = None, is_completed: bool | None = None, image_uuid: str | None = None, header_uuid: str | None = None, ) -> dict[str, Any]: """ Create a new entity. Args: entity_type: Type of entity name: Entity name type: Entity subtype entry: Description in Markdown tags: List of tag names is_hidden: Whether entity should be hidden from players (admin-only) is_completed: Whether quest is completed (quests only) image_uuid: Image gallery UUID for entity image header_uuid: Image gallery UUID for entity header Returns: Created entity data """ try: manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Prepare data data: dict[str, Any] = {"name": name} if type is not None: data["type"] = type if entry is not None: # Convert markdown to HTML data["entry"] = self.converter.markdown_to_html(entry) # Set privacy based on is_hidden # For entities, use is_private (not visibility_id) if is_hidden is not None: data["is_private"] = is_hidden elif entity_type == "note": # Notes default to private data["is_private"] = True else: # Default to public data["is_private"] = False # Handle tags if tags: tag_ids = self._get_or_create_tag_ids(tags) data["tags"] = tag_ids # Handle quest-specific field if entity_type == "quest" and is_completed is not None: data["is_completed"] = is_completed # Handle image fields if image_uuid is not None: data["image_uuid"] = image_uuid if header_uuid is not None: data["header_uuid"] = header_uuid # Create entity entity = manager.create(**data) # Convert to our format result = self._entity_to_dict(entity, entity_type) result["mention"] = f"[entity:{entity.entity_id}]" # If we explicitly set privacy, ensure it's reflected in the result # The API might not return is_private in the create response if "is_private" in data: result["is_hidden"] = data["is_private"] return result except KankaException as e: logger.error(f"Create entity failed: {e}") raise def update_entity( self, entity_id: int, name: str, type: str | None = None, entry: str | None = None, tags: list[str] | None = None, is_hidden: bool | None = None, is_completed: bool | None = None, image_uuid: str | None = None, header_uuid: str | None = None, ) -> bool: """ Update an existing entity. Args: entity_id: Entity ID name: Entity name (required by API) type: Entity subtype entry: Description in Markdown tags: List of tag names is_hidden: Whether entity should be hidden from players (admin-only) is_completed: Whether quest is completed (quests only) image_uuid: Image gallery UUID for entity image header_uuid: Image gallery UUID for entity header Returns: True if successful """ try: # First get the entity to find its type entity_data = self.get_entity_by_id(entity_id) if not entity_data: raise ValueError(f"Entity {entity_id} not found") entity_type = entity_data["entity_type"] manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Prepare update data data: dict[str, Any] = {"name": name} if type is not None: data["type"] = type if entry is not None: # Convert markdown to HTML data["entry"] = self.converter.markdown_to_html(entry) # Handle privacy # For entities, use is_private (not visibility_id) if is_hidden is not None: data["is_private"] = is_hidden # Handle tags if tags is not None: tag_ids = self._get_or_create_tag_ids(tags) data["tags"] = tag_ids # Handle quest-specific field if entity_type == "quest" and is_completed is not None: data["is_completed"] = is_completed # Handle image fields if image_uuid is not None: data["image_uuid"] = image_uuid if header_uuid is not None: data["header_uuid"] = header_uuid # Update entity manager.update(entity_data["id"], **data) return True except Exception as e: logger.error(f"Update entity failed for {entity_id}: {e}") raise def delete_entity(self, entity_id: int) -> bool: """ Delete an entity. Args: entity_id: Entity ID Returns: True if successful """ try: # First get the entity to find its type entity_data = self.get_entity_by_id(entity_id) if not entity_data: raise ValueError(f"Entity {entity_id} not found") entity_type = entity_data["entity_type"] manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Delete entity manager.delete(entity_data["id"]) return True except Exception as e: logger.error(f"Delete entity failed for {entity_id}: {e}") raise def create_post( self, entity_id: int, name: str, entry: str | None = None, is_hidden: bool = False, ) -> dict[str, Any]: """ Create a post on an entity. Args: entity_id: Entity ID name: Post title entry: Post content in Markdown is_hidden: Whether post should be hidden from players (admin-only) Returns: Created post data """ try: # Get entity to find its type entity_data = self.get_entity_by_id(entity_id) if not entity_data: raise ValueError(f"Entity {entity_id} not found") entity_type = entity_data["entity_type"] manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Convert markdown to HTML if entry provided html_entry = self.converter.markdown_to_html(entry) if entry else None # Set visibility based on is_hidden visibility_id = 2 if is_hidden else 1 # Create post - use entity_id, not the type-specific id post = manager.create_post( entity_id, name=name, entry=html_entry or "", visibility_id=visibility_id, ) return { "post_id": post.id, "entity_id": entity_id, } except Exception as e: logger.error(f"Create post failed: {e}") raise def update_post( self, entity_id: int, post_id: int, name: str, entry: str | None = None, is_hidden: bool | None = None, ) -> bool: """ Update a post. Args: entity_id: Entity ID post_id: Post ID name: Post title (required by API) entry: Post content in Markdown is_hidden: Whether post should be hidden from players (admin-only) Returns: True if successful """ try: # Get entity to find its type entity_data = self.get_entity_by_id(entity_id) if not entity_data: raise ValueError(f"Entity {entity_id} not found") entity_type = entity_data["entity_type"] manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Prepare update data kwargs: dict[str, Any] = {"name": name} if entry is not None: kwargs["entry"] = self.converter.markdown_to_html(entry) # Handle visibility # For posts, use visibility_id visibility_id = None if is_hidden is not None: visibility_id = 2 if is_hidden else 1 # Update post - use entity_id, not the type-specific id manager.update_post( entity_id, post_id, visibility_id=visibility_id, **kwargs ) return True except Exception as e: logger.error(f"Update post failed: {e}") raise def delete_post(self, entity_id: int, post_id: int) -> bool: """ Delete a post. Args: entity_id: Entity ID post_id: Post ID Returns: True if successful """ try: # Get entity to find its type entity_data = self.get_entity_by_id(entity_id) if not entity_data: raise ValueError(f"Entity {entity_id} not found") entity_type = entity_data["entity_type"] manager = getattr(self.client, self.API_ENDPOINT_MAP[entity_type]) # Delete post - use entity_id, not the type-specific id manager.delete_post(entity_id, post_id) return True except Exception as e: logger.error(f"Delete post failed: {e}") raise def _get_or_create_tag_ids(self, tag_names: list[str]) -> list[int]: """ Get or create tags by name. Args: tag_names: List of tag names Returns: List of tag IDs """ # Load tag cache if needed if not self._tag_cache: self._load_tag_cache() tag_ids = [] for name in tag_names: name_lower = name.lower() # Check cache if name_lower in self._tag_cache: tag_ids.append(self._tag_cache[name_lower].id) else: # Create new tag try: tag = self.client.tags.create(name=name) self._tag_cache[name_lower] = tag tag_ids.append(tag.id) except Exception as e: logger.warning(f"Failed to create tag '{name}': {e}") return tag_ids def _load_tag_cache(self) -> None: """Load all tags into cache.""" self._tag_cache = {} try: # Get all tags by paginating through them current_page = 1 while True: batch = self.client.tags.list(page=current_page, limit=100) if not batch: break for tag in batch: self._tag_cache[tag.name.lower()] = tag if len(batch) < 100: break current_page += 1 except Exception as e: logger.warning(f"Failed to load tag cache: {e}") def _resolve_tag_names(self, raw_tags: list[Any]) -> list[str]: """ Resolve tag IDs to tag names. Args: raw_tags: List of tag IDs or tag objects Returns: List of tag names """ if not raw_tags or not isinstance(raw_tags, list): return [] # Ensure tag cache is loaded if not self._tag_cache: self._load_tag_cache() tag_names = [] for tag_item in raw_tags: if isinstance(tag_item, int | str): # It's a tag ID, need to look it up tag_id = int(tag_item) if isinstance(tag_item, str) else tag_item # Check cache first tag_name = None for _cached_name, cached_tag in self._tag_cache.items(): if cached_tag.id == tag_id: tag_name = cached_tag.name break if tag_name: tag_names.append(tag_name) else: # Not in cache, try to fetch it try: tag = self.client.tags.get(tag_id) tag_names.append(tag.name) # Add to cache for future lookups self._tag_cache[tag.name.lower()] = tag except Exception as e: logger.warning(f"Failed to resolve tag ID {tag_id}: {e}") # If we can't resolve it, keep the ID as string tag_names.append(str(tag_id)) elif hasattr(tag_item, "name"): # It's a tag object tag_names.append(tag_item.name) else: # Unknown format, keep as string tag_names.append(str(tag_item)) return tag_names def _entity_to_dict(self, entity: Entity, entity_type: str) -> dict[str, Any]: """ Convert entity object to dictionary. Args: entity: Entity object entity_type: Our entity type string Returns: Dictionary representation """ result = { "id": entity.id, "entity_id": entity.entity_id, "name": entity.name, "entity_type": entity_type, "type": getattr(entity, "type", None), "tags": [], "created_at": ( entity.created_at.isoformat() if hasattr(entity, "created_at") and entity.created_at else None ), "updated_at": ( entity.updated_at.isoformat() if hasattr(entity, "updated_at") and entity.updated_at else None ), } # Handle visibility - translate is_private to is_hidden # Entities use is_private field is_private = getattr(entity, "is_private", None) if is_private is not None: result["is_hidden"] = is_private else: # Default to visible if no is_private field result["is_hidden"] = False # Convert HTML entry to Markdown if hasattr(entity, "entry") and entity.entry: result["entry"] = self.converter.html_to_markdown(entity.entry) else: result["entry"] = None # Extract tag names using helper method if hasattr(entity, "tags"): result["tags"] = self._resolve_tag_names(entity.tags) # Handle posts if present (when related=True) if hasattr(entity, "posts") and entity.posts is not None: result["posts"] = [self._post_to_dict(post) for post in entity.posts] # Handle quest-specific fields if entity_type == "quest": result["is_completed"] = getattr(entity, "is_completed", None) # Handle image fields - always include all 5 fields result["image"] = getattr(entity, "image", None) result["image_full"] = getattr(entity, "image_full", None) result["image_thumb"] = getattr(entity, "image_thumb", None) result["image_uuid"] = getattr(entity, "image_uuid", None) result["header_uuid"] = getattr(entity, "header_uuid", None) return result def _post_to_dict(self, post: Any) -> dict[str, Any]: """ Convert post object to dictionary. Args: post: Post object Returns: Dictionary representation """ result = { "id": post.id, "name": post.name, } # Handle visibility - translate visibility_id to is_hidden # Posts use visibility_id field visibility_id = getattr(post, "visibility_id", None) if visibility_id is not None: # visibility_id 2 = admin only (hidden from players) result["is_hidden"] = visibility_id == 2 else: # Default to visible if no visibility_id result["is_hidden"] = False # Convert HTML entry to Markdown if hasattr(post, "entry") and post.entry: result["entry"] = self.converter.html_to_markdown(post.entry) else: result["entry"] = None return result # Global service instance (initialized on first use) _service: KankaService | None = None def get_service() -> KankaService: """Get or create the Kanka service instance.""" global _service if _service is None: _service = KankaService() return _service

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ervwalter/mcp-kanka'

If you have feedback or need assistance with the MCP directory API, please join our Discord server