"""Standards search and query tools.
These tools help agents find relevant standards and documentation.
"""
import re
from dataclasses import dataclass
from pathlib import Path
from fastmcp import FastMCP, Context
from ..config import get_config
from ..observability import get_tracer
router = FastMCP("search-tools")
tracer = get_tracer(__name__)
@dataclass
class SearchResult:
"""A single search result."""
uri: str
title: str
excerpt: str
relevance: float
line_number: int | None = None
def to_dict(self) -> dict:
return {
"uri": self.uri,
"title": self.title,
"excerpt": self.excerpt,
"relevance": self.relevance,
"line_number": self.line_number,
}
@router.tool()
async def search_standards(
query: str,
category: str | None = None,
max_results: int = 10,
ctx: Context = None
) -> dict:
"""Search standards documents for relevant content.
Performs a full-text search across all standards documents
and returns relevant excerpts with context.
Args:
query: Search query (keywords or phrases)
category: Optional category filter (agents, mcp, infrastructure)
max_results: Maximum number of results to return (default: 10)
ctx: MCP context
Returns:
Dictionary with search results and metadata.
Example:
>>> results = await search_standards("authentication")
>>> for r in results["results"]:
... print(f"{r['title']}: {r['excerpt']}")
"""
with tracer.start_as_current_span("tool.search_standards") as span:
span.set_attribute("search.query", query)
span.set_attribute("search.category", category or "all")
if ctx:
await ctx.info(f"Searching standards for: {query}")
results = []
config = get_config()
standards_root = config.get_standards_root()
if not standards_root.exists():
return {
"query": query,
"total": 0,
"results": [],
"error": f"Standards directory not found: {standards_root}"
}
# Build search pattern
search_pattern = re.compile(re.escape(query), re.IGNORECASE)
# Search through files
search_paths = [standards_root]
if category:
category_path = standards_root / category
if category_path.exists():
search_paths = [category_path]
for search_path in search_paths:
for md_file in search_path.rglob("*.md"):
if md_file.name == "README.md":
continue
try:
content = md_file.read_text(encoding="utf-8")
matches = list(search_pattern.finditer(content))
if matches:
# Calculate relevance based on match count and position
relevance = min(1.0, len(matches) * 0.2)
# Get title
title = _extract_title(content) or md_file.stem.replace("-", " ").title()
# Get excerpt around first match
first_match = matches[0]
excerpt = _get_excerpt(content, first_match.start(), context_chars=150)
line_number = content[:first_match.start()].count('\n') + 1
# Build URI
relative = md_file.relative_to(standards_root)
parts = list(relative.parts)
parts[-1] = parts[-1].replace(".md", "")
uri = f"standards://{'/'.join(parts)}"
results.append(SearchResult(
uri=uri,
title=title,
excerpt=excerpt,
relevance=relevance,
line_number=line_number
))
except Exception as e:
if ctx:
await ctx.warning(f"Error reading {md_file}: {e}")
# Sort by relevance and limit
results.sort(key=lambda r: r.relevance, reverse=True)
results = results[:max_results]
span.set_attribute("search.results_count", len(results))
return {
"query": query,
"category": category,
"total": len(results),
"results": [r.to_dict() for r in results]
}
@router.tool()
async def list_standards_categories(ctx: Context = None) -> dict:
"""List all available standards categories.
Returns a list of all top-level categories in the standards
documentation with descriptions and document counts.
Args:
ctx: MCP context
Returns:
Dictionary with category information.
"""
with tracer.start_as_current_span("tool.list_standards_categories"):
config = get_config()
standards_root = config.get_standards_root()
if not standards_root.exists():
return {"categories": [], "error": "Standards directory not found"}
categories = []
for item in sorted(standards_root.iterdir()):
if item.is_dir() and not item.name.startswith("."):
# Count documents in category
doc_count = len(list(item.rglob("*.md"))) - (
1 if (item / "README.md").exists() else 0
)
# Try to get description from README
description = None
readme = item / "README.md"
if readme.exists():
content = readme.read_text(encoding="utf-8")
# Get first paragraph after title
lines = content.split("\n")
for i, line in enumerate(lines):
if line.startswith("# "):
# Skip title, find first non-empty line
for desc_line in lines[i+1:]:
if desc_line.strip() and not desc_line.startswith("#"):
description = desc_line.strip()
break
break
categories.append({
"name": item.name,
"path": f"standards://{item.name}",
"description": description or f"Standards for {item.name}",
"document_count": doc_count
})
return {"categories": categories}
@router.tool()
async def get_standards_summary(
category: str,
ctx: Context = None
) -> dict:
"""Get a summary of all standards in a category.
Returns titles and descriptions of all documents in the
specified category for quick reference.
Args:
category: Category name (agents, mcp, infrastructure)
ctx: MCP context
Returns:
Dictionary with document summaries.
"""
with tracer.start_as_current_span("tool.get_standards_summary") as span:
span.set_attribute("category", category)
config = get_config()
standards_root = config.get_standards_root()
category_path = standards_root / category
if not category_path.exists():
return {
"category": category,
"error": f"Category not found: {category}",
"documents": []
}
documents = []
for md_file in sorted(category_path.rglob("*.md")):
if md_file.name == "README.md":
continue
content = md_file.read_text(encoding="utf-8")
title = _extract_title(content) or md_file.stem.replace("-", " ").title()
description = _extract_description(content)
# Build URI
relative = md_file.relative_to(standards_root)
parts = list(relative.parts)
parts[-1] = parts[-1].replace(".md", "")
uri = f"standards://{'/'.join(parts)}"
documents.append({
"uri": uri,
"title": title,
"description": description,
"filename": md_file.name
})
return {
"category": category,
"document_count": len(documents),
"documents": documents
}
# =============================================================================
# Helper Functions
# =============================================================================
def _extract_title(content: str) -> str | None:
"""Extract title from markdown content."""
for line in content.split("\n"):
if line.startswith("# "):
return line[2:].strip()
return None
def _extract_description(content: str) -> str | None:
"""Extract first paragraph description from markdown."""
lines = content.split("\n")
in_frontmatter = False
past_title = False
for line in lines:
# Skip frontmatter
if line.strip() == "---":
in_frontmatter = not in_frontmatter
continue
if in_frontmatter:
continue
# Skip title
if line.startswith("# "):
past_title = True
continue
# Get first non-empty, non-heading line after title
if past_title and line.strip() and not line.startswith("#"):
return line.strip()
return None
def _get_excerpt(content: str, match_pos: int, context_chars: int = 150) -> str:
"""Get excerpt around a match position."""
start = max(0, match_pos - context_chars)
end = min(len(content), match_pos + context_chars)
excerpt = content[start:end]
# Clean up to word boundaries
if start > 0:
first_space = excerpt.find(" ")
if first_space > 0:
excerpt = "..." + excerpt[first_space + 1:]
if end < len(content):
last_space = excerpt.rfind(" ")
if last_space > 0:
excerpt = excerpt[:last_space] + "..."
# Clean whitespace
excerpt = " ".join(excerpt.split())
return excerpt