Skip to main content
Glama

Typesense MCP Server

main.py40.6 kB
# Add lifespan support for startup/shutdown with strong typing from contextlib import asynccontextmanager from collections.abc import AsyncIterator from dataclasses import dataclass import os # Import os for environment variables (though BaseSettings handles it) import json # Added for parsing JSON Lines in export import csv # Added for CSV import import io # Added for CSV import from string # Import Typesense and Pydantic Settings import typesense from pydantic_settings import BaseSettings from pydantic import Field # Import Field for default values/validation if needed # Remove fake database import # from fake_database import Database # Replace with your actual DB type from mcp.server.fastmcp import Context, FastMCP # Configuration settings loaded from environment variables class Settings(BaseSettings): # Defines application settings, loading values from environment variables. typesense_host: str = Field(default='localhost', alias='TYPESENSE_HOST') typesense_port: int = Field(default=8108, alias='TYPESENSE_PORT') typesense_protocol: str = Field(default='http', alias='TYPESENSE_PROTOCOL') typesense_api_key: str = Field(..., alias='TYPESENSE_API_KEY') # Required, use ... class Config: # Allows loading from a .env file if present env_file = '.env' extra = 'ignore' # Ignore extra fields from env # Create a named server # Note: The MCP instance creation is duplicated, consolidating to the one with lifespan # mcp = FastMCP("My App") # Specify dependencies for deployment and development # Dependencies can also be managed in pyproject.toml # mcp = FastMCP("My App", dependencies=["pandas", "numpy"]) # Application context holding shared resources like the Typesense client @dataclass class AppContext: # Holds shared application state, accessible within tools. client: typesense.Client settings: Settings # Keep settings accessible if needed # Manages the application lifecycle, initializing and cleaning up resources @asynccontextmanager async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]: """ Manage application lifecycle: initialize Typesense client on startup. Args: server (FastMCP): The FastMCP server instance. Yields: AppContext: The application context containing the initialized Typesense client. """ # Load settings from environment variables settings = Settings() # Configure Typesense client # Ensure required env var TYPESENSE_API_KEY is set if not settings.typesense_api_key: raise ValueError("TYPESENSE_API_KEY environment variable not set.") client = typesense.Client({ 'nodes': [{ 'host': settings.typesense_host, 'port': settings.typesense_port, 'protocol': settings.typesense_protocol }], 'api_key': settings.typesense_api_key, 'connection_timeout_seconds': 2 # Example timeout }) try: # Provide the context to the application yield AppContext(client=client, settings=settings) finally: # Cleanup on shutdown (if necessary) # Typesense client based on HTTPX usually doesn't need explicit closing print("Shutting down. Typesense client resources managed automatically.") # Initialize the MCP Server with the lifespan manager mcp = FastMCP("Typesense MCP Server", lifespan=app_lifespan) # Example tool demonstrating access to the Typesense client @mcp.tool() async def check_typesense_health(ctx: Context) -> dict | str: """ Checks the health status of the configured Typesense server. Args: ctx (Context): The MCP context, providing access to application resources. Returns: dict | str: The health status dictionary from Typesense or an error message. """ # Access the Typesense client from the lifespan context try: client: typesense.Client = ctx.request_context.lifespan_context.client health_status = await client.health.retrieve() return health_status except Exception as e: # Log the exception ideally print(f"Error checking Typesense health: {e}") return f"Error connecting to Typesense or checking health: {e}" # --- Typesense Collection Tools --- # Tool to list all collections @mcp.tool() async def list_collections(ctx: Context) -> list | str: """ Retrieves a list of all collections in the Typesense server. Args: ctx (Context): The MCP context. Returns: list | str: A list of collection schemas or an error message string. """ try: client: typesense.Client = ctx.request_context.lifespan_context.client collections = client.collections.retrieve() # The response is expected to be a list of dictionaries return collections except typesense.exceptions.TypesenseClientError as e: print(f"Error listing collections: {e}") return f"Error listing collections: {e}" except Exception as e: print(f"An unexpected error occurred while listing collections: {e}") return f"An unexpected error occurred: {e}" # Tool to describe a specific collection @mcp.tool() async def describe_collection(ctx: Context, collection_name: str) -> dict | str: """ Retrieves the schema and metadata for a specific collection. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection to describe. Returns: dict | str: The collection schema dictionary or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." try: client: typesense.Client = ctx.request_context.lifespan_context.client collection_info = client.collections[collection_name].retrieve() return collection_info except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.TypesenseClientError as e: print(f"Error describing collection '{collection_name}': {e}") return f"Error describing collection '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred while describing collection '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # Tool to export all documents from a collection @mcp.tool() async def export_collection(ctx: Context, collection_name: str) -> list[dict] | str: """ Exports all documents from a specific collection. Warning: This can be memory-intensive for very large collections. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection to export. Returns: list[dict] | str: A list of document dictionaries or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." documents = [] try: client: typesense.Client = ctx.request_context.lifespan_context.client # Check if collection exists first to give a clearer error _ = client.collections[collection_name].retrieve() # Check existence synchronously exported_lines = client.collections[collection_name].documents.export() for line in exported_lines: try: documents.append(json.loads(line)) except json.JSONDecodeError: print(f"Warning: Could not decode JSON line during export: {line}") # Decide whether to skip or raise an error continue return documents except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.TypesenseClientError as e: print(f"Error exporting collection '{collection_name}': {e}") return f"Error exporting collection '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred while exporting collection '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # --- Typesense Search Tools --- # Tool to perform keyword search @mcp.tool() async def search( ctx: Context, collection_name: str, query: str, query_by: str, filter_by: str | None = None, sort_by: str | None = None, group_by: str | None = None, # Added group_by facet_by: str | None = None, # Added facet_by per_page: int = 20, page: int = 1 # Added page ) -> dict | str: """ Performs a keyword search on a specific collection. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection to search within. query (str): The search query string. Use '*' for all documents. query_by (str): Comma-separated list of fields to search in. filter_by (str | None): Filter conditions (e.g., 'price:>100 && category:Electronics'). Defaults to None. sort_by (str | None): Sorting criteria (e.g., 'price:asc, rating:desc'). Defaults to None. group_by (str | None): Field to group results by. Defaults to None. facet_by (str | None): Fields to facet on. Defaults to None. per_page (int): Number of results per page. Defaults to 20. page (int): Page number to retrieve. Defaults to 1. Returns: dict | str: The search results dictionary from Typesense or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." if not query: return "Error: query parameter is required." if not query_by: return "Error: query_by parameter is required." search_params = { 'q': query, 'query_by': query_by, 'per_page': per_page, 'page': page, } if filter_by: search_params['filter_by'] = filter_by if sort_by: search_params['sort_by'] = sort_by if group_by: search_params['group_by'] = group_by if facet_by: search_params['facet_by'] = facet_by try: client: typesense.Client = ctx.request_context.lifespan_context.client search_results = client.collections[collection_name].documents.search(search_params) return search_results except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.RequestMalformed as e: return f"Error: Malformed search request for collection '{collection_name}'. Check parameters (query_by fields existence, filter/sort syntax, etc.). Details: {e}" except typesense.exceptions.TypesenseClientError as e: print(f"Error searching collection '{collection_name}': {e}") return f"Error searching collection '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred while searching collection '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # Tool to perform vector search @mcp.tool() async def vector_search( ctx: Context, collection_name: str, vector_query: str, query_by: str | None = None, # For potential hybrid search text query part filter_by: str | None = None, sort_by: str | None = None, # Usually less relevant for pure vector search unless post-filtering/ranking per_page: int = 10, page: int = 1 ) -> dict | str: """ Performs a vector similarity search on a specific collection. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection to search within. vector_query (str): The vector query string, formatted as 'vector_field:([v1,v2,...], k: num_neighbors)'. query_by (str | None): Optional: Comma-separated list of text fields for hybrid search query ('q' parameter). Defaults to None. filter_by (str | None): Filter conditions to apply before vector search. Defaults to None. sort_by (str | None): Optional sorting criteria (less common for pure vector search). Defaults to None. per_page (int): Number of results per page. Defaults to 10. page (int): Page number to retrieve. Defaults to 1. Returns: dict | str: The vector search results dictionary from Typesense or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." if not vector_query: return "Error: vector_query parameter is required (e.g., 'vec:([0.1,...], k:5)')." search_params = { 'q': '*' if not query_by else query_by, # Use '*' if no text query component 'vector_query': vector_query, 'per_page': per_page, 'page': page, } # Hybrid search requires 'q' and 'query_by' along with 'vector_query' if query_by: search_params['q'] = query_by # The actual text query would be passed in 'q' if needed, maybe add 'q' param? Let's assume vector only for now and use '*' search_params['query_by'] = query_by # But if query_by is provided, maybe the intent IS hybrid? Needs clarification. Let's adjust based on initial thought. # Re-thinking: vector_query parameter itself contains the field name. 'query_by' is for the *text* part. # So, if query_by is present, we should probably require a text query 'q' param as well. # Let's stick to the provided params first: use q='*' and let vector_query drive the search. # If query_by *is* provided, Typesense might use it for hybrid ranking if 'q' is not '*'. Let's leave 'q':'*' for now. search_params['q'] = '*' # Default to fetch all for vector ranking # If query_by IS specified, it likely means hybrid search is intended, but we lack a 'q' param. Let's add it for clarity. # Let's redefine the vector search parameters slightly for better clarity on hybrid search. # We'll add an optional 'query' text parameter. # ---- Revised Vector Search Definition --- search_params = { 'vector_query': vector_query, 'per_page': per_page, 'page': page, # q defaults to '*' for pure vector search, but can be overridden 'q': '*' } # If text query and fields are provided, set them up for hybrid search # User needs to provide the actual text query string via 'query' param if 'query_by' is set. # Let's add 'query: str | None = None' to the function signature. # --- Applying change to signature and logic --- # (Applied in the final code block below) if filter_by: search_params['filter_by'] = filter_by if sort_by: search_params['sort_by'] = sort_by try: client: typesense.Client = ctx.request_context.lifespan_context.client search_results = client.collections[collection_name].documents.search(search_params) return search_results except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.RequestMalformed as e: return f"Error: Malformed vector search request for collection '{collection_name}'. Check parameters (vector field existence, vector format, filter syntax, etc.). Details: {e}" except typesense.exceptions.TypesenseClientError as e: print(f"Error during vector search in collection '{collection_name}': {e}") return f"Error during vector search in collection '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred during vector search in collection '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # Tool to perform vector search (REVISED with optional text query 'q') @mcp.tool() async def vector_search( ctx: Context, collection_name: str, vector_query: str, query: str | None = None, # Optional text query for hybrid search query_by: str | None = None, # Required if 'query' is provided for hybrid search filter_by: str | None = None, sort_by: str | None = None, per_page: int = 10, page: int = 1 ) -> dict | str: """ Performs a vector similarity search on a specific collection, with optional hybrid text search. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection to search within. vector_query (str): The vector query string, formatted as 'vector_field:([v1,v2,...], k: num_neighbors)'. query (str | None): Optional: The text query string for hybrid search. Defaults to None. query_by (str | None): Optional: Comma-separated list of text fields. Required if 'query' is provided. Defaults to None. filter_by (str | None): Filter conditions to apply. Defaults to None. sort_by (str | None): Optional sorting criteria. Defaults to None. per_page (int): Number of results per page. Defaults to 10. page (int): Page number to retrieve. Defaults to 1. Returns: dict | str: The vector search results dictionary from Typesense or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." if not vector_query: return "Error: vector_query parameter is required (e.g., 'vec:([0.1,...], k:5)')." if query and not query_by: return "Error: query_by parameter is required when providing a text query for hybrid search." search_params = { 'vector_query': vector_query, 'per_page': per_page, 'page': page, 'q': query if query else '*' # Use text query if provided, else '*' for pure vector } if query and query_by: search_params['query_by'] = query_by if filter_by: search_params['filter_by'] = filter_by if sort_by: search_params['sort_by'] = sort_by try: client: typesense.Client = ctx.request_context.lifespan_context.client search_results = client.collections[collection_name].documents.search(search_params) return search_results except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.RequestMalformed as e: # Provide more specific feedback if possible error_message = f"Error: Malformed vector search request for collection '{collection_name}'. Details: {e}" if "vector_query" in str(e) and "dimension" in str(e): error_message += " Check if vector dimensions match the schema." elif "vector_query" in str(e): error_message += " Check vector query format 'field:([vector], k:num)'." elif "filter_by" in str(e): error_message += " Check filter syntax." elif query and query_by and any(field in str(e) for field in query_by.split(',')): error_message += f" Check if text search fields in 'query_by' ({query_by}) exist in the schema." return error_message except typesense.exceptions.TypesenseClientError as e: print(f"Error during vector search in collection '{collection_name}': {e}") return f"Error during vector search in collection '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred during vector search in collection '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # --- Collection Management Tools --- # Tool to create a new collection @mcp.tool() async def create_collection(ctx: Context, schema: dict) -> dict | str: """ Creates a new collection with the provided schema. Args: ctx (Context): The MCP context. schema (dict): The collection schema dictionary (must include 'name' and 'fields'). Returns: dict | str: The created collection schema dictionary or an error message string. """ if not isinstance(schema, dict) or 'name' not in schema or 'fields' not in schema: return "Error: Invalid schema provided. Must be a dictionary with 'name' and 'fields' keys." try: client: typesense.Client = ctx.request_context.lifespan_context.client # Assuming create is async based on library structure created_collection = await client.collections.create(schema) return created_collection except typesense.exceptions.ObjectAlreadyExists: return f"Error: Collection '{schema.get('name')}' already exists." except typesense.exceptions.RequestMalformed as e: return f"Error: Malformed create collection request. Check schema format. Details: {e}" except typesense.exceptions.TypesenseClientError as e: print(f"Error creating collection '{schema.get('name')}': {e}") return f"Error creating collection '{schema.get('name')}': {e}" except Exception as e: print(f"An unexpected error occurred while creating collection '{schema.get('name')}': {e}") return f"An unexpected error occurred: {e}" # Tool to delete a collection @mcp.tool() async def delete_collection(ctx: Context, collection_name: str) -> dict | str: """ Deletes a specific collection. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection to delete. Returns: dict | str: The deleted collection schema dictionary or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." try: client: typesense.Client = ctx.request_context.lifespan_context.client # NOTE: Assuming delete is synchronous based on pattern with retrieve/export/search. deleted_info = client.collections[collection_name].delete() return deleted_info except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.TypesenseClientError as e: print(f"Error deleting collection '{collection_name}': {e}") return f"Error deleting collection '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred while deleting collection '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # Tool to truncate a collection (delete all documents, keep schema) @mcp.tool() async def truncate_collection(ctx: Context, collection_name: str) -> str: """ Truncates a collection by deleting all documents but keeping the schema. Achieved by retrieving schema, deleting collection, and recreating it. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection to truncate. Returns: str: A success or error message string. """ if not collection_name: return "Error: collection_name parameter is required." client: typesense.Client = ctx.request_context.lifespan_context.client original_schema = None try: # 1. Retrieve the current schema (assuming sync) print(f"Truncating '{collection_name}': Retrieving schema...") original_schema = client.collections[collection_name].retrieve() # We only need the fields, name, and potentially other top-level settings for re-creation # Remove read-only fields like 'created_at', 'num_documents' before re-creating schema_to_recreate = { key: value for key, value in original_schema.items() if key in ['name', 'fields', 'default_sorting_field', 'token_separators', 'symbols_to_index', 'enable_nested_fields'] } # 2. Delete the collection (assuming sync) print(f"Truncating '{collection_name}': Deleting original collection...") client.collections[collection_name].delete() # 3. Recreate the collection with the original schema (assuming async) print(f"Truncating '{collection_name}': Recreating collection with schema...") await client.collections.create(schema_to_recreate) return f"Successfully truncated collection '{collection_name}'." except typesense.exceptions.ObjectNotFound: return f"Error during truncate: Collection '{collection_name}' not found (maybe already deleted?)." except typesense.exceptions.TypesenseClientError as e: error_message = f"Error during truncate operation on '{collection_name}': {e}" print(error_message) # Attempt to restore if deletion succeeded but recreation failed? # For now, just report the error stage. if original_schema and not client.collections[collection_name].exists(): # Check if deleted but not recreated error_message += " Original collection was deleted but recreation failed." return error_message except Exception as e: print(f"An unexpected error occurred during truncate for '{collection_name}': {e}") return f"An unexpected error occurred during truncate: {e}" # --- Document Management Tools --- # Tool to create a single document @mcp.tool() async def create_document(ctx: Context, collection_name: str, document: dict) -> dict | str: """ Creates a single new document in a specific collection. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection. document (dict): The document data to create (must include an 'id' field unless auto-schema). Returns: dict | str: The created document dictionary or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." if not isinstance(document, dict): return "Error: document parameter must be a dictionary." # Consider adding check for 'id' field if not using auto-id generation try: print(f"Creating document in collection '{collection_name}' with ID: {document.get('id', 'N/A')}") client: typesense.Client = ctx.request_context.lifespan_context.client # NOTE: Assuming create is *sync* based on observed pattern created_doc = client.collections[collection_name].documents.create(document) return created_doc except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.ObjectAlreadyExists as e: # Occurs if document ID already exists return f"Error: Document with ID '{document.get('id', 'N/A')}' already exists in collection '{collection_name}'. Use upsert to update. Details: {e}" except typesense.exceptions.RequestMalformed as e: return f"Error: Malformed create document request for collection '{collection_name}'. Check document structure against schema. Details: {e}" except typesense.exceptions.TypesenseClientError as e: print(f"Error creating document in '{collection_name}': {e}") return f"Error creating document in '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred while creating document in '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # Tool to upsert (create or update) a single document @mcp.tool() async def upsert_document(ctx: Context, collection_name: str, document: dict) -> dict | str: """ Upserts (creates or updates) a single document in a specific collection. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection. document (dict): The document data to upsert (must include an 'id' field). Returns: dict | str: The upserted document dictionary or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." if not isinstance(document, dict) or 'id' not in document: return "Error: document parameter must be a dictionary and include an 'id' field." try: client: typesense.Client = ctx.request_context.lifespan_context.client # NOTE: Assuming upsert is *sync* based on observed pattern upserted_doc = client.collections[collection_name].documents.upsert(document) return upserted_doc except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.RequestMalformed as e: return f"Error: Malformed upsert document request for collection '{collection_name}'. Check document structure against schema. Details: {e}" except typesense.exceptions.TypesenseClientError as e: print(f"Error upserting document in '{collection_name}': {e}") return f"Error upserting document in '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred while upserting document in '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # Tool to index (create, upsert, update) multiple documents @mcp.tool() async def index_multiple_documents( ctx: Context, collection_name: str, documents: list[dict], action: str = 'upsert' ) -> list[dict] | str: """ Indexes (creates, upserts, or updates) multiple documents in a batch. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection. documents (list[dict]): A list of document dictionaries to index. action (str): The import action ('create', 'upsert', 'update'). Defaults to 'upsert'. Returns: list[dict] | str: A list of result dictionaries (one per document) or an error message string. Each result dict typically looks like {'success': true/false, 'error': '...', 'document': {...}}. """ if not collection_name: return "Error: collection_name parameter is required." if not isinstance(documents, list) or not documents: return "Error: documents parameter must be a non-empty list of dictionaries." if action not in ['create', 'upsert', 'update']: return "Error: action parameter must be one of 'create', 'upsert', 'update'." results = [] try: client: typesense.Client = ctx.request_context.lifespan_context.client # NOTE: Assuming import_ is *sync* based on observed pattern # Also, the response is JSONL strings, not directly awaitable objects import_response_lines = client.collections[collection_name].documents.import_(documents, {'action': action}) # Response is JSON Lines (string per result), parse each line # NOTE: Changed to normal 'for' loop for line in import_response_lines: try: results.append(json.loads(line)) except json.JSONDecodeError: print(f"Warning: Could not decode JSON line from import response: {line}") results.append({'success': False, 'error': 'Failed to decode response line', 'raw_line': line}) return results except typesense.exceptions.ObjectNotFound: return f"Error: Collection '{collection_name}' not found." except typesense.exceptions.RequestMalformed as e: return f"Error: Malformed bulk import request for collection '{collection_name}'. Check document structures. Details: {e}" except typesense.exceptions.TypesenseClientError as e: print(f"Error during bulk import to '{collection_name}': {e}") return f"Error during bulk import to '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred during bulk import to '{collection_name}': {e}") return f"An unexpected error occurred during bulk import: {e}" # Tool to delete a single document by ID @mcp.tool() async def delete_document(ctx: Context, collection_name: str, document_id: str) -> dict | str: """ Deletes a single document by its ID from a specific collection. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection. document_id (str): The ID of the document to delete. Returns: dict | str: The deleted document dictionary or an error message string. """ if not collection_name: return "Error: collection_name parameter is required." if not document_id: return "Error: document_id parameter is required." try: client: typesense.Client = ctx.request_context.lifespan_context.client # NOTE: Assuming document delete is synchronous based on collection delete pattern. deleted_doc = client.collections[collection_name].documents[document_id].delete() return deleted_doc except typesense.exceptions.ObjectNotFound: # Could be collection or document not found return f"Error: Collection '{collection_name}' or Document ID '{document_id}' not found." except typesense.exceptions.TypesenseClientError as e: print(f"Error deleting document '{document_id}' from '{collection_name}': {e}") return f"Error deleting document '{document_id}' from '{collection_name}': {e}" except Exception as e: print(f"An unexpected error occurred while deleting document '{document_id}' from '{collection_name}': {e}") return f"An unexpected error occurred: {e}" # Tool to import documents from CSV data or file path @mcp.tool() async def import_documents_from_csv( ctx: Context, collection_name: str, csv_data_or_path: str, batch_size: int = 100, action: str = 'upsert' ) -> dict: """ Imports documents from CSV data (as a string) or a file path into a collection. Assumes CSV header row maps directly to Typesense field names. Does basic type inference for int/float, otherwise treats as string. Args: ctx (Context): The MCP context. collection_name (str): The name of the collection. csv_data_or_path (str): Either the raw CSV data as a string or the path to a CSV file. batch_size (int): Number of documents to import per batch. Defaults to 100. action (str): Import action ('create', 'upsert', 'update'). Defaults to 'upsert'. Returns: dict: A summary of the import process including total processed, successful, failed count, and any errors. """ if not collection_name: return {"success": False, "error": "collection_name parameter is required."} if not csv_data_or_path: return {"success": False, "error": "csv_data_or_path parameter is required."} if action not in ['create', 'upsert', 'update']: return {"success": False, "error": "action parameter must be one of 'create', 'upsert', 'update'."} client: typesense.Client = ctx.request_context.lifespan_context.client batch = [] total_processed = 0 total_successful = 0 total_failed = 0 errors = [] import_results = [] try: # Check if collection exists (synchronously) _ = client.collections[collection_name].retrieve() file_obj = None reader = None # Determine if input is path or data if os.path.exists(csv_data_or_path): print(f"Importing from file path: {csv_data_or_path}") try: # Specify encoding, adjust if needed file_obj = open(csv_data_or_path, 'r', newline='', encoding='utf-8') reader = csv.DictReader(file_obj) except FileNotFoundError: return {"success": False, "error": f"CSV file not found at path: {csv_data_or_path}"} except Exception as e: return {"success": False, "error": f"Error opening or reading CSV file: {e}"} else: print("Importing from CSV string data.") file_obj = io.StringIO(csv_data_or_path) reader = csv.DictReader(file_obj) if not reader or not reader.fieldnames: return {"success": False, "error": "Could not read CSV headers or data."} print(f"CSV Headers detected: {reader.fieldnames}") for row in reader: total_processed += 1 # Basic type inference (can be expanded significantly) processed_row = {} for key, value in row.items(): if value is None or value == '': # Handle empty strings or None - skip or set default? Depends on schema. # For now, let's skip adding the key if value is empty/None continue try: # Try int, then float, then string processed_row[key] = int(value) except ValueError: try: processed_row[key] = float(value) except ValueError: processed_row[key] = value # Keep as string batch.append(processed_row) if len(batch) >= batch_size: print(f"Importing batch of {len(batch)} documents...") try: # NOTE: Assuming import_ is *sync* based on observed pattern # Also, the response is JSONL strings batch_results_lines = client.collections[collection_name].documents.import_(batch, {'action': action}) # NOTE: Changed to normal 'for' loop for line in batch_results_lines: try: result = json.loads(line) import_results.append(result) if result.get('success', False): total_successful += 1 else: total_failed += 1 errors.append(result.get('error', 'Unknown error in batch result')) except json.JSONDecodeError: print(f"Warning: Could not decode JSON line from import response: {line}") total_failed += 1 errors.append(f"Failed to decode response line: {line}") except Exception as batch_error: print(f"Error importing batch: {batch_error}") total_failed += len(batch) # Assume all in batch failed errors.append(f"Failed to import batch: {batch_error}") finally: batch = [] # Clear batch # Import any remaining documents in the last batch if batch: print(f"Importing final batch of {len(batch)} documents...") try: # NOTE: Assuming import_ is *sync* based on observed pattern batch_results_lines = client.collections[collection_name].documents.import_(batch, {'action': action}) # NOTE: Changed to normal 'for' loop for line in batch_results_lines: try: result = json.loads(line) import_results.append(result) if result.get('success', False): total_successful += 1 else: total_failed += 1 errors.append(result.get('error', 'Unknown error in batch result')) except json.JSONDecodeError: print(f"Warning: Could not decode JSON line from import response: {line}") total_failed += 1 errors.append(f"Failed to decode response line: {line}") except Exception as batch_error: print(f"Error importing final batch: {batch_error}") total_failed += len(batch) errors.append(f"Failed to import final batch: {batch_error}") except typesense.exceptions.ObjectNotFound: return {"success": False, "error": f"Collection '{collection_name}' not found."} except Exception as e: print(f"An unexpected error occurred during CSV import preparation or loop for '{collection_name}': {e}") return {"success": False, "error": f"An unexpected error occurred during CSV import: {e}"} finally: if file_obj and not isinstance(file_obj, io.StringIO): file_obj.close() # Close file if opened summary = { "success": total_failed == 0, "total_processed": total_processed, "total_successful": total_successful, "total_failed": total_failed, "errors": errors[:10] # Limit reported errors } print(f"CSV Import Summary: {summary}") return summary

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/avarant/typesense-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server