#!/usr/bin/env python3
"""
ProbLog Bridge Script
Reads JSON requests from stdin, executes ProbLog operations,
and writes JSON responses to stdout.
This script acts as a bridge between the TypeScript MCP server
and the Python ProbLog library.
"""
import json
import sys
import traceback
import time
from typing import Dict, Any, List, Optional
# Check ProbLog availability
try:
from problog import get_evaluatable
from problog.program import PrologString
from problog.learning import lfi
from problog.tasks import sample
from problog.logic import Term
PROBLOG_AVAILABLE = True
except ImportError as e:
PROBLOG_AVAILABLE = False
IMPORT_ERROR = str(e)
def send_response(response: Dict[str, Any]) -> None:
"""Send JSON response to stdout and flush"""
print(json.dumps(response))
sys.stdout.flush()
def handle_query(request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle query operation - compute probabilities
Args:
request: Request dict with program and queries
Returns:
Response dict with probabilities
"""
program_text = request['params']['program']
queries = request['params'].get('queries', [])
options = request.get('options', {})
start_time = time.time()
try:
# Parse program
program = PrologString(program_text)
# Create evaluatable
evaluator = get_evaluatable()
evaluatable = evaluator.create_from(program)
# Evaluate queries
result = evaluatable.evaluate()
# Convert to plain dict with float probabilities
probabilities = {}
for atom, prob in result.items():
atom_str = str(atom)
probabilities[atom_str] = float(prob)
elapsed_ms = (time.time() - start_time) * 1000
return {
'id': request['id'],
'status': 'success',
'result': {
'probabilities': probabilities,
'inferenceMethod': 'exact', # ProbLog uses exact inference by default
'timeMs': round(elapsed_ms, 2)
},
'statistics': {
'timeMs': round(elapsed_ms, 2),
'inferenceMethod': 'sdd' # Most common method
}
}
except Exception as e:
elapsed_ms = (time.time() - start_time) * 1000
return {
'id': request['id'],
'status': 'error',
'error': str(e),
'errorType': type(e).__name__,
'traceback': traceback.format_exc(),
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
def handle_validate(request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle validate operation - check program syntax
Args:
request: Request dict with program
Returns:
Response dict with validation result
"""
program_text = request['params']['program']
start_time = time.time()
try:
# Try to parse the program
program = PrologString(program_text)
# Extract components
facts = []
rules = []
queries = []
for line in program_text.split('\n'):
line = line.strip()
if not line or line.startswith('%'):
continue
# Probabilistic facts
if '::' in line and ':-' not in line:
facts.append(line)
# Rules
elif ':-' in line:
rules.append(line)
# Queries
elif line.startswith('query('):
queries.append(line)
elapsed_ms = (time.time() - start_time) * 1000
return {
'id': request['id'],
'status': 'success',
'result': {
'valid': True,
'facts': facts,
'rules': rules,
'queries': queries
},
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
except Exception as e:
elapsed_ms = (time.time() - start_time) * 1000
# Extract error details
error_msg = str(e)
error_type = type(e).__name__
# Parse error for line number if available
errors = [{
'type': error_type,
'message': error_msg
}]
return {
'id': request['id'],
'status': 'success',
'result': {
'valid': False,
'errors': errors
},
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
def handle_learn(request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle learn operation - parameter learning (LFI)
Args:
request: Request dict with program and examples
Returns:
Response dict with learned weights
"""
program_text = request['params']['program']
examples = request['params']['examples']
options = request.get('options', {})
start_time = time.time()
try:
# Parse program
program = PrologString(program_text)
# Convert examples to ProbLog format
# Examples should be list of dicts: {predicate: bool, ...}
# For now, pass them as-is and let ProbLog handle them
# Run LFI (Learning from Interpretations)
score, weights, atoms, iterations, problem = lfi.run_lfi(
program,
examples
)
elapsed_ms = (time.time() - start_time) * 1000
# Convert weights to dict
weights_dict = {}
for atom, weight in zip(atoms, weights):
weights_dict[str(atom)] = float(weight)
# Check convergence (simple heuristic)
convergence = iterations < options.get('maxIterations', 100)
return {
'id': request['id'],
'status': 'success',
'result': {
'weights': weights_dict,
'score': float(score),
'iterations': int(iterations),
'convergence': convergence,
'timeMs': round(elapsed_ms, 2)
},
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
except Exception as e:
elapsed_ms = (time.time() - start_time) * 1000
return {
'id': request['id'],
'status': 'error',
'error': str(e),
'errorType': type(e).__name__,
'traceback': traceback.format_exc(),
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
def handle_sample(request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle sample operation - generate samples
Args:
request: Request dict with program and n
Returns:
Response dict with samples
"""
program_text = request['params']['program']
n = request['params']['n']
start_time = time.time()
try:
# Parse program
program = PrologString(program_text)
# Generate samples
result = sample.sample(program, n=n, format='dict')
elapsed_ms = (time.time() - start_time) * 1000
# Process samples
samples = []
if isinstance(result, list):
samples = result
elif isinstance(result, dict):
# Convert dict to list of samples
samples = [result]
# Calculate statistics
statistics = {}
if samples:
# Calculate frequencies
frequencies = {}
for s in samples:
for key, value in s.items():
if key not in frequencies:
frequencies[key] = 0
if value:
frequencies[key] += 1
# Normalize to probabilities
for key in frequencies:
frequencies[key] = frequencies[key] / len(samples)
statistics = {
'frequencies': frequencies,
'mean': frequencies, # For boolean, mean = frequency
}
return {
'id': request['id'],
'status': 'success',
'result': {
'samples': samples,
'n': n,
'statistics': statistics,
'timeMs': round(elapsed_ms, 2)
},
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
except Exception as e:
elapsed_ms = (time.time() - start_time) * 1000
return {
'id': request['id'],
'status': 'error',
'error': str(e),
'errorType': type(e).__name__,
'traceback': traceback.format_exc(),
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
def handle_mpe(request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle MPE (Most Probable Explanation) operation
Args:
request: Request dict with program and evidence
Returns:
Response dict with most probable explanation
"""
program_text = request['params']['program']
evidence = request['params'].get('evidence', {})
start_time = time.time()
elapsed_ms = (time.time() - start_time) * 1000
# MPE is not directly supported in basic ProbLog API
# Would need to implement using inference or optimization
return {
'id': request['id'],
'status': 'error',
'error': 'MPE operation not yet implemented',
'errorType': 'NotImplementedError',
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
def handle_explain(request: Dict[str, Any]) -> Dict[str, Any]:
"""
Handle explain operation - generate derivation trace
Args:
request: Request dict with program and query
Returns:
Response dict with explanation
"""
program_text = request['params']['program']
queries = request['params'].get('queries', [])
start_time = time.time()
try:
# First compute probabilities
program = PrologString(program_text)
evaluator = get_evaluatable()
evaluatable = evaluator.create_from(program)
result = evaluatable.evaluate()
elapsed_ms = (time.time() - start_time) * 1000
# For now, provide basic explanation
# Full proof tree generation would require additional ProbLog features
probabilities = {}
for atom, prob in result.items():
probabilities[str(atom)] = float(prob)
explanation = []
for query in queries:
prob = probabilities.get(query, 0.0)
explanation.append({
'query': query,
'probability': prob,
'derivations': [] # Would need proof tree extraction
})
return {
'id': request['id'],
'status': 'success',
'result': {
'explanations': explanation,
'probabilities': probabilities,
'timeMs': round(elapsed_ms, 2)
},
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
except Exception as e:
elapsed_ms = (time.time() - start_time) * 1000
return {
'id': request['id'],
'status': 'error',
'error': str(e),
'errorType': type(e).__name__,
'traceback': traceback.format_exc(),
'statistics': {
'timeMs': round(elapsed_ms, 2)
}
}
def handle_request(request: Dict[str, Any]) -> Dict[str, Any]:
"""
Route request to appropriate handler
Args:
request: Request dict with operation field
Returns:
Response dict
"""
operation = request.get('operation')
handlers = {
'query': handle_query,
'validate': handle_validate,
'learn': handle_learn,
'sample': handle_sample,
'mpe': handle_mpe,
'explain': handle_explain
}
handler = handlers.get(operation)
if not handler:
return {
'id': request.get('id'),
'status': 'error',
'error': f'Unknown operation: {operation}',
'errorType': 'ValueError'
}
return handler(request)
def main():
"""
Main event loop
Read JSON requests from stdin, process, and write responses to stdout
"""
# Check if ProbLog is available
if not PROBLOG_AVAILABLE:
send_response({
'status': 'error',
'error': 'ProbLog not installed. Run: pip install problog',
'errorType': 'ImportError',
'traceback': IMPORT_ERROR
})
sys.exit(1)
# Read requests from stdin, one JSON object per line
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
# Parse request
request = json.loads(line)
# Handle request
response = handle_request(request)
# Send response
send_response(response)
except json.JSONDecodeError as e:
error_response = {
'status': 'error',
'error': f'Invalid JSON: {str(e)}',
'errorType': 'JSONDecodeError'
}
send_response(error_response)
except Exception as e:
error_response = {
'status': 'error',
'error': str(e),
'errorType': type(e).__name__,
'traceback': traceback.format_exc()
}
send_response(error_response)
if __name__ == '__main__':
main()