copy_data
Insert rows into a Vertica table using the COPY command. Provide schema, table, and row data to bulk load records efficiently.
Instructions
Copy data into a Vertica table using COPY command.
Args:
ctx: FastMCP context for progress reporting and logging
schema: vertica schema to execute the copy against
table: Target table name
data: List of rows to insert
Returns:
Status message indicating success or failureInput Schema
| Name | Required | Description | Default |
|---|---|---|---|
| schema | Yes | ||
| table | Yes | ||
| data | Yes |
Implementation Reference
- src/mcp_vertica/mcp.py:308-364 (handler)The async function `copy_data` that implements the tool logic. It accepts schema, table, and data (list of rows), converts data to CSV, then uses Vertica's COPY FROM STDIN command to bulk-insert rows into the specified table, with permission checking and connection management.
@mcp.tool() async def copy_data( ctx: Context, schema: str, table: str, data: List[List[Any]], ) -> str: """Copy data into a Vertica table using COPY command. Args: ctx: FastMCP context for progress reporting and logging schema: vertica schema to execute the copy against table: Target table name data: List of rows to insert Returns: Status message indicating success or failure """ await ctx.info(f"Copying {len(data)} rows to table: {table}") # Get or create connection manager manager = await get_or_create_manager(ctx) if not manager: return "Error: Failed to initialize database connection. Check configuration." # Check operation permissions if not manager.is_operation_allowed(schema, OperationType.INSERT): error_msg = f"INSERT operation not allowed for database {schema}" await ctx.error(error_msg) return error_msg conn = None cursor = None try: conn = manager.get_connection() cursor = conn.cursor() # Convert data to CSV string output = io.StringIO() writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) writer.writerows(data) output.seek(0) # Create COPY command copy_query = f"""COPY {table} FROM STDIN DELIMITER ',' ENCLOSED BY '\"'""" cursor.copy(copy_query, output.getvalue()) conn.commit() success_msg = f"Successfully copied {len(data)} rows to {table}" await ctx.info(success_msg) return success_msg except Exception as e: error_msg = f"Error copying data: {str(e)}" await ctx.error(error_msg) return error_msg finally: if cursor: cursor.close() if conn: manager.release_connection(conn) - src/mcp_vertica/mcp.py:308-309 (registration)The `@mcp.tool()` decorator registers `copy_data` as an MCP tool on the FastMCP instance.
@mcp.tool() async def copy_data( - src/mcp_vertica/mcp.py:309-311 (schema)The function signature serves as the schema: parameters are `ctx: Context`, `schema: str`, `table: str`, `data: List[List[Any]]` (the rows to insert), and returns `str`.
async def copy_data( ctx: Context, schema: str, table: str, data: List[List[Any]], ) -> str: - src/mcp_vertica/mcp.py:361-364 (helper)The `finally` block ensures proper cleanup of the cursor and connection back to the pool.
if cursor: cursor.close() if conn: manager.release_connection(conn)