Skip to main content
Glama
Solvro

SOLVRO MCP - Knowledge Graph RAG System

Official
by Solvro
data_pipe.py6.12 kB
import logging import os from typing import List from langchain_neo4j import Neo4jGraph from langchain_text_splitters import RecursiveCharacterTextSplitter from .llm_pipe import LLMPipe from .pdf_loader import PDFLoader class DataPipe: def __init__( self, url: str, username: str, password: str, api_key: str = None, nodes: List[str] = None, relations: List[str] = None, max_chunk_size: int = 30000, chunk_overlap: int = 200, ): self.docs_data = [] if not url: raise ValueError("Neo4j URL is required") if not username: raise ValueError("Neo4j username is required") if not password: raise ValueError("Neo4j password is required") logging.info(f"Connecting to Neo4j at: {url}") logging.info(f"Username: {username}") try: self.llm_pipe = LLMPipe(api_key=api_key) # , nodes=nodes, relations=relations) self.graph_db = Neo4jGraph(url=url, username=username, password=password) self.graph_db.query("RETURN 1 as test") logging.info("Successfully connected to Neo4j database") except Exception as e: logging.error(f"Failed to connect to Neo4j: {str(e)}") logging.error(f"URL: {url}") logging.error(f"Username: {username}") raise ConnectionError(f"Could not connect to Neo4j database: {str(e)}") self.max_chunk_size = max_chunk_size self.chunk_overlap = chunk_overlap self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.max_chunk_size, chunk_overlap=self.chunk_overlap, length_function=len, ) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") def _load_data(self, file_path: str) -> None: """Load data from a file and append it to the docs_data list.""" try: if not os.path.exists(file_path): logging.error(f"File not found: {file_path}") return if os.path.getsize(file_path) == 0: logging.error(f"Empty file: {file_path}") return logging.info(f"Loading file: {file_path}") loader = PDFLoader(file_path) content = loader.load_document() if content.startswith("ERROR:"): logging.error(content) return if len(content) > self.max_chunk_size: logging.info(f"Document is large ({len(content)} chars), splitting into chunks") chunks = self.text_splitter.split_text(content) for i, chunk in enumerate(chunks): filename = os.path.basename(file_path) part_info = f"[Part {i + 1} of {len(chunks)} from {filename}]" self.docs_data.append(f"{part_info} {chunk}") logging.info(f"Added chunk {i + 1} of {len(chunks)} from {file_path}") else: self.docs_data.append(content) logging.info(f"Successfully loaded: {file_path}") except Exception as e: logging.error(f"Error loading file {file_path}: {str(e)}") def load_data_from_directory(self, directory_path: str) -> None: """Load data from all files in a given directory.""" if not os.path.exists(directory_path): logging.error(f"Directory not found: {directory_path}") return logging.info(f"Loading files from directory: {directory_path}") file_count = 0 for filename in os.listdir(directory_path): if filename.endswith(".pdf") or filename.endswith(".txt") or filename.endswith(".docx"): self._load_data(os.path.join(directory_path, filename)) file_count += 1 logging.info(f"Loaded {len(self.docs_data)} documents/chunks from {file_count} files") def clear_database(self) -> None: """Clear the Neo4j database.""" try: self.execute_cypher("MATCH (n) DETACH DELETE n") logging.info("Database cleared successfully") except Exception as e: logging.error(f"Error clearing database: {str(e)}") def execute_cypher(self, query: str) -> None: """Execute a Cypher query on the Neo4j database.""" if not query or not query.strip(): logging.error("Empty Cypher query") return try: self.graph_db.query(query) logging.info("Cypher query executed successfully") except Exception as e: logging.error(f"Error executing Cypher query: {str(e)}") logging.error(f"Query: {query}") raise def process_documents(self): """Process all loaded documents through the LLM pipe.""" all_results = [] for i, doc in enumerate(self.docs_data): try: logging.info(f"Processing document chunk {i + 1}/{len(self.docs_data)}") char_count = len(doc) logging.info(f"Document chunk size: {char_count} characters") if char_count > self.max_chunk_size * 2: logging.warning( f"Document chunk may be too large for model ({char_count} chars)" ) cypher_code = "".join([code for code in self.llm_pipe.run(doc)]) logging.info(f"Generated Cypher code of length {len(cypher_code)}") logging.info( "Executing Cypher for document: " + doc[:50] + "..." if len(doc) > 50 else doc ) try: self.execute_cypher(cypher_code) all_results.append(cypher_code) except Exception as e: logging.error(f"Failed to execute Cypher: {str(e)}") except Exception as e: logging.error(f"Error processing document chunk {i + 1}: {str(e)}") return all_results

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Solvro/ml-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server