#!/usr/bin/env python3
"""
Interactive Hybrid RAG Demo
Easy-to-use command-line interface for querying your documents.
NO MCP SERVER REQUIRED - just run this script!
"""
import sys
from pathlib import Path
# Add project root to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import yaml
from typing import Dict, Any
from langchain_community.vectorstores import Chroma
from langchain_ollama import OllamaEmbeddings, OllamaLLM
from langchain_core.prompts import ChatPromptTemplate
from langchain_classic.chains import create_retrieval_chain
from langchain_classic.chains.combine_documents import create_stuff_documents_chain
from src.hybrid_rag import (
DocumentLoaderUtility,
configure_logging,
create_document_type_aware_retriever
)
from src.hybrid_rag.query_preprocessor import QueryPreprocessor
class InteractiveRAG:
"""Interactive RAG system with easy command-line interface."""
def __init__(self):
"""Initialize the RAG system."""
print("=" * 70)
print("π HYBRID RAG SYSTEM - INTERACTIVE DEMO")
print("=" * 70)
print("\nInitializing system...\n")
# Configure logging
configure_logging()
# Load configuration
config_path = Path(__file__).parent.parent.parent / "config" / "config.yaml"
with open(config_path, "r") as f:
self.config = yaml.safe_load(f)
print("β
Configuration loaded")
# Initialize query preprocessor for product ID mapping
print("π§ Loading product ID mapping...")
self.query_preprocessor = QueryPreprocessor()
print("β
Query preprocessor ready (product ID expansion enabled)")
# Initialize components
self._load_documents()
self._initialize_ollama()
self._create_vector_store()
self._create_retriever()
self._create_qa_chain()
print("\n" + "=" * 70)
print("β
SYSTEM READY - You can now ask questions!")
print("=" * 70)
def _load_documents(self):
"""Load documents from data directory."""
data_dir = self.config['data']['directory']
data_path = Path(__file__).parent.parent.parent / data_dir
print(f"π Loading documents from: {data_path}")
loader = DocumentLoaderUtility(str(data_path), config=self.config)
self.documents = loader.load_documents()
if not self.documents:
print(f"\nβ οΈ No documents found in '{data_path}'")
print(f"β οΈ Supported formats: {', '.join(loader.get_supported_formats())}")
sys.exit(1)
# Count file types
sources = [doc.metadata.get('source', '') for doc in self.documents]
unique_files = set([Path(s).name for s in sources if s])
print(f"β
Loaded {len(self.documents)} chunks from {len(unique_files)} files")
def _initialize_ollama(self):
"""Connect to Ollama and initialize models."""
try:
ollama_url = self.config['ollama']['base_url']
self.embedding_model = self.config['ollama']['embedding_model']
self.llm_model = self.config['ollama']['llm_model']
self.embeddings = OllamaEmbeddings(
model=self.embedding_model,
base_url=ollama_url
)
self.llm = OllamaLLM(
model=self.llm_model,
base_url=ollama_url
)
print(f"β
Connected to Ollama at {ollama_url}")
print(f" β’ Embedding model: {self.embedding_model}")
print(f" β’ LLM model: {self.llm_model}")
except Exception as e:
print(f"β Error connecting to Ollama: {e}")
print("\nπ‘ Make sure Ollama is running:")
print(" 1. Start Ollama: ollama serve")
print(" 2. Pull models:")
print(f" ollama pull {self.embedding_model}")
print(f" ollama pull {self.llm_model}")
sys.exit(1)
def _create_vector_store(self):
"""Create or load vector store."""
persist_dir = Path(__file__).parent.parent / self.config['vector_store']['persist_directory']
print(f"π§ Creating vector store (this may take a few minutes)...")
self.vectorstore = Chroma.from_documents(
self.documents,
self.embeddings,
persist_directory=str(persist_dir)
)
print(f"β
Vector store created with {len(self.documents)} embeddings")
def _create_retriever(self):
"""Create hybrid retriever."""
print("π§ Creating hybrid retriever...")
self.retriever = create_document_type_aware_retriever(
documents=self.documents,
vectorstore=self.vectorstore,
config=self.config
)
print("β
Hybrid retriever ready (semantic + keyword search)")
def _create_qa_chain(self):
"""Create QA chain."""
prompt = ChatPromptTemplate.from_template("""
You are an expert assistant. Answer the user's question based on the provided context.
If the context doesn't contain enough information, say so clearly.
Keep your answer concise and relevant.
<context>
{context}
</context>
Question: {input}
Answer:""")
document_chain = create_stuff_documents_chain(self.llm, prompt)
self.qa_chain = create_retrieval_chain(self.retriever, document_chain)
print("β
QA chain constructed")
def query(self, question: str, show_sources: bool = True):
"""
Ask a question and get an answer.
Args:
question: The question to ask
show_sources: Whether to show source documents
Returns:
Dictionary with 'answer' and 'context'
"""
try:
# Expand query with product ID mappings
expanded_question = self.query_preprocessor.expand_query(question)
# Show expansion if it happened
if expanded_question != question:
print(f"π Expanded query with product IDs: {expanded_question[:100]}...")
response = self.qa_chain.invoke({"input": expanded_question})
if show_sources:
print("\nπ Sources:")
sources_seen = set()
for i, doc in enumerate(response.get('context', [])[:5], 1):
source = doc.metadata.get('source', 'unknown')
source_file = Path(source).name if source != 'unknown' else 'unknown'
if source_file not in sources_seen:
sources_seen.add(source_file)
print(f" [{i}] {source_file}")
return response
except Exception as e:
print(f"\nβ Error processing query: {e}")
return None
def interactive_mode(self):
"""Run interactive question-answering loop."""
print("\n㪠INTERACTIVE MODE")
print(" β’ Type your questions and press Enter")
print(" β’ Type 'exit' or 'quit' to stop")
print(" β’ Type 'help' for example questions")
print(" β’ Type 'stats' for system statistics")
print()
while True:
try:
# Get user input
question = input("β Your question: ").strip()
if not question:
continue
# Handle commands
if question.lower() in ['exit', 'quit', 'q']:
print("\nπ Goodbye!")
break
elif question.lower() == 'help':
self._show_help()
continue
elif question.lower() == 'stats':
self._show_stats()
continue
# Process question
print("\nπ€ Thinking...")
response = self.query(question, show_sources=True)
if response:
print(f"\nπ‘ Answer:\n{response['answer']}\n")
print("-" * 70)
except KeyboardInterrupt:
print("\n\nπ Goodbye!")
break
except Exception as e:
print(f"\nβ Error: {e}\n")
def _show_help(self):
"""Show example questions."""
print("\nπ EXAMPLE QUESTIONS:")
print()
print("General:")
print(" β’ What information is available in these documents?")
print(" β’ Summarize the main topics covered")
print()
print("Specific (adjust based on your data):")
print(" β’ What OLED TVs are available?")
print(" β’ Which products are low in stock?")
print(" β’ Show me the largest orders in November")
print(" β’ What are common warranty claim types?")
print(" β’ Which supplier has the best ratings?")
print(" β’ What customer feedback mentions delivery?")
print()
def _show_stats(self):
"""Show system statistics."""
print("\nπ SYSTEM STATISTICS:")
print()
# Document statistics
sources = [doc.metadata.get('source', '') for doc in self.documents]
unique_files = set([Path(s).name for s in sources if s])
file_types = {}
for source in sources:
if source:
ext = Path(source).suffix
file_types[ext] = file_types.get(ext, 0) + 1
print(f"Documents:")
print(f" β’ Total chunks: {len(self.documents)}")
print(f" β’ Unique files: {len(unique_files)}")
print(f" β’ File types:")
for ext, count in sorted(file_types.items()):
print(f" - {ext}: {count} chunks")
print()
print(f"Models:")
print(f" β’ Embedding: {self.embedding_model}")
print(f" β’ LLM: {self.llm_model}")
print()
print(f"Configuration:")
print(f" β’ Vector search k: {self.config['retrieval']['vector_search_k']}")
print(f" β’ Keyword search k: {self.config['retrieval']['keyword_search_k']}")
print(f" β’ CSV weight: {self.config['retrieval']['csv_weight']}")
print(f" β’ Text weight: {self.config['retrieval']['text_weight']}")
print()
def main():
"""Main entry point."""
# Parse command line arguments
import argparse
parser = argparse.ArgumentParser(
description="Interactive Hybrid RAG Demo",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Interactive mode (default)
python interactive_demo.py
# Ask a single question
python interactive_demo.py --query "What OLED TVs are available?"
# Ask without showing sources
python interactive_demo.py --query "Show me products" --no-sources
"""
)
parser.add_argument(
'--query', '-q',
type=str,
help='Ask a single question (non-interactive mode)'
)
parser.add_argument(
'--no-sources',
action='store_true',
help='Hide source documents in output'
)
args = parser.parse_args()
# Initialize system
try:
rag = InteractiveRAG()
except Exception as e:
print(f"\nβ Failed to initialize system: {e}")
sys.exit(1)
# Single query mode
if args.query:
print(f"\nβ Question: {args.query}")
response = rag.query(args.query, show_sources=not args.no_sources)
if response:
print(f"\nπ‘ Answer:\n{response['answer']}")
sys.exit(0)
# Interactive mode
rag.interactive_mode()
if __name__ == "__main__":
main()