write_data
Write time-series data to InfluxDB databases using Line Protocol format for GigAPI Timeseries Lake integration.
Instructions
Write data using InfluxDB Line Protocol format.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| database | Yes | ||
| data | Yes |
Implementation Reference
- mcp_gigapi/tools.py:153-177 (handler)The primary handler function for the MCP 'write_data' tool. It invokes the underlying client.write_data method, handles exceptions, and formats the success/error response.def write_data(self, database: str, data: str) -> Dict[str, Any]: """Write data using InfluxDB Line Protocol. Args: database: The database to write to data: Data in InfluxDB Line Protocol format Returns: Write operation result """ try: result = self.client.write_data(database, data) return { "result": result, "success": True, "database": database, "data_lines": len(data.strip().split('\n')) } except GigAPIClientError as e: logger.error(f"Failed to write data: {e}") return { "error": str(e), "success": False, "database": database }
- mcp_gigapi/tools.py:27-32 (schema)Pydantic input schema model matching the write_data handler parameters (database and data).class WriteDataInput(BaseModel): """Input model for data writing operations.""" database: str = Field(..., description="The database to write to") data: str = Field(..., description="Data in InfluxDB Line Protocol format")
- mcp_gigapi/tools.py:248-252 (registration)FastMCP Tool registration for the 'write_data' tool, binding the GigAPITools.write_data method.Tool.from_function( tools_instance.write_data, name="write_data", description="Write data using InfluxDB Line Protocol format.", ),
- mcp_gigapi/client.py:167-188 (helper)Supporting client method that performs the actual HTTP POST request to the GigAPI /write endpoint for writing data.def write_data(self, database: str, data: str) -> Dict[str, Any]: """Write data using InfluxDB Line Protocol. Args: database: Database name data: Data in InfluxDB Line Protocol format Returns: Write response """ params = {"db": database} headers = {"Content-Type": "text/plain"} response = self._make_request( "POST", "/write", data=data, params=params, headers=headers ) return response.json()