sparql_query
Execute SPARQL queries against knowledge graphs to retrieve structured data in JSON, CSV, or XML formats with configurable timeouts.
Instructions
Execute SPARQL query against user's graph via API server.
Args:
graph_id: Graph ID to query
query: SPARQL query to execute
result_format: Result format (json, csv, xml)
timeout_seconds: Query timeout in seconds
Returns:
JSON string with query results
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| graph_id | Yes | ||
| query | Yes | ||
| result_format | No | json | |
| timeout_seconds | No |
Implementation Reference
- Core handler function for the sparql_query tool. Includes registration via @mcp_server.tool(), input parameter definitions serving as schema, access validation, API call to execute SPARQL query, response transformation, and detailed error handling with context-specific advice.@mcp_server.tool() async def sparql_query( graph_id: str, query: str, ctx: Context, result_format: str = "json", timeout_seconds: int = 30 ) -> str: """ Execute SPARQL query against user's graph via API server. Args: graph_id: Graph ID to query query: SPARQL query to execute result_format: Result format (json, csv, xml) timeout_seconds: Query timeout in seconds Returns: JSON string with query results """ try: # Get session context (cached or fresh) auth_token, session_data = await get_session_context(ctx) # Validate graph access if we have session data if session_data: accessible_graphs = session_data.get("accessible_graphs", []) if graph_id not in accessible_graphs: error_result = { "success": False, "error": f"Graph '{graph_id}' not accessible to user", "error_type": "GRAPH_ACCESS_DENIED", "accessible_graphs": accessible_graphs } return json.dumps(error_result, indent=2) logger.debug(f"Graph '{graph_id}' validated via session cache") logger.info(f"Executing SPARQL query on graph '{graph_id}' via API") # Call API server api_response = await api_client.query_graph( graph_id, query, auth_token, timeout_seconds ) # Transform to enhanced MCP response mcp_response = McpSparqlResponse.from_api_response(api_response, query, graph_id) return mcp_response.to_json() except Exception as e: logger.error(f"SPARQL query failed: {e}") # Determine error type and provide specific help error_msg = str(e) if "timed out" in error_msg.lower(): error_type = "QUERY_TIMEOUT" help_info = { "graph_id": graph_id, "timeout_seconds": timeout_seconds, "suggestions": [ "Check SPARQL syntax - invalid syntax can cause timeouts", "Verify graph exists and is accessible", "Try adding LIMIT clause to reduce result set", "Simplify query by removing complex joins or filters" ], "common_timeout_causes": [ "Missing object in WHERE clause (e.g., '{ ?s ?p }' should be '{ ?s ?p ?o }')", "Querying nonexistent graph", "Invalid SPARQL syntax", "Overly complex query without constraints" ] } else: error_type = "SPARQL_QUERY_FAILED" help_info = { "graph_id": graph_id, "suggestions": [ "Check SPARQL syntax for missing brackets or semicolons", "Verify graph access permissions", "Try a simpler query to test connectivity" ], "common_fixes": [ "Ensure all variables start with ?", "Check that PREFIX declarations are correct", "Verify bracket matching: { }" ] } error_response = McpErrorResponse.from_exception(e, error_type=error_type, help_info=help_info) return error_response.to_json()
- Defines the output schema and transformation logic for sparql_query responses. Structures results into summary, formatted table, raw data, and analytical insights. Used by the handler to format API responses.class McpSparqlResponse(McpResponseObject): """Enhanced SPARQL query response with formatting and insights""" query_summary: McpQuerySummary results: McpQueryResults insights: McpQueryInsights success: bool = True @classmethod def from_api_response(cls, api_response: Dict[str, Any], query: str, graph_id: str) -> 'McpSparqlResponse': """Transform API SPARQL response to enhanced MCP format""" # API returns nested structure: {"success": true, "data": {"results": [...], "count": N}} data = api_response.get("data", {}) raw_results = data.get("results", []) # Analyze query complexity query_complexity = _analyze_query_complexity(query) # Build response components query_summary = McpQuerySummary( result_count=len(raw_results), query_time_ms=data.get("query_time_ms", 0), graph_id=graph_id, result_type=_detect_result_type(raw_results), query_complexity=query_complexity ) results = McpQueryResults( formatted_table=_format_as_markdown_table(raw_results), raw_bindings=raw_results, sample_results=raw_results[:5] if raw_results else [] ) insights = McpQueryInsights( result_type=query_summary.result_type, has_more_data=len(raw_results) >= 100, common_variables=_extract_common_variables(raw_results), data_patterns=_analyze_data_patterns(raw_results), optimization_hints=_generate_optimization_hints(query, raw_results) ) return cls( query_summary=query_summary, results=results, insights=insights )