"""Wrapper for Senzing SDK to provide async interface and initialization."""
import asyncio
import json
import os
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from typing import Any, Optional
# Add Senzing SDK to path
SENZING_SDK_PATH = "/data/etl/senzing/er/v4beta/sdk/python"
if SENZING_SDK_PATH not in sys.path:
sys.path.insert(0, SENZING_SDK_PATH)
from senzing import (
SzConfigManager,
SzDiagnostic,
SzEngine,
SzEngineFlags,
SzError,
SzNotFoundError,
SzProduct,
)
from senzing_core import SzAbstractFactoryCore
class SenzingSDKWrapper:
"""Async wrapper for Senzing SDK."""
def __init__(self):
self.factory: Optional[SzAbstractFactoryCore] = None
self.engine: Optional[SzEngine] = None
self.config_manager: Optional[SzConfigManager] = None
self.diagnostic: Optional[SzDiagnostic] = None
self.product: Optional[SzProduct] = None
self.executor = ThreadPoolExecutor(max_workers=10)
self._initialized = False
async def initialize(self):
"""Initialize Senzing SDK from environment variables."""
if self._initialized:
return
# Get configuration from environment
engine_config = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON")
if not engine_config:
raise ValueError(
"SENZING_ENGINE_CONFIGURATION_JSON environment variable not set"
)
module_name = os.getenv("SENZING_MODULE_NAME", "senzing-mcp")
instance_name = os.getenv("SENZING_INSTANCE_NAME", "senzing-mcp-server")
verbose_logging = int(os.getenv("SENZING_LOG_LEVEL", "0"))
# Initialize in thread pool to avoid blocking
await asyncio.get_event_loop().run_in_executor(
self.executor, self._sync_initialize, engine_config, module_name, instance_name, verbose_logging
)
self._initialized = True
def _sync_initialize(self, engine_config: str, module_name: str, instance_name: str, verbose_logging: int):
"""Synchronous initialization of Senzing SDK."""
try:
# Create factory with settings
# The factory automatically initializes all components
self.factory = SzAbstractFactoryCore(
instance_name=instance_name,
settings=engine_config,
verbose_logging=verbose_logging
)
# Create components (already initialized through factory)
self.engine = self.factory.create_engine()
self.config_manager = self.factory.create_configmanager()
self.diagnostic = self.factory.create_diagnostic()
self.product = self.factory.create_product()
except Exception as e:
raise RuntimeError(f"Failed to initialize Senzing SDK: {str(e)}")
async def _run_async(self, func, *args, **kwargs):
"""Run a synchronous SDK function asynchronously."""
if not self._initialized:
raise RuntimeError("SDK not initialized. Call initialize() first.")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor, partial(func, *args, **kwargs)
)
# Entity Operations
async def get_entity_by_entity_id(self, entity_id: int, flags: int = None) -> str:
"""Get entity details by entity ID with comprehensive information (same flags as sz_explorer)."""
try:
# Use the exact same flags as sz_explorer's get command
if flags is None:
flags = (
SzEngineFlags.SZ_ENTITY_INCLUDE_ENTITY_NAME |
SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_DATA |
SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_MATCHING_INFO |
SzEngineFlags.SZ_ENTITY_INCLUDE_ALL_RELATIONS |
SzEngineFlags.SZ_ENTITY_INCLUDE_RELATED_ENTITY_NAME |
SzEngineFlags.SZ_ENTITY_INCLUDE_RELATED_MATCHING_INFO |
SzEngineFlags.SZ_ENTITY_INCLUDE_RELATED_RECORD_SUMMARY |
SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_FEATURES |
SzEngineFlags.SZ_ENTITY_INCLUDE_ALL_FEATURES |
SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_UNMAPPED_DATA
)
result = await self._run_async(
self.engine.get_entity_by_entity_id, entity_id, flags
)
return result
except SzNotFoundError:
return json.dumps({"error": "Entity not found", "entity_id": entity_id})
except SzError as e:
return json.dumps({"error": str(e)})
async def search_by_attributes(self, attributes: str, flags: int = None) -> str:
"""Search for entities by attributes with comprehensive search information (same flags as sz_explorer).
If fewer than 11 entities are found, automatically performs a second search with full feature details.
"""
try:
# Use the exact same flags as sz_explorer's search command
if flags is None:
base_flags = (
SzEngineFlags.SZ_SEARCH_INCLUDE_ALL_ENTITIES |
SzEngineFlags.SZ_INCLUDE_FEATURE_SCORES |
SzEngineFlags.SZ_ENTITY_INCLUDE_ENTITY_NAME |
SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_DATA |
SzEngineFlags.SZ_INCLUDE_MATCH_KEY_DETAILS |
SzEngineFlags.SZ_SEARCH_INCLUDE_STATS |
SzEngineFlags.SZ_ENTITY_INCLUDE_ALL_RELATIONS |
SzEngineFlags.SZ_ENTITY_INCLUDE_RELATED_MATCHING_INFO
)
flags = base_flags
# First search with base flags
result = await self._run_async(
self.engine.search_by_attributes, attributes, flags
)
# Parse result to check entity count
result_data = json.loads(result)
entity_count = len(result_data.get("RESOLVED_ENTITIES", []))
# If fewer than 11 entities, redo search with full feature details
if entity_count < 11 and entity_count > 0:
enhanced_flags = flags | SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_FEATURES | SzEngineFlags.SZ_ENTITY_INCLUDE_ALL_FEATURES
result = await self._run_async(
self.engine.search_by_attributes, attributes, enhanced_flags
)
return result
except SzError as e:
return json.dumps({"error": str(e)})
# Relationship Operations
async def find_path_by_entity_id(
self, start_entity_id: int, end_entity_id: int, max_degrees: int, flags: int = 0
) -> str:
"""Find relationship path between two entities."""
try:
result = await self._run_async(
self.engine.find_path_by_entity_id,
start_entity_id,
end_entity_id,
max_degrees,
flags,
)
return result
except SzError as e:
return json.dumps({"error": str(e)})
async def find_network_by_entity_id(
self, entity_list: str, max_degrees: int, build_out_degrees: int, max_entities: int, flags: int = 0
) -> str:
"""Find network of related entities."""
try:
result = await self._run_async(
self.engine.find_network_by_entity_id,
entity_list,
max_degrees,
build_out_degrees,
max_entities,
flags,
)
return result
except SzError as e:
return json.dumps({"error": str(e)})
async def why_entities(
self, entity_id_1: int, entity_id_2: int, flags: int = None
) -> str:
"""Explain why two entities are related (same flags as sz_explorer)."""
try:
# Use the exact same flags as sz_explorer's why command (why_not)
if flags is None:
flags = (
SzEngineFlags.SZ_ENTITY_DEFAULT_FLAGS |
SzEngineFlags.SZ_ENTITY_INCLUDE_INTERNAL_FEATURES |
SzEngineFlags.SZ_ENTITY_INCLUDE_FEATURE_STATS |
SzEngineFlags.SZ_INCLUDE_FEATURE_SCORES |
SzEngineFlags.SZ_INCLUDE_MATCH_KEY_DETAILS
)
result = await self._run_async(
self.engine.why_entities, entity_id_1, entity_id_2, flags
)
return result
except SzError as e:
return json.dumps({"error": str(e)})
async def how_entity_by_entity_id(self, entity_id: int, flags: int = None) -> str:
"""Explain how an entity was resolved (same flags as sz_explorer)."""
try:
# Use the exact same flags as sz_explorer's how command (get_how_data)
if flags is None:
flags = (
SzEngineFlags.SZ_ENTITY_INCLUDE_ENTITY_NAME |
SzEngineFlags.SZ_ENTITY_INCLUDE_ALL_FEATURES |
SzEngineFlags.SZ_ENTITY_INCLUDE_INTERNAL_FEATURES |
SzEngineFlags.SZ_ENTITY_INCLUDE_FEATURE_STATS |
SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_DATA |
SzEngineFlags.SZ_ENTITY_INCLUDE_RECORD_FEATURES |
SzEngineFlags.SZ_INCLUDE_MATCH_KEY_DETAILS
)
result = await self._run_async(
self.engine.how_entity_by_entity_id, entity_id, flags
)
return result
except SzError as e:
return json.dumps({"error": str(e)})
# Configuration and Diagnostics
async def get_stats(self) -> str:
"""Get Senzing statistics and diagnostics."""
try:
stats = await self._run_async(self.engine.get_stats)
return stats
except SzError as e:
return json.dumps({"error": str(e)})
async def get_active_config_id(self) -> str:
"""Get the active configuration ID."""
try:
config_id = await self._run_async(self.config_manager.get_default_config_id)
return json.dumps({"active_config_id": config_id})
except SzError as e:
return json.dumps({"error": str(e)})
async def get_version(self) -> str:
"""Get Senzing product version."""
try:
version = await self._run_async(self.product.get_version)
return json.dumps({"version": version})
except SzError as e:
return json.dumps({"error": str(e)})
async def cleanup(self):
"""Clean up resources."""
if self._initialized:
await self._run_async(self._sync_cleanup)
self.executor.shutdown(wait=True)
def _sync_cleanup(self):
"""Synchronous cleanup of Senzing SDK."""
if self.factory:
self.factory.destroy()