#!/usr/bin/env python3
"""
Direct Integration Test for Slot Resolution System
This test directly tests the slot resolution middleware to verify
it works correctly with entity name-to-ID conversion.
"""
import asyncio
import json
import logging
from config import AppConfig
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def test_direct_slot_resolution():
"""
Test slot resolution middleware directly with real Elasticsearch data.
"""
logger.info("=" * 80)
logger.info("DIRECT SLOT RESOLUTION TEST")
logger.info("=" * 80)
# Initialize components
config = AppConfig.load()
# Initialize search client
from elasticsearch_search_lib import SearchClient
search_client = SearchClient(tenant_id=config.elasticsearch.tenant_id)
logger.info(f"✅ Search client initialized for tenant: {config.elasticsearch.tenant_id}")
# Initialize slot resolution system
from slot_resolution.middleware.slot_resolution_middleware import SlotResolutionMiddleware
from slot_resolution.core.resolver import SlotResolver
from slot_resolution.services.elasticsearch_service import ElasticsearchMatchingService
es_service = ElasticsearchMatchingService(search_client)
resolver = SlotResolver(es_service=es_service)
slot_middleware = SlotResolutionMiddleware(
tenant_id=config.elasticsearch.tenant_id,
resolver=resolver
)
logger.info("✅ Slot resolution middleware initialized")
# Test Case 1: Entity names that should resolve
logger.info("\n" + "=" * 80)
logger.info("TEST 1: Resolve entity names to IDs")
logger.info("=" * 80)
request_payload = {
"subject": "Laptop not working",
"description": "Screen is black",
"impact": "high",
"urgency": "high",
"priority": "high",
}
logger.info(f"Input payload:")
logger.info(json.dumps(request_payload, indent=2))
result = await slot_middleware.resolve_request(
request_payload=request_payload,
module="request",
user_id="test_user"
)
logger.info(f"\nResolution status: {result.status}")
logger.info(f"Resolved payload:")
logger.info(json.dumps(result.payload, indent=2))
if result.status == "READY":
# Check if IDs were added
has_impact_id = "impactId" in result.payload
has_urgency_id = "urgencyId" in result.payload
has_priority_id = "priorityId" in result.payload
logger.info(f"\n✅ Impact resolved: {has_impact_id}")
logger.info(f"✅ Urgency resolved: {has_urgency_id}")
logger.info(f"✅ Priority resolved: {has_priority_id}")
if has_impact_id or has_urgency_id or has_priority_id:
logger.info("\n✅ TEST 1 PASSED: Entity names were resolved to IDs")
else:
logger.warning("\n⚠️ TEST 1 WARNING: No IDs found in resolved payload")
elif result.status == "DISAMBIGUATION_REQUIRED":
logger.info(f"\n⚠️ Disambiguation required:")
logger.info(json.dumps(result.disambiguations, indent=2))
else:
logger.error(f"\n❌ TEST 1 FAILED: Unexpected status {result.status}")
if result.error:
logger.error(f"Error: {result.error}")
# Test Case 2: Search for specific entities
logger.info("\n" + "=" * 80)
logger.info("TEST 2: Search for specific entities in Elasticsearch")
logger.info("=" * 80)
# Test impact search
logger.info("\nSearching for 'high' impact...")
impact_results = await es_service.fuzzy_search(
entity_type="impact",
query="high",
filters={},
limit=5
)
logger.info(f"Found {len(impact_results)} impact results:")
for result in impact_results[:3]:
logger.info(f" - ID: {result['id']}, Name: {result['canonical_name']}, Score: {result['confidence']:.3f}")
# Test urgency search
logger.info("\nSearching for 'high' urgency...")
urgency_results = await es_service.fuzzy_search(
entity_type="urgency",
query="high",
filters={},
limit=5
)
logger.info(f"Found {len(urgency_results)} urgency results:")
for result in urgency_results[:3]:
logger.info(f" - ID: {result['id']}, Name: {result['canonical_name']}, Score: {result['confidence']:.3f}")
# Test priority search
logger.info("\nSearching for 'high' priority...")
priority_results = await es_service.fuzzy_search(
entity_type="priority",
query="high",
filters={},
limit=5
)
logger.info(f"Found {len(priority_results)} priority results:")
for result in priority_results[:3]:
logger.info(f" - ID: {result['id']}, Name: {result['canonical_name']}, Score: {result['confidence']:.3f}")
if impact_results or urgency_results or priority_results:
logger.info("\n✅ TEST 2 PASSED: Elasticsearch search is working")
else:
logger.error("\n❌ TEST 2 FAILED: No results from Elasticsearch")
# Test Case 3: Full request with multiple fields
logger.info("\n" + "=" * 80)
logger.info("TEST 3: Full request with multiple entity fields")
logger.info("=" * 80)
full_request = {
"subject": "Network connectivity issue",
"description": "Cannot access internal servers",
"impact": "medium",
"urgency": "medium",
"priority": "medium",
"category": "network",
"location": "bangalore",
}
logger.info(f"Input payload:")
logger.info(json.dumps(full_request, indent=2))
result = await slot_middleware.resolve_request(
request_payload=full_request,
module="request",
user_id="test_user"
)
logger.info(f"\nResolution status: {result.status}")
logger.info(f"Resolved payload:")
logger.info(json.dumps(result.payload, indent=2))
# Count resolved fields
id_fields = [k for k in result.payload.keys() if k.endswith("Id")]
logger.info(f"\n✅ Resolved {len(id_fields)} entity fields: {id_fields}")
if result.status == "READY" and len(id_fields) > 0:
logger.info("\n✅ TEST 3 PASSED: Multiple entities resolved successfully")
else:
logger.warning(f"\n⚠️ TEST 3 WARNING: Status={result.status}, Resolved fields={len(id_fields)}")
logger.info("\n" + "=" * 80)
logger.info("DIRECT SLOT RESOLUTION TEST COMPLETE")
logger.info("=" * 80)
if __name__ == "__main__":
asyncio.run(test_direct_slot_resolution())