Skip to main content
Glama

KevoDB MCP Server

Official
by KevoDB
main.py16 kB
# ABOUTME: MCP server for KevoDB using the Python SDK # ABOUTME: Exposes KevoDB operations as MCP tools from fastmcp import FastMCP from kevo import Client as KevoClient, ClientOptions, ScanOptions, BatchOperation from pydantic import BaseModel from typing import Dict, List, Optional, Union import logging import os import sys import traceback # Configure logging logging.basicConfig( level=logging.WARN, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('kevo_mcp_server.log') ] ) logger = logging.getLogger('kevo_mcp') # Initialize FastMCP server mcp = FastMCP("KevoMCP") # Initialize KevoDB client host = os.getenv("KEVO_HOST", "localhost") port = os.getenv("KEVO_PORT", "50051") kevo_client = KevoClient(ClientOptions(endpoint=f"{host}:{port}")) # Models for request/response data class KeyValue(BaseModel): key: str value: str class ScanRequest(BaseModel): prefix: Optional[str] = None suffix: Optional[str] = None start_key: Optional[str] = None end_key: Optional[str] = None limit: Optional[int] = None reverse: bool = False @mcp.tool() def connect() -> bool: """Connect to the KevoDB server""" logger.info("Attempting to connect to KevoDB server") try: kevo_client.connect() logger.info("Successfully connected to KevoDB server") return True except Exception as e: logger.error(f"Failed to connect to KevoDB server: {str(e)}") logger.error(f"Stack trace: {traceback.format_exc()}") return False @mcp.tool() def get(key: str) -> Dict[str, Union[str, bool]]: """Get a value by key from KevoDB""" logger.info(f"GET operation started for key: {key[:10]}...") try: # Convert string key to bytes logger.debug(f"Converting key to bytes: {key[:30]}...") key_bytes = key.encode('utf-8') logger.debug(f"Key length: {len(key_bytes)} bytes") logger.info("Sending GET request to KevoDB for key") value, found = kevo_client.get(key_bytes) if found: logger.info(f"Key found, value length: {len(value)} bytes") # Convert bytes value to string for JSON transport decoded_value = value.decode('utf-8') logger.debug(f"Decoded value length: {len(decoded_value)}") return {"value": decoded_value, "found": True} else: logger.info("Key not found") return {"value": "", "found": False} except Exception as e: logger.error(f"GET operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return {"value": "", "found": False, "error": str(e)} @mcp.tool() def put(key: str, value: str) -> bool: """Store a key-value pair in KevoDB""" logger.info(f"PUT operation started for key: {key[:10]}...") try: # Convert string key/value to bytes directly logger.debug(f"Converting key to bytes: {key[:30]}...") key_bytes = key.encode('utf-8') logger.debug(f"Key length: {len(key_bytes)} bytes") logger.debug(f"Converting value to bytes: {value[:30]}...") value_bytes = value.encode('utf-8') logger.debug(f"Value length: {len(value_bytes)} bytes") logger.info( f"Sending PUT request to KevoDB for key length: {len(key_bytes)}, value length: {len(value_bytes)}") kevo_client.put(key_bytes, value_bytes) logger.info("PUT operation completed successfully") return True except Exception as e: logger.error(f"PUT operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") # Log the key/value details that caused the error try: logger.debug(f"Failed key: {key}") logger.debug(f"Failed value: {value[:100]}") except Exception as log_e: logger.error(f"Error while logging key/value: {str(log_e)}") return False @mcp.tool() def delete(key: str) -> bool: """Delete a key-value pair from KevoDB""" logger.info(f"DELETE operation started for key: {key[:10]}...") try: logger.debug(f"Converting key to bytes: {key[:30]}...") key_bytes = key.encode('utf-8') logger.debug(f"Key length: {len(key_bytes)} bytes") kevo_client.delete(key_bytes) logger.info("DELETE operation completed successfully") return True except Exception as e: logger.error(f"DELETE operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return False @mcp.tool() def scan(scan_options: ScanRequest) -> List[Dict[str, str]]: """Scan keys in KevoDB with options""" logger.info("SCAN operation started with options") try: results = [] logger.info("Executing scan operation") # Create proper ScanOptions object from our ScanRequest model from kevo import ScanOptions options = ScanOptions( prefix=scan_options.prefix.encode('utf-8') if scan_options.prefix else None, suffix=scan_options.suffix.encode('utf-8') if scan_options.suffix else None, start_key=scan_options.start_key.encode('utf-8') if scan_options.start_key else None, end_key=scan_options.end_key.encode('utf-8') if scan_options.end_key else None, limit=scan_options.limit if scan_options.limit else 0 ) for kv in kevo_client.scan(options): try: # Decode bytes to strings key_str = kv.key.decode('utf-8') value_str = kv.value.decode('utf-8') results.append({ "key": key_str, "value": value_str }) except UnicodeDecodeError: logger.warning( "Found binary data that couldn't be decoded as UTF-8, skipping") # Skip binary data that can't be decoded as UTF-8 continue logger.info(f"Scan completed, found {len(results)} results") return results except Exception as e: logger.error(f"SCAN operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return [] @mcp.tool() def batch_write(operations: List[str]) -> bool: """Perform multiple operations in a batch. Each operation should be a string in one of these formats: - "put key value" - Store a key-value pair - "delete key" - Delete a key Example: batch_write([ "put user:123 {\"name\": \"John\"}", "delete user:456" ]) """ logger.info( f"BATCH_WRITE operation started with {len(operations)} operations") try: batch_ops: BatchOperation = [] for i, op_str in enumerate(operations): try: # Split the operation string into parts parts = op_str.split(maxsplit=2) op_type = parts[0].lower() logger.debug( f"Processing operation {i+1}/{len(operations)}: {op_str[:50]}...") if op_type == "put" and len(parts) == 3: op = BatchOperation( op_type=BatchOperation.Type.PUT, key=parts[1].encode('utf-8'), value=parts[2].encode('utf-8'), ) logger.debug( f"PUT - Key length: {len(op.key)} bytes, Value length: {len(op.value)} bytes") batch_ops.append(op) elif op_type == "delete" and len(parts) == 2: op = BatchOperation( op_type=BatchOperation.Type.DELETE, key=parts[1].encode('utf-8'), ) logger.debug(f"DELETE - Key length: {len(op.key)} bytes") batch_ops.append(op) else: logger.warning( f"Invalid operation format: {op_str[:50]}...") raise ValueError( f"Invalid operation format. Must be 'put key value' or 'delete key'") except Exception as e: logger.error(f"Error processing operation {i+1}: {str(e)}") raise ValueError( f"Failed to process operation {i+1}: {str(e)}") if not batch_ops: logger.warning("No valid operations to process") return False logger.info( f"Sending batch with {len(batch_ops)} operations to KevoDB") kevo_client.batch_write(batch_ops) logger.info("Batch write completed successfully") return True except Exception as e: logger.error(f"BATCH_WRITE operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return False @mcp.tool() def get_stats() -> Dict: """Get database statistics""" try: stats = kevo_client.get_stats() return stats.__dict__ except Exception as e: return {"error": str(e)} @mcp.tool() def begin_transaction() -> str: """Begin a new transaction and return transaction ID""" try: # Store transaction in a dict to keep track of it tx = kevo_client.begin_transaction() # Generate a unique ID for this transaction import uuid tx_id = str(uuid.uuid4()) # Store transaction in a global variable (in a real app, use a proper store) # This is just a simple example - in production, use a proper transaction store if not hasattr(begin_transaction, "transactions"): begin_transaction.transactions = {} begin_transaction.transactions[tx_id] = tx logger.info(f"Transaction started with ID: {tx_id}") return tx_id except Exception as e: logger.error(f"Failed to begin transaction: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return "" @mcp.tool() def commit_transaction(tx_id: str) -> bool: """Commit a transaction by ID""" try: if not hasattr(begin_transaction, "transactions"): return False tx = begin_transaction.transactions.get(tx_id) if not tx: return False tx.commit() del begin_transaction.transactions[tx_id] return True except Exception as e: return False @mcp.tool() def rollback_transaction(tx_id: str) -> bool: """Roll back a transaction by ID""" try: if not hasattr(begin_transaction, "transactions"): return False tx = begin_transaction.transactions.get(tx_id) if not tx: return False tx.rollback() del begin_transaction.transactions[tx_id] return True except Exception as e: return False @mcp.tool() def tx_put(tx_id: str, key: str, value: str) -> bool: """Store a key-value pair within a transaction""" logger.info( f"TX_PUT operation started for transaction: {tx_id}, key: {key[:10]}...") try: if not hasattr(begin_transaction, "transactions"): logger.error("Transaction store not initialized") return False tx = begin_transaction.transactions.get(tx_id) if not tx: logger.error(f"Transaction not found with ID: {tx_id}") return False # Convert string to bytes key_bytes = key.encode('utf-8') value_bytes = value.encode('utf-8') logger.debug( f"Key length: {len(key_bytes)} bytes, Value length: {len(value_bytes)} bytes") tx.put(key_bytes, value_bytes) logger.info("TX_PUT operation completed successfully") return True except Exception as e: logger.error(f"TX_PUT operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return False @mcp.tool() def tx_get(tx_id: str, key: str) -> Dict[str, Union[str, bool]]: """Get a value by key within a transaction""" logger.info( f"TX_GET operation started for transaction: {tx_id}, key: {key[:10]}...") try: if not hasattr(begin_transaction, "transactions"): logger.error("Transaction store not initialized") return {"value": "", "found": False} tx = begin_transaction.transactions.get(tx_id) if not tx: logger.error(f"Transaction not found with ID: {tx_id}") return {"value": "", "found": False} # Convert string to bytes key_bytes = key.encode('utf-8') logger.debug(f"Key length: {len(key_bytes)} bytes") value, found = tx.get(key_bytes) if found: # Convert bytes to string value_str = value.decode('utf-8') logger.info( f"Key found, value length: {len(value_str)} characters") return {"value": value_str, "found": True} else: logger.info("Key not found in transaction") return {"value": "", "found": False} except Exception as e: logger.error(f"TX_GET operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return {"value": "", "found": False, "error": str(e)} @mcp.tool() def tx_delete(tx_id: str, key: str) -> bool: """Delete a key-value pair within a transaction""" logger.info( f"TX_DELETE operation started for transaction: {tx_id}, key: {key[:10]}...") try: if not hasattr(begin_transaction, "transactions"): logger.error("Transaction store not initialized") return False tx = begin_transaction.transactions.get(tx_id) if not tx: logger.error(f"Transaction not found with ID: {tx_id}") return False # Convert string to bytes key_bytes = key.encode('utf-8') logger.debug(f"Key length: {len(key_bytes)} bytes") tx.delete(key_bytes) logger.info("TX_DELETE operation completed successfully") return True except Exception as e: logger.error(f"TX_DELETE operation failed with error: {str(e)}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Stack trace: {traceback.format_exc()}") return False @mcp.tool() def cleanup() -> bool: """Close the KevoDB connection""" try: kevo_client.close() return True except Exception as e: return False if __name__ == "__main__": logger.info("Starting KevoDB MCP server") # Attempt to connect to KevoDB when server starts try: logger.info("Attempting initial connection to KevoDB") kevo_client.connect() logger.info("Successfully connected to KevoDB") print("Successfully connected to KevoDB") except Exception as e: logger.error(f"Error connecting to KevoDB: {str(e)}") logger.error(f"Stack trace: {traceback.format_exc()}") print(f"Error connecting to KevoDB: {e}") print("Server will still start, but connections will need to be initialized") # Log client configuration logger.info( f"KevoDB client configured with endpoint: {kevo_client.options.endpoint}") # Run the MCP server logger.info("Starting MCP server") mcp.run() logger.info("MCP server stopped")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KevoDB/kevo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server