Skip to main content
Glama
operations.py27.1 kB
"""High-level operations layer for Kanka functionality. This module provides a reusable operations layer that can be used by both MCP tools and external scripts, ensuring consistent behavior and type safety. """ import logging from dataclasses import dataclass from datetime import datetime, timezone from typing import Any from .service import KankaService, get_service from .types import ( CheckEntityUpdatesResult, CreateEntityResult, CreatePostResult, DeleteEntityResult, DeletePostResult, GetEntityResult, UpdateEntityResult, UpdatePostResult, ) from .utils import ( filter_entities_by_name, filter_entities_by_tags, filter_entities_by_type, filter_journals_by_date_range, paginate_results, search_in_content, ) logger = logging.getLogger(__name__) # Result classes with to_dict() methods for MCP compatibility @dataclass class FindEntitiesResult: """Structured result for find_entities operation.""" entities: list[dict[str, Any]] sync_info: dict[str, Any] def to_dict(self) -> dict[str, Any]: """Convert to MCP response format.""" return {"entities": self.entities, "sync_info": self.sync_info} @dataclass class OperationResult: """Generic result for operations that return lists.""" results: list[dict[str, Any]] def to_list(self) -> list[dict[str, Any]]: """Convert to MCP response format.""" return self.results class KankaOperationsError(Exception): """Base exception for operations layer.""" pass class PartialSuccessError(KankaOperationsError): """Some operations succeeded, some failed.""" def __init__(self, successes: list[Any], failures: list[Any]): self.successes = successes self.failures = failures super().__init__( f"Partial success: {len(successes)} succeeded, {len(failures)} failed" ) class KankaOperations: """High-level operations for Kanka, used by both MCP tools and external scripts.""" def __init__(self, service: KankaService | None = None): """Initialize operations with optional service instance. Args: service: Optional KankaService instance. If not provided, creates a new one. """ self.service = service or KankaService() async def find_entities( self, query: str | None = None, entity_type: str | None = None, name: str | None = None, name_exact: bool = False, name_fuzzy: bool = False, type: str | None = None, tags: list[str] | None = None, date_range: dict[str, str] | None = None, include_full: bool = True, page: int = 1, limit: int = 25, last_synced: str | None = None, ) -> dict[str, Any]: """Find entities with search and filtering capabilities. Args: query: Search term for full-text search across names and content entity_type: Type of entity to search for name: Filter by entity name name_exact: Use exact name matching (case-insensitive) name_fuzzy: Use fuzzy name matching (typo-tolerant) type: Filter by custom type field tags: Filter by tags (must have all specified tags) date_range: Date range filter for journals include_full: Whether to include full entity details page: Page number for pagination limit: Number of results per page (0 for all) last_synced: ISO timestamp to get only entities modified after this time Returns: Dictionary with entities and sync_info """ # Validate entity type if provided valid_types = [ "character", "creature", "location", "organization", "race", "note", "journal", "quest", ] if entity_type and entity_type not in valid_types: logger.error( f"Invalid entity_type: {entity_type}. Must be one of: {', '.join(valid_types)}" ) return {"entities": [], "sync_info": {}} try: # Step 1: Get entities if query: # For content search, we need full entities entities = [] if entity_type: # Search specific entity type # Cast to EntityType since we validated it above from typing import cast from .types import EntityType entity_objects = self.service.list_entities( cast(EntityType, entity_type), page=1, limit=0, last_sync=last_synced, related=include_full, ) for obj in entity_objects: entity_dict = self.service._entity_to_dict(obj, entity_type) entities.append(entity_dict) else: # Search across all entity types from .types import EntityType entity_types: list[EntityType] = [ "character", "creature", "location", "organization", "race", "note", "journal", "quest", ] for et in entity_types: try: entity_objects = self.service.list_entities( et, page=1, limit=0, last_sync=last_synced, related=include_full, ) for obj in entity_objects: entity_dict = self.service._entity_to_dict(obj, et) entities.append(entity_dict) except Exception as e: logger.debug(f"Could not search {et}: {e}") continue # Apply content search entities = search_in_content(entities, query) # If not including full details, strip to minimal data if not include_full: minimal_entities = [] for entity in entities: minimal_entities.append( { "entity_id": entity["entity_id"], "name": entity["name"], "entity_type": entity["entity_type"], } ) entities = minimal_entities else: # List entities of specific type (no search) if not entity_type: # No entity type specified, can't list all return {"entities": [], "sync_info": {}} # Get all entities of this type # Cast to EntityType since we validated it above from typing import cast from .types import EntityType entity_objects = self.service.list_entities( cast(EntityType, entity_type), page=1, limit=0, last_sync=last_synced, related=include_full, ) # Convert to dictionaries entities = [] for obj in entity_objects: entity_dict = self.service._entity_to_dict(obj, entity_type) entities.append(entity_dict) # Step 2: Apply client-side filters if name: entities = filter_entities_by_name( entities, name, exact=name_exact, fuzzy=name_fuzzy ) if type: entities = filter_entities_by_type(entities, type) if tags: entities = filter_entities_by_tags(entities, tags) if date_range and entity_type == "journal": start = date_range.get("start") end = date_range.get("end") if start and end: entities = filter_journals_by_date_range(entities, start, end) # Don't apply content search if we already used the search API # The search API already searched content # Step 3: Paginate results paginated, total_pages, total_items = paginate_results( entities, page, limit ) # Step 4: Calculate sync metadata # Find newest updated_at timestamp newest_updated_at = None for entity in paginated: if entity.get("updated_at") and ( newest_updated_at is None or entity["updated_at"] > newest_updated_at ): newest_updated_at = entity["updated_at"] # Build sync info sync_info = { "request_timestamp": datetime.now(timezone.utc).isoformat(), "newest_updated_at": newest_updated_at, "total_count": total_items, "returned_count": len(paginated), } # Step 5: Format results based on include_full if not include_full: # Return minimal data formatted_entities = [ { "entity_id": e["entity_id"], "name": e["name"], "entity_type": e["entity_type"], } for e in paginated ] else: # Return full data formatted_entities = paginated # Return the new response structure return { "entities": formatted_entities, "sync_info": sync_info, } except Exception as e: logger.error(f"find_entities failed: {e}") raise async def create_entities( self, entities: list[dict[str, Any]] ) -> list[CreateEntityResult]: """Create one or more entities. Args: entities: List of entity data to create Returns: List of results, one per entity (success or failure) """ results = [] valid_types = [ "character", "creature", "location", "organization", "race", "note", "journal", "quest", ] for entity_input in entities: entity_type = entity_input.get("entity_type") entity_name = entity_input.get("name", "") # Validate entity type if not entity_type or entity_type not in valid_types: logger.error( f"Invalid entity_type '{entity_type}' for entity '{entity_name}'" ) error_result: CreateEntityResult = { "id": None, "entity_id": None, "name": entity_name, "mention": None, "success": False, "error": f"Invalid entity_type '{entity_type}'. Must be one of: {', '.join(valid_types)}", } results.append(error_result) continue # Validate required fields if not entity_name: name_error: CreateEntityResult = { "id": None, "entity_id": None, "name": "", "mention": None, "success": False, "error": "Name is required", } results.append(name_error) continue try: # Create entity created = self.service.create_entity( entity_type=entity_type, name=entity_name, type=entity_input.get("type"), entry=entity_input.get("entry"), tags=entity_input.get("tags"), is_hidden=entity_input.get("is_hidden"), is_completed=entity_input.get("is_completed"), image_uuid=entity_input.get("image_uuid"), header_uuid=entity_input.get("header_uuid"), ) result: CreateEntityResult = { "id": created["id"], "entity_id": created["entity_id"], "name": created["name"], "mention": created["mention"], "success": True, "error": None, } results.append(result) except Exception as e: logger.error( f"Failed to create entity '{entity_input.get('name')}': {e}" ) create_error: CreateEntityResult = { "id": None, "entity_id": None, "name": entity_input.get("name", ""), "mention": None, "success": False, "error": str(e), } results.append(create_error) return results async def update_entities( self, updates: list[dict[str, Any]] ) -> list[UpdateEntityResult]: """Update one or more entities. Args: updates: List of entity updates to apply Returns: List of results, one per entity (success or failure) """ results = [] for update in updates: entity_id = update.get("entity_id") name = update.get("name") # Validate required fields if not entity_id: id_error: UpdateEntityResult = { "entity_id": 0, "success": False, "error": "entity_id is required", } results.append(id_error) continue if not name: name_error: UpdateEntityResult = { "entity_id": entity_id, "success": False, "error": "name is required for updates (Kanka API requirement)", } results.append(name_error) continue try: # Update entity success = self.service.update_entity( entity_id=entity_id, name=name, type=update.get("type"), entry=update.get("entry"), tags=update.get("tags"), is_hidden=update.get("is_hidden"), is_completed=update.get("is_completed"), image_uuid=update.get("image_uuid"), header_uuid=update.get("header_uuid"), ) result: UpdateEntityResult = { "entity_id": update["entity_id"], "success": success, "error": None, } results.append(result) except Exception as e: logger.error(f"Failed to update entity {update['entity_id']}: {e}") update_error: UpdateEntityResult = { "entity_id": update["entity_id"], "success": False, "error": str(e), } results.append(update_error) return results async def get_entities( self, entity_ids: list[int], include_posts: bool = False ) -> list[GetEntityResult]: """Get specific entities by ID. Args: entity_ids: List of entity IDs to retrieve include_posts: Whether to include posts for each entity Returns: List of results, one per entity """ results = [] for entity_id in entity_ids: try: # Get entity entity = self.service.get_entity_by_id(entity_id, include_posts) if entity: result: GetEntityResult = { "id": entity["id"], "entity_id": entity["entity_id"], "name": entity["name"], "entity_type": entity["entity_type"], "type": entity.get("type"), "entry": entity.get("entry"), "tags": entity.get("tags", []), "is_hidden": entity.get("is_hidden", False), "created_at": entity.get("created_at"), "updated_at": entity.get("updated_at"), "success": True, "error": None, } # Add quest-specific fields if entity.get("entity_type") == "quest": result["is_completed"] = entity.get("is_completed") # Add all image fields (they should always be present from service layer) result["image"] = entity.get("image") result["image_full"] = entity.get("image_full") result["image_thumb"] = entity.get("image_thumb") result["image_uuid"] = entity.get("image_uuid") result["header_uuid"] = entity.get("header_uuid") if include_posts: result["posts"] = entity.get("posts", []) results.append(result) else: not_found_result: GetEntityResult = { "entity_id": entity_id, "success": False, "error": f"Entity {entity_id} not found", } results.append(not_found_result) except Exception as e: logger.error(f"Failed to get entity {entity_id}: {e}") error_result: GetEntityResult = { "entity_id": entity_id, "success": False, "error": str(e), } results.append(error_result) return results async def delete_entities(self, entity_ids: list[int]) -> list[DeleteEntityResult]: """Delete one or more entities. Args: entity_ids: List of entity IDs to delete Returns: List of results, one per entity """ results = [] for entity_id in entity_ids: try: # Delete entity success = self.service.delete_entity(entity_id) result: DeleteEntityResult = { "entity_id": entity_id, "success": success, "error": None, } results.append(result) except Exception as e: logger.error(f"Failed to delete entity {entity_id}: {e}") error_result: DeleteEntityResult = { "entity_id": entity_id, "success": False, "error": str(e), } results.append(error_result) return results async def create_posts(self, posts: list[dict[str, Any]]) -> list[CreatePostResult]: """Create posts on entities. Args: posts: List of post data to create Returns: List of results, one per post """ results = [] for post_input in posts: try: # Create post created = self.service.create_post( entity_id=post_input["entity_id"], name=post_input["name"], entry=post_input.get("entry"), is_hidden=post_input.get("is_hidden", False), ) result: CreatePostResult = { "post_id": created["post_id"], "entity_id": created["entity_id"], "success": True, "error": None, } results.append(result) except Exception as e: logger.error( f"Failed to create post on entity {post_input['entity_id']}: {e}" ) error_result: CreatePostResult = { "post_id": None, "entity_id": post_input["entity_id"], "success": False, "error": str(e), } results.append(error_result) return results async def update_posts( self, updates: list[dict[str, Any]] ) -> list[UpdatePostResult]: """Update existing posts. Args: updates: List of post updates to apply Returns: List of results, one per post """ results = [] for update in updates: try: # Update post success = self.service.update_post( entity_id=update["entity_id"], post_id=update["post_id"], name=update["name"], entry=update.get("entry"), is_hidden=update.get("is_hidden"), ) result: UpdatePostResult = { "entity_id": update["entity_id"], "post_id": update["post_id"], "success": success, "error": None, } results.append(result) except Exception as e: logger.error( f"Failed to update post {update['post_id']} on entity {update['entity_id']}: {e}" ) error_result: UpdatePostResult = { "entity_id": update["entity_id"], "post_id": update["post_id"], "success": False, "error": str(e), } results.append(error_result) return results async def delete_posts( self, deletions: list[dict[str, Any]] ) -> list[DeletePostResult]: """Delete posts from entities. Args: deletions: List of post deletions to perform Returns: List of results, one per post """ results = [] for deletion in deletions: try: # Delete post success = self.service.delete_post( entity_id=deletion["entity_id"], post_id=deletion["post_id"], ) result: DeletePostResult = { "entity_id": deletion["entity_id"], "post_id": deletion["post_id"], "success": success, "error": None, } results.append(result) except Exception as e: logger.error( f"Failed to delete post {deletion['post_id']} from entity {deletion['entity_id']}: {e}" ) error_result: DeletePostResult = { "entity_id": deletion["entity_id"], "post_id": deletion["post_id"], "success": False, "error": str(e), } results.append(error_result) return results async def check_entity_updates( self, entity_ids: list[int], last_synced: str ) -> CheckEntityUpdatesResult: """Check which entities have been modified since last sync. Args: entity_ids: List of entity IDs to check last_synced: ISO timestamp of last sync Returns: Result containing modified and deleted entity IDs """ if not last_synced: raise ValueError("last_synced parameter is required") modified_entity_ids = [] deleted_entity_ids = [] try: # Get all entities using the entities endpoint # This is more efficient than checking each entity individually page = 1 all_entities = {} while page <= 20: # Reasonable limit to avoid infinite loops batch = self.service.client.entities(page=page, limit=100) if not batch: break for entity_data in batch: entity_id = entity_data.get("id") if entity_id: all_entities[entity_id] = entity_data if len(batch) < 100: break page += 1 # Check each requested entity for entity_id in entity_ids: if entity_id in all_entities: entity_data = all_entities[entity_id] updated_at = entity_data.get("updated_at") if updated_at and updated_at > last_synced: modified_entity_ids.append(entity_id) else: # Entity not found - might be deleted deleted_entity_ids.append(entity_id) # Get current timestamp check_timestamp = datetime.now(timezone.utc).isoformat() return { "modified_entity_ids": modified_entity_ids, "deleted_entity_ids": deleted_entity_ids, "check_timestamp": check_timestamp, } except Exception as e: logger.error(f"Check entity updates failed: {e}") raise # Global instance management _operations: KankaOperations | None = None def get_operations() -> KankaOperations: """Get or create the singleton operations instance. Returns: The global KankaOperations instance """ global _operations if _operations is None: _operations = KankaOperations(service=get_service()) return _operations def create_operations(service: KankaService | None = None) -> KankaOperations: """Create a new operations instance for external use. This is useful for scripts that want to manage their own instances or provide a custom service configuration. Args: service: Optional KankaService instance Returns: A new KankaOperations instance """ return KankaOperations(service)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ervwalter/mcp-kanka'

If you have feedback or need assistance with the MCP directory API, please join our Discord server