MCP Pytest Server

  • src
  • meilisearch_mcp
from typing import Dict, Any, List, Optional, Union from meilisearch import Client class DocumentManager: """Manage documents within Meilisearch indexes""" def __init__(self, client: Client): self.client = client async def get_documents( self, index_uid: str, offset: Optional[int] = None, limit: Optional[int] = None, fields: Optional[List[str]] = None, ) -> Dict[str, Any]: """Get documents from an index""" try: index = self.client.index(index_uid) return index.get_documents( {"offset": offset, "limit": limit, "fields": fields} ) except Exception as e: raise Exception(f"Failed to get documents: {str(e)}") async def get_document( self, index_uid: str, document_id: Union[str, int] ) -> Dict[str, Any]: """Get a single document""" try: index = self.client.index(index_uid) return index.get_document(document_id) except Exception as e: raise Exception(f"Failed to get document: {str(e)}") async def add_documents( self, index_uid: str, documents: List[Dict[str, Any]], primary_key: Optional[str] = None, ) -> Dict[str, Any]: """Add documents to an index""" try: index = self.client.index(index_uid) return index.add_documents(documents, primary_key) except Exception as e: raise Exception(f"Failed to add documents: {str(e)}") async def update_documents( self, index_uid: str, documents: List[Dict[str, Any]] ) -> Dict[str, Any]: """Update documents in an index""" try: index = self.client.index(index_uid) return index.update_documents(documents) except Exception as e: raise Exception(f"Failed to update documents: {str(e)}") async def delete_document( self, index_uid: str, document_id: Union[str, int] ) -> Dict[str, Any]: """Delete a single document""" try: index = self.client.index(index_uid) return index.delete_document(document_id) except Exception as e: raise Exception(f"Failed to delete document: {str(e)}") async def delete_documents( self, index_uid: str, document_ids: List[Union[str, int]] ) -> Dict[str, Any]: """Delete multiple documents by ID""" try: index = self.client.index(index_uid) return index.delete_documents(document_ids) except Exception as e: raise Exception(f"Failed to delete documents: {str(e)}") async def delete_all_documents(self, index_uid: str) -> Dict[str, Any]: """Delete all documents in an index""" try: index = self.client.index(index_uid) return index.delete_all_documents() except Exception as e: raise Exception(f"Failed to delete all documents: {str(e)}")