Skip to main content
Glama

E-commerce Local MCP Server

query.py10.7 kB
from fastapi import APIRouter, HTTPException, Header from src.models.api import ( QueryRequest, QueryResponse, QueryMetadata, StructuredData, TokenUsage, UserTokenInfo, SubscriptionInfo ) from src.services.query_processor import query_processor from src.services.llm_query_processor import llm_query_processor from src.services.universal_llm_processor import universal_llm_processor from src.services.auth_service import auth_service from src.services.token_service import token_service from src.services.subscription_service import subscription_service from src.services.conversation_service import conversation_service from src.config.settings import settings import logging router = APIRouter() logger = logging.getLogger(__name__) @router.post("/query", response_model=QueryResponse) async def process_query( request: QueryRequest, authorization: str = Header(..., description="Bearer token for authentication") ): """Process natural language queries with subscription and token management""" logger.info(f"Processing authenticated query: {request.query}") # Step 1: Authenticate and get user context context = await auth_service.authenticate_request(authorization) logger.info(f"Authenticated user: {context.user_id}, shop: {context.shop_id}") # Step 2: Check subscription and token limits estimated_tokens = token_service.estimate_query_tokens( request.query, request.options.dict() if request.options else None ) can_proceed, token_info = await token_service.check_token_availability( context.user_id, context.shop_id, estimated_tokens ) if not can_proceed: # Return token limit error with custom response structure from fastapi.responses import JSONResponse if token_info.get("error") == "NO_SUBSCRIPTION": return JSONResponse( status_code=402, content={ "error": "NO_SUBSCRIPTION", "message": token_info.get("message"), "subscribe_url": "/api/v1/subscription" } ) elif token_info.get("error") == "TOKEN_LIMIT_EXCEEDED": return JSONResponse( status_code=429, content={ "error": "TOKEN_LIMIT_EXCEEDED", "message": token_info.get("message"), "current_usage": token_info.get("current_usage"), "allocated_tokens": token_info.get("allocated_tokens"), "remaining_tokens": token_info.get("remaining_tokens") } ) else: return JSONResponse( status_code=503, content={ "error": "TOKEN_CHECK_FAILED", "message": token_info.get("message", "Unable to verify token availability") } ) # Step 3: Handle conversation (ChatGPT-like flow) conversation = None if request.conversation_id: # Continue existing conversation conversation = await conversation_service.get_conversation( request.conversation_id, context.user_id ) if not conversation: from fastapi.responses import JSONResponse return JSONResponse( status_code=404, content={ "error": "CONVERSATION_NOT_FOUND", "message": "Conversation not found or access denied" } ) else: # Create new conversation (first query in new chat session) conversation = await conversation_service.create_conversation( context.user_id, context.shop_id, request.query ) # Step 4: Add user message to conversation user_message = await conversation_service.add_message( conversation["conversation_id"] if isinstance(conversation, dict) else conversation.conversation_id, "user", request.query, tokens_used=0 # User messages don't consume tokens ) try: # Step 5: Process the query using appropriate processor # Check for processor selection via settings use_universal = settings.USE_UNIVERSAL_PROCESSOR # Allow override via request option if hasattr(request, "processor"): if request.processor == "universal": use_universal = True elif request.processor == "specific": use_universal = False # Determine processor to use query_lower = request.query.lower() simple_greetings = ["hi", "hello", "hey", "good morning", "good afternoon", "good evening"] if query_lower.strip() in simple_greetings: # Use pattern-based for simple greetings logger.info("Using pattern-based query processor for greeting") result = await query_processor.process_query( query=request.query, context=context.model_dump() ) elif use_universal: # Use the new universal processor logger.info("Using UNIVERSAL LLM query processor") result = await universal_llm_processor.process_query( query=request.query, context=context.model_dump() ) else: # Use the original LLM processor with specific tools logger.info("Using specific-tools LLM query processor") result = await llm_query_processor.process_query( query=request.query, context=context.model_dump() ) # Step 4: Process successful query response and update token usage if result["success"]: # Handle token usage if present token_usage = None actual_tokens_used = 0 if result["metadata"].get("token_usage"): token_usage_data = result["metadata"]["token_usage"] actual_tokens_used = token_usage_data.get("total_tokens", estimated_tokens) token_usage = TokenUsage( prompt_tokens=token_usage_data.get("prompt_tokens", 0), completion_tokens=token_usage_data.get("completion_tokens", 0), total_tokens=actual_tokens_used ) else: # Fallback to estimated tokens if no actual usage reported actual_tokens_used = estimated_tokens # Step 5: Update token usage in database usage_update_result = await token_service.update_token_usage( context.user_id, context.shop_id, actual_tokens_used, {"query": request.query[:100]} # First 100 chars for reference ) if not usage_update_result.get("success"): logger.warning(f"Failed to update token usage for user {context.user_id}: {usage_update_result.get('error')}") # Step 6: Get updated user token and subscription info user_token_info = await token_service.get_user_token_info( context.user_id, context.shop_id, actual_tokens_used ) subscription_info = await token_service.get_subscription_info( context.user_id, context.shop_id ) metadata = QueryMetadata( model_used=result["metadata"].get("model_used", "phi-3-mini"), execution_time_ms=result["metadata"]["execution_time_ms"], tools_called=result["metadata"]["tools_called"], confidence_score=result["metadata"]["confidence_score"], query_intent=result["metadata"].get("query_intent"), extracted_entities=result["metadata"].get("extracted_entities", []), token_usage=token_usage, tokens_per_second=result["metadata"].get("tokens_per_second") ) # Convert structured data structured_data = None if result.get("structured_data"): structured_data = StructuredData(**result["structured_data"]) # Step 7: Add assistant message to conversation conversation_id = conversation["conversation_id"] if isinstance(conversation, dict) else conversation.conversation_id assistant_message = await conversation_service.add_message( conversation_id, "assistant", result["response"], tokens_used=actual_tokens_used, execution_time_ms=result["metadata"]["execution_time_ms"], model_used=result["metadata"].get("model_used"), structured_data=structured_data.model_dump() if structured_data else None, metadata=result.get("debug", {}) ) logger.info(f"Query processed successfully for user {context.user_id}, tokens used: {actual_tokens_used}") return QueryResponse( success=True, response=result["response"], structured_data=structured_data, metadata=metadata, debug=result.get("debug"), user_token_info=user_token_info, subscription_info=subscription_info, # New conversation fields conversation_id=conversation_id, message_index=assistant_message.message_index ) else: return QueryResponse( success=False, response=result["response"], metadata=QueryMetadata( model_used="error", execution_time_ms=result["metadata"]["execution_time_ms"], tools_called=[], confidence_score=0.0 ), error=result.get("error") ) except HTTPException: # Let HTTP exceptions bubble up to FastAPI's exception handler raise except Exception as e: logger.error(f"API error processing query: {e}", exc_info=True) return QueryResponse( success=False, response="I encountered an error while processing your request. Please try again.", metadata=QueryMetadata( model_used="error", execution_time_ms=0, tools_called=[], confidence_score=0.0 ), error=str(e) )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/AnisurRahman06046/mcptestwithmodel'

If you have feedback or need assistance with the MCP directory API, please join our Discord server