Skip to main content
Glama
zep_cloud_client.py15.3 kB
#!/usr/bin/env python3 """ Zep Cloud Client This module provides a client for interacting with the Zep Cloud API using the official Zep Cloud SDK. """ import os import logging from typing import Dict, List, Optional, Any, Union import json import requests from dotenv import load_dotenv # Import the Zep Cloud SDK try: from zep_cloud.client import Zep from zep_cloud import User except ImportError: raise ImportError( "zep-cloud SDK not found. Install with: pip install zep-cloud" ) # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() class ZepCloudClient: """Client for interacting with the Zep Cloud API""" def __init__(self): """Initialize the Zep Cloud client""" self.api_key = os.getenv("ZEP_API_KEY") # Add fallback_mode attribute explicitly self.fallback_mode = False if not self.api_key: raise ValueError("ZEP_API_KEY environment variable not set") # Initialize the client try: self.client = Zep(api_key=self.api_key) logger.info("Zep Cloud client initialized successfully") except Exception as e: logger.error(f"Failed to initialize Zep Cloud client: {str(e)}") raise def list_users(self) -> List[Dict[str, Any]]: """ List all users Returns: List[Dict[str, Any]]: List of user objects """ try: user_response = self.client.user.list_ordered() users = user_response.users or [] # Convert User objects to dictionaries user_dicts = [] for user in users: user_dict = { "user_id": user.user_id, "metadata": user.metadata if user.metadata else {} } user_dicts.append(user_dict) return user_dicts except Exception as e: logger.error(f"Error listing users: {str(e)}") return [] def get_user(self, user_id: str) -> Optional[Dict[str, Any]]: """ Get a user by ID Args: user_id (str): The user ID Returns: Optional[Dict[str, Any]]: User object if found, None otherwise """ try: user = self.client.user.get(user_id=user_id) if user: return { "user_id": user.user_id, "metadata": user.metadata if user.metadata else {}, "first_name": user.first_name, "last_name": user.last_name, "email": user.email } return None except Exception as e: logger.error(f"Error getting user {user_id}: {str(e)}") return None def create_user(self, user_id: str, metadata: Optional[Dict[str, Any]] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, email: Optional[str] = None) -> Optional[Dict[str, Any]]: """ Create a new user Args: user_id (str): The user ID metadata (Optional[Dict[str, Any]]): User metadata first_name (Optional[str]): User's first name last_name (Optional[str]): User's last name email (Optional[str]): User's email address Returns: Optional[Dict[str, Any]]: Created user object if successful, None otherwise """ try: # Ensure metadata is not None metadata_dict = metadata if metadata is not None else {} # Create the user user = self.client.user.add( user_id=user_id, metadata=metadata_dict, first_name=first_name, last_name=last_name, email=email ) return { "user_id": user.user_id, "metadata": user.metadata if user.metadata else {}, "first_name": user.first_name, "last_name": user.last_name, "email": user.email } except Exception as e: logger.error(f"Error creating user {user_id}: {str(e)}") return None def update_user(self, user_id: str, metadata: Optional[Dict[str, Any]] = None, first_name: Optional[str] = None, last_name: Optional[str] = None, email: Optional[str] = None) -> Optional[Dict[str, Any]]: """ Update a user's metadata and profile information Args: user_id (str): The user ID metadata (Optional[Dict[str, Any]]): User metadata first_name (Optional[str]): User's first name last_name (Optional[str]): User's last name email (Optional[str]): User's email address Returns: Optional[Dict[str, Any]]: Updated user object if successful, None otherwise """ try: # Ensure metadata is not None metadata_dict = metadata if metadata is not None else {} # Update the user user = self.client.user.update( user_id=user_id, metadata=metadata_dict, first_name=first_name, last_name=last_name, email=email ) return { "user_id": user.user_id, "metadata": user.metadata if user.metadata else {}, "first_name": user.first_name, "last_name": user.last_name, "email": user.email } except Exception as e: logger.error(f"Error updating user {user_id}: {str(e)}") return None def delete_user(self, user_id: str) -> bool: """ Delete a user Args: user_id (str): The user ID Returns: bool: True if successful, False otherwise """ try: self.client.user.delete(user_id=user_id) return True except Exception as e: logger.error(f"Error deleting user {user_id}: {str(e)}") return False def search_graph(self, user_id: str, query: str, limit: int = 10) -> Optional[Dict[str, Any]]: """ Search the user's graph with a query Args: user_id (str): The user ID query (str): The search query limit (int): The maximum number of results to return Returns: Optional[Dict[str, Any]]: Search results if successful, None otherwise """ try: # Keep queries concise as recommended by docs if len(query) > 8000: logger.warning(f"Search query exceeds recommended length. Truncating to 8000 characters.") query = query[:8000] # Call the graph search API search_results = self.client.graph.search( user_id=user_id, query=query, limit=limit ) # Create a digestible result that can be serialized to JSON results = { "query": query, "user_id": user_id, "limit": limit, "success": True, "results": [] # Keep generic results array for backward compatibility } # Add edges (facts) if they exist if hasattr(search_results, 'edges') and search_results.edges: results["edges"] = [] for edge in search_results.edges: edge_data = { "id": edge.id if hasattr(edge, 'id') else None, "fact": edge.fact if hasattr(edge, 'fact') else None, "created_at": str(edge.created_at) if hasattr(edge, 'created_at') else None, "updated_at": str(edge.updated_at) if hasattr(edge, 'updated_at') else None, "score": edge.score if hasattr(edge, 'score') else None } results["edges"].append(edge_data) # Also add to generic results for backward compatibility results["results"].append(edge_data) # Add nodes if they exist if hasattr(search_results, 'nodes') and search_results.nodes: results["nodes"] = [] for node in search_results.nodes: node_data = { "id": node.id if hasattr(node, 'id') else None, "label": node.label if hasattr(node, 'label') else None, "attributes": node.attributes if hasattr(node, 'attributes') else {}, "score": node.score if hasattr(node, 'score') else None } results["nodes"].append(node_data) # Also add to generic results for backward compatibility results["results"].append(node_data) # Add a summary field to help Claude understand the results if len(results["results"]) > 0: results["summary"] = f"Found {len(results['results'])} results for query '{query}'" if "nodes" in results and len(results["nodes"]) > 0: results["summary"] += f", including {len(results['nodes'])} nodes" if "edges" in results and len(results["edges"]) > 0: results["summary"] += f", including {len(results['edges'])} edges/facts" else: results["summary"] = f"No results found for query '{query}'" return results except Exception as e: logger.error(f"Error searching graph for user {user_id}: {str(e)}") return None def add_graph_data(self, user_id, data, data_type="text"): """ Add data to a user's graph in Zep Cloud. Args: user_id: The unique identifier for the user data: The data to add to the graph (string or dict) data_type: The type of data, can be "text", "json", or "message" Returns: A dictionary with information about the added data """ if not user_id: return {"error": "User ID is required", "success": False} if data is None: return {"error": "Data is required", "success": False} valid_types = ["text", "json", "message"] if data_type not in valid_types: return { "error": f"Invalid data type: {data_type}. Must be one of {valid_types}", "success": False } # Convert dict to JSON string if needed if data_type == "json" and isinstance(data, dict): try: data = json.dumps(data) logging.info(f"Converted Python dict to JSON string: {type(data)}") except Exception as e: return { "error": f"Error converting dict to JSON: {str(e)}", "success": False } # Check data size limits (after possible conversion) data_len = len(data) if isinstance(data, str) else len(str(data)) if data_len > 100000: # 100KB limit return { "error": f"Data size of {data_len} bytes exceeds 100KB limit", "success": False } # Additional validation for JSON data if data_type == "json": try: # Verify it's valid JSON if isinstance(data, str): try: json_data = json.loads(data) except json.JSONDecodeError as e: # Check for JSON with extra quotes if data.startswith('"') and data.endswith('"') and len(data) > 2: try: # Try to parse the inner content without outer quotes inner_data = data[1:-1] if inner_data.lstrip().startswith(('{', '[')): json_data = json.loads(inner_data) data = inner_data logging.info(f"Fixed JSON by removing outer quotes: {type(json_data)}") else: raise e except Exception: # If that didn't work, re-raise the original error raise e else: raise e else: # If already a dict or other Python object, convert to JSON string json_data = data data = json.dumps(data) # If we get here, data is valid JSON logging.info(f"Validated JSON data: {type(json_data)}") except json.JSONDecodeError as e: return { "error": f"Invalid JSON data: {str(e)}", "success": False } except Exception as e: return { "error": f"Error processing JSON data: {str(e)}", "success": False } # Use the Zep SDK to add the data to the graph try: # Call the graph add API response = self.client.graph.add( user_id=user_id, type=data_type, data=data ) # Create a return value that can be serialized to JSON result = { "success": True, "user_id": user_id, "data_type": data_type, "data_length": len(data), "response": { "uuid": response.uuid if hasattr(response, 'uuid') else None, "content": response.content if hasattr(response, 'content') else None, "created_at": str(response.created_at) if hasattr(response, 'created_at') else None, "processed": response.processed if hasattr(response, 'processed') else None } } return result except Exception as e: error_message = f"Exception adding graph data: {str(e)}" logging.error(error_message) return { "error": error_message, "success": False }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fshamim/mcp-server-zep-cloud'

If you have feedback or need assistance with the MCP directory API, please join our Discord server