#!/usr/bin/env python3
import asyncio
import json
import math
import os
import sys
import time
import hashlib
import re
import pickle
from pathlib import Path
from typing import Any, Dict, List, Tuple
from datetime import datetime
from collections import defaultdict
import PyPDF2
from mcp.server.models import InitializationOptions
from mcp.server import Server, NotificationOptions
from mcp.server.stdio import stdio_server
import mcp.types as types
# Add error output for debugging
print("Starting QuantConnect PDF MCP Server with Enhanced Search...", file=sys.stderr)
class SearchIndex:
"""Simple inverted index for fast text search"""
def __init__(self):
self.word_to_docs = defaultdict(list) # word -> [(doc_id, page, position)]
self.doc_metadata = {} # doc_id -> metadata
self.word_idf = {} # word -> inverse document frequency
def add_document(self, doc_id: str, title: str, pages: List[Dict], full_content: str):
"""Add a document to the search index"""
# Store metadata
self.doc_metadata[doc_id] = {
'title': title,
'total_pages': len(pages),
'word_count': len(full_content.split())
}
# Index each page
for page_info in pages:
page_num = page_info['page']
content = page_info['content']
# Tokenize and index words
words = self._tokenize(content)
for position, word in enumerate(words):
if len(word) > 2: # Skip very short words
self.word_to_docs[word].append((doc_id, page_num, position))
def finalize_index(self):
"""Calculate IDF scores after all documents have been added"""
total_docs = len(self.doc_metadata)
if total_docs == 0:
return
# Calculate IDF for each word
for word in self.word_to_docs:
docs_with_word = len(set(doc_id for doc_id, _, _ in self.word_to_docs[word]))
if docs_with_word > 0:
self.word_idf[word] = math.log(total_docs / docs_with_word)
else:
self.word_idf[word] = 0
def _tokenize(self, text: str) -> List[str]:
"""Tokenize text into words"""
# Convert to lowercase and split by non-alphanumeric characters
text = text.lower()
words = re.findall(r'\b[a-z0-9]+\b', text)
return words
def search(self, query: str, max_results: int = 5) -> List[Dict]:
"""Search for documents matching the query"""
query_words = self._tokenize(query)
if not query_words:
return []
# Score documents
doc_scores = defaultdict(float)
doc_matches = defaultdict(list)
for word in query_words:
if word in self.word_to_docs:
# Get IDF weight
idf = self.word_idf.get(word, 1.0)
# Score each document containing this word
word_docs = defaultdict(list)
for doc_id, page, position in self.word_to_docs[word]:
word_docs[doc_id].append((page, position))
for doc_id, occurrences in word_docs.items():
# TF-IDF scoring
tf = len(occurrences) / self.doc_metadata[doc_id]['word_count']
score = tf * idf
# Boost score if words appear close together
if len(query_words) > 1:
score *= self._calculate_proximity_boost(doc_id, query_words)
doc_scores[doc_id] += score
doc_matches[doc_id].extend([(word, page, pos) for page, pos in occurrences])
# Sort by score
sorted_docs = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
# Build results
results = []
for doc_id, score in sorted_docs[:max_results]:
results.append({
'doc_id': doc_id,
'title': self.doc_metadata[doc_id]['title'],
'score': score,
'matches': doc_matches[doc_id][:5] # Limit matches per doc
})
return results
def _calculate_proximity_boost(self, doc_id: str, query_words: List[str]) -> float:
"""Calculate proximity boost for multi-word queries"""
# Simple proximity boost - could be enhanced
positions = []
for word in query_words:
if word in self.word_to_docs:
word_positions = [pos for d, _, pos in self.word_to_docs[word] if d == doc_id]
if word_positions:
positions.append(min(word_positions))
if len(positions) > 1:
# Calculate average distance between words
positions.sort()
avg_distance = sum(positions[i+1] - positions[i] for i in range(len(positions)-1)) / (len(positions)-1)
# Boost inversely proportional to distance
return max(1.0, 10.0 / (avg_distance + 1))
return 1.0
class QuantConnectPDFServer:
def __init__(self, pdf_folder: str, markdown_folder: str = None):
self.pdf_folder = Path(pdf_folder)
self.markdown_folder = Path(markdown_folder) if markdown_folder else self.pdf_folder / "markdown"
self.cache_file = self.markdown_folder / ".pdf_cache.json"
self.index_file = self.markdown_folder / ".search_index.pkl"
self.documents = {}
self.search_index = SearchIndex()
self.server = Server("quantconnect-pdf-server")
self.is_initialized = False
# Create markdown folder if it doesn't exist
self.markdown_folder.mkdir(exist_ok=True)
# Setup handlers first
self._setup_handlers()
print(f"Server initialized with PDF folder: {self.pdf_folder}", file=sys.stderr)
print(f"Markdown cache folder: {self.markdown_folder}", file=sys.stderr)
def _get_file_hash(self, file_path: Path) -> str:
"""Calculate hash of file to detect changes"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _load_cache(self) -> dict:
"""Load PDF processing cache"""
if self.cache_file.exists():
try:
with open(self.cache_file, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading cache: {e}", file=sys.stderr)
return {}
def _save_cache(self, cache: dict):
"""Save PDF processing cache"""
try:
with open(self.cache_file, 'w') as f:
json.dump(cache, f, indent=2)
except Exception as e:
print(f"Error saving cache: {e}", file=sys.stderr)
def _save_search_index(self):
"""Save search index to disk"""
try:
with open(self.index_file, 'wb') as f:
pickle.dump(self.search_index, f)
print("Search index saved", file=sys.stderr)
except Exception as e:
print(f"Error saving search index: {e}", file=sys.stderr)
def _load_search_index(self) -> bool:
"""Load search index from disk"""
if self.index_file.exists():
try:
with open(self.index_file, 'rb') as f:
self.search_index = pickle.load(f)
print("Search index loaded from cache", file=sys.stderr)
return True
except Exception as e:
print(f"Error loading search index: {e}", file=sys.stderr)
return False
def _extract_key_sections(self, text: str) -> Dict[str, str]:
"""Extract key sections from markdown text for better search"""
sections = {
'title': '',
'headers': [],
'code_blocks': [],
'important_terms': []
}
lines = text.split('\n')
in_code_block = False
for line in lines:
# Extract headers
if line.startswith('#'):
header_text = line.strip('#').strip()
sections['headers'].append(header_text)
if line.startswith('# '):
sections['title'] = header_text
# Extract code blocks
if line.strip().startswith('```'):
in_code_block = not in_code_block
elif in_code_block:
sections['code_blocks'].append(line)
# Extract QuantConnect-specific terms
qc_terms = re.findall(r'\b(?:Algorithm|Portfolio|Securities|OnData|Initialize|Schedule|Symbol|Resolution|Market|Order|Indicator)\b', line)
sections['important_terms'].extend(qc_terms)
return sections
def _pdf_to_markdown(self, pdf_path: Path) -> str:
"""Convert PDF to markdown format with better structure"""
markdown_lines = []
try:
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
total_pages = len(pdf_reader.pages)
markdown_lines.append(f"# {pdf_path.stem}")
markdown_lines.append(f"\n*Source: {pdf_path.name}*")
markdown_lines.append(f"*Total Pages: {total_pages}*")
markdown_lines.append(f"*Processed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*")
markdown_lines.append("\n---\n")
for page_num in range(total_pages):
try:
page = pdf_reader.pages[page_num]
text = page.extract_text()
# Clean up text for markdown
text = text.strip()
if text:
markdown_lines.append(f"\n## Page {page_num + 1}\n")
# Process text to improve markdown formatting
lines = text.split('\n')
for line in lines:
line = line.strip()
if line:
# Detect code patterns and format them
if re.match(r'^(class|def|import|from)\s', line) or line.startswith(' '):
markdown_lines.append(f" {line}")
else:
markdown_lines.append(line)
if line.endswith('.') or line.endswith(':'):
markdown_lines.append("") # Add blank line
except Exception as e:
print(f"Error extracting page {page_num + 1} from {pdf_path.name}: {e}", file=sys.stderr)
markdown_lines.append(f"\n## Page {page_num + 1}\n")
markdown_lines.append(f"*[Error extracting page: {e}]*\n")
except Exception as e:
print(f"Error processing PDF {pdf_path}: {e}", file=sys.stderr)
markdown_lines = [f"# Error Processing {pdf_path.name}\n\n{str(e)}"]
return '\n'.join(markdown_lines)
async def _process_new_pdfs(self):
"""Process new or updated PDFs and convert to markdown"""
if not self.pdf_folder.exists():
print(f"Warning: PDF folder {self.pdf_folder} does not exist", file=sys.stderr)
return
# Load cache
cache = self._load_cache()
pdf_files = list(self.pdf_folder.glob("*.pdf"))
print(f"Found {len(pdf_files)} PDF files in folder", file=sys.stderr)
processed_count = 0
for pdf_file in pdf_files:
try:
# Check if PDF needs processing
file_hash = self._get_file_hash(pdf_file)
cached_info = cache.get(pdf_file.name, {})
if cached_info.get('hash') == file_hash and cached_info.get('markdown_file'):
# Check if markdown file still exists
markdown_path = self.markdown_folder / cached_info['markdown_file']
if markdown_path.exists():
print(f"Skipping {pdf_file.name} - already processed", file=sys.stderr)
continue
# Process PDF
print(f"Processing {pdf_file.name}...", file=sys.stderr)
start_time = time.time()
# Convert to markdown
markdown_content = self._pdf_to_markdown(pdf_file)
# Save markdown file
markdown_filename = f"{pdf_file.stem}.md"
markdown_path = self.markdown_folder / markdown_filename
with open(markdown_path, 'w', encoding='utf-8') as f:
f.write(markdown_content)
# Update cache
cache[pdf_file.name] = {
'hash': file_hash,
'markdown_file': markdown_filename,
'processed_date': datetime.now().isoformat(),
'size': pdf_file.stat().st_size
}
process_time = time.time() - start_time
print(f"Processed {pdf_file.name} in {process_time:.2f}s", file=sys.stderr)
processed_count += 1
except Exception as e:
print(f"Error processing {pdf_file}: {e}", file=sys.stderr)
# Save updated cache
self._save_cache(cache)
print(f"Processed {processed_count} new/updated PDFs", file=sys.stderr)
async def _load_markdown_files(self):
"""Load all markdown files and build search index"""
# Try to load existing search index
index_loaded = self._load_search_index()
markdown_files = list(self.markdown_folder.glob("*.md"))
print(f"Loading {len(markdown_files)} markdown files...", file=sys.stderr)
# Check if we need to rebuild index
rebuild_index = not index_loaded or len(self.search_index.doc_metadata) != len(markdown_files)
for md_file in markdown_files:
if md_file.name == ".pdf_cache.json":
continue
try:
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
# Parse markdown into pages
pages = []
current_page = None
current_content = []
for line in content.split('\n'):
if line.startswith('## Page '):
# Save previous page
if current_page is not None:
pages.append({
'page': current_page,
'content': '\n'.join(current_content).strip()
})
# Start new page
try:
current_page = int(line.split('Page ')[1].strip())
current_content = []
except:
pass
else:
current_content.append(line)
# Save last page
if current_page is not None:
pages.append({
'page': current_page,
'content': '\n'.join(current_content).strip()
})
# Extract metadata and sections
lines = content.split('\n')
title = lines[0].replace('# ', '').strip() if lines else md_file.stem
sections = self._extract_key_sections(content)
self.documents[md_file.name] = {
'path': str(md_file),
'title': title,
'pages': pages,
'total_pages': len(pages),
'full_content': content,
'sections': sections
}
# Add to search index if rebuilding
if rebuild_index:
self.search_index.add_document(md_file.name, title, pages, content)
print(f"Loaded {md_file.name} ({len(pages)} pages)", file=sys.stderr)
except Exception as e:
print(f"Error loading markdown file {md_file}: {e}", file=sys.stderr)
# Finalize search index if rebuilt (calculate IDF scores)
if rebuild_index:
print("Finalizing search index (calculating IDF scores)...", file=sys.stderr)
self.search_index.finalize_index()
self._save_search_index()
self.is_initialized = True
print(f"Successfully loaded {len(self.documents)} documents", file=sys.stderr)
def _get_context_around_match(self, content: str, query: str, context_size: int = 500) -> str:
"""Extract context around a match with highlighting"""
query_lower = query.lower()
content_lower = content.lower()
pos = content_lower.find(query_lower)
if pos == -1:
return ""
# Find sentence boundaries
start = max(0, pos - context_size)
end = min(len(content), pos + len(query) + context_size)
# Extend to sentence boundaries
while start > 0 and content[start] not in '.!?\n':
start -= 1
while end < len(content) and content[end] not in '.!?\n':
end += 1
context = content[start:end].strip()
# Highlight the match
highlighted = re.sub(
f'({re.escape(query)})',
r'**\1**',
context,
flags=re.IGNORECASE
)
return f"...{highlighted}..."
def _setup_handlers(self):
"""Setup MCP protocol handlers"""
@self.server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
"""List available tools"""
return [
types.Tool(
name="search_quantconnect_docs",
description="Search through QuantConnect PDF documentation for specific topics or keywords",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query or topic to look for in QuantConnect documentation"
},
"max_results": {
"type": "integer",
"description": "Maximum number of results to return (default: 5)",
"default": 5
}
},
"required": ["query"]
}
),
types.Tool(
name="list_quantconnect_docs",
description="List all available QuantConnect PDF documents",
inputSchema={
"type": "object",
"properties": {}
}
),
types.Tool(
name="get_document_content",
description="Get the full content of a specific QuantConnect PDF document",
inputSchema={
"type": "object",
"properties": {
"filename": {
"type": "string",
"description": "The filename of the document (with or without .md extension)"
},
"page_number": {
"type": "integer",
"description": "Specific page number to retrieve (optional)"
}
},
"required": ["filename"]
}
)
]
@self.server.call_tool()
async def handle_call_tool(
name: str,
arguments: dict[str, Any]
) -> list[types.TextContent]:
"""Handle tool calls"""
print(f"Tool called: {name} with arguments: {arguments}", file=sys.stderr)
# Wait for initialization if needed
if not self.is_initialized:
return [types.TextContent(
type="text",
text="Server is still initializing. Please try again in a moment."
)]
if name == "search_quantconnect_docs":
start_time = time.time()
results = self._search_documents_enhanced(
arguments["query"],
arguments.get("max_results", 5)
)
search_time = time.time() - start_time
print(f"Search completed in {search_time:.3f}s", file=sys.stderr)
return [types.TextContent(
type="text",
text=json.dumps(results, indent=2)
)]
elif name == "list_quantconnect_docs":
doc_list = [
{
"filename": doc["title"],
"markdown_file": filename,
"total_pages": doc["total_pages"]
}
for filename, doc in self.documents.items()
]
return [types.TextContent(
type="text",
text=json.dumps(doc_list, indent=2)
)]
elif name == "get_document_content":
content = self._get_document_content(
arguments["filename"],
arguments.get("page_number")
)
return [types.TextContent(
type="text",
text=content
)]
else:
raise ValueError(f"Unknown tool: {name}")
def _search_documents_enhanced(self, query: str, max_results: int) -> list[dict[str, Any]]:
"""Enhanced search using the search index"""
# Use the search index for fast lookup
index_results = self.search_index.search(query, max_results * 2)
# Build detailed results
results = []
for idx_result in index_results[:max_results]:
doc_id = idx_result['doc_id']
if doc_id not in self.documents:
continue
doc = self.documents[doc_id]
# Get best matching context
contexts = []
seen_pages = set() # Avoid duplicate contexts from same page
for word, page, _ in idx_result['matches'][:3]:
if page in seen_pages:
continue
seen_pages.add(page)
# Find the page content
page_content = None
for page_info in doc['pages']:
if page_info['page'] == page:
page_content = page_info['content']
break
if page_content:
# Try to get context for the specific matching word first
context = self._get_context_around_match(page_content, word)
# If no context for individual word, try full query
if not context or context == "...":
context = self._get_context_around_match(page_content, query)
if context and context != "...":
contexts.append({
'page': page,
'context': context,
'matching_word': word
})
results.append({
'filename': doc['title'],
'markdown_file': doc_id,
'score': idx_result['score'],
'contexts': contexts,
'headers': doc['sections']['headers'][:3] # Include relevant headers
})
return results
def _get_document_content(self, filename: str, page_number: int = None) -> str:
"""Get content from a specific document"""
# Handle both with and without .md extension
if not filename.endswith('.md'):
# Try to find matching document
for doc_name, doc in self.documents.items():
if doc['title'] == filename or doc_name.startswith(filename):
filename = doc_name
break
if filename not in self.documents:
available = [f"{doc['title']} ({fname})" for fname, doc in self.documents.items()]
return f"Document '{filename}' not found. Available documents: {available}"
doc = self.documents[filename]
if page_number is not None:
# Find specific page
for page_info in doc['pages']:
if page_info['page'] == page_number:
return f"# {doc['title']} - Page {page_number}\n\n{page_info['content']}"
return f"Page {page_number} not found. Document has {doc['total_pages']} pages."
else:
# Return full document content
return doc['full_content']
async def run(self):
"""Run the MCP server"""
print("Starting server...", file=sys.stderr)
# Initialize in background but start server immediately
# The server will respond with "initializing" message until ready
async def initialize():
try:
await self._process_new_pdfs()
await self._load_markdown_files()
print("Server initialization completed", file=sys.stderr)
except Exception as e:
print(f"Server initialization failed: {e}", file=sys.stderr)
# Set a flag or handle the error appropriately
# Start initialization task
init_task = asyncio.create_task(initialize())
# Run the server using stdio
async with stdio_server() as (read_stream, write_stream):
print("Server running with stdio", file=sys.stderr)
await self.server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="quantconnect-pdf-server",
server_version="0.3.0",
capabilities=self.server.get_capabilities(
notification_options=NotificationOptions(),
experimental_capabilities={},
),
),
)
async def main():
# Get configuration from environment
pdf_folder = os.environ.get("QUANTCONNECT_PDF_FOLDER", "./quantconnect-docs")
markdown_folder = os.environ.get("QUANTCONNECT_MARKDOWN_FOLDER", None)
print(f"PDF folder: {pdf_folder}", file=sys.stderr)
if markdown_folder:
print(f"Markdown folder: {markdown_folder}", file=sys.stderr)
# Create server instance
server = QuantConnectPDFServer(pdf_folder, markdown_folder)
# Run the server
await server.run()
if __name__ == "__main__":
# Run the async main function
asyncio.run(main())