Skip to main content
Glama
clean_campaign.py4.42 kB
#!/usr/bin/env python3 """Clean all entities from the test campaign.""" import asyncio import os import sys from pathlib import Path from dotenv import load_dotenv from kanka import KankaClient # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent.parent / "src")) # Load environment variables env_path = Path(__file__).parent / ".env" if env_path.exists(): load_dotenv(env_path) else: load_dotenv() async def clean_campaign(): """Remove all entities from the test campaign.""" token = os.environ.get("KANKA_TOKEN") campaign_id = os.environ.get("KANKA_CAMPAIGN_ID") if not token or not campaign_id: print("Error: KANKA_TOKEN and KANKA_CAMPAIGN_ID must be set") return client = KankaClient(token=token, campaign_id=int(campaign_id)) # Entity types to clean entity_types = [ ("characters", client.characters), ("creatures", client.creatures), ("locations", client.locations), ("organisations", client.organisations), ("races", client.races), ("notes", client.notes), ("journals", client.journals), ("quests", client.quests), ("families", client.families), ("calendars", client.calendars), ("events", client.events), ("tags", client.tags), ] print(f"Cleaning campaign {campaign_id}...") for entity_type_name, manager in entity_types: print(f"\nCleaning {entity_type_name}...") deleted_count = 0 try: # Get all entities of this type page = 1 while True: entities = manager.list(page=page, limit=100) if not entities: break for entity in entities: try: manager.delete(entity.id) deleted_count += 1 print(f" Deleted {entity.name} (ID: {entity.id})") except Exception as e: print(f" Failed to delete {entity.name}: {e}") if len(entities) < 100: break page += 1 print(f" Total {entity_type_name} deleted: {deleted_count}") except Exception as e: print(f" Error listing {entity_type_name}: {e}") print("\nCampaign cleanup complete!") async def clean_test_entities_async(): """Remove only test entities (with 'Integration Test' and 'DELETE ME' in name).""" token = os.environ.get("KANKA_TOKEN") campaign_id = os.environ.get("KANKA_CAMPAIGN_ID") if not token or not campaign_id: return 0 client = KankaClient(token=token, campaign_id=int(campaign_id)) # Entity types to clean entity_types = [ ("characters", client.characters), ("creatures", client.creatures), ("locations", client.locations), ("organisations", client.organisations), ("races", client.races), ("notes", client.notes), ("journals", client.journals), ("quests", client.quests), ("families", client.families), ("calendars", client.calendars), ("events", client.events), ("tags", client.tags), ] total_deleted = 0 for _, manager in entity_types: deleted_count = 0 try: # Get all entities of this type page = 1 while True: entities = manager.list(page=page, limit=100) if not entities: break for entity in entities: # Only delete test entities if "Integration Test" in entity.name and "DELETE ME" in entity.name: try: manager.delete(entity.id) deleted_count += 1 except Exception: pass # Silently skip failures if len(entities) < 100: break page += 1 total_deleted += deleted_count except Exception: pass # Silently skip errors return total_deleted def clean_test_entities(): """Synchronous wrapper that returns count of deleted test entities.""" return asyncio.run(clean_test_entities_async()) if __name__ == "__main__": asyncio.run(clean_campaign())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ervwalter/mcp-kanka'

If you have feedback or need assistance with the MCP directory API, please join our Discord server