import_table_file
Imports a local CSV or XLSX file into an existing database table. Specify connection, table, and file path to load data.
Instructions
Import a local CSV or XLSX file into an existing table.
Input Schema
| Name | Required | Description | Default |
|---|---|---|---|
| connection_id | Yes | ||
| table_name | Yes | ||
| file_path | Yes | ||
| schema | No | ||
| database | No | ||
| sheet_name | No |
Implementation Reference
- sql_query_mcp/importer.py:36-119 (handler)Core handler logic for import_table_file: reads CSV/XLSX, validates against target table, builds insert query, executes insert, and audits the result.
def import_table_file( self, connection_id: str, table_name: str, file_path: str, schema: Optional[str] = None, database: Optional[str] = None, sheet_name: Optional[str] = None, ) -> Dict[str, object]: started = time.perf_counter() config = None namespace = None file_extension = Path(file_path).suffix.lower() selected_sheet_name = None inserted_row_count = 0 try: config = self._registry.get_connection_config(connection_id) namespace = resolve_namespace(config, schema=schema, database=database) headers, rows, selected_sheet_name = _read_file(Path(file_path), sheet_name) if not rows: raise QueryExecutionError("文件没有可导入的数据行。") if config.engine == "hive" and len(rows) > HIVE_IMPORT_MAX_ROWS: raise QueryExecutionError( f"Hive 导入最多支持 {HIVE_IMPORT_MAX_ROWS} 行;大文件请使用 Hive LOAD DATA、外部表或已有数据入湖链路。" ) with self._registry.connection_from_config(config) as (conn, adapter): _apply_statement_timeout(adapter, conn, self._settings.statement_timeout_ms) description = adapter.describe_table(conn, namespace.value, table_name) if not description: raise QueryExecutionError( f"未找到表 {namespace.value}.{table_name},或当前用户没有访问权限" ) table_columns = [item["column_name"] for item in description["columns"]] _validate_headers(headers, table_columns) query = adapter.build_insert_query(namespace.value, table_name, headers) _execute_insert(conn, config.engine, query, rows) inserted_row_count = len(rows) duration_ms = _elapsed_ms(started) self._audit.log( tool="import_table_file", connection_id=connection_id, success=True, duration_ms=duration_ms, row_count=inserted_row_count, extra=_build_audit_extra( config, namespace, table_name, file_extension, selected_sheet_name, ), ) return { "connection_id": connection_id, "engine": config.engine, namespace.field_name: namespace.value, "table_name": table_name, "inserted_row_count": inserted_row_count, "duration_ms": duration_ms, "file_extension": file_extension, "sheet_name": selected_sheet_name, } except Exception as exc: duration_ms = _elapsed_ms(started) sanitized = sanitize_error_message(str(exc)) self._audit.log( tool="import_table_file", connection_id=connection_id, success=False, duration_ms=duration_ms, row_count=inserted_row_count, error=sanitized, extra=_build_audit_extra( config, namespace, table_name, file_extension, selected_sheet_name, ), ) raise QueryExecutionError(sanitized) from exc - sql_query_mcp/app.py:111-131 (registration)FastMCP tool registration via @mcp.tool() decorator, defining the tool name and parameters (connection_id, table_name, file_path, schema, database, sheet_name).
@mcp.tool() def import_table_file( connection_id: str, table_name: str, file_path: str, schema: Optional[str] = None, database: Optional[str] = None, sheet_name: Optional[str] = None, ) -> dict: """Import a local CSV or XLSX file into an existing table.""" return _run_tool( lambda: importer.import_table_file( connection_id, table_name, file_path, schema, database, sheet_name, ) ) - sql_query_mcp/importer.py:122-130 (helper)Helper function _read_file that dispatches to _read_csv or _read_xlsx based on file extension.
def _read_file(path: Path, sheet_name: Optional[str]) -> Tuple[List[str], List[Tuple[object, ...]], Optional[str]]: extension = path.suffix.lower() if extension == ".csv": if sheet_name: raise QueryExecutionError("CSV 文件不支持 sheet_name 参数。") return _read_csv(path) if extension == ".xlsx": return _read_xlsx(path, sheet_name) raise QueryExecutionError("仅支持 .csv 和 .xlsx 文件导入。") - sql_query_mcp/importer.py:133-141 (helper)Helper function _read_csv to parse CSV files.
def _read_csv(path: Path) -> Tuple[List[str], List[Tuple[object, ...]], Optional[str]]: with path.open("r", encoding="utf-8-sig", newline="") as handle: reader = csv.reader(handle) try: headers = next(reader) except StopIteration as exc: raise QueryExecutionError("文件表头不能为空。") from exc rows = [_normalize_row(row, len(headers)) for row in reader] return headers, rows, None - sql_query_mcp/importer.py:144-164 (helper)Helper function _read_xlsx to parse XLSX files using openpyxl.
def _read_xlsx(path: Path, sheet_name: Optional[str]) -> Tuple[List[str], List[Tuple[object, ...]], Optional[str]]: if load_workbook is None: raise QueryExecutionError("缺少 openpyxl 依赖,请先安装项目依赖。") workbook = load_workbook(path, read_only=True, data_only=True) try: if sheet_name: if sheet_name not in workbook.sheetnames: raise QueryExecutionError(f"XLSX 文件中不存在 sheet: {sheet_name}") worksheet = workbook[sheet_name] else: worksheet = workbook.worksheets[0] rows_iter = worksheet.iter_rows(values_only=True) try: header_row = next(rows_iter) except StopIteration as exc: raise QueryExecutionError("文件表头不能为空。") from exc headers = ["" if value is None else str(value) for value in header_row] rows = [_normalize_row(list(row), len(headers)) for row in rows_iter] return headers, rows, worksheet.title finally: workbook.close()