#!/usr/bin/env python3
"""
Z3 SMT Solver Python Script
Reads JSON from stdin, solves constraints using Z3, outputs JSON to stdout
"""
import sys
import json
import signal
from typing import Dict, Any, List, Optional
from z3 import *
# Set up timeout handler
def timeout_handler(signum, frame):
raise TimeoutError("Z3 solver timeout exceeded")
def create_variable(var_def: Dict[str, Any]):
"""
Create a Z3 variable from definition
Args:
var_def: Variable definition with 'name', 'type', and optional parameters
Returns:
Z3 variable
"""
name = var_def['name']
var_type = var_def['type']
if var_type == 'Int':
return Int(name)
elif var_type == 'Real':
return Real(name)
elif var_type == 'Bool':
return Bool(name)
elif var_type == 'String':
return String(name)
elif var_type == 'BitVec':
bit_width = var_def.get('bitWidth', 32)
return BitVec(name, bit_width)
elif var_type == 'Array':
# Default: Int -> Int array
return Array(name, IntSort(), IntSort())
else:
raise ValueError(f"Unsupported variable type: {var_type}")
def parse_constraint(constraint_str: str, var_dict: Dict[str, Any]) -> Any:
"""
Parse a constraint string into a Z3 expression
Args:
constraint_str: String representation of constraint
var_dict: Dictionary of variable name to Z3 variable
Returns:
Z3 constraint expression
"""
# Create a safe evaluation context
safe_dict = {
'__builtins__': {},
'And': And,
'Or': Or,
'Not': Not,
'Implies': Implies,
'ForAll': ForAll,
'Exists': Exists,
'Distinct': Distinct,
'If': If,
**var_dict
}
try:
return eval(constraint_str, safe_dict)
except Exception as e:
raise ValueError(f"Failed to parse constraint '{constraint_str}': {str(e)}")
def solve_constraints(input_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Solve constraints using Z3
Args:
input_data: JSON request with variables, constraints, and optional objective
Returns:
JSON response with status and solution
"""
try:
# Extract input parameters
variables = input_data.get('variables', [])
constraints = input_data.get('constraints', [])
objective = input_data.get('objective')
timeout = input_data.get('timeout', 5000) # Default 5 seconds
find_all = input_data.get('findAll', False)
max_solutions = input_data.get('maxSolutions', 100)
# Set up timeout (convert ms to seconds)
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(timeout / 1000) + 1)
# Create variables
var_dict = {}
for var_def in variables:
var_dict[var_def['name']] = create_variable(var_def)
# Choose solver based on whether optimization is needed
if objective:
solver = Optimize()
else:
solver = Solver()
# Set Z3 timeout (in milliseconds)
solver.set("timeout", timeout)
# Add constraints
constraint_objects = []
for constraint in constraints:
constraint_expr = parse_constraint(constraint['expression'], var_dict)
solver.add(constraint_expr)
constraint_objects.append(constraint_expr)
# Add optimization objective if specified
objective_value = None
if objective:
obj_expr = parse_constraint(objective['expression'], var_dict)
if objective['direction'] == 'maximize':
objective_value = solver.maximize(obj_expr)
else:
objective_value = solver.minimize(obj_expr)
# Solve
start_time = time.time() if 'time' in dir() else 0
result = solver.check()
solve_time = (time.time() - start_time) * 1000 if start_time else 0
# Cancel timeout
signal.alarm(0)
# Process result
if result == sat:
model = solver.model()
# Extract solution
solution = {}
for var_name, var_obj in var_dict.items():
if model[var_obj] is not None:
val = model[var_obj]
# Convert Z3 values to Python types
if is_int_value(val):
solution[var_name] = val.as_long()
elif is_rational_value(val):
# Return as decimal string for precision
solution[var_name] = val.as_decimal(10)
elif is_bool(val):
solution[var_name] = is_true(val)
elif is_string_value(val):
solution[var_name] = val.as_string()
else:
solution[var_name] = str(val)
# Build response
response = {
'status': 'sat',
'model': solution,
'statistics': {
'time': solve_time
}
}
# Add objective value if optimization
if objective_value is not None:
opt_val = model.evaluate(objective_value.value())
if is_int_value(opt_val):
response['objectiveValue'] = opt_val.as_long()
elif is_rational_value(opt_val):
response['objectiveValue'] = float(opt_val.as_decimal(10))
else:
response['objectiveValue'] = str(opt_val)
# Enumerate multiple solutions if requested
if find_all and not objective:
solutions = [solution]
while len(solutions) < max_solutions:
# Block current solution
block_constraint = Or([
var_dict[var_name] != model[var_dict[var_name]]
for var_name in var_dict.keys()
if model[var_dict[var_name]] is not None
])
solver.add(block_constraint)
# Check for next solution
if solver.check() == sat:
model = solver.model()
next_solution = {}
for var_name, var_obj in var_dict.items():
if model[var_obj] is not None:
val = model[var_obj]
if is_int_value(val):
next_solution[var_name] = val.as_long()
elif is_rational_value(val):
next_solution[var_name] = val.as_decimal(10)
elif is_bool(val):
next_solution[var_name] = is_true(val)
else:
next_solution[var_name] = str(val)
solutions.append(next_solution)
else:
break
response['models'] = solutions
del response['model']
return response
elif result == unsat:
# Try to get unsat core if solver supports it
unsat_core = None
try:
if hasattr(solver, 'unsat_core'):
core = solver.unsat_core()
core_indices = []
for constraint in core:
for i, c in enumerate(constraint_objects):
if str(constraint) == str(c):
core_indices.append(i)
break
if core_indices:
unsat_core = {
'constraints': core_indices,
'explanation': f"Constraints {core_indices} are inconsistent"
}
except:
pass # Unsat core not available
response = {
'status': 'unsat',
'reason': 'No solution exists - constraints are unsatisfiable',
'statistics': {
'time': solve_time
}
}
if unsat_core:
response['unsatCore'] = unsat_core
return response
else: # unknown
return {
'status': 'unknown',
'reason': solver.reason_unknown() if hasattr(solver, 'reason_unknown') else 'Solver returned unknown',
'statistics': {
'time': solve_time
}
}
except TimeoutError as e:
return {
'status': 'timeout',
'reason': str(e)
}
except Exception as e:
return {
'status': 'error',
'reason': f"Error: {type(e).__name__}: {str(e)}"
}
finally:
# Ensure timeout is cancelled
signal.alarm(0)
def main():
"""
Main entry point - reads JSON from stdin, solves, outputs JSON to stdout
"""
try:
# Read input from stdin
input_str = sys.stdin.read()
input_data = json.loads(input_str)
# Solve constraints
result = solve_constraints(input_data)
# Output result as JSON
print(json.dumps(result))
sys.exit(0)
except json.JSONDecodeError as e:
error_result = {
'status': 'error',
'reason': f"Invalid JSON input: {str(e)}"
}
print(json.dumps(error_result))
sys.exit(1)
except Exception as e:
error_result = {
'status': 'error',
'reason': f"Unexpected error: {type(e).__name__}: {str(e)}"
}
print(json.dumps(error_result))
sys.exit(1)
if __name__ == '__main__':
main()