dynamodb_item_batch_write
Perform batch write or delete operations on DynamoDB items in a specified table, enabling efficient bulk data management for AWS operations.
Instructions
Batch write operations (put/delete) for DynamoDB items
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| items | Yes | Array of items to process | |
| key_attributes | No | For delete operations, specify which attributes form the key | |
| operation | Yes | Type of batch operation (put or delete) | |
| table_name | Yes | Name of the DynamoDB table |
Input Schema (JSON Schema)
{
"properties": {
"items": {
"description": "Array of items to process",
"type": "array"
},
"key_attributes": {
"description": "For delete operations, specify which attributes form the key",
"items": {
"type": "string"
},
"type": "array"
},
"operation": {
"description": "Type of batch operation (put or delete)",
"enum": [
"put",
"delete"
],
"type": "string"
},
"table_name": {
"description": "Name of the DynamoDB table",
"type": "string"
}
},
"required": [
"table_name",
"operation",
"items"
],
"type": "object"
}
Implementation Reference
- src/mcp_server_aws/server.py:265-332 (handler)Core handler for the dynamodb_item_batch_write tool. Parses arguments, batches items (max 25 per batch), supports put/delete operations, formats data using get_dynamodb_type, calls batch_write_item with retries for unprocessed items, and returns summary of processed/failed items.elif name == "dynamodb_item_batch_write": table_name = arguments["table_name"] operation = arguments["operation"] items = arguments["items"] if not items: raise ValueError("No items provided for batch operation") batch_size = 25 total_items = len(items) processed_items = 0 failed_items = [] for i in range(0, total_items, batch_size): batch = items[i:i + batch_size] request_items = {table_name: []} for item in batch: if operation == "put": formatted_item = {k: get_dynamodb_type( v) for k, v in item.items()} request_items[table_name].append({ 'PutRequest': {'Item': formatted_item} }) elif operation == "delete": key_attrs = arguments.get( "key_attributes", list(item.keys())) formatted_key = {k: get_dynamodb_type( item[k]) for k in key_attrs} request_items[table_name].append({ 'DeleteRequest': {'Key': formatted_key} }) try: response = dynamodb_client.batch_write_item( RequestItems=request_items) processed_items += len(batch) - len( response.get('UnprocessedItems', {} ).get(table_name, []) ) unprocessed = response.get('UnprocessedItems', {}) retry_count = 0 max_retries = 3 while unprocessed and retry_count < max_retries: await asyncio.sleep(2 ** retry_count) retry_response = dynamodb_client.batch_write_item( RequestItems=unprocessed) unprocessed = retry_response.get( 'UnprocessedItems', {}) retry_count += 1 if unprocessed: failed_items.extend([ item['PutRequest']['Item'] if 'PutRequest' in item else item['DeleteRequest']['Key'] for item in unprocessed.get(table_name, []) ]) except Exception as e: logger.error(f"Error processing batch: {str(e)}") failed_items.extend(batch) response = { "total_items": total_items, "processed_items": processed_items, "failed_items": len(failed_items), "failed_items_details": failed_items if failed_items else None }
- src/mcp_server_aws/tools.py:356-385 (schema)Input schema definition for the dynamodb_item_batch_write tool, specifying required parameters: table_name, operation (put/delete), items array, and optional key_attributes for deletes.Tool( name="dynamodb_item_batch_write", description="Batch write operations (put/delete) for DynamoDB items", inputSchema={ "type": "object", "properties": { "table_name": { "type": "string", "description": "Name of the DynamoDB table" }, "operation": { "type": "string", "enum": ["put", "delete"], "description": "Type of batch operation (put or delete)" }, "items": { "type": "array", "description": "Array of items to process" }, "key_attributes": { "type": "array", "description": "For delete operations, specify which attributes form the key", "items": { "type": "string" } } }, "required": ["table_name", "operation", "items"] } ),
- src/mcp_server_aws/utils.py:1-17 (helper)Helper function get_dynamodb_type recursively converts Python values to DynamoDB attribute value format (S, N, BOOL, NULL, L, M), used in formatting items and keys for batch_write_item.def get_dynamodb_type(value): if isinstance(value, str): return {'S': value} elif isinstance(value, (int, float)): return {'N': str(value)} elif isinstance(value, bool): return {'BOOL': value} elif value is None: return {'NULL': True} elif isinstance(value, list): return {'L': [get_dynamodb_type(v) for v in value]} elif isinstance(value, dict): return {'M': {k: get_dynamodb_type(v) for k, v in value.items()}} else: raise ValueError( f"Unsupported type for DynamoDB: {type(value)}")
- src/mcp_server_aws/server.py:136-140 (registration)MCP server registration of tools via list_tools handler, which returns get_aws_tools() including the dynamodb_item_batch_write Tool.async def list_tools() -> list[Tool]: """List available AWS tools""" logger.debug("Handling list_tools request") return get_aws_tools()