/**
* ProbLog Integration
* Provides TypeScript interface to ProbLog via Python subprocess
*
* This module manages communication with the Python ProbLog library,
* handling request/response via JSON over stdin/stdout.
*/
import { spawn, ChildProcess } from 'child_process';
import { resolve, dirname } from 'path';
import { fileURLToPath } from 'url';
import { v4 as uuidv4 } from 'uuid';
import { EventEmitter } from 'events';
import {
ProbLogRequest,
ProbLogResponse,
QueryResult,
ValidationResult,
LearningResult,
SamplingResult,
MPEResult,
ExplanationResult,
TrainingExample
} from './types.js';
import { Loggers } from '../../utils/logger.js';
const logger = Loggers.manager;
// Get directory name in ES modules
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
/**
* Configuration options for ProbLog solver
*/
export interface ProbLogSolverConfig {
timeout?: number; // Default timeout in milliseconds (default: 10000)
pythonPath?: string; // Path to Python interpreter (default: 'python3')
maxSamples?: number; // Maximum samples for sampling (default: 10000)
inferenceMethod?: 'exact' | 'approximate'; // Inference method preference
}
/**
* Pending request in the queue
*/
interface PendingRequest {
id: string;
operation: string;
resolve: (result: any) => void;
reject: (error: Error) => void;
timeout: NodeJS.Timeout;
timestamp: number;
}
/**
* ProbLog Solver integration class
* Manages Python subprocess and request/response queue
*/
export class ProbLogSolver extends EventEmitter {
private pythonPath: string;
private scriptPath: string;
private defaultTimeout: number;
private maxSamples: number;
private inferenceMethod: 'exact' | 'approximate';
private pythonProcess: ChildProcess | null = null;
private requestQueue: Map<string, PendingRequest> = new Map();
private responseBuffer: string = '';
private isInitialized: boolean = false;
/**
* Create a new ProbLog solver instance
* @param config Configuration options
*/
constructor(config: ProbLogSolverConfig = {}) {
super();
// Use pyenv Python if available, fallback to system python3
const pyenvPython = '/Users/russellsmith/.pyenv/shims/python3';
this.pythonPath = config.pythonPath || pyenvPython;
this.scriptPath = resolve(__dirname, 'problogBridge.py');
this.defaultTimeout = config.timeout || 10000; // 10 seconds default for probabilistic inference
this.maxSamples = config.maxSamples || 10000;
this.inferenceMethod = config.inferenceMethod || 'exact';
}
/**
* Initialize the Python bridge
* Spawns Python process and sets up communication
*/
async initialize(): Promise<void> {
if (this.isInitialized) {
return;
}
try {
// Spawn Python process
this.pythonProcess = spawn(this.pythonPath, [this.scriptPath], {
stdio: ['pipe', 'pipe', 'pipe']
});
// Set up event handlers
this.pythonProcess.stdout?.on('data', (data: Buffer) => {
this.handleStdout(data);
});
this.pythonProcess.stderr?.on('data', (data: Buffer) => {
// Emit for debugging if needed
this.emit('stderr', data.toString());
});
this.pythonProcess.on('error', (error: Error) => {
this.handleProcessError(error);
});
this.pythonProcess.on('exit', (code: number | null) => {
this.handleProcessExit(code);
});
this.isInitialized = true;
this.emit('initialized');
} catch (error) {
throw new Error(`Failed to initialize ProbLog bridge: ${error}`);
}
}
/**
* Query probabilities
* @param program ProbLog program text
* @param queries Optional list of query predicates
* @param timeout Optional timeout override
* @returns Query results with probabilities
*/
async query(
program: string,
queries?: string[],
timeout?: number
): Promise<QueryResult> {
return this.executeRequest<QueryResult>('query', {
program,
queries
}, timeout);
}
/**
* Validate a ProbLog program
* @param program ProbLog program text
* @param timeout Optional timeout override
* @returns Validation result
*/
async validate(
program: string,
timeout?: number
): Promise<ValidationResult> {
return this.executeRequest<ValidationResult>('validate', {
program
}, timeout);
}
/**
* Learn parameters from examples (LFI - Learning from Interpretations)
* @param program ProbLog program with learnable parameters (t(_)::fact)
* @param examples Training examples
* @param timeout Optional timeout override
* @returns Learned weights
*/
async learn(
program: string,
examples: TrainingExample[],
timeout?: number
): Promise<LearningResult> {
return this.executeRequest<LearningResult>('learn', {
program,
examples
}, timeout);
}
/**
* Generate samples from the distribution
* @param program ProbLog program
* @param n Number of samples to generate
* @param timeout Optional timeout override
* @returns Sampling results
*/
async sample(
program: string,
n: number,
timeout?: number
): Promise<SamplingResult> {
const numSamples = Math.min(n, this.maxSamples);
return this.executeRequest<SamplingResult>('sample', {
program,
n: numSamples
}, timeout);
}
/**
* Compute most probable explanation (MPE)
* @param program ProbLog program
* @param evidence Observed evidence
* @param timeout Optional timeout override
* @returns Most probable explanation
*/
async mpe(
program: string,
evidence: Record<string, boolean>,
timeout?: number
): Promise<MPEResult> {
return this.executeRequest<MPEResult>('mpe', {
program,
evidence
}, timeout);
}
/**
* Generate explanation/derivation trace
* @param program ProbLog program
* @param queries Queries to explain
* @param timeout Optional timeout override
* @returns Explanation with derivation traces
*/
async explain(
program: string,
queries: string[],
timeout?: number
): Promise<ExplanationResult> {
return this.executeRequest<ExplanationResult>('explain', {
program,
queries
}, timeout);
}
/**
* Execute a generic request
* @param operation Operation name
* @param params Operation parameters
* @param timeout Optional timeout override
* @returns Result of the operation
*/
private async executeRequest<T>(
operation: string,
params: any,
timeout?: number
): Promise<T> {
if (!this.isInitialized) {
await this.initialize();
}
const requestId = uuidv4();
const timeoutMs = timeout || this.defaultTimeout;
return new Promise<T>((resolve, reject) => {
// Set up timeout
const timeoutHandle = setTimeout(() => {
this.requestQueue.delete(requestId);
reject(new Error(`ProbLog request timeout after ${timeoutMs}ms`));
}, timeoutMs);
// Store pending request
this.requestQueue.set(requestId, {
id: requestId,
operation,
resolve,
reject,
timeout: timeoutHandle,
timestamp: Date.now()
});
// Build request
const request: ProbLogRequest = {
id: requestId,
operation: operation as any,
params,
options: {
timeout: timeoutMs,
inference: this.inferenceMethod
}
};
// Send request to Python
const requestJson = JSON.stringify(request) + '\n';
this.pythonProcess?.stdin?.write(requestJson);
});
}
/**
* Handle stdout data from Python process
* @param data Buffer containing response data
*/
private handleStdout(data: Buffer): void {
// Append to buffer
this.responseBuffer += data.toString();
// Process complete lines (each line is a JSON response)
const lines = this.responseBuffer.split('\n');
this.responseBuffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (line.trim()) {
this.handleResponse(line);
}
}
}
/**
* Handle a complete JSON response line
* @param line JSON response string
*/
private handleResponse(line: string): void {
try {
const response: ProbLogResponse = JSON.parse(line);
const pending = this.requestQueue.get(response.id);
if (!pending) {
logger.warn('Received response for unknown request', { requestId: response.id });
return;
}
// Clear timeout
clearTimeout(pending.timeout);
this.requestQueue.delete(response.id);
// Resolve or reject based on status
if (response.status === 'success') {
pending.resolve(response.result);
} else {
const error = new Error(response.error || 'Unknown error');
(error as any).type = response.errorType;
(error as any).traceback = response.traceback;
pending.reject(error);
}
} catch (error) {
logger.error('Failed to parse ProbLog response', { line, error });
}
}
/**
* Handle Python process error
* @param error Error object
*/
private handleProcessError(error: Error): void {
logger.error('Python process error', { error: error.message });
// Reject all pending requests
for (const [id, pending] of this.requestQueue.entries()) {
clearTimeout(pending.timeout);
pending.reject(new Error(`Python process error: ${error.message}`));
}
this.requestQueue.clear();
this.isInitialized = false;
this.emit('error', error);
}
/**
* Handle Python process exit
* @param code Exit code
*/
private handleProcessExit(code: number | null): void {
logger.warn('Python process exited', { exitCode: code });
// Reject all pending requests
for (const [id, pending] of this.requestQueue.entries()) {
clearTimeout(pending.timeout);
pending.reject(new Error(`Python process exited with code ${code}`));
}
this.requestQueue.clear();
this.isInitialized = false;
this.pythonProcess = null;
this.emit('exit', code);
}
/**
* Shutdown the bridge
* Terminates the Python process gracefully
*/
async shutdown(): Promise<void> {
if (this.pythonProcess) {
this.pythonProcess.kill('SIGTERM');
// Wait for graceful shutdown
await new Promise<void>((resolve) => {
const timeout = setTimeout(() => {
this.pythonProcess?.kill('SIGKILL');
resolve();
}, 2000);
this.once('exit', () => {
clearTimeout(timeout);
resolve();
});
});
}
this.isInitialized = false;
}
/**
* Test ProbLog installation and Python script availability
* @returns true if ProbLog is available, false otherwise
*/
async testConnection(): Promise<boolean> {
try {
const program = '0.5::test.\nquery(test).';
const result = await this.query(program, ['test'], 1000);
return result.probabilities.test === 0.5;
} catch (error) {
return false;
}
}
/**
* Get ProbLog version information
* @returns Version string or error message
*/
async getVersion(): Promise<string> {
return new Promise((resolve, reject) => {
const python = spawn(this.pythonPath, [
'-c',
'from problog import __version__; print(__version__)'
]);
let output = '';
python.stdout?.on('data', (data: Buffer) => {
output += data.toString();
});
python.on('close', (code: number | null) => {
if (code === 0) {
resolve(output.trim());
} else {
reject(new Error('Failed to get ProbLog version'));
}
});
python.on('error', (error: Error) => {
reject(new Error(`Failed to check ProbLog: ${error.message}`));
});
setTimeout(() => {
python.kill();
reject(new Error('Timeout checking ProbLog version'));
}, 2000);
});
}
}
/**
* Create a solver instance with default configuration
*/
export function createProbLogSolver(config?: ProbLogSolverConfig): ProbLogSolver {
return new ProbLogSolver(config);
}
/**
* Helper function to create probabilistic fact
*/
export function probFact(probability: number, predicate: string): string {
return `${probability}::${predicate}.`;
}
/**
* Helper function to create query
*/
export function query(predicate: string): string {
return `query(${predicate}).`;
}
/**
* Helper function to create evidence
*/
export function evidence(predicate: string, value: boolean): string {
return `evidence(${predicate}, ${value}).`;
}
/**
* Helper function to create annotated disjunction
*/
export function annotatedDisjunction(alternatives: Array<{ prob: number; pred: string }>): string {
const parts = alternatives.map(a => `${a.prob}::${a.pred}`);
return parts.join('; ') + '.';
}
/**
* Helper function to create probabilistic rule
*/
export function probRule(probability: number, head: string, body: string): string {
return `${probability}::${head} :- ${body}.`;
}
/**
* Helper function to create deterministic rule
*/
export function rule(head: string, body: string): string {
return `${head} :- ${body}.`;
}
/**
* Helper function to build a complete ProbLog program
*/
export function buildProgram(parts: {
facts?: string[];
rules?: string[];
queries?: string[];
evidence?: string[];
}): string {
const sections: string[] = [];
if (parts.facts && parts.facts.length > 0) {
sections.push('% Facts');
sections.push(...parts.facts);
sections.push('');
}
if (parts.rules && parts.rules.length > 0) {
sections.push('% Rules');
sections.push(...parts.rules);
sections.push('');
}
if (parts.evidence && parts.evidence.length > 0) {
sections.push('% Evidence');
sections.push(...parts.evidence);
sections.push('');
}
if (parts.queries && parts.queries.length > 0) {
sections.push('% Queries');
sections.push(...parts.queries);
}
return sections.join('\n');
}