copy_data
Transfer data into a Vertica table using the COPY command. Specify a schema, target table, and data rows to ensure accurate and efficient insertion with progress tracking and logging.
Instructions
Copy data into a Vertica table using COPY command.
Args:
ctx: FastMCP context for progress reporting and logging
schema: vertica schema to execute the copy against
table: Target table name
data: List of rows to insert
Returns:
Status message indicating success or failure
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| data | Yes | ||
| schema | Yes | ||
| table | Yes |
Implementation Reference
- src/mcp_vertica/mcp.py:308-365 (handler)Implementation of the copy_data tool handler. Uses Vertica's COPY command to insert list of data rows into a specified schema.table. Includes permission checks, CSV conversion, and error handling.@mcp.tool() async def copy_data( ctx: Context, schema: str, table: str, data: List[List[Any]], ) -> str: """Copy data into a Vertica table using COPY command. Args: ctx: FastMCP context for progress reporting and logging schema: vertica schema to execute the copy against table: Target table name data: List of rows to insert Returns: Status message indicating success or failure """ await ctx.info(f"Copying {len(data)} rows to table: {table}") # Get or create connection manager manager = await get_or_create_manager(ctx) if not manager: return "Error: Failed to initialize database connection. Check configuration." # Check operation permissions if not manager.is_operation_allowed(schema, OperationType.INSERT): error_msg = f"INSERT operation not allowed for database {schema}" await ctx.error(error_msg) return error_msg conn = None cursor = None try: conn = manager.get_connection() cursor = conn.cursor() # Convert data to CSV string output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) writer.writerows(data) output.seek(0) # Create COPY command copy_query = f"""COPY {table} FROM STDIN DELIMITER ',' ENCLOSED BY '\"'""" cursor.copy(copy_query, output.getvalue()) conn.commit() success_msg = f"Successfully copied {len(data)} rows to {table}" await ctx.info(success_msg) return success_msg except Exception as e: error_msg = f"Error copying data: {str(e)}" await ctx.error(error_msg) return error_msg finally: if cursor: cursor.close() if conn: manager.release_connection(conn)