#!/usr/bin/env -S uv run --script
# -*- coding: utf-8 -*-
# /// script
# dependencies = [
# "mcp>=0.3.0",
# "sqlite-vec>=0.1.6",
# "sentence-transformers>=2.2.2",
# "requests>=2.28.0"
# ]
# requires-python = ">=3.10"
# ///
"""
Vector Memory MCP Server - Main Entry Point
===========================================
A secure, vector-based memory server using sqlite-vec for semantic search.
Stores and retrieves coding memories, experiences, and knowledge using
384-dimensional embeddings generated by sentence-transformers.
Usage:
python main.py --working-dir /path/to/project
Memory files stored in: {working_dir}/memory/vector_memory.db
"""
import sys
import re
from pathlib import Path
from typing import Dict, Any
from importlib import resources
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent / "src"))
from mcp.server.fastmcp import FastMCP
# Import our modules
from src.models import Config
from src.security import validate_working_dir, SecurityError
from src.memory_store import VectorMemoryStore
def get_working_dir() -> Path:
"""Get working directory from command line arguments"""
if "--working-dir" in sys.argv:
idx = sys.argv.index("--working-dir")
if idx + 1 < len(sys.argv):
return validate_working_dir(sys.argv[idx + 1])
# Default to current directory
return validate_working_dir(".")
def get_memory_limit() -> int:
"""Get memory limit from command line arguments"""
if "--memory-limit" in sys.argv:
idx = sys.argv.index("--memory-limit")
if idx + 1 < len(sys.argv):
try:
limit = int(sys.argv[idx + 1])
if limit < 1000:
print(f"Warning: memory-limit {limit} is too low, using minimum 1000", file=sys.stderr)
return 1000
if limit > 10_000_000:
print(f"Warning: memory-limit {limit} is too high, using maximum 10,000,000", file=sys.stderr)
return 10_000_000
return limit
except ValueError:
print(f"Warning: invalid memory-limit value, using default {Config.MAX_TOTAL_MEMORIES}", file=sys.stderr)
return Config.MAX_TOTAL_MEMORIES
# Default from config
return Config.MAX_TOTAL_MEMORIES
def create_server() -> FastMCP:
"""Create and configure the MCP server"""
# Initialize global memory store
try:
memory_dir = get_working_dir()
memory_limit = get_memory_limit()
db_path = memory_dir / Config.DB_NAME
memory_store = VectorMemoryStore(db_path, memory_limit=memory_limit)
print(f"Memory database path: {db_path} (lazy initialization)", file=sys.stderr)
print(f"Memory limit: {memory_limit:,} entries", file=sys.stderr)
except Exception as e:
print(f"Failed to initialize memory store: {e}", file=sys.stderr)
sys.exit(1)
# Create FastMCP server
mcp = FastMCP(Config.SERVER_NAME)
# ===============================================================================
# MCP TOOLS IMPLEMENTATION
# ===============================================================================
@mcp.tool()
async def store_memory(
content: str,
category: str = "other",
tags: list[str] = None
) -> dict[str, Any]:
"""
Store coding memory with vector embedding for semantic search.
Args:
content: Memory content (max 10K chars)
category: code-solution, bug-fix, architecture, learning, tool-usage, debugging, performance, security, other
tags: Tags for organization (max 10)
"""
try:
if tags is None:
tags = []
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
# Get embedding model asynchronously (lazy loading)
model = await memory_store.get_embedding_model_async()
result = memory_store.store_memory(content, category, tags, embedding_model=model)
return result
except SecurityError as e:
return {
"success": False,
"error": "Security validation failed",
"message": str(e)
}
except Exception as e:
return {
"success": False,
"error": "Storage failed",
"message": str(e)
}
@mcp.tool()
async def search_memories(
query: str,
limit: int = 10,
category: str = None,
offset: int = 0,
tags: list[str] = None
) -> dict[str, Any]:
"""
Search memories using semantic similarity (vector search).
Args:
query: Search query
limit: Max results (1-50, default 10)
category: Optional category filter
offset: Starting position for results (pagination, 0-based index, default 0)
tags: Optional list of tags to filter by (matches memories containing ANY of the specified tags)
"""
try:
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
# Get embedding model asynchronously (lazy loading)
model = await memory_store.get_embedding_model_async()
search_results, total = memory_store.search_memories(query, limit, category, offset, tags, embedding_model=model)
if not search_results:
return {
"success": True,
"results": [],
"total": total,
"count": 0,
"message": "No matching memories found. Try different keywords or broader terms."
}
# Convert SearchResult objects to dictionaries
results = [result.to_dict() for result in search_results]
return {
"success": True,
"query": query,
"results": results,
"total": total,
"count": len(results),
"message": f"Show {len(results)} of {total} total memories matching filters"
}
except SecurityError as e:
return {
"success": False,
"error": "Security validation failed",
"message": str(e)
}
except Exception as e:
return {
"success": False,
"error": "Search failed",
"message": str(e)
}
@mcp.tool()
async def list_recent_memories(limit: int = 10) -> dict[str, Any]:
"""
List recent memories in chronological order.
Args:
limit: Max results (1-50, default 10)
"""
try:
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
limit = min(max(1, limit), Config.MAX_MEMORIES_PER_SEARCH)
memories = memory_store.get_recent_memories(limit)
# Convert MemoryEntry objects to dictionaries
memory_dicts = [memory.to_dict() for memory in memories]
return {
"success": True,
"memories": memory_dicts,
"count": len(memory_dicts),
"message": f"Retrieved {len(memory_dicts)} recent memories"
}
except Exception as e:
return {
"success": False,
"error": "Failed to get recent memories",
"message": str(e)
}
@mcp.tool()
async def get_memory_stats() -> dict[str, Any]:
"""Get database statistics (total memories, categories, usage, health)."""
try:
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
stats = memory_store.get_stats()
result = stats.to_dict()
result["success"] = True
return result
except Exception as e:
return {
"success": False,
"error": "Failed to get statistics",
"message": str(e)
}
@mcp.tool()
async def clear_old_memories(
days_old: int = 30,
max_to_keep: int = 1000
) -> dict[str, Any]:
"""
Clear old memories to free space (keeps frequently accessed).
Args:
days_old: Min age in days (default 30)
max_to_keep: Max total memories (default 1000)
"""
try:
if days_old < 1:
return {
"success": False,
"error": "Invalid parameter",
"message": "days_old must be at least 1"
}
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
result = memory_store.clear_old_memories(days_old, max_to_keep)
return result
except SecurityError as e:
return {
"success": False,
"error": "Security validation failed",
"message": str(e)
}
except Exception as e:
return {
"success": False,
"error": "Cleanup failed",
"message": str(e)
}
@mcp.tool()
async def get_by_memory_id(memory_id: int) -> dict[str, Any]:
"""
Get specific memory by ID.
Args:
memory_id: Memory ID to retrieve
"""
try:
if not isinstance(memory_id, int) or memory_id < 1:
return {
"success": False,
"error": "Invalid parameter",
"message": "memory_id must be a positive integer"
}
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
memory = memory_store.get_memory_by_id(memory_id)
if memory is None:
return {
"success": False,
"error": "Not found",
"message": f"Memory with ID {memory_id} not found"
}
return {
"success": True,
"memory": memory.to_dict(),
"message": "Memory retrieved successfully"
}
except Exception as e:
return {
"success": False,
"error": "Retrieval failed",
"message": str(e)
}
@mcp.tool()
async def delete_by_memory_id(memory_id: int) -> dict[str, Any]:
"""
Delete memory by ID (permanent, cannot be undone).
Args:
memory_id: Memory ID to delete
"""
try:
if not isinstance(memory_id, int) or memory_id < 1:
return {
"success": False,
"error": "Invalid parameter",
"message": "memory_id must be a positive integer"
}
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
deleted = memory_store.delete_memory(memory_id)
if not deleted:
return {
"success": False,
"error": "Not found",
"message": f"Memory with ID {memory_id} not found"
}
return {
"success": True,
"memory_id": memory_id,
"message": "Memory deleted successfully from both metadata and vector tables"
}
except Exception as e:
return {
"success": False,
"error": "Deletion failed",
"message": str(e)
}
@mcp.tool()
async def get_unique_tags() -> dict[str, Any]:
"""Get all unique tags from memory database."""
try:
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
tags = memory_store.get_unique_tags()
return {
"success": True,
"tags": tags,
"count": len(tags),
"message": f"Retrieved {len(tags)} unique tags"
}
except Exception as e:
return {
"success": False,
"error": "Failed to retrieve tags",
"message": str(e)
}
@mcp.tool()
async def get_canonical_tags() -> dict[str, Any]:
"""
Get all canonical tags (semantic tag clusters).
Canonical tags are the normalized form of semantically similar tags.
For example: 'API v2.0', 'API 2.0', 'api version 2' all map to one canonical tag.
"""
try:
# Ensure database is initialized (lazy loading)
await memory_store._ensure_db_initialized_async()
tags = memory_store.get_canonical_tags()
return {
"success": True,
"tags": tags,
"count": len(tags),
"message": f"Retrieved {len(tags)} canonical tags"
}
except Exception as e:
return {
"success": False,
"error": "Failed to retrieve canonical tags",
"message": str(e)
}
@mcp.tool()
async def get_tag_frequencies() -> dict[str, Any]:
"""
Get frequency count for all canonical tags.
Shows how often each tag is used. Useful for understanding tag popularity
and for IDF-based weighting.
"""
try:
await memory_store._ensure_db_initialized_async()
frequencies = memory_store.get_tag_frequencies()
# Sort by frequency descending
sorted_freq = sorted(frequencies.items(), key=lambda x: -x[1])
return {
"success": True,
"frequencies": dict(sorted_freq),
"count": len(frequencies),
"message": f"Retrieved frequencies for {len(frequencies)} tags"
}
except Exception as e:
return {
"success": False,
"error": "Failed to retrieve tag frequencies",
"message": str(e)
}
@mcp.tool()
async def get_tag_weights() -> dict[str, Any]:
"""
Get IDF-based weights for all canonical tags.
Weight formula: 1 / log(1 + frequency)
- High frequency tags (common like 'api') → lower weight
- Low frequency tags (rare like 'module:terminal') → higher weight
Useful for search relevance tuning.
"""
try:
await memory_store._ensure_db_initialized_async()
weights = memory_store.get_tag_weights()
# Sort by weight descending (rarest first)
sorted_weights = sorted(weights.items(), key=lambda x: -x[1])
return {
"success": True,
"weights": dict(sorted_weights),
"count": len(weights),
"message": f"Retrieved IDF weights for {len(weights)} tags"
}
except Exception as e:
return {
"success": False,
"error": "Failed to retrieve tag weights",
"message": str(e)
}
@mcp.tool()
async def tag_normalize_preview(
threshold: float = 0.90,
max_changes: int = 200
) -> dict[str, Any]:
"""
Preview tag normalization without applying changes.
Scans all memories and identifies tags that can be merged into
existing canonical tags based on semantic similarity with guards.
Returns a deterministic preview_id (same state = same ID).
No database modifications are made.
Args:
threshold: Minimum cosine similarity for merging (0.0-1.0, default 0.90)
max_changes: Maximum number of tag mappings to propose (default 200)
"""
try:
await memory_store._ensure_db_initialized_async()
model = await memory_store.get_embedding_model_async()
result = memory_store.tag_normalize_preview(threshold, max_changes, embedding_model=model)
return result
except Exception as e:
return {
"success": False,
"error": "Preview failed",
"message": str(e)
}
@mcp.tool()
async def tag_normalize_apply(
preview_id: str,
snapshot_id: str,
threshold: float = 0.90,
max_changes: int = 200
) -> dict[str, Any]:
"""
Apply tag normalization. Requires both preview_id and snapshot_id.
Safety requirements:
- snapshot_id must exist (created via snapshot_create) for rollback
- preview_id must match current state (no drift since preview)
Only modifies tags in memory_metadata. Content and embeddings
are NOT changed — this is a tags-only operation.
Args:
preview_id: ID from tag_normalize_preview (ensures no state drift)
snapshot_id: ID from snapshot_create (ensures rollback is possible)
threshold: Must match the threshold used in preview (default 0.90)
max_changes: Must match max_changes used in preview (default 200)
"""
try:
await memory_store._ensure_db_initialized_async()
model = await memory_store.get_embedding_model_async()
result = memory_store.tag_normalize_apply(
preview_id, snapshot_id, threshold, max_changes, embedding_model=model
)
return result
except Exception as e:
return {
"success": False,
"error": "Apply failed",
"message": str(e)
}
@mcp.tool()
async def snapshot_create(
description: str = ""
) -> dict[str, Any]:
"""
Create a snapshot of current memory tags for rollback safety.
Captures memory_id → tags mapping for all memories.
Required before running tag_normalize_apply.
The snapshot_id is a deterministic hash of the current tag state.
Args:
description: Optional human-readable description of the snapshot
"""
try:
await memory_store._ensure_db_initialized_async()
result = memory_store.snapshot_create(description)
return result
except Exception as e:
return {
"success": False,
"error": "Snapshot creation failed",
"message": str(e)
}
@mcp.tool()
async def snapshot_restore(
snapshot_id: str
) -> dict[str, Any]:
"""
Restore memory tags from a previously created snapshot.
Only modifies tags — content and embeddings are untouched.
Use this to rollback after tag_normalize_apply if needed.
Args:
snapshot_id: ID of the snapshot to restore (from snapshot_create)
"""
try:
await memory_store._ensure_db_initialized_async()
result = memory_store.snapshot_restore(snapshot_id)
return result
except Exception as e:
return {
"success": False,
"error": "Restore failed",
"message": str(e)
}
@mcp.tool()
async def cookbook(
level: int = 0,
include: str = "init",
case_category: str = None,
query: str = None,
priority: str = None,
cognitive: str = None,
strict: str = None,
limit: int = 10,
offset: int = 0
) -> dict[str, Any]:
"""
CRITICAL: Your essential knowledge base for this MCP.
READ THIS FIRST before using any other tools.
Without this, you are operating blind.
Contains: docs (4 levels), use cases (12 categories), categories reference.
Args:
include: What to return (default "init")
- "init": FIRST READ - quick start + available resources
- "docs": Documentation by level
- "cases": Use case scenarios (all or filtered)
- "categories": List of available case categories
- "all": Everything combined
level: Verbosity level 0-3 for docs (default 0)
case_category: Filter cases by category. Supports comma-separated list (e.g., "store,search,gates-rules")
query: Text search filter - finds content containing keywords
priority: Filter by priority level: "critical", "high", or "critical,high"
cognitive: Filter by cognitive level tags (minimal, standard, deep, exhaustive), CSV for OR
strict: Filter by strict level tags (relaxed, standard, strict, paranoid), CSV for OR
limit: Max results/sections to return (default 10, max 50)
offset: Starting position for pagination (default 0)
Examples:
# FIRST: Initialize context
mcp__vector-memory__cookbook()
# List available categories with keys
mcp__vector-memory__cookbook(include="categories")
# Single category
mcp__vector-memory__cookbook(include="cases", case_category="gates-rules")
# Multiple categories
mcp__vector-memory__cookbook(include="cases", case_category="store,search")
# Only CRITICAL rules
mcp__vector-memory__cookbook(include="cases", case_category="gates-rules", priority="critical")
# CRITICAL or HIGH across all categories
mcp__vector-memory__cookbook(include="cases", priority="critical,high")
# Cognitive filter
mcp__vector-memory__cookbook(include="cases", cognitive="deep,exhaustive")
# Strict filter
mcp__vector-memory__cookbook(include="cases", strict="strict,paranoid")
# Search + priority filter
mcp__vector-memory__cookbook(include="cases", query="JWT", priority="critical")
# Documentation by level
mcp__vector-memory__cookbook(include="docs", level=2)
# Pagination
mcp__vector-memory__cookbook(include="cases", priority="critical", limit=5, offset=0)
"""
try:
if level not in [0, 1, 2, 3]:
return {
"success": False,
"error": "Invalid level",
"message": "Level must be 0, 1, 2, or 3"
}
if include not in ["init", "docs", "cases", "categories", "all"]:
return {
"success": False,
"error": "Invalid include",
"message": "include must be 'init', 'docs', 'cases', 'categories', or 'all'"
}
# Parse priority filter
priority_list = None
if priority:
priority_list = [p.strip().lower() for p in priority.split(",")]
valid_priorities = ["critical", "high"]
invalid = [p for p in priority_list if p not in valid_priorities]
if invalid:
return {
"success": False,
"error": "Invalid priority",
"message": f"Priority must be one of: {valid_priorities}. Got: {invalid}"
}
# Parse cognitive filter
cognitive_list = None
if cognitive:
cognitive_list = [c.strip().lower() for c in cognitive.split(",")]
valid_cognitive = ["minimal", "standard", "deep", "exhaustive"]
invalid = [c for c in cognitive_list if c not in valid_cognitive]
if invalid:
return {
"success": False,
"error": "Invalid cognitive",
"message": f"Cognitive must be one of: {valid_cognitive}. Got: {invalid}"
}
# Parse strict filter
strict_list = None
if strict:
strict_list = [s.strip().lower() for s in strict.split(",")]
valid_strict = ["relaxed", "standard", "strict", "paranoid"]
invalid = [s for s in strict_list if s not in valid_strict]
if invalid:
return {
"success": False,
"error": "Invalid strict",
"message": f"Strict must be one of: {valid_strict}. Got: {invalid}"
}
limit = max(1, min(limit, 50))
offset = max(0, offset)
result = {
"success": True,
"level": level,
"include": include,
"query": query,
"priority": priority,
"cognitive": cognitive,
"strict": strict,
"limit": limit,
"offset": offset
}
def filter_by_tag_pattern(content, tag_type, values):
"""Filter content by tag patterns like cognitive:* or strict:*"""
if not values or not content:
return content
if isinstance(content, dict):
content = str(content.get("content", content))
import re
sections = re.split(r'\n---+\n', content)
matching = []
for section in sections:
for val in values:
pattern = f"{tag_type}:{val}"
if pattern.lower() in section.lower():
matching.append(section)
break
return "\n\n---\n\n".join(matching) if matching else ""
docs_content = _load_resource("README_AGENTS.md")
cases_content = _load_resource("CASES_AGENTS.md")
case_sections = _list_case_sections(cases_content) if cases_content else []
if include == "init":
result["critical"] = "READ THIS FIRST - Your essential knowledge base for this MCP"
result["warning"] = "Without this cookbook, you are operating blind. Always consult it first."
quick_start_content = _extract_level(docs_content, 0) if docs_content else {"error": "README_AGENTS.md not found"}
if query:
quick_start_content = _filter_by_query(quick_start_content, query, limit, offset)
if priority_list:
quick_start_content = _filter_by_priority(quick_start_content, priority_list)
result["quick_start"] = quick_start_content
result["available_resources"] = {
"cookbook_docs": {
"levels": [0, 1, 2, 3],
"level_names": ["Identity & Quick Start", "Practical Usage", "Advanced Patterns", "Architecture & Internals"],
"usage": "mcp__vector-memory__cookbook(include='docs', level=N)"
},
"cookbook_cases": {
"categories": case_sections,
"count": len(case_sections),
"usage": "mcp__vector-memory__cookbook(include='cases', case_category='key1,key2')"
},
"cookbook_priority": {
"usage": "mcp__vector-memory__cookbook(include='cases', priority='critical') or priority='critical,high'"
},
"cookbook_cognitive": {
"values": ["minimal", "standard", "deep", "exhaustive"],
"usage": "mcp__vector-memory__cookbook(include='cases', cognitive='deep,exhaustive')"
},
"cookbook_strict": {
"values": ["relaxed", "standard", "strict", "paranoid"],
"usage": "mcp__vector-memory__cookbook(include='cases', strict='strict,paranoid')"
},
"cookbook_search": {
"usage": "mcp__vector-memory__cookbook(include='cases', query='keywords')"
},
"cookbook_categories": {
"usage": "mcp__vector-memory__cookbook(include='categories')"
}
}
return result
if include in ["docs", "all"]:
if docs_content:
docs_extracted = _extract_level(docs_content, level)
if query:
docs_extracted = _filter_by_query(docs_extracted, query, limit, offset)
if priority_list:
docs_extracted = _filter_by_priority(docs_extracted, priority_list)
result["docs"] = {
"level": level,
"level_name": ["Identity & Quick Start", "Practical Usage", "Advanced Patterns", "Architecture & Internals"][level],
"content": docs_extracted
}
else:
result["docs"] = {"error": "README_AGENTS.md not found"}
if include in ["categories", "all"]:
categories_filtered = case_sections
if query:
query_lower = query.lower()
categories_filtered = [
s for s in case_sections
if query_lower in s["title"].lower() or query_lower in s.get("description", "").lower()
]
paginated = categories_filtered[offset:offset + limit]
result["categories"] = {
"categories": paginated,
"keys": [s["key"] for s in paginated],
"total": len(categories_filtered),
"count": len(paginated),
"offset": offset,
"usage": "Use include='cases' with case_category='key1,key2' for multiple categories"
}
if include in ["cases", "all"]:
case_titles = [s["title"] for s in case_sections]
if not cases_content:
result["cases"] = {"error": "CASES_AGENTS.md not found"}
elif case_category:
# Parse multiple categories (comma-separated)
category_keys = [c.strip().lower() for c in case_category.split(",")]
matched_sections = []
for section in case_sections:
for cat_key in category_keys:
if cat_key == section["key"] or cat_key in section["title"].lower():
matched_sections.append(section["title"])
break
if matched_sections:
# Combine all matched sections
combined_content = ""
for i, section_title in enumerate(matched_sections):
section_content = _extract_case_section(cases_content, section_title)
if priority_list:
section_content = _filter_by_priority(section_content, priority_list)
if cognitive_list:
section_content = filter_by_tag_pattern(section_content, "cognitive", cognitive_list)
if strict_list:
section_content = filter_by_tag_pattern(section_content, "strict", strict_list)
combined_content += f"\n\n---\n\n{section_content}"
if query:
combined_content = _filter_by_query(combined_content, query, limit, offset)
result["cases"] = {
"categories": matched_sections,
"count": len(matched_sections),
"content": combined_content.strip()
}
else:
result["cases"] = {
"error": f"Categories '{case_category}' not found",
"available_keys": [s["key"] for s in case_sections],
"available_titles": case_titles
}
else:
# No category filter - return all or search
filtered_content = cases_content
filter_applied = False
if priority_list:
filtered_content = _filter_by_priority(filtered_content, priority_list)
filter_applied = True
if cognitive_list:
filtered_content = filter_by_tag_pattern(filtered_content, "cognitive", cognitive_list)
filter_applied = True
if strict_list:
filtered_content = filter_by_tag_pattern(filtered_content, "strict", strict_list)
filter_applied = True
if query:
filtered_content = _filter_by_query(filtered_content, query, limit, offset)
filter_applied = True
if filter_applied:
result["cases"] = {
"query": query,
"priority": priority,
"cognitive": cognitive,
"strict": strict,
"content": filtered_content if isinstance(filtered_content, str) else str(filtered_content),
"usage": "Content matching all filters"
}
else:
result["cases"] = {
"categories": case_sections,
"content": cases_content
}
msg_parts = [f"Retrieved {include}"]
if include not in ["categories"]:
msg_parts.append(f"level {level}")
if query:
msg_parts.append(f"query='{query}'")
if priority:
msg_parts.append(f"priority='{priority}'")
if cognitive:
msg_parts.append(f"cognitive='{cognitive}'")
if strict:
msg_parts.append(f"strict='{strict}'")
result["message"] = " ".join(msg_parts)
return result
except Exception as e:
return {
"success": False,
"error": "Failed to retrieve cookbook",
"message": str(e)
}
return mcp
def _load_resource(filename: str) -> str | None:
"""Load resource file from package."""
try:
with resources.files("src").joinpath(filename).open("r", encoding="utf-8") as f:
return f.read()
except Exception:
return None
def _extract_level(content: str, level: int) -> str:
"""Extract documentation for specified level."""
pattern = rf"^## Level {level}: .*$"
lines = content.split("\n")
start_idx = None
end_idx = len(lines)
for i, line in enumerate(lines):
if re.match(pattern, line, re.MULTILINE):
start_idx = i
elif start_idx is not None and re.match(r"^## Level \d+: ", line):
end_idx = i
break
if start_idx is None:
return "Level not found in documentation"
section_lines = lines[start_idx:end_idx]
return "\n".join(section_lines).strip()
def _list_case_sections(content: str) -> list[dict]:
"""List all case sections with keys, titles and descriptions from HTML comments."""
pattern = r"^## ([A-Za-z\s&]+) Scenarios$"
desc_pattern = r"<!-- description: (.+?) -->"
sections = []
lines = content.split("\n")
for i, line in enumerate(lines):
match = re.match(pattern, line, re.MULTILINE)
if match:
name = match.group(1).strip()
title = name + " Scenarios"
description = ""
# Generate key: kebab-case from name (without "Scenarios")
key = name.lower().replace(" & ", "-").replace(" ", "-").replace("--", "-")
# Check next line for description comment
if i + 1 < len(lines):
desc_match = re.search(desc_pattern, lines[i + 1])
if desc_match:
description = desc_match.group(1).strip()
sections.append({
"key": key,
"title": title,
"description": description
})
return sections
def _extract_case_section(content: str, section_name: str) -> str:
"""Extract specific case section by name."""
escaped_name = re.escape(section_name)
pattern = rf"^## {escaped_name}$"
lines = content.split("\n")
start_idx = None
end_idx = len(lines)
for i, line in enumerate(lines):
if re.match(pattern, line, re.MULTILINE):
start_idx = i
elif start_idx is not None and re.match(r"^## [A-Za-z\s]+ Scenarios$", line):
end_idx = i
break
section_lines = lines[start_idx:end_idx]
return "\n".join(section_lines).strip()
def _filter_by_priority(content: str, priority_list: list) -> str:
"""Filter content by priority markers [CRITICAL] and [HIGH]."""
if not content or not priority_list:
return content
# Build regex pattern for priority markers
priority_pattern = r"\[(" + "|".join(p.upper() for p in priority_list) + r")\]"
lines = content.split("\n")
matches = []
current_block_start = None
for i, line in enumerate(lines):
if re.search(priority_pattern, line, re.IGNORECASE):
if current_block_start is None:
current_block_start = i
elif current_block_start is not None and line.startswith("**") and line.endswith("**"):
# End of block (next subsection)
block_lines = lines[current_block_start:i]
matches.extend(block_lines)
current_block_start = None
elif current_block_start is not None and line.startswith("---"):
# End of block (separator)
block_lines = lines[current_block_start:i]
matches.extend(block_lines)
matches.append("")
current_block_start = None
# Don't forget last block
if current_block_start is not None:
matches.extend(lines[current_block_start:])
if not matches:
return f"No content found with priority: {', '.join(priority_list)}"
return "\n".join(matches)
def _filter_sections_by_priority(sections: list, content: str, priority_list: list) -> list:
"""Filter section search results by priority markers."""
if not priority_list:
return sections
priority_pattern = r"\[(" + "|".join(p.upper() for p in priority_list) + r")\]"
filtered = []
for section in sections:
section_content = _extract_case_section(content, section.get("section", section.get("title", "")))
if re.search(priority_pattern, section_content, re.IGNORECASE):
filtered.append(section)
return filtered
def _filter_by_query(content: str, query: str, limit: int, offset: int) -> str:
"""Filter content by query keywords, returning matching lines with context."""
if not content or not query:
return content
keywords = [kw.lower().strip() for kw in query.split() if kw.strip()]
if not keywords:
return content
lines = content.split("\n")
matches = []
for i, line in enumerate(lines):
line_lower = line.lower()
if any(kw in line_lower for kw in keywords):
start = max(0, i - 2)
end = min(len(lines), i + 3)
context = lines[start:end]
matches.append({
"line_number": i + 1,
"matched_line": line,
"context": "\n".join(context)
})
if not matches:
return f"No matches found for query: '{query}'"
total = len(matches)
paginated = matches[offset:offset + limit]
result_lines = [f"Found {total} matches for '{query}'. Showing {len(paginated)} (offset: {offset}, limit: {limit}):\n"]
for m in paginated:
result_lines.append(f"--- Line {m['line_number']} ---")
result_lines.append(m['context'])
result_lines.append("")
return "\n".join(result_lines)
def _search_in_cases(content: str, sections: list, query: str, limit: int, offset: int) -> list:
"""Search for query in all case sections, return matching sections with snippets."""
keywords = [kw.lower().strip() for kw in query.split() if kw.strip()]
if not keywords:
return []
results = []
for section_name in sections:
section_content = _extract_case_section(content, section_name)
lines = section_content.split("\n")
matches_in_section = []
for i, line in enumerate(lines):
line_lower = line.lower()
if any(kw in line_lower for kw in keywords):
matches_in_section.append({
"line": i + 1,
"content": line.strip()[:200]
})
if matches_in_section:
results.append({
"section": section_name,
"match_count": len(matches_in_section),
"matches": matches_in_section[:5]
})
total = len(results)
return results[offset:offset + limit]
def main():
"""Main entry point"""
print(f"Starting {Config.SERVER_NAME} v{Config.SERVER_VERSION}", file=sys.stderr)
try:
# Get working directory and config
memory_dir = get_working_dir()
memory_limit = get_memory_limit()
db_path = memory_dir / Config.DB_NAME
print(f"Working directory: {memory_dir.parent}", file=sys.stderr)
print(f"Memory database: {db_path}", file=sys.stderr)
print(f"Memory limit: {memory_limit:,} entries", file=sys.stderr)
print(f"Embedding model: {Config.EMBEDDING_MODEL}", file=sys.stderr)
print("=" * 50, file=sys.stderr)
# Create and run server
server = create_server()
print("Server ready for connections...", file=sys.stderr)
server.run()
except KeyboardInterrupt:
print("\nServer stopped by user", file=sys.stderr)
except Exception as e:
print(f"Server failed to start: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()