Skip to main content
Glama

create_graph

Create a new knowledge graph with a unique ID and title to organize structured data for AI agents using Mnemosyne MCP server.

Instructions

Creates a new knowledge graph with the given ID, title, and optional description. The graph_id should be a URL-safe identifier (e.g., 'my-project', 'research-notes').

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
graph_idYes
titleYes
descriptionNo

Implementation Reference

  • The core handler function for the 'create_graph' tool. It validates inputs, submits a 'create_graph' job to the backend API, waits for completion using WebSocket or polling, and returns formatted JSON results.
    @server.tool(
        name="create_graph",
        title="Create Knowledge Graph",
        description=(
            "Creates a new knowledge graph with the given ID, title, and optional description. "
            "The graph_id should be a URL-safe identifier (e.g., 'my-project', 'research-notes')."
        ),
    )
    async def create_graph_tool(
        graph_id: str,
        title: str,
        description: Optional[str] = None,
        context: Context | None = None,
    ) -> str:
        """Create a new knowledge graph."""
        auth = MCPAuthContext.from_context(context)
        auth.require_auth()
    
        if not graph_id or not graph_id.strip():
            raise ValueError("graph_id is required and cannot be empty")
        if not title or not title.strip():
            raise ValueError("title is required and cannot be empty")
    
        payload = {
            "graph_id": graph_id.strip(),
            "title": title.strip(),
        }
        if description:
            payload["description"] = description.strip()
    
        metadata = await submit_job(
            base_url=backend_config.base_url,
            auth=auth,
            task_type="create_graph",
            payload=payload,
        )
    
        if context:
            await context.report_progress(10, 100)
    
        result = await _wait_for_job_result(
            job_stream, metadata, context, auth
        )
    
        return _render_json({
            "success": True,
            "graph_id": graph_id.strip(),
            "title": title.strip(),
            "description": description.strip() if description else None,
            "job_id": metadata.job_id,
            **result,
        })
  • Registration of graph operations tools, including 'create_graph', on the MCP server instance in the standalone server setup.
    register_basic_tools(mcp_server)
    register_graph_ops_tools(mcp_server)
    register_hocuspocus_tools(mcp_server)
  • Helper function used by create_graph_tool to wait for the job result using realtime streaming or polling.
    async def _wait_for_job_result(
        job_stream: Optional[RealtimeJobClient],
        metadata: JobSubmitMetadata,
        context: Optional[Context],
        auth: MCPAuthContext,
    ) -> JsonDict:
        """Wait for job completion via WebSocket or polling, return result info including detail."""
        events = None
        if job_stream and metadata.links.websocket:
            events = await stream_job(job_stream, metadata, timeout=STREAM_TIMEOUT_SECONDS)
    
        if events:
            if context:
                await context.report_progress(80, 100)
            # Check for completion status in events and extract result
            for event in reversed(events):
                event_type = event.get("type", "")
                if event_type in ("job_completed", "completed", "succeeded"):
                    if context:
                        await context.report_progress(100, 100)
                    # Extract result from event payload
                    result: JsonDict = {"status": "succeeded", "events": len(events)}
                    payload = event.get("payload", {})
                    if isinstance(payload, dict):
                        detail = payload.get("detail")
                        if detail:
                            result["detail"] = detail
                    return result
                if event_type in ("failed", "error"):
                    error = event.get("error", "Job failed")
                    return {"status": "failed", "error": error}
            return {"status": "unknown", "event_count": len(events)}
    
        # Fall back to polling
        status_payload = (
            await poll_job_until_terminal(metadata.links.status, auth)
            if metadata.links.status
            else None
        )
    
        if context:
            await context.report_progress(100, 100)
    
        if status_payload:
            status = status_payload.get("status", "unknown")
            detail = status_payload.get("detail")
            if status == "failed":
                error = status_payload.get("error") or (detail.get("error") if isinstance(detail, dict) else None)
                return {"status": "failed", "error": error}
            # Include full detail in result for successful jobs
            result: JsonDict = {"status": status}
            if detail:
                result["detail"] = detail
            return result
    
        return {"status": "unknown"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sophia-labs/mnemosyne-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server