"""
Column Generation Optimization Tool
optimize_column_gen: Large-scale optimization using column generation.
Use cases:
- Cutting stock problems (minimize waste)
- Bin packing (pack items into containers)
- Crew scheduling (assign crew to flights/shifts)
- Vehicle routing with large route sets
"""
from typing import Dict, List, Any, Optional, Callable
import pulp as pl
from ..solvers.pulp_solver import PuLPSolver
from ..solvers.base_solver import ObjectiveSense
def optimize_column_gen(
master_problem: Dict[str, Any],
pricing_problem: Dict[str, Any],
initial_columns: Optional[List[Dict[str, Any]]] = None,
max_iterations: int = 100,
optimality_gap: float = 1e-6,
solver_options: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Column generation for large-scale structured optimization.
Iteratively generates promising columns (variables) instead of enumerating all upfront.
Useful when problem has exponentially many variables but most will be zero in optimal solution.
Args:
master_problem: Master problem specification:
- "constraints": [{"name": str, "type": str, "rhs": float}, ...]
- "objective": "minimize" or "maximize"
pricing_problem: Pricing subproblem specification:
- "type": "knapsack" | "shortest_path" | "custom"
- "parameters": {...} (problem-specific)
initial_columns: Starting feasible columns:
- [{"id": str, "cost": float, "coefficients": {...}}, ...]
max_iterations: Maximum column generation iterations
optimality_gap: Convergence tolerance (reduced cost threshold)
solver_options: Optional solver settings
Returns:
Dict with:
- status: "optimal" or "converged"
- optimal_solution: Selected columns and weights
- objective_value: Final objective value
- column_count: Total columns generated
- iterations: Number of CG iterations
- convergence_history: Bound progression
- monte_carlo_compatible: MC output
Example:
result = optimize_column_gen(
master_problem={
"constraints": [
{"name": "demand_A", "type": ">=", "rhs": 10},
{"name": "demand_B", "type": ">=", "rhs": 15}
],
"objective": "minimize"
},
pricing_problem={
"type": "knapsack",
"parameters": {"capacity": 100, "items": [...]}
},
initial_columns=[
{"id": "col_0", "cost": 50, "coefficients": {"demand_A": 5, "demand_B": 0}},
{"id": "col_1", "cost": 60, "coefficients": {"demand_A": 0, "demand_B": 7}}
],
max_iterations=50
)
"""
# Extract solver options
solver_opts = solver_options or {}
time_limit = solver_opts.get("time_limit", None)
verbose = solver_opts.get("verbose", False)
# Initialize columns
columns = initial_columns if initial_columns else _generate_trivial_initial_columns(master_problem)
if len(columns) == 0:
raise ValueError("No initial columns provided and could not generate trivial columns")
# Track initial column count before generation loop
initial_column_count = len(columns)
convergence_history = []
# Column generation main loop
for iteration in range(max_iterations):
if verbose:
print(f"\nIteration {iteration + 1}: {len(columns)} columns")
# Solve Restricted Master Problem (RMP)
rmp_result = _solve_rmp(
master_problem,
columns,
time_limit,
verbose
)
if rmp_result["status"] != "optimal":
return {
"status": rmp_result["status"],
"message": f"RMP infeasible at iteration {iteration}",
"iterations": iteration,
"column_count": len(columns)
}
# Record bounds
convergence_history.append({
"iteration": iteration,
"lower_bound": rmp_result["objective_value"],
"num_columns": len(columns)
})
# Extract dual values
duals = rmp_result.get("duals", {})
# Solve pricing problem
new_columns = _solve_pricing(
pricing_problem,
duals,
optimality_gap,
verbose
)
# Check convergence
if not new_columns:
if verbose:
print(f" Converged! No improving columns found.")
break
# Check if best column has negative reduced cost
best_reduced_cost = min(col.get("reduced_cost", 0) for col in new_columns)
if best_reduced_cost >= -optimality_gap:
if verbose:
print(f" Converged! Best reduced cost: {best_reduced_cost:.6f}")
break
# Add new columns
columns.extend(new_columns)
if verbose:
print(f" Added {len(new_columns)} new columns (reduced cost: {best_reduced_cost:.6f})")
# Build final result
obj_value = rmp_result.get("objective_value")
result = {
"solver": "column_generation",
"status": "failed" if obj_value is None else ("converged" if iteration < max_iterations - 1 else "optimal"),
"objective_value": obj_value,
"column_count": len(columns),
"iterations": iteration + 1,
"convergence_history": convergence_history,
"num_initial_columns": initial_column_count,
"num_generated_columns": len(columns) - initial_column_count
}
# Extract solution (selected columns)
selected_columns = []
column_solution = rmp_result.get("solution", {})
for col_idx, column in enumerate(columns):
col_var_name = f"lambda_{col_idx}"
if col_var_name in column_solution:
weight = column_solution[col_var_name]
if weight > 1e-6:
selected_columns.append({
"column_id": column.get("id", f"col_{col_idx}"),
"weight": weight,
"cost": column.get("cost", 0),
"coefficients": column.get("coefficients", {})
})
result["optimal_solution"] = selected_columns
# Simplified MC output
obj_value = result.get('objective_value')
if obj_value is not None:
outcome_str = f"Column generation: {len(selected_columns)} columns selected, cost={obj_value:.2f}"
else:
outcome_str = f"Column generation: {len(selected_columns)} columns selected, cost=unknown (solver failed)"
result["monte_carlo_compatible"] = {
"decision_variables": {f"column_{i}": col["weight"] for i, col in enumerate(selected_columns)},
"assumptions": [],
"outcome_function": outcome_str,
"recommended_next_tool": "validate_reasoning_confidence"
}
return result
def _solve_rmp(
master_problem: Dict[str, Any],
columns: List[Dict[str, Any]],
time_limit: Optional[float],
verbose: bool
) -> Dict[str, Any]:
"""Solve Restricted Master Problem with current column set."""
solver = PuLPSolver(problem_name="rmp")
# Create variables (one per column)
col_names = [f"lambda_{i}" for i in range(len(columns))]
variables = solver.create_variables(
names=col_names,
var_type="continuous",
bounds={name: (0, None) for name in col_names} # Non-negative
)
# Objective: minimize sum of column costs
obj_expr = pl.lpSum([
columns[i].get("cost", 0.0) * variables[col_names[i]]
for i in range(len(columns))
])
sense = (
ObjectiveSense.MINIMIZE if master_problem.get("objective") == "minimize"
else ObjectiveSense.MAXIMIZE
)
solver.set_objective(obj_expr, sense)
# Add covering constraints
# Handle both "constraints" list and "demands" dict formats
constraints = master_problem.get("constraints", [])
demands = master_problem.get("demands", {})
# Convert demands dict to constraints list if needed
if demands and not constraints:
for name, rhs in demands.items():
constraints.append({
"name": name,
"rhs": rhs,
"type": ">=" # Set covering default
})
for constr in constraints:
constr_name = constr.get("name", "constraint")
constr_type = constr.get("type", ">=")
rhs = constr.get("rhs", 0)
# Sum of column coefficients (support both "coefficients" and "coverage" keys)
expr = pl.lpSum([
(columns[i].get("coefficients") or columns[i].get("coverage", {})).get(constr_name, 0.0) * variables[col_names[i]]
for i in range(len(columns))
])
if constr_type == ">=":
solver.add_constraint(expr >= rhs, name=constr_name)
elif constr_type == "<=":
solver.add_constraint(expr <= rhs, name=constr_name)
else:
solver.add_constraint(expr == rhs, name=constr_name)
# Solve
status = solver.solve(time_limit=time_limit, verbose=verbose)
if not solver.is_feasible():
return {"status": status.value}
# Extract duals (shadow prices)
duals = solver.get_shadow_prices()
return {
"status": "optimal",
"objective_value": solver.get_objective_value(),
"solution": solver.get_solution(),
"duals": duals
}
def _solve_pricing(
pricing_problem: Dict[str, Any],
duals: Dict[str, float],
optimality_gap: float,
verbose: bool
) -> List[Dict[str, Any]]:
"""
Solve pricing problem to find columns with negative reduced cost.
Args:
pricing_problem: Pricing problem specification
duals: Dual values from RMP
optimality_gap: Threshold for improvement
verbose: Print output
Returns:
List of new columns with negative reduced cost
"""
problem_type = pricing_problem.get("type", "custom")
if problem_type == "knapsack":
return _solve_knapsack_pricing(pricing_problem, duals, optimality_gap)
elif problem_type == "shortest_path":
return _solve_shortest_path_pricing(pricing_problem, duals, optimality_gap)
else:
# No new columns (pricing not implemented)
if verbose:
print(f" Pricing type '{problem_type}' not implemented, stopping")
return []
def _solve_knapsack_pricing(
pricing_problem: Dict[str, Any],
duals: Dict[str, float],
optimality_gap: float
) -> List[Dict[str, Any]]:
"""
Solve knapsack pricing subproblem for cutting stock.
Finds patterns with negative reduced cost using dynamic programming.
Reduced cost = pattern_cost - sum(dual_i * coverage_i)
"""
# Get capacity (support multiple key names)
capacity = pricing_problem.get("stock_length") or pricing_problem.get("capacity", 100)
# Get item sizes (support multiple formats)
items = pricing_problem.get("items", {})
item_lengths = pricing_problem.get("item_lengths", items)
# Build item list with dual values
item_list = []
for name, size_info in item_lengths.items():
# Handle both direct size and dict format
if isinstance(size_info, dict):
size = size_info.get("size", size_info.get("length", 0))
else:
size = int(size_info)
# Find dual value (try multiple key formats)
dual_value = duals.get(name, 0)
if dual_value == 0:
dual_value = duals.get(f"demand_{name}", 0)
if dual_value == 0:
dual_value = duals.get(f"item_{name}", 0)
if size > 0:
item_list.append({
"name": name,
"size": size,
"value": max(dual_value, 0.001) # Ensure positive value for DP
})
if not item_list:
return []
# Ensure capacity is an integer
capacity = int(capacity)
# DP for unbounded knapsack: maximize sum(dual_i * count_i)
dp = [0.0] * (capacity + 1)
choice = [None] * (capacity + 1)
for c in range(1, capacity + 1):
for item in item_list:
size = item["size"]
value = item["value"]
if size <= c and dp[c - size] + value > dp[c]:
dp[c] = dp[c - size] + value
choice[c] = item["name"]
# Reconstruct solution pattern
pattern = {}
remaining = capacity
while remaining > 0 and choice[remaining] is not None:
item_name = choice[remaining]
pattern[item_name] = pattern.get(item_name, 0) + 1
# Find item size
item_size = next(i["size"] for i in item_list if i["name"] == item_name)
remaining -= item_size
if not pattern:
return []
# Calculate reduced cost: pattern_cost - sum(dual * coverage)
pattern_cost = 1.0 # Standard cutting stock: 1 stock used per pattern
total_dual_value = dp[capacity]
reduced_cost = pattern_cost - total_dual_value
# Only return if improving (negative reduced cost)
if reduced_cost >= -optimality_gap:
return []
# Return new column
return [{
"id": f"gen_{abs(hash(str(pattern))) % 100000}",
"cost": pattern_cost,
"coefficients": pattern,
"coverage": pattern, # Alias for compatibility
"reduced_cost": reduced_cost
}]
def _solve_shortest_path_pricing(
pricing_problem: Dict[str, Any],
duals: Dict[str, float],
optimality_gap: float
) -> List[Dict[str, Any]]:
"""Solve shortest path pricing subproblem."""
# Simplified implementation
return []
def _generate_trivial_initial_columns(
master_problem: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Generate trivial initial columns (one per constraint/demand)."""
constraints = master_problem.get("constraints", [])
demands = master_problem.get("demands", {})
# Convert demands dict to constraints list if needed
if demands and not constraints:
for name, rhs in demands.items():
constraints.append({
"name": name,
"rhs": rhs,
"type": ">="
})
columns = []
for i, constr in enumerate(constraints):
constr_name = constr.get("name", f"constr_{i}")
rhs = constr.get("rhs", 1)
# Create column that satisfies just this constraint
columns.append({
"id": f"initial_{constr_name}",
"cost": 1000.0, # High cost (will be replaced by better columns)
"coefficients": {constr_name: rhs},
"coverage": {constr_name: rhs} # Alias for compatibility
})
return columns