Skip to main content
Glama
Replicant-Partners

Congo River Compositional Intelligence

graph_engine.py14.1 kB
#!/usr/bin/env python3 """ Graph Query Engine SPARQL-like queries over RDF knowledge graphs Implements semantic web operations using rdflib. Supports both structured queries and natural language query translation. """ import json import sys import os from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass, asdict from rdflib import Graph, Namespace, Literal, URIRef, RDF, RDFS from rdflib.plugins.sparql import prepareQuery import psycopg2 from psycopg2.extras import RealDictCursor # Custom namespace for Congo River ontology CR = Namespace("http://congoriver.ai/ontology#") @dataclass class Triple: """RDF Triple representation""" subject: str predicate: str object: str context: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return asdict(self) def to_rdf_tuple(self) -> Tuple[URIRef, URIRef, Any]: """Convert to rdflib tuple format""" subj = URIRef(self.subject) if self.subject.startswith("http") else CR[self.subject] pred = URIRef(self.predicate) if self.predicate.startswith("http") else CR[self.predicate] # Handle object - could be URI or Literal if self.object.startswith("http"): obj = URIRef(self.object) elif self._looks_like_number(self.object): obj = Literal(float(self.object) if "." in self.object else int(self.object)) else: obj = Literal(self.object) return (subj, pred, obj) def _looks_like_number(self, s: str) -> bool: """Check if string represents a number""" try: float(s) return True except ValueError: return False @dataclass class QueryResult: """Result of a graph query""" success: bool query: str query_type: str triples: List[Triple] bindings: List[Dict[str, str]] # For SELECT queries count: int def to_dict(self) -> Dict[str, Any]: return { "success": self.success, "query": self.query, "query_type": self.query_type, "triples": [t.to_dict() for t in self.triples], "bindings": self.bindings, "count": self.count } class GraphEngine: """ Knowledge graph query engine using rdflib. Supports: 1. SPARQL queries (structured) 2. Triple pattern matching 3. Path queries (subject → predicate → object chains) 4. Natural language query translation (basic) 5. Integration with PostgreSQL triple store """ def __init__(self, db_url: Optional[str] = None): """Initialize graph engine with optional database connection""" self.graph = Graph() self.graph.bind("cr", CR) self.graph.bind("rdf", RDF) self.graph.bind("rdfs", RDFS) self.db_url = db_url or os.getenv("CLOUD_DB_URL") # Load triples from database if available if self.db_url: self._load_from_database() def _load_from_database(self): """Load triples from PostgreSQL into in-memory graph""" try: conn = psycopg2.connect(self.db_url) cursor = conn.cursor(cursor_factory=RealDictCursor) cursor.execute("SELECT subject, predicate, object FROM triples LIMIT 10000") rows = cursor.fetchall() for row in rows: triple = Triple( subject=row["subject"], predicate=row["predicate"], object=row["object"] ) self.graph.add(triple.to_rdf_tuple()) cursor.close() conn.close() except Exception as e: print(f"Warning: Could not load from database: {e}", file=sys.stderr) def add_triple(self, triple: Triple): """Add a triple to the in-memory graph""" self.graph.add(triple.to_rdf_tuple()) def add_triples(self, triples: List[Triple]): """Add multiple triples""" for triple in triples: self.add_triple(triple) def query_sparql(self, sparql_query: str) -> QueryResult: """ Execute a SPARQL query on the graph. Args: sparql_query: SPARQL query string Returns: QueryResult with bindings or triples """ try: results = self.graph.query(sparql_query) # Convert results to standard format bindings = [] triples = [] # Check query type query_lower = sparql_query.lower() if "select" in query_lower: # SELECT query - return bindings for row in results: binding = {} for var, value in row.asdict().items(): binding[var] = str(value) bindings.append(binding) return QueryResult( success=True, query=sparql_query, query_type="select", triples=[], bindings=bindings, count=len(bindings) ) elif "construct" in query_lower: # CONSTRUCT query - return triples for s, p, o in results: triples.append(Triple( subject=str(s), predicate=str(p), object=str(o) )) return QueryResult( success=True, query=sparql_query, query_type="construct", triples=triples, bindings=[], count=len(triples) ) else: # ASK query or other return QueryResult( success=True, query=sparql_query, query_type="ask", triples=[], bindings=[{"result": str(results.askAnswer)}] if hasattr(results, 'askAnswer') else [], count=1 ) except Exception as e: return QueryResult( success=False, query=sparql_query, query_type="error", triples=[], bindings=[{"error": str(e)}], count=0 ) def query_pattern( self, subject: Optional[str] = None, predicate: Optional[str] = None, object: Optional[str] = None ) -> QueryResult: """ Query using triple pattern (None = wildcard). Example: query_pattern(subject="consciousness", predicate=None, object=None) Returns all triples about consciousness. """ # Convert to URIRefs/Literals s = CR[subject] if subject else None p = CR[predicate] if predicate else None o = Literal(object) if object and not object.startswith("http") else (URIRef(object) if object else None) triples = [] for subj, pred, obj in self.graph.triples((s, p, o)): triples.append(Triple( subject=str(subj), predicate=str(pred), object=str(obj) )) return QueryResult( success=True, query=f"pattern({subject}, {predicate}, {object})", query_type="pattern", triples=triples, bindings=[], count=len(triples) ) def query_path(self, start: str, path: List[str]) -> QueryResult: """ Follow a path through the graph. Example: query_path("consciousness", ["has_property", "relates_to"]) Returns entities reached by following these predicates. """ current_nodes = [CR[start]] triples = [] for predicate in path: next_nodes = [] pred_uri = CR[predicate] for node in current_nodes: for s, p, o in self.graph.triples((node, pred_uri, None)): triples.append(Triple( subject=str(s), predicate=str(p), object=str(o) )) next_nodes.append(o) current_nodes = next_nodes if not current_nodes: break return QueryResult( success=True, query=f"path({start}, {path})", query_type="path", triples=triples, bindings=[], count=len(triples) ) def query_natural_language(self, nl_query: str) -> QueryResult: """ Translate natural language query to SPARQL (basic implementation). Examples: - "Find all properties of consciousness" - "What relates to awareness?" - "Show everything about qualia" """ nl_lower = nl_query.lower() # Pattern: "find all properties of X" if "properties of" in nl_lower: entity = nl_lower.split("properties of")[-1].strip().strip("?.") return self.query_pattern(subject=entity, predicate="has_property") # Pattern: "what relates to X" elif "relates to" in nl_lower or "related to" in nl_lower: entity = nl_lower.split("relates to")[-1].strip() if "relates to" in nl_lower else nl_lower.split("related to")[-1].strip() entity = entity.strip("?.") # Find both directions result1 = self.query_pattern(subject=entity, predicate="relates_to") result2 = self.query_pattern(object=entity, predicate="relates_to") combined_triples = result1.triples + result2.triples return QueryResult( success=True, query=nl_query, query_type="natural_language", triples=combined_triples, bindings=[], count=len(combined_triples) ) # Pattern: "show everything about X" or "what is X" elif "everything about" in nl_lower or "what is" in nl_lower: if "everything about" in nl_lower: entity = nl_lower.split("everything about")[-1].strip().strip("?.") else: entity = nl_lower.split("what is")[-1].strip().strip("?.") # Get all triples with entity as subject return self.query_pattern(subject=entity) # Pattern: "find X that Y" elif "find" in nl_lower and "that" in nl_lower: # Simple subject-predicate extraction parts = nl_lower.split("that") if len(parts) >= 2: predicate_part = parts[1].strip().strip("?.") # Try to extract predicate words = predicate_part.split() if len(words) >= 1: predicate = "_".join(words[:2]) if len(words) >= 2 else words[0] return self.query_pattern(predicate=predicate) # Fallback: return all triples (limited) return QueryResult( success=False, query=nl_query, query_type="natural_language", triples=[], bindings=[{"error": "Could not parse natural language query. Try SPARQL or pattern matching."}], count=0 ) def get_statistics(self) -> Dict[str, Any]: """Get graph statistics""" return { "triple_count": len(self.graph), "unique_subjects": len(set(s for s, _, _ in self.graph)), "unique_predicates": len(set(p for _, p, _ in self.graph)), "unique_objects": len(set(o for _, _, o in self.graph)) } def main(): """CLI interface for graph queries""" if len(sys.argv) < 2: print("Usage: python graph_engine.py <query>", file=sys.stderr) print(" OR: python graph_engine.py --json '{\"query\": \"...\", \"query_type\": \"sparql|pattern|natural\", ...}'", file=sys.stderr) sys.exit(1) # Parse input if sys.argv[1] == "--json": input_data = json.loads(sys.argv[2]) query = input_data.get("query") query_type = input_data.get("query_type", "natural") triples_data = input_data.get("triples", []) # Initialize engine engine = GraphEngine() # Add any provided triples if triples_data: triples = [Triple(**t) for t in triples_data] engine.add_triples(triples) # Execute query based on type if query_type == "sparql": result = engine.query_sparql(query) elif query_type == "pattern": # Extract pattern components subject = input_data.get("subject") predicate = input_data.get("predicate") obj = input_data.get("object") result = engine.query_pattern(subject, predicate, obj) elif query_type == "path": start = input_data.get("start") path = input_data.get("path", []) result = engine.query_path(start, path) else: # natural language result = engine.query_natural_language(query) else: # Simple text query (natural language) query = " ".join(sys.argv[1:]) engine = GraphEngine() result = engine.query_natural_language(query) # Output as JSON output = { "success": True, "result": result.to_dict(), "statistics": engine.get_statistics() } print(json.dumps(output, indent=2)) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Replicant-Partners/Congo'

If you have feedback or need assistance with the MCP directory API, please join our Discord server