"""
Minimal FastMCP server that exposes one tool: nrtsearch/search
──────────────────────────────────────────────────────────────
• Accepts **any** Lucene query the caller provides.
• Normalises bare keywords → text:"…" so phrase tests still work.
• Validates topHits (1-100) to avoid runaway requests.
• Returns [{"score": …, "stars": …, "text": …}] – easy for Copilot to display.
"""
import logging
import os
from typing import List, Optional
import httpx
import asyncio
from fastmcp import FastMCP
from pydantic import BaseModel
from nrtsearch_mcp.settings import Settings
import time
from nrtsearch_mcp.logging import setup_logging
settings = Settings()
logger = setup_logging("nrtsearch.mcp", settings.log_level)
mcp = FastMCP("nrtsearch") # host / port / path supplied at run()
# ────────── result schema ─────────────────────────────────────────────────────
class Hit(BaseModel):
score: float
stars: int
text: str
class SearchResult(BaseModel):
hits: List[Hit]
# ────────── MCP tool ──────────────────────────────────────────────────────────
# Private implementation for search logic, for direct unit testing
async def _search_impl(
index: str,
queryText: str,
topHits: int = 10,
retrieveFields: Optional[List[str]] = None,
highlight: bool = False,
) -> SearchResult:
"""
Core search logic for NRTSearch/Lucene index. Used by the tool and for direct unit testing.
"""
# ── sanity-check inputs ───────────────────────────────────────────────────
topHits = max(1, min(topHits, 100))
if "text:" not in queryText and '"' not in queryText:
queryText = f'text:"{queryText}"'
if not retrieveFields:
# Try to get a default from settings, or fallback to an empty list
retrieveFields = getattr(settings, "default_retrieve_fields", {}).get(index)
if retrieveFields is None:
retrieveFields = getattr(settings, "default_retrieve_fields_global", [])
# Ensure retrieveFields is always a list
if not retrieveFields:
logger.warning("No retrieveFields specified and no default found for index '%s'.", index)
retrieveFields = list(retrieveFields) if retrieveFields else []
logger.info("→ search %s | %r | top=%s", index, queryText, topHits)
# ── call the HTTP wrapper ────────────────────────────────────────────────
attempt = 0
max_attempts = getattr(settings, "retry_max_attempts", 3)
backoff = getattr(settings, "retry_initial_backoff", 0.1)
total_timeout = getattr(settings, "total_timeout", 10.0)
per_attempt_timeout = getattr(settings, "per_attempt_timeout", 5.0)
start_time = time.monotonic()
while True:
elapsed = time.monotonic() - start_time
if elapsed > total_timeout:
logger.error("Total retry timeout (%.2fs) exceeded for search operation", total_timeout)
raise TimeoutError(f"Total retry timeout ({total_timeout}s) exceeded")
try:
async with httpx.AsyncClient(timeout=per_attempt_timeout) as client:
payload = {
"indexName": index,
"queryText": queryText,
"topHits": topHits,
"retrieveFields": retrieveFields,
}
if highlight:
payload["highlightFields"] = ["text"]
resp = await client.post(
f"{settings.gateway_url}/v1/search",
json=payload,
)
resp.raise_for_status()
raw = resp.json()
break
except (httpx.RequestError, httpx.HTTPStatusError) as e:
attempt += 1
logger.warning(f"Gateway call failed (attempt {attempt}): {e}")
if attempt >= max_attempts:
logger.error("Max retry attempts reached. Raising error.")
raise
await asyncio.sleep(backoff)
backoff = min(backoff * 2, 0.8)
# ── reshape results for Copilot ──────────────────────────────────────────
hits: List[Hit] = []
for hit in raw.get("hits", []):
fields = hit["fields"]
try:
score = hit.get("score", 0.0)
stars = None
text = None
if "stars" in fields and fields["stars"].get("fieldValue"):
stars = fields["stars"]["fieldValue"][0].get("intValue")
if "text" in fields and fields["text"].get("fieldValue"):
text = fields["text"]["fieldValue"][0].get("textValue")
hits.append(
Hit(
score=score,
stars=stars,
text=text,
)
)
except (KeyError, IndexError, TypeError) as e:
logger.error("Error extracting hit fields: %s", e)
return SearchResult(hits=hits)
# MCP tool wrapper calls the private implementation
@mcp.tool(
description="Search an NRTSearch/Lucene index",
annotations={
"parameters": {
"properties": {
"queryText": {
"description": (
"Lucene Boolean query **required**.\n"
"• Join keywords with AND/OR/NOT → text:(gay AND bar AND sf)\n"
"• Use quotes for phrases → text:\"great coffee\"\n"
"• Range / wildcard / fuzzy ok → stars:[4 TO 5], bar*, cocktail~1"
)
}
}
},
"examples": [
{ # Boolean keywords
"index": "yelp_reviews_staging",
"queryText": 'text:(irish AND pub AND (texas OR tx))',
"topHits": 3
},
{ # Phrase
"index": "yelp_reviews_staging",
"queryText": 'text:"great coffee"',
"topHits": 5
}
],
},
)
async def search(
index: str,
queryText: str,
topHits: int = 10,
retrieveFields: Optional[List[str]] = None,
highlight: bool = False,
) -> SearchResult:
return await _search_impl(index, queryText, topHits, retrieveFields, highlight)
# ────────── run the server ────────────────────────────────────────────────────
def entrypoint():
# CLI entry-point for nrtsearch-mcp-server
main()
def main():
# Streamable-HTTP endpoint on http://host:port/
mcp.run(transport="http", host=settings.host, port=settings.port, path="/")
if __name__ == "__main__":
entrypoint()