local_query.py•5.62 kB
#!/usr/bin/env python
"""
Local query script that doesn't require a running server.
This directly loads the FAISS index and performs queries against it.
"""
import os
import sys
import logging
import argparse
import numpy as np
from mcp_server.utils.document_processor import DocumentProcessor
from mcp_server.models.vector_store import FAISSVectorStore
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def local_query(query_text, index_file="data/faiss_index.bin", top_k=5, show_full=False):
"""
Query the local vector database without needing a server.
Args:
query_text: The query text
index_file: Path to the FAISS index file
top_k: Number of results to return
show_full: Whether to show full document content
Returns:
List of document results
"""
# Check if index exists
if not os.path.exists(index_file):
logger.error(f"Index file not found: {index_file}")
logger.info("Please run mcp-index first to create an index")
return []
# Initialize document processor for embeddings
logger.info("Initializing document processor and vector store")
doc_processor = DocumentProcessor()
vector_store = FAISSVectorStore()
# Load the index
logger.info(f"Loading index from {index_file}")
vector_store.load(index_file)
logger.info(f"Loaded index with {len(vector_store.documents)} documents")
# Convert query to embedding
logger.info(f"Processing query: '{query_text}'")
query_embedding = doc_processor.get_embedding(query_text)
# Search the index
logger.info(f"Searching for top {top_k} results")
results = vector_store.search(query_embedding, top_k)
# Display results
if results:
logger.info(f"Found {len(results)} results:")
for i, doc in enumerate(results):
print(f"\n--- Result {i+1} (Score: {doc['score']:.4f}) ---")
print(f"Source: {doc['path']} (Chunk {doc['chunk_index']})")
# Get file type
file_type = doc.get('file_type', 'txt')
# Show appropriate amount of content
if show_full:
print(f"Content:\n{doc['content']}")
else:
print(f"Content (snippet):\n{doc['content'][:1000]}...")
if len(doc['content']) > 1000:
print("(Use --full to see complete content)")
# Ask if user wants to see the full file
if not show_full:
print("\nOptions:")
print(f" [{i+1}f] See full content for result {i+1}")
if not show_full:
print("\n[a] See all full results")
print("[v] View original file for a result (enter number)")
print("[q] Continue without viewing more")
choice = input("\nEnter choice: ").strip().lower()
if choice == 'q':
return results
elif choice == 'a':
for i, doc in enumerate(results):
print(f"\n=== FULL CONTENT FOR RESULT {i+1} ===")
print(f"Source: {doc['path']} (Chunk {doc['chunk_index']})")
print(f"Content:\n{doc['content']}")
print("=" * 80)
elif choice.endswith('f') and choice[:-1].isdigit():
idx = int(choice[:-1]) - 1
if 0 <= idx < len(results):
print(f"\n=== FULL CONTENT FOR RESULT {idx+1} ===")
print(f"Source: {results[idx]['path']} (Chunk {results[idx]['chunk_index']})")
print(f"Content:\n{results[idx]['content']}")
elif choice.isdigit():
idx = int(choice) - 1
if 0 <= idx < len(results):
file_path = results[idx]['path']
if os.path.exists(file_path):
print(f"\n=== ORIGINAL FILE: {file_path} ===")
with open(file_path, 'r', encoding='utf-8') as f:
print(f.read())
else:
print(f"File not found: {file_path}")
return results
else:
logger.warning("No results found for the query")
return []
def main():
"""Main function to parse arguments and run the query"""
parser = argparse.ArgumentParser(description="Query the local vector database")
parser.add_argument("query", nargs="+", help="The query text")
parser.add_argument("-k", "--top", type=int, default=5, help="Number of results to return")
parser.add_argument("-f", "--full", action="store_true", help="Show full document content")
parser.add_argument("-i", "--index", default="data/faiss_index.bin", help="Path to the index file")
parser.add_argument("-n", "--no-interactive", action="store_true", help="Disable interactive mode")
parser.add_argument("-v", "--verbose", action="store_true", help="Enable verbose logging")
args = parser.parse_args()
# Set logging level
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
query = " ".join(args.query)
return local_query(query, index_file=args.index, top_k=args.top, show_full=args.full or args.no_interactive)
if __name__ == "__main__":
main()