#!/usr/bin/env node
import { z } from "zod";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import * as fs from "fs/promises";
import * as path from "path";
import * as os from "os";
import { spawn } from "child_process";
import { SimpleVisumController } from "./simple-visum-controller.js";
import { PersistentVisumController } from "./persistent-visum-controller.js";
import { ProjectInstanceManager } from "./project-instance-manager.js";
import { ProjectServerManager } from "./project-server-manager.js";
// =============================================================================
// INITIALIZATION
// =============================================================================
// Initialize controllers with singleton pattern
const visumController = PersistentVisumController.getInstance();
const legacyController = SimpleVisumController.getInstance(); // For backward compatibility
const projectManager = ProjectInstanceManager.getInstance(); // Project-specific instances
const serverManager = ProjectServerManager.getInstance(); // TCP server manager
// π§ CONFIGURAZIONE VISUM MCP TOOLS
const VISUM_MCP_CONFIG = {
PYTHON_PATH: "H:\\Program Files\\PTV Vision\\PTV Visum 2025\\Exe\\Python\\python.exe",
VISUM_EXE: "H:\\Program Files\\PTV Vision\\PTV Visum 2025\\Exe\\Visum250.exe",
TEMP_DIR: "C:\\temp\\mcp_visum"
};
// =============================================================================
// STORAGE & STATE MANAGEMENT
// =============================================================================
interface ThinkingStep {
id: number;
thought: string;
reasoning?: string;
timestamp: string;
category?: string;
}
interface ThinkingSession {
currentSteps: ThinkingStep[];
totalSteps: number;
isComplete: boolean;
summary?: string;
metadata?: Record<string, any>;
pdfContext?: {
filename?: string;
content?: string;
metadata?: Record<string, any>;
};
}
let thinkingSession: ThinkingSession = {
currentSteps: [],
totalSteps: 0,
isComplete: false
};
const STORAGE_FILE = path.join(os.homedir(), '.mcp-sequential-thinking.json');
async function initializeStorage() {
try {
const data = await fs.readFile(STORAGE_FILE, 'utf-8');
const saved = JSON.parse(data);
thinkingSession = { ...thinkingSession, ...saved };
console.error(`LOADED: thinking state: ${thinkingSession.currentSteps.length} steps`);
} catch (error) {
console.error("INIT: Starting with fresh thinking state");
}
}
async function saveThinkingState() {
try {
await fs.writeFile(STORAGE_FILE, JSON.stringify(thinkingSession, null, 2));
} catch (error) {
console.error("β οΈ Failed to save thinking state:", error);
}
}
async function loadThinkingState() {
return thinkingSession;
}
// =============================================================================
// MCP SERVER SETUP
// =============================================================================
const server = new McpServer({
name: "visum-thinker",
version: "2.0.0",
});
// =============================================================================
// SEQUENTIAL THINKING TOOLS
// =============================================================================
// Sequential thinking tool
server.tool(
"sequential_thinking",
"Engage in systematic step-by-step thinking to analyze complex problems, make decisions, or explore ideas. Each thought builds on the previous ones, creating a chain of reasoning.",
{
thought: z.string().describe("Your current thought or analysis step"),
reasoning: z.string().optional().describe("Optional: Explain why this thought follows from previous ones"),
category: z.string().optional().describe("Optional: Categorize this thought (analysis, synthesis, evaluation, etc.)"),
revise_step: z.number().optional().describe("Optional: Revise a previous step by its number"),
branch_from_step: z.number().optional().describe("Optional: Create a new reasoning branch from a specific step number"),
target_steps: z.number().optional().describe("Optional: Target number of thinking steps for this session"),
complete_thinking: z.boolean().optional().describe("Optional: Mark the thinking session as complete")
},
async ({ thought, reasoning, category, revise_step, branch_from_step, target_steps, complete_thinking }) => {
try {
const timestamp = new Date().toISOString();
// Handle revision of existing step
if (revise_step !== undefined) {
const stepIndex = thinkingSession.currentSteps.findIndex(step => step.id === revise_step);
if (stepIndex !== -1) {
thinkingSession.currentSteps[stepIndex] = {
...thinkingSession.currentSteps[stepIndex],
thought,
reasoning,
category,
timestamp
};
} else {
return {
content: [{
type: "text",
text: `β Step ${revise_step} not found. Available steps: ${thinkingSession.currentSteps.map(s => s.id).join(', ')}`
}]
};
}
}
// Handle branching from existing step
else if (branch_from_step !== undefined) {
const branchPoint = thinkingSession.currentSteps.find(step => step.id === branch_from_step);
if (!branchPoint) {
return {
content: [{
type: "text",
text: `β Cannot branch from step ${branch_from_step}. Step not found.`
}]
};
}
// Create new branch
thinkingSession.totalSteps++;
const newStep: ThinkingStep = {
id: thinkingSession.totalSteps,
thought: `[Branch from Step ${branch_from_step}] ${thought}`,
reasoning,
category,
timestamp
};
thinkingSession.currentSteps.push(newStep);
}
// Add new step
else {
thinkingSession.totalSteps++;
const newStep: ThinkingStep = {
id: thinkingSession.totalSteps,
thought,
reasoning,
category,
timestamp
};
thinkingSession.currentSteps.push(newStep);
}
// Handle completion
if (complete_thinking) {
thinkingSession.isComplete = true;
thinkingSession.summary = `Sequential thinking completed with ${thinkingSession.currentSteps.length} steps.`;
}
// Update target if provided
if (target_steps) {
thinkingSession.metadata = { ...thinkingSession.metadata, target_steps };
}
// Auto-save state
await saveThinkingState();
// Generate progress report
const progress = target_steps ? ` (${thinkingSession.currentSteps.length}/${target_steps})` : '';
const recentSteps = thinkingSession.currentSteps.slice(-3);
let content = `π§ **Sequential Thinking${progress}**\n\n`;
if (revise_step !== undefined) {
content += `π **Step ${revise_step} Revised**\n\n`;
} else if (branch_from_step !== undefined) {
content += `πΏ **New Branch from Step ${branch_from_step}**\n\n`;
} else {
content += `π **Step ${thinkingSession.totalSteps} Added**\n\n`;
}
content += "**Recent Thinking Chain:**\n";
recentSteps.forEach(step => {
const categoryLabel = step.category ? ` [${step.category}]` : '';
content += `**${step.id}.${categoryLabel}** ${step.thought}\n`;
if (step.reasoning) {
content += ` *Reasoning: ${step.reasoning}*\n`;
}
content += '\n';
});
if (thinkingSession.isComplete) {
content += `β
**Thinking Complete**: ${thinkingSession.summary}\n\n`;
}
content += `*Continue with next thought or use 'get_thinking_summary' to review all steps*`;
return {
content: [{
type: "text",
text: content
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `β **Error in sequential thinking**: ${error instanceof Error ? error.message : String(error)}`
}]
};
}
}
);
// Reset thinking tool
server.tool(
"reset_thinking",
"Clear the current thinking session and start fresh",
{},
async () => {
try {
const previousSteps = thinkingSession.currentSteps.length;
thinkingSession = {
currentSteps: [],
totalSteps: 0,
isComplete: false
};
await saveThinkingState();
return {
content: [{
type: "text",
text: `ποΈ **Thinking Reset**\n\nPrevious session cleared (${previousSteps} steps removed).\nReady for fresh sequential thinking.`
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `β **Error resetting thinking**: ${error instanceof Error ? error.message : String(error)}`
}]
};
}
}
);
// Get thinking summary tool
server.tool(
"get_thinking_summary",
"Get a complete summary of the current thinking session with all steps and analysis",
{},
async () => {
try {
if (thinkingSession.currentSteps.length === 0) {
return {
content: [{
type: "text",
text: "π **No Active Thinking Session**\n\nUse 'sequential_thinking' to start your first thought."
}]
};
}
const targetSteps = thinkingSession.metadata?.target_steps;
const progress = targetSteps ? ` (${thinkingSession.currentSteps.length}/${targetSteps})` : '';
let content = `π **Thinking Session Summary${progress}**\n\n`;
content += `**Status**: ${thinkingSession.isComplete ? 'Complete β
' : 'In Progress π'}\n`;
content += `**Total Steps**: ${thinkingSession.currentSteps.length}\n`;
content += `**Started**: ${thinkingSession.currentSteps[0]?.timestamp ? new Date(thinkingSession.currentSteps[0].timestamp).toLocaleString() : 'Unknown'}\n\n`;
content += "**Complete Thinking Chain:**\n";
thinkingSession.currentSteps.forEach((step, index) => {
const categoryLabel = step.category ? ` [${step.category}]` : '';
content += `**${step.id}.${categoryLabel}** ${step.thought}\n`;
if (step.reasoning) {
content += ` *Reasoning: ${step.reasoning}*\n`;
}
content += '\n';
});
if (thinkingSession.summary) {
content += `**Session Summary**: ${thinkingSession.summary}\n`;
}
if (thinkingSession.pdfContext) {
content += `**PDF Context**: ${thinkingSession.pdfContext.filename || 'Document loaded'}\n`;
}
return {
content: [{
type: "text",
text: content
}]
};
} catch (error) {
return {
content: [{
type: "text",
text: `β **Error getting thinking summary**: ${error instanceof Error ? error.message : String(error)}`
}]
};
}
}
);
// =============================================================================
// VISUM INTEGRATION TOOLS - Using SimpleVisumController with Persistent VisumPy
// =============================================================================
// Project Launch Tool - DEPRECATED! Use project_open instead
server.tool(
"visum_launch_project",
"β οΈ DEPRECATED: Use 'project_open' tool instead. This tool is obsolete and slower than the new TCP-based project_open tool.",
{
projectPath: z.string().describe("Full path to the Visum project file (.ver)")
},
async ({ projectPath }) => {
try {
const result = await visumController.executeVisumAnalysis(
`# Load specific Visum project
import time
try:
start_time = time.time()
visum.LoadVersion(r"${projectPath}")
load_time = time.time() - start_time
# Get basic network info
num_nodes = visum.Net.Nodes.Count
num_links = visum.Net.Links.Count
num_zones = visum.Net.Zones.Count
result = {
'project_path': r"${projectPath}",
'loaded_successfully': True,
'load_time_seconds': round(load_time, 3),
'network_summary': {
'nodes': num_nodes,
'links': num_links,
'zones': num_zones
}
}
except Exception as e:
result = {
'project_path': r"${projectPath}",
'loaded_successfully': False,
'error': str(e)
}`,
`Loading Visum project: ${projectPath}`
);
if (result.success && result.result?.loaded_successfully) {
const info = result.result;
return {
content: [
{
type: "text",
text: `β
**Progetto Visum Caricato**\n\n` +
`**Progetto:** \`${info.project_path}\`\n` +
`**Tempo di Caricamento:** ${info.load_time_seconds}s\n\n` +
`**Statistiche Rete:**\n` +
`β’ **Nodi:** ${info.network_summary?.nodes?.toLocaleString() || 'N/A'}\n` +
`β’ **Link:** ${info.network_summary?.links?.toLocaleString() || 'N/A'}\n` +
`β’ **Zone:** ${info.network_summary?.zones?.toLocaleString() || 'N/A'}\n\n` +
`**Performance:**\n` +
`β’ **Tempo Esecuzione:** ${result.executionTimeMs?.toFixed(3) || 'N/A'}ms\n\n` +
`*Progetto pronto per l'analisi della rete*`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Caricamento Progetto**\n\n` +
`**Progetto:** \`${projectPath}\`\n` +
`**Errore:** ${result.result?.error || result.error || 'Errore sconosciuto'}\n\n` +
`*Verificare che il percorso del file sia corretto e che il progetto sia valido*`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore durante il caricamento:**\n\n${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Network Analysis Tool
server.tool(
"visum_network_analysis",
"Analyze the loaded Visum network with detailed statistics and performance metrics",
{
includeGeometry: z.boolean().optional().default(false).describe("Include geometric analysis of network elements"),
sampleSize: z.number().optional().default(50).describe("Number of sample elements to analyze (default: 50)")
},
async ({ includeGeometry, sampleSize }) => {
try {
let analysisCode = `
# Comprehensive network analysis
import time
try:
start_time = time.time()
# Basic network statistics
num_nodes = visum.Net.Nodes.Count
num_links = visum.Net.Links.Count
num_zones = visum.Net.Zones.Count
num_stops = visum.Net.Stops.Count if hasattr(visum.Net, 'Stops') else 0
num_lines = visum.Net.Lines.Count if hasattr(visum.Net, 'Lines') else 0
# Sample node analysis
sample_nodes = []
if num_nodes > 0:
node_iter = visum.Net.Nodes.Iterator
count = 0
while node_iter.Valid and count < ${sampleSize}:
node = node_iter.Item
sample_nodes.append({
'id': node.AttValue('No'),
'x': node.AttValue('XCoord') if hasattr(node, 'AttValue') else None,
'y': node.AttValue('YCoord') if hasattr(node, 'AttValue') else None
})
node_iter.Next()
count += 1
# Sample link analysis
sample_links = []
if num_links > 0:
link_iter = visum.Net.Links.Iterator
count = 0
while link_iter.Valid and count < ${sampleSize}:
link = link_iter.Item
sample_links.append({
'from_node': link.AttValue('FromNodeNo'),
'to_node': link.AttValue('ToNodeNo'),
'length': link.AttValue('Length') if hasattr(link, 'AttValue') else None
})
link_iter.Next()
count += 1
`;
if (includeGeometry) {
analysisCode += `
# Geometric analysis
total_length = 0.0
if num_links > 0:
link_iter = visum.Net.Links.Iterator
while link_iter.Valid:
try:
length = link_iter.Item.AttValue('Length')
if length:
total_length += length
except:
pass
link_iter.Next()
`;
}
analysisCode += `
analysis_time = time.time() - start_time
result = {
'network_statistics': {
'nodes': num_nodes,
'links': num_links,
'zones': num_zones,
'stops': num_stops,
'lines': num_lines
},
'sample_analysis': {
'nodes': sample_nodes[:10], # Limit output
'links': sample_links[:10] # Limit output
},${includeGeometry ? `
'geometric_analysis': {
'total_network_length_km': round(total_length / 1000, 2) if 'total_length' in locals() else None
},` : ''}
'performance': {
'analysis_time_seconds': round(analysis_time, 3)
},
'analysis_successful': True
}
except Exception as e:
result = {
'analysis_successful': False,
'error': str(e)
}`;
const result = await visumController.executeCustomCode(
analysisCode,
"Analisi completa della rete Visum"
);
if (result.success && result.result?.analysis_successful) {
const analysis = result.result;
const stats = analysis.network_statistics;
let geometryInfo = '';
if (includeGeometry && analysis.geometric_analysis) {
geometryInfo = `**Analisi Geometrica:**\n` +
`β’ **Lunghezza Totale Rete:** ${analysis.geometric_analysis.total_network_length_km || 'N/A'} km\n\n`;
}
let sampleInfo = '';
if (analysis.sample_analysis) {
const sampleNodes = analysis.sample_analysis.nodes?.length || 0;
const sampleLinks = analysis.sample_analysis.links?.length || 0;
sampleInfo = `**Analisi Campionaria:**\n` +
`β’ **Nodi Analizzati:** ${sampleNodes}\n` +
`β’ **Link Analizzati:** ${sampleLinks}\n\n`;
}
return {
content: [
{
type: "text",
text: `β
**Analisi Rete Completata**\n\n` +
`**Statistiche Rete:**\n` +
`β’ **Nodi:** ${stats.nodes?.toLocaleString() || 'N/A'}\n` +
`β’ **Link:** ${stats.links?.toLocaleString() || 'N/A'}\n` +
`β’ **Zone:** ${stats.zones?.toLocaleString() || 'N/A'}\n` +
`β’ **Fermate:** ${stats.stops?.toLocaleString() || 'N/A'}\n` +
`β’ **Linee:** ${stats.lines?.toLocaleString() || 'N/A'}\n\n` +
sampleInfo +
geometryInfo +
`**Performance:**\n` +
`β’ **Tempo Analisi:** ${analysis.performance?.analysis_time_seconds || 'N/A'}s\n` +
`β’ **Tempo Esecuzione Tool:** ${result.executionTimeMs?.toFixed(3) || 'N/A'}ms\n\n` +
`*Analisi della rete completata con successo*`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Analisi Rete**\n\n` +
`**Errore:** ${result.result?.error || result.error || 'Errore sconosciuto'}\n\n` +
`*Assicurarsi che un progetto Visum sia caricato correttamente*`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore durante l'analisi:**\n\n${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Custom Python Analysis Tool
server.tool(
"visum_custom_analysis",
"Execute custom Python code with access to the active Visum instance",
{
pythonCode: z.string().describe("Python code to execute. The 'visum' variable contains the active VisumPy instance. Store results in 'result' dictionary."),
description: z.string().optional().describe("Optional description of the analysis being performed")
},
async ({ pythonCode, description }) => {
try {
const result = await visumController.executeCustomCode(pythonCode, description);
if (result.success) {
let analysisResults = '';
if (result.result) {
analysisResults = `**Risultati Analisi:**\n\`\`\`json\n${JSON.stringify(result.result, null, 2)}\n\`\`\`\n\n`;
}
let executionOutput = '';
if (result.output) {
const outputLines = result.output.split('\n').filter(line =>
line.trim() &&
!line.includes('=====') &&
!line.includes('Executing analysis') &&
!line.includes('SUCCESS: Tool call completed')
);
if (outputLines.length > 0) {
executionOutput = `**Output Esecuzione:**\n\`\`\`\n${outputLines.slice(-20).join('\n')}\n\`\`\`\n\n`;
}
}
return {
content: [
{
type: "text",
text: `β
**Analisi Personalizzata Completata**\n\n` +
`**Descrizione:** ${description || 'Analisi Python personalizzata'}\n\n` +
analysisResults +
executionOutput +
`**Performance:**\n` +
`β’ **Tempo Esecuzione:** ${result.executionTimeMs?.toFixed(3) || 'N/A'}ms\n\n` +
`*Eseguito su istanza VisumPy persistente*`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Analisi personalizzata fallita**\n\n` +
`**Errore:** ${result.error || 'Errore sconosciuto'}\n\n` +
`**Codice tentato:**\n\`\`\`python\n${pythonCode}\n\`\`\`\n\n` +
`*Controllare la sintassi Python e l'uso corretto della variabile 'visum'*`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore esecuzione analisi personalizzata:**\n\n${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Network Statistics Tool
server.tool(
"visum_network_stats",
"Get comprehensive network statistics from the loaded Visum project",
{},
async () => {
try {
const result = await visumController.getNetworkStats();
if (result.success) {
return {
content: [
{
type: "text",
text: `β
**Statistiche Rete PERSISTENTE**\n\n` +
`**Riepilogo Rete:**\n` +
`β’ **Nodi:** ${result.result?.nodes?.toLocaleString() || 'N/A'}\n` +
`β’ **Link:** ${result.result?.links?.toLocaleString() || 'N/A'}\n` +
`β’ **Zone:** ${result.result?.zones?.toLocaleString() || 'N/A'}\n\n` +
`**Performance ULTRA-VELOCE:**\n` +
`β’ **Tempo Query:** ${result.result?.query_time_ms?.toFixed(3) || 'N/A'}ms\n` +
`β’ **Tempo Totale:** ${result.executionTimeMs?.toFixed(3) || 'N/A'}ms\n` +
`β’ **Persistente:** ${result.result?.persistent ? 'β
SΓ' : 'β NO'}\n\n` +
`*Dati recuperati da istanza VisumPy PERSISTENTE - Ultra-veloce!*`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Impossibile ottenere statistiche rete**\n\n` +
`Il progetto Visum potrebbe non essere caricato o accessibile.\n` +
`La prima chiamata potrebbe richiedere piΓΉ tempo per inizializzare l'istanza VisumPy.`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore ottenimento statistiche rete:**\n\n${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Health Check Tool
server.tool(
"visum_health_check",
"Check the health and status of the VisumPy instance",
{},
async () => {
try {
// First check persistent process health
const healthResult = await visumController.checkInstanceHealth();
const statsResult = await visumController.getNetworkStats();
if (statsResult.success && healthResult.success) {
const nodes = statsResult.result?.nodes || 0;
const isHealthy = nodes > 0;
const isPersistent = statsResult.result?.persistent === true;
const queryTime = statsResult.result?.query_time_ms || 0;
const performance = queryTime < 50 ? 'Ultra-Veloce π' :
queryTime < 200 ? 'Veloce β‘' :
queryTime < 1000 ? 'Normale' : 'Lenta';
return {
content: [
{
type: "text",
text: `${isHealthy ? 'π' : 'β οΈ'} **Controllo Salute Istanza VisumPy PERSISTENTE**\n\n` +
`**Stato:** ${isHealthy ? 'ATTIVO e PERSISTENTE β
' : 'Attenzione β οΈ'}\n` +
`**Performance:** ${performance}\n` +
`**Tempo Query:** ${queryTime.toFixed(1)}ms\n` +
`**Tempo Totale:** ${statsResult.executionTimeMs?.toFixed(3) || 'N/A'}ms\n` +
`**Persistente:** ${isPersistent ? 'β
SΓ' : 'β NO'}\n\n` +
`**Dettagli Istanza:**\n` +
`β’ **Nodi Disponibili:** ${nodes.toLocaleString()}\n` +
`β’ **Link Disponibili:** ${statsResult.result?.links?.toLocaleString() || 'N/A'}\n` +
`β’ **Zone Disponibili:** ${statsResult.result?.zones?.toLocaleString() || 'N/A'}\n` +
`β’ **Richieste Processate:** ${healthResult.result?.requestCount || 0}\n` +
`β’ **Progetto Caricato:** ${healthResult.result?.projectLoaded ? 'β
SΓ' : 'β NO'}\n\n` +
`*${isHealthy && isPersistent ? 'π Istanza persistente pronta - Performance ultra-veloce garantita!' : 'Istanza potrebbe necessitare reinizializzazione'}*`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Istanza VisumPy Non Sana**\n\n` +
`L'istanza VisumPy non risponde o non Γ¨ inizializzata.\n` +
`Prova a eseguire visum_network_stats per inizializzare l'istanza.`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore controllo salute:**\n\n${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// =============================================================================
// GLOBAL LAYOUTS MANAGEMENT TOOLS
// =============================================================================
// List Available Global Layout Files (.lay) in project directory
server.tool(
"project_list_available_layouts",
"π List all Global Layout files (.lay) available in the project directory. Shows filename, size, and full path. ALWAYS use this before loading a layout to show available options to the user.",
{
projectId: z.string().describe("Project identifier returned by project_open (e.g. S000009result_1278407893)")
},
async ({ projectId }) => {
try {
// Python code to search for .lay files in project directory
const pythonCode = `
import os
result = {'layouts': [], 'count': 0, 'project_dir': None}
try:
# Get project path - try different methods
project_path = None
try:
# Method 1: Try GetPath with file type parameter (1 = .ver file)
project_path = visum.GetPath(1)
except:
try:
# Method 2: Try without parameters
project_path = visum.GetPath()
except:
# Method 3: Use IO.Path if available
if hasattr(visum, 'IO') and hasattr(visum.IO, 'Path'):
project_path = visum.IO.Path
if project_path:
project_dir = os.path.dirname(project_path)
result['project_dir'] = project_dir
result['project_path'] = project_path
# Search for .lay files in project directory
lay_files = []
if os.path.exists(project_dir):
for file in os.listdir(project_dir):
if file.endswith('.lay'):
full_path = os.path.join(project_dir, file)
file_size = os.path.getsize(full_path)
lay_files.append({
'filename': file,
'path': full_path,
'size_bytes': file_size,
'size_mb': round(file_size / (1024 * 1024), 2)
})
# Sort by filename
lay_files.sort(key=lambda x: x['filename'])
result['layouts'] = lay_files
result['count'] = len(lay_files)
else:
result['error'] = 'Cannot determine project path'
except Exception as e:
result['error'] = str(e)
result
`;
// Use ProjectServerManager to execute Python inside the project TCP server
const execResponse: any = await serverManager.executeCommand(projectId, pythonCode, "List available Global Layout files");
if (!execResponse || execResponse.type === 'error') {
return {
content: [
{
type: 'text',
text: `β Errore durante la ricerca: ${execResponse?.message || 'Errore sconosciuto'}`
}
]
};
}
const result = execResponse.result || {};
if (result.error) {
return {
content: [
{
type: 'text',
text: `β οΈ Errore durante la ricerca dei file .lay\nErrore: ${result.error}`
}
]
};
}
let output = `οΏ½ **Global Layout Files Disponibili**\n\n`;
output += `π **Directory:** ${result.project_dir}\n`;
output += `π **Totale file .lay:** ${result.count}\n\n`;
if (result.count === 0) {
output += 'βΉοΈ Nessun file .lay trovato nella directory del progetto.\n';
} else {
result.layouts.forEach((l: any, idx: number) => {
output += `${idx + 1}. **${l.filename}**\n`;
output += ` π Dimensione: ${l.size_mb} MB (${l.size_bytes.toLocaleString()} bytes)\n`;
output += ` π Path: \`${l.path}\`\n\n`;
});
}
output += '\nπ‘ **Uso:** Usa `project_load_global_layout` per caricare uno di questi layout nel progetto.';
return {
content: [
{
type: 'text',
text: output
}
]
};
} catch (error) {
return {
content: [
{
type: 'text',
text: `β Errore: ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Load Global Layout Tool - Load a .lay file into the current Visum project
server.tool(
"project_load_global_layout",
"π¨ Load a Global Layout (.lay file) into an opened Visum project. The layout file must exist in the project directory or provide full path. ALWAYS use project_list_available_layouts first to show available options to the user.",
{
projectId: z.string().describe("Project identifier returned by project_open"),
layoutFile: z.string().describe("Full path to the .lay file OR just the filename if in project directory (e.g. 'tabelle_report.lay' or 'H:\\path\\to\\layout.lay')")
},
async ({ projectId, layoutFile }) => {
try {
// Get project info to determine project directory
const pythonCode = `
import os
result = {}
try:
layout_file = r'${layoutFile.replace(/\\/g, '\\\\')}'
# Se layoutFile non ha path completo, cerca nella directory del progetto
if not os.path.isabs(layout_file):
# Ottieni directory del progetto corrente - try different methods
project_path = None
try:
project_path = visum.GetPath(1) # 1 = .ver file type
except:
try:
if hasattr(visum, 'IO') and hasattr(visum.IO, 'Path'):
project_path = visum.IO.Path
except:
pass
if project_path:
project_dir = os.path.dirname(project_path)
layout_file = os.path.join(project_dir, layout_file)
else:
result['error'] = 'Cannot determine project path'
result['status'] = 'PATH_ERROR'
if 'status' not in result:
# Verifica esistenza file
if not os.path.exists(layout_file):
result['error'] = f'File .lay non trovato: {layout_file}'
result['status'] = 'FILE_NOT_FOUND'
else:
result['file_exists'] = True
result['file_path'] = layout_file
result['file_size'] = os.path.getsize(layout_file)
# Carica il Global Layout usando visum.LoadGlobalLayout()
try:
visum.LoadGlobalLayout(layout_file)
result['status'] = 'SUCCESS'
result['message'] = 'Global Layout caricato con successo'
result['loaded_file'] = os.path.basename(layout_file)
except Exception as e:
result['error'] = str(e)
result['status'] = 'LOAD_FAILED'
except Exception as e:
result = {'error': str(e), 'status': 'EXCEPTION'}
result
`;
const execResponse: any = await serverManager.executeCommand(projectId, pythonCode, `Load Global Layout: ${layoutFile}`);
if (!execResponse || !execResponse.success) {
return {
content: [
{
type: "text",
text: `β Impossibile caricare Global Layout per progetto ${projectId}: ${execResponse?.error || 'Errore sconosciuto'}`
}
]
};
}
const result = execResponse.result || {};
if (result.status === 'SUCCESS') {
return {
content: [
{
type: "text",
text: `β
**Global Layout Caricato**\n\nπ **File:** ${result.loaded_file}\nπ **Path:** ${result.file_path}\nπ **Dimensione:** ${(result.file_size / 1024 / 1024).toFixed(2)} MB\n\nπ¨ Il layout Γ¨ ora attivo nel progetto Visum.`
}
]
};
} else if (result.status === 'FILE_NOT_FOUND') {
return {
content: [
{
type: "text",
text: `β **File non trovato**\n\n${result.error}\n\nπ‘ **Suggerimento:** Usa \`project_list_available_layouts\` per vedere i file .lay disponibili.`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore durante il caricamento**\n\n${result.error}\n\nStatus: ${result.status}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β Errore: ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Export Visible Tables from Layout - Export only tables visible in a Global Layout
server.tool(
"project_export_visible_tables",
"π Export ONLY tables visible in a Global Layout (.lay file) to CSV files. Maintains exact column order and includes sub-attributes (formula columns). WORKFLOW: 1) List layouts with project_list_available_layouts, 2) User selects layout, 3) Load with project_load_global_layout, 4) Export tables with this tool.",
{
projectId: z.string().describe("Project identifier returned by project_open"),
layoutFile: z.string().describe("Full path to the .lay file OR just filename if in project directory (e.g. 'tabelle_report.lay')")
},
async ({ projectId, layoutFile }) => {
try {
const pythonCode = `
import xml.etree.ElementTree as ET
import os
layout_file = r'${layoutFile.replace(/\\/g, '\\\\')}'
result = {}
try:
# Se layoutFile non ha path completo, cerca nella directory del progetto
if not os.path.isabs(layout_file):
project_path = visum.GetPath(1)
project_dir = os.path.dirname(project_path)
layout_file = os.path.join(project_dir, layout_file)
project_name = os.path.basename(project_path).replace('.ver', '')
else:
project_path = visum.GetPath(1)
project_name = os.path.basename(project_path).replace('.ver', '')
output_dir = os.path.dirname(project_path)
# Verifica esistenza file layout
if not os.path.exists(layout_file):
result['error'] = f'File .lay non trovato: {layout_file}'
result['status'] = 'FILE_NOT_FOUND'
else:
# Parse layout XML
tree = ET.parse(layout_file)
root = tree.getroot()
# Find all visible tables
tables_info = []
for list_item in root.iter('listLayoutItem'):
graphic = list_item.find('.//listGraphicParameterLayoutItems')
if graphic is not None:
net_obj_type = graphic.get('netObjectType')
if net_obj_type:
# Get table name
table_name_elem = list_item.find('.//caption')
table_name = table_name_elem.get('text', net_obj_type) if table_name_elem is not None else net_obj_type
# Get all column definitions
col_defs = []
for attr_def in list_item.iter('attributeDefinition'):
col_defs.append(attr_def.attrib)
tables_info.append({
'name': table_name,
'type': net_obj_type,
'columns': col_defs
})
# Map net object types to Visum collections
type_to_collection = {
'LINK': 'visum.Net.Links',
'NODE': 'visum.Net.Nodes',
'ZONE': 'visum.Net.Zones',
'ODPAIR': 'visum.Net.ODPairs',
'LINE': 'visum.Net.Lines',
'LINEROUTE': 'visum.Net.LineRoutes',
'TIMEPROFILE': 'visum.Net.TimeProfiles',
'TIMEPROFILEITEM': 'visum.Net.TimeProfileItems',
'VEHJOURNEYSECTION': 'visum.Net.VehicleJourneySections',
'STOP': 'visum.Net.Stops',
'STOPPOINTAREA': 'visum.Net.StopAreas',
'CONNECTOR': 'visum.Net.Connectors',
'TURN': 'visum.Net.Turns',
'MAINZONE': 'visum.Net.MainZones'
}
# Export each table
results = []
for table in tables_info:
table_type = table['type']
table_name = table['name']
# Get collection
collection_path = type_to_collection.get(table_type)
if not collection_path:
results.append({'table': table_name, 'status': 'SKIPPED', 'reason': 'Unknown type'})
continue
try:
collection = eval(collection_path)
count = collection.Count
except Exception as e:
results.append({'table': table_name, 'status': 'ERROR', 'reason': str(e)})
continue
# Build attribute list with sub-attributes
full_attrs = []
headers = []
for col in table['columns']:
attr_id = col['attributeID']
sub1 = col.get('subAttributeID1', '')
sub2 = col.get('subAttributeID2', '')
sub3 = col.get('subAttributeID3', '')
# Build full attribute name
if sub1 or sub2 or sub3:
subs = [s for s in [sub1, sub2, sub3] if s]
full_attr = attr_id + '(' + ','.join(subs) + ')'
# Create readable header
header = attr_id + '_' + '_'.join(subs)
else:
full_attr = attr_id
header = attr_id
full_attrs.append(full_attr)
headers.append(header)
# Get data
try:
data = collection.GetMultipleAttributes(full_attrs)
# Build CSV
lines = [';'.join(headers)]
for row_tuple in data:
lines.append(';'.join(str(v) for v in row_tuple))
# Write file
safe_name = table_name.replace('/', '_').replace('\\\\', '_').replace(' ', '_')
output_file = os.path.join(output_dir, f'{project_name}_{safe_name}.csv')
text = '\\n'.join(lines)
with open(output_file, 'w', encoding='utf-8', newline='') as f:
f.write(text)
size_mb = os.path.getsize(output_file) / (1024 * 1024)
results.append({
'table': table_name,
'type': table_type,
'status': 'SUCCESS',
'file': output_file,
'rows': len(data),
'cols': len(full_attrs),
'size_mb': round(size_mb, 2)
})
except Exception as e:
results.append({
'table': table_name,
'status': 'ERROR',
'reason': str(e)[:100]
})
result = {
'total_tables': len(tables_info),
'successful': len([r for r in results if r['status'] == 'SUCCESS']),
'errors': len([r for r in results if r['status'] == 'ERROR']),
'skipped': len([r for r in results if r['status'] == 'SKIPPED']),
'details': results,
'layout_file': layout_file,
'output_dir': output_dir
}
except Exception as e:
result = {'error': str(e), 'status': 'EXCEPTION'}
result
`;
const execResponse: any = await serverManager.executeCommand(projectId, pythonCode, `Export visible tables from ${layoutFile}`);
if (!execResponse || !execResponse.success) {
return {
content: [
{
type: "text",
text: `β Impossibile esportare tabelle per progetto ${projectId}: ${execResponse?.error || 'Errore sconosciuto'}`
}
]
};
}
const result = execResponse.result || {};
if (result.status === 'FILE_NOT_FOUND') {
return {
content: [
{
type: "text",
text: `β **File Layout non trovato**\n\n${result.error}\n\nπ‘ **Suggerimento:** Usa \`project_list_available_layouts\` per vedere i file .lay disponibili.`
}
]
};
} else if (result.status === 'EXCEPTION') {
return {
content: [
{
type: "text",
text: `β **Errore durante l'export**\n\n${result.error}`
}
]
};
}
// Format success response
const success = result.details?.filter((r: any) => r.status === 'SUCCESS') || [];
const errors = result.details?.filter((r: any) => r.status === 'ERROR') || [];
const skipped = result.details?.filter((r: any) => r.status === 'SKIPPED') || [];
let output = `β
**Tabelle Esportate da Layout**\n\n`;
output += `π **Layout:** ${path.basename(result.layout_file || layoutFile)}\n`;
output += `π **Directory:** ${result.output_dir}\n\n`;
output += `π **Riepilogo:**\n`;
output += `- β
Successo: ${result.successful}/${result.total_tables}\n`;
if (result.errors > 0) output += `- β Errori: ${result.errors}\n`;
if (result.skipped > 0) output += `- β οΈ Saltate: ${result.skipped}\n`;
if (success.length > 0) {
output += `\nπ **File Creati:**\n`;
for (const t of success) {
output += `\n**${t.table}** (${t.type})\n`;
output += ` - Righe: ${t.rows.toLocaleString()}\n`;
output += ` - Colonne: ${t.cols}\n`;
output += ` - Dimensione: ${t.size_mb} MB\n`;
output += ` - File: \`${path.basename(t.file)}\`\n`;
}
}
if (errors.length > 0) {
output += `\nβ **Errori:**\n`;
for (const t of errors) {
output += ` - ${t.table}: ${t.reason}\n`;
}
}
if (skipped.length > 0) {
output += `\nβ οΈ **Tabelle Saltate:**\n`;
for (const t of skipped) {
output += ` - ${t.table}: ${t.reason}\n`;
}
}
return {
content: [
{
type: "text",
text: output
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `β Errore: ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Export Graphic Layout to PNG/SVG - Load .gpa and export as image
server.tool(
"project_export_graphic_layout",
"πΊοΈ Load a Graphic Parameter file (.gpa) and export the network view as PNG/SVG. WORKFLOW: 1) List available .gpa files, 2) User selects layout, 3) Export as PNG (raster) or SVG (vector). Supports paper formats (A5, A4, A3) in landscape or portrait orientation. SVG is vector format (scalable, editable) but requires Visum GUI visible.",
{
projectId: z.string().describe("Project identifier returned by project_open"),
gpaFile: z.string().describe("Filename of .gpa file (e.g., 'Flussogramma_tpb.gpa') or full path"),
outputFile: z.string().optional().describe("Output filename (default: {gpaName}_export.png or .svg)"),
format: z.enum(['png', 'jpg', 'svg']).optional().describe("Export format: png (default, raster), jpg (raster, smaller), svg (vector, scalable, requires GUI). SVG is resolution-independent and editable in Illustrator/Inkscape."),
paperFormat: z.enum(['A5', 'A5_portrait', 'A4', 'A4_portrait', 'A3', 'A3_portrait', 'custom']).optional().describe("Paper format for export. A4=210Γ297mm landscape (1240Γ1754px@150dpi), A4_portrait=297Γ210mm. Overrides width parameter. Only for raster formats (png/jpg). Default: custom (use width)"),
width: z.number().optional().describe("Image width in pixels (default: 1920). For raster formats only. Ignored if paperFormat specified (except 'custom')"),
dpi: z.number().optional().describe("Resolution in DPI (default: 150). For raster formats only. Higher DPI = larger file. 96=screen, 150=print, 300=high quality"),
quality: z.number().optional().describe("JPEG quality 0-100 (default: 95, only for .jpg)"),
svgNonScalingStroke: z.boolean().optional().describe("For SVG only: keep line widths constant when scaling (default: true)")
},
async ({ projectId, gpaFile, outputFile, format, paperFormat, width, dpi, quality, svgNonScalingStroke }) => {
try {
const exportFormat = format || 'png';
const imageWidth = width || 1920;
const imageDpi = dpi || 150;
const imageQuality = quality || 95;
const usePaperFormat = paperFormat || 'custom';
const svgNonScaling = svgNonScalingStroke !== undefined ? svgNonScalingStroke : true;
const pythonCode = `
import os
result = {}
# Paper formats (width, height in mm)
PAPER_FORMATS = {
'A5': (148, 210),
'A5_portrait': (210, 148),
'A4': (210, 297),
'A4_portrait': (297, 210),
'A3': (297, 420),
'A3_portrait': (420, 297)
}
def calculate_pixels_from_paper(paper_format, dpi):
"""Calculate image dimensions from paper format."""
if paper_format not in PAPER_FORMATS:
return None, None
width_mm, height_mm = PAPER_FORMATS[paper_format]
width_inches = width_mm / 25.4
height_inches = height_mm / 25.4
width_px = int(width_inches * dpi)
height_px = int(height_inches * dpi)
return width_px, height_px
try:
# Get project info
project_path = visum.GetPath(1)
project_dir = os.path.dirname(project_path)
project_name = os.path.splitext(os.path.basename(project_path))[0]
# Resolve GPA file path
gpa_input = r'${gpaFile.replace(/\\/g, '\\\\')}'
if not os.path.isabs(gpa_input):
gpa_path = os.path.join(project_dir, gpa_input)
else:
gpa_path = gpa_input
if not os.path.exists(gpa_path):
result['error'] = f'GPA file not found: {gpa_path}'
result['status'] = 'FILE_NOT_FOUND'
else:
# Load GPA file
visum.Net.GraphicParameters.Open(gpa_path)
result['gpa_loaded'] = os.path.basename(gpa_path)
# Get network bounds from PrintArea
printArea = visum.Net.PrintParameters.PrintArea
left = printArea.AttValue('LEFTMARGIN')
bottom = printArea.AttValue('BOTTOMMARGIN')
right = printArea.AttValue('RIGHTMARGIN')
top = printArea.AttValue('TOPMARGIN')
result['bounds'] = {
'left': left,
'bottom': bottom,
'right': right,
'top': top
}
# Calculate aspect ratio
width_net = right - left
height_net = top - bottom
aspect_ratio = height_net / width_net if width_net > 0 else 1.0
result['aspect_ratio'] = round(aspect_ratio, 3)
# Determine export format
export_format = '${exportFormat}'.lower()
result['format'] = export_format
# Determine output filename
${outputFile ? `output_file = r'${outputFile.replace(/\\/g, '\\\\')}'` : `
gpa_basename = os.path.splitext(os.path.basename(gpa_path))[0]
default_ext = 'svg' if export_format == 'svg' else 'png'
output_file = os.path.join(project_dir, f'{gpa_basename}_export.{default_ext}')`}
if not os.path.isabs(output_file):
output_file = os.path.join(project_dir, output_file)
if export_format == 'svg':
# SVG EXPORT (vector format)
result['format_type'] = 'vector'
# Set the view window to PrintArea bounds
visum.Graphic.SetWindow(left, bottom, right, top)
# Export SVG
visum.Graphic.WriteSVG(
output_file,
UseNonScalingStroke=${svgNonScaling},
CopyPictures=False
)
# Verify export
if os.path.exists(output_file):
result['success'] = True
result['output_file'] = output_file
result['size_kb'] = round(os.path.getsize(output_file) / 1024, 2)
result['size_mb'] = round(os.path.getsize(output_file) / (1024 * 1024), 2)
result['scalable'] = True
else:
result['error'] = 'SVG file was not created'
result['status'] = 'EXPORT_FAILED'
else:
# RASTER EXPORT (PNG/JPG)
result['format_type'] = 'raster'
# Determine image dimensions
paper_format = '${usePaperFormat}'
if paper_format != 'custom':
# Use paper format
width_px, height_px = calculate_pixels_from_paper(paper_format, ${imageDpi})
if width_px is None:
result['error'] = f'Invalid paper format: {paper_format}'
result['status'] = 'INVALID_PAPER_FORMAT'
else:
result['paper_format'] = paper_format
paper_width_mm, paper_height_mm = PAPER_FORMATS[paper_format]
result['paper_size_mm'] = f'{paper_width_mm}Γ{paper_height_mm}mm'
else:
# Use custom width, calculate height from aspect ratio
width_px = ${imageWidth}
height_px = int(width_px * aspect_ratio)
result['paper_format'] = 'custom'
if result.get('status') != 'INVALID_PAPER_FORMAT':
result['width_px'] = width_px
result['height_px'] = height_px
# Export network image
visum.Graphic.ExportNetworkImageFile(
output_file,
left,
bottom,
right,
top,
width_px,
${imageDpi},
${imageQuality}
)
# Verify export
if os.path.exists(output_file):
result['success'] = True
result['output_file'] = output_file
result['size_kb'] = round(os.path.getsize(output_file) / 1024, 2)
result['size_mb'] = round(os.path.getsize(output_file) / (1024 * 1024), 2)
result['dpi'] = ${imageDpi}
else:
result['error'] = 'Image file was not created'
result['status'] = 'EXPORT_FAILED'
result['status'] = result.get('status', 'SUCCESS')
except Exception as e:
result['error'] = str(e)
result['status'] = 'EXCEPTION'
result
`;
const execResponse: any = await serverManager.executeCommand(projectId, pythonCode, `Export graphic layout: ${gpaFile}`);
if (!execResponse || !execResponse.success) {
return {
content: [
{
type: "text",
text: `β Impossibile esportare layout grafico: ${execResponse?.error || 'Errore sconosciuto'}`
}
]
};
}
const result = execResponse.result || {};
if (result.status === 'FILE_NOT_FOUND') {
return {
content: [
{
type: "text",
text: `β **File GPA non trovato**\n\n${result.error}\n\nπ‘ **Suggerimento:** Verifica il nome del file o usa il path completo.`
}
]
};
} else if (result.status === 'INVALID_PAPER_FORMAT') {
return {
content: [
{
type: "text",
text: `β **Formato carta non valido**\n\n${result.error}\n\nπ **Formati disponibili:** A5, A5_portrait, A4, A4_portrait, A3, A3_portrait, custom`
}
]
};
} else if (result.status === 'EXCEPTION' || result.status === 'EXPORT_FAILED') {
return {
content: [
{
type: "text",
text: `β **Errore durante l'export**\n\n${result.error}\n\nStatus: ${result.status}`
}
]
};
}
// Success response
let output = `β
**Layout Grafico Esportato**\n\n`;
output += `πΊοΈ **GPA caricato:** ${result.gpa_loaded}\n`;
output += `π **Coordinate rete:**\n`;
output += ` - Left: ${result.bounds?.left?.toFixed(6)}\n`;
output += ` - Bottom: ${result.bounds?.bottom?.toFixed(6)}\n`;
output += ` - Right: ${result.bounds?.right?.toFixed(6)}\n`;
output += ` - Top: ${result.bounds?.top?.toFixed(6)}\n\n`;
if (result.format === 'svg') {
// SVG format
output += `π¨ **Immagine vettoriale generata:**\n`;
output += ` - File: \`${path.basename(result.output_file)}\`\n`;
output += ` - Percorso: ${result.output_file}\n`;
output += ` - Formato: **SVG** (vettoriale, scalabile)\n`;
output += ` - Aspect ratio: ${result.aspect_ratio}\n`;
output += ` - Dimensione file: ${result.size_kb} KB (${result.size_mb} MB)\n`;
output += `\n⨠**Vantaggi SVG:**\n`;
output += ` - Scalabile senza perdita di qualitΓ \n`;
output += ` - Modificabile in Illustrator/Inkscape\n`;
output += ` - Convertibile in PDF con strumenti esterni\n`;
output += ` - File piΓΉ piccolo rispetto a PNG ad alta risoluzione`;
} else {
// Raster format (PNG/JPG)
output += `πΌοΈ **Immagine raster generata:**\n`;
output += ` - File: \`${path.basename(result.output_file)}\`\n`;
output += ` - Percorso: ${result.output_file}\n`;
output += ` - Formato: **${result.format?.toUpperCase()}** (raster)\n`;
if (result.paper_format !== 'custom') {
output += ` - π Formato carta: **${result.paper_format}** (${result.paper_size_mm})\n`;
}
output += ` - Dimensioni: ${result.width_px} Γ ${result.height_px} px\n`;
output += ` - Aspect ratio: ${result.aspect_ratio}\n`;
output += ` - Risoluzione: ${result.dpi} DPI\n`;
output += ` - Dimensione file: ${result.size_kb} KB (${result.size_mb} MB)\n`;
if (result.paper_format !== 'custom') {
output += `\nπ‘ L'immagine Γ¨ ottimizzata per stampa su formato **${result.paper_format}** a ${result.dpi} DPI`;
}
}
return {
content: [
{
type: "text",
text: output
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `β Errore: ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// =============================================================================
// TABLE EXPORT TOOLS
// =============================================================================
// Export All Tables Tool - Export all Visum tables to CSV files
server.tool(
"project_export_all_tables",
"π Export all Visum tables (except Network Editor) to CSV files. Each table is saved as ProjectName_TableName.csv in the project directory.",
{
projectId: z.string().describe("Project identifier returned by project_open"),
maxRowsPerTable: z.number().optional().describe("Maximum rows to export per table (default: all rows)")
},
async ({ projectId, maxRowsPerTable }) => {
try {
const pythonCode = `
import os
import csv
import time
result = {
'exported_tables': [],
'failed_tables': [],
'total_tables': 0,
'total_rows': 0,
'total_files': 0
}
try:
# Get project info
project_path = visum.GetPath(1)
project_dir = os.path.dirname(project_path)
project_name = os.path.basename(project_path).replace('.ver', '')
result['project_name'] = project_name
result['output_dir'] = project_dir
# Define tables to export (main collections from visum.Net)
# Exclude Network Editor and very large/complex tables
tables_to_export = [
('Zones', ['No', 'Name', 'Code', 'XCoord', 'YCoord']),
('Nodes', ['No', 'Name', 'XCoord', 'YCoord', 'TypeNo']),
('Links', ['No', 'Name', 'FromNodeNo', 'ToNodeNo', 'Length', 'TypeNo']),
('Turns', ['No', 'FromLinkNo', 'ToLinkNo', 'ViaNodeNo']),
('MainZones', ['No', 'Name', 'Code']),
('Territories', ['No', 'Name', 'Code']),
('POICategories', ['No', 'Name', 'Code']),
('POIs', ['No', 'Name', 'Code', 'XCoord', 'YCoord']),
('StopAreas', ['No', 'Name', 'Code']),
('StopPoints', ['No', 'Name', 'Code', 'XCoord', 'YCoord']),
('TimeSeriesCont', ['No', 'Name']),
('VehicleJourneys', ['No', 'Name', 'LineNo']),
('Lines', ['No', 'Name', 'TSysCode']),
('LineRoutes', ['No', 'Name', 'LineNo']),
('TimeProfiles', ['No', 'Name', 'Code']),
('DemandSegments', ['Code', 'Name', 'Mode']),
('Modes', ['Code', 'Name', 'Type']),
('TSystems', ['Code', 'Name', 'Type'])
]
max_rows = ${maxRowsPerTable || 'None'}
# Export each table
for table_name, attributes in tables_to_export:
try:
# Get collection
if hasattr(visum.Net, table_name):
collection = getattr(visum.Net, table_name)
count = collection.Count
if count == 0:
result['failed_tables'].append({
'table': table_name,
'reason': 'Empty table (0 rows)'
})
continue
# CSV file path
csv_file = os.path.join(project_dir, f'{project_name}_{table_name}.csv')
# Write CSV
with open(csv_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f, delimiter=';')
# Header
writer.writerow(attributes)
# Data rows
rows_exported = 0
limit = min(count, max_rows) if max_rows else count
for i in range(limit):
try:
item = collection.ItemByKey(i + 1)
row = []
for attr in attributes:
try:
value = item.AttValue(attr)
row.append(value if value is not None else '')
except:
row.append('') # Attribute not available
writer.writerow(row)
rows_exported += 1
except:
# Item not found or error, skip
continue
# Record success
file_size = os.path.getsize(csv_file)
result['exported_tables'].append({
'table': table_name,
'rows': rows_exported,
'file': os.path.basename(csv_file),
'size_kb': round(file_size / 1024, 2)
})
result['total_rows'] += rows_exported
result['total_files'] += 1
except Exception as e:
result['failed_tables'].append({
'table': table_name,
'reason': str(e)
})
result['total_tables'] = len(tables_to_export)
result['status'] = 'SUCCESS'
except Exception as e:
result['status'] = 'FAILED'
result['error'] = str(e)
result
`;
const execResponse: any = await serverManager.executeCommand(projectId, pythonCode, "Export all tables to CSV");
if (!execResponse || !execResponse.success) {
return {
content: [
{
type: "text",
text: `β Impossibile esportare tabelle per progetto ${projectId}: ${execResponse?.error || 'Errore sconosciuto'}`
}
]
};
}
const result = execResponse.result || {};
if (result.status === 'SUCCESS') {
let output = `β
**Tabelle Esportate con Successo**\n\n`;
output += `π **Progetto:** ${result.project_name}\n`;
output += `π **Directory:** ${result.output_dir}\n\n`;
output += `π **Statistiche:**\n`;
output += `- Tabelle processate: ${result.total_tables}\n`;
output += `- File CSV creati: ${result.total_files}\n`;
output += `- Totale righe esportate: ${result.total_rows.toLocaleString()}\n\n`;
if (result.exported_tables && result.exported_tables.length > 0) {
output += `**Tabelle esportate:**\n\n`;
result.exported_tables.forEach((table: any) => {
output += `β
**${table.table}**\n`;
output += ` π File: ${table.file}\n`;
output += ` π Righe: ${table.rows.toLocaleString()}\n`;
output += ` πΎ Dimensione: ${table.size_kb} KB\n\n`;
});
}
if (result.failed_tables && result.failed_tables.length > 0) {
output += `\nβ οΈ **Tabelle non esportate:**\n\n`;
result.failed_tables.forEach((table: any) => {
output += `β ${table.table}: ${table.reason}\n`;
});
}
return {
content: [
{
type: "text",
text: output
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore durante l'export**\n\n${result.error}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β Errore: ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// =============================================================================
// =============================================================================
// PROJECT-SPECIFIC INSTANCE MANAGEMENT TOOLS
// =============================================================================
// Start Project Instance Tool
server.tool(
"project_start_instance",
"Start dedicated persistent instance for specific Visum project",
{
projectId: z.string().describe("Project identifier (campoleone, testProject, etc.)")
},
async ({ projectId }) => {
try {
const result = await projectManager.startProjectInstance(projectId);
if (result.success) {
return {
content: [
{
type: "text",
text: `π **Istanza Progetto Avviata**\n\nβ
${result.message}\n\nπ **Network Stats:**\n- Nodi: ${result.stats?.nodes}\n- Link: ${result.stats?.links}\n- Zone: ${result.stats?.zones}\n\nπ L'istanza Γ¨ ora attiva e pronta per ricevere comandi.`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Avvio Istanza**\n\n${result.message}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
/**
* Sanitizza le stringhe per l'uso sicuro nel codice Python generato
*/
function sanitizeForPython(str: string): string {
return str
.replace(/\\/g, '\\\\') // Escape backslashes
.replace(/'/g, "\\'") // Escape single quotes
.replace(/"/g, '\\"') // Escape double quotes
.replace(/\n/g, '\\n') // Escape newlines
.replace(/\r/g, '\\r'); // Escape carriage returns
}
/**
* Genera codice Python automaticamente basato sulla richiesta di analisi
*/
function generateAnalysisCode(analysisRequest: string, returnFormat: string): string {
const request = analysisRequest.toLowerCase();
const sanitizedRequest = sanitizeForPython(analysisRequest);
// ============= FASE 2: ASSIGNMENT E PERCORSI =============
// Flexible PrT Assignment - Supports all Visum assignment methods with user-configured VDF
if (request.includes('prt assignment') || request.includes('car assignment') || request.includes('private transport assignment') ||
request.includes('equilibrium assignment') || request.includes('user equilibrium') ||
request.includes('bpr assignment') || request.includes('boyce assignment') ||
request.includes('sue assignment') || request.includes('stochastic user equilibrium') ||
request.includes('luce assignment') || request.includes('tapias assignment') ||
request.includes('incremental assignment') || request.includes('msa assignment') ||
request.includes('assignment') || request.includes('assegnazione equilibrio') ||
request.includes('assegnazione trasporto privato')) {
// Determine assignment method from request
let assignmentMethod = 'PrTAssignmentBPR'; // default
let methodDescription = 'BPR (Bureau of Public Roads)';
if (request.includes('boyce') || request.includes('metodo boyce')) {
assignmentMethod = 'PrTAssignmentBoyce';
methodDescription = 'Boyce Method';
} else if (request.includes('sue') || request.includes('stochastic')) {
assignmentMethod = 'PrTAssignmentSUE';
methodDescription = 'Stochastic User Equilibrium';
} else if (request.includes('luce')) {
assignmentMethod = 'PrTAssignmentLuce';
methodDescription = 'Luce Method';
} else if (request.includes('tapias')) {
assignmentMethod = 'PrTAssignmentTAPIAS';
methodDescription = 'TAPIAS Method';
} else if (request.includes('incremental')) {
assignmentMethod = 'PrTAssignmentIncremental';
methodDescription = 'Incremental Assignment';
} else if (request.includes('msa') || request.includes('successive averages')) {
assignmentMethod = 'PrTAssignmentMSA';
methodDescription = 'Method of Successive Averages';
}
return `
# Flexible PrT Assignment - ${methodDescription}
try:
import json
print(f"Starting PrT Assignment with ${methodDescription}...")
# === DEMAND SEGMENT CONFIGURATION ===
# Detect and configure demand segments for assignment
demand_segments = []
segment_config = {}
try:
# Get all available demand segments
all_segments = list(visum.Net.DemandSegments)
prt_segments = [seg for seg in all_segments if seg.GetAttValue('Code').startswith('P') or 'PrT' in seg.GetAttValue('Code')]
if not prt_segments:
# If no PrT segments found, use all segments
prt_segments = all_segments[:3] # Limit to first 3 to avoid overload
for seg in prt_segments[:5]: # Limit to 5 segments max
segment_info = {
'code': seg.GetAttValue('Code'),
'name': seg.GetAttValue('Name') if hasattr(seg, 'GetAttValue') else seg.GetAttValue('Code'),
'mode': seg.GetAttValue('Mode') if hasattr(seg, 'GetAttValue') else 'PrT'
}
demand_segments.append(segment_info)
segment_config = {
'segments_found': len(demand_segments),
'segments_selected': demand_segments,
'auto_detected': True
}
print(f"Found {len(demand_segments)} demand segments for assignment:")
for seg in demand_segments:
print(f" - {seg['code']}: {seg['name']} ({seg['mode']})")
except Exception as e:
print(f"Warning: Could not detect demand segments: {e}")
# Fallback: try to use default segment configuration
segment_config = {
'segments_found': 0,
'segments_selected': [],
'auto_detected': False,
'note': 'Using Visum default segment configuration'
}
# Read Volume Delay Function (VDF) configuration from user's General Procedure Settings
# This respects user's impedance function configuration instead of hardcoding BPR
vdf_config = {}
try:
# Get VDF settings from user's project
vdf_functions = visum.Net.Links.GetAttValues('VolCapFormula')
vdf_types = set(vdf_functions) if vdf_functions else {'BPR'}
vdf_config['functions_in_use'] = list(vdf_types)
# Check if custom VDF parameters are defined
try:
vdf_params = visum.Procedures.Functions.${assignmentMethod}.GetAttValue('UseUserDefinedVDF')
vdf_config['user_defined'] = vdf_params
except:
vdf_config['user_defined'] = False
except Exception as e:
print(f"Warning: Could not read VDF configuration: {e}")
vdf_config = {'functions_in_use': ['Default'], 'user_defined': False}
# === ASSIGNMENT CONFIGURATION ===
# Configure assignment method with flexible parameters
assignment_function = visum.Procedures.Functions.${assignmentMethod}
# Set demand segments if detected
if demand_segments and len(demand_segments) > 0:
try:
# Set the demand segments for assignment (Visum-specific syntax may vary)
segment_codes = [seg['code'] for seg in demand_segments]
print(f"Configuring assignment for segments: {', '.join(segment_codes)}")
# Note: Actual segment configuration depends on Visum version and setup
# The assignment function will use the segments configured in the network
except Exception as e:
print(f"Warning: Could not configure demand segments: {e}")
# Set standard convergence parameters (user can modify these in Visum GUI)
try:
assignment_function.SetAttValue('MaxIter', 20)
assignment_function.SetAttValue('GapCriterion', 0.01)
# Method-specific parameters
if '${assignmentMethod}' == 'PrTAssignmentBPR':
assignment_function.SetAttValue('InnerIterations', 10)
elif '${assignmentMethod}' == 'PrTAssignmentSUE':
assignment_function.SetAttValue('Theta', 1.0) # Perception parameter
elif '${assignmentMethod}' == 'PrTAssignmentMSA':
assignment_function.SetAttValue('MSAParameters', 'Default')
except Exception as e:
print(f"Warning: Could not set all parameters: {e}")
# === PROCEDURE SEQUENCE INTEGRATION ===
# Instead of executing immediately, add to Procedure Sequence for user control
procedure_added = False
procedure_line = 0
try:
# Get current procedure sequence
procedure_sequence = visum.Procedures.ProcedureSequence
current_items = procedure_sequence.Count
# Add assignment procedure to sequence
procedure_sequence.AddProcedure(visum.Procedures.Functions.${assignmentMethod})
procedure_line = current_items + 1
procedure_added = True
print(f"β
Assignment procedure added to Procedure Sequence at line {procedure_line}")
print(f"π Procedure: {assignmentMethod} - ${methodDescription}")
print(f"β οΈ Please review the procedure settings in Visum GUI before execution")
print(f"π To execute: Go to Procedures > Procedure Sequence > Run from line {procedure_line}")
except Exception as e:
print(f"Warning: Could not add to Procedure Sequence: {e}")
print("Falling back to direct execution...")
# Fallback: Direct execution if Procedure Sequence fails
try:
assignment_function.Execute()
print("β
Assignment executed directly (Procedure Sequence not available)")
except Exception as exec_error:
print(f"β Direct execution also failed: {exec_error}")
raise exec_error
# === RESULTS COLLECTION ===
# Collect results regardless of execution method
if procedure_added:
# If added to sequence, provide preview without executing
print("π Collecting network preview (assignment not yet executed)...")
result = {
'assignment_type': '${assignmentMethod}',
'method_description': '${methodDescription}',
'status': 'prepared_in_sequence',
'demand_segments': segment_config,
'vdf_configuration': vdf_config,
'procedure_sequence': {
'added_to_sequence': True,
'procedure_line': procedure_line,
'total_procedures': procedure_line,
'execution_instructions': f'Go to Procedures > Procedure Sequence > Run from line {procedure_line}'
},
'user_instructions': {
'step_1': 'Review procedure settings in Visum GUI',
'step_2': 'Check demand segments and VDF configuration',
'step_3': f'Execute Procedure Sequence from line {procedure_line}',
'step_4': 'Run this analysis again after execution to see results'
},
'convergence_info': {
'method_configured': '${methodDescription}',
'uses_user_vdf': vdf_config.get('user_defined', False),
'vdf_functions': vdf_config.get('functions_in_use', ['Default']),
'segments_configured': len(demand_segments)
}
}
else:
# If executed directly, collect actual results
print("π Collecting assignment results...")
total_volume = sum(link.GetAttValue('VolPrT(AP)') for link in visum.Net.Links)
total_vmt = sum(link.GetAttValue('VolPrT(AP)') * link.GetAttValue('Length') for link in visum.Net.Links)
avg_speed = sum(link.GetAttValue('Length') / max(link.GetAttValue('tCur_PrTSys(c)'), 0.01) for link in visum.Net.Links) / visum.Net.Links.Count
# Advanced congestion analysis
congestion_levels = {'low': 0, 'medium': 0, 'high': 0, 'severe': 0}
vc_ratios = []
for link in visum.Net.Links:
volume = link.GetAttValue('VolPrT(AP)')
capacity = link.GetAttValue('VolCapPrT')
if capacity > 0:
vc_ratio = volume / capacity
vc_ratios.append(vc_ratio)
if vc_ratio < 0.5:
congestion_levels['low'] += 1
elif vc_ratio < 0.8:
congestion_levels['medium'] += 1
elif vc_ratio < 1.0:
congestion_levels['high'] += 1
else:
congestion_levels['severe'] += 1
# Calculate network-level performance indicators
avg_vc = sum(vc_ratios) / len(vc_ratios) if vc_ratios else 0
max_vc = max(vc_ratios) if vc_ratios else 0
result = {
'assignment_type': '${assignmentMethod}',
'method_description': '${methodDescription}',
'status': 'completed',
'demand_segments': segment_config,
'vdf_configuration': vdf_config,
'network_performance': {
'total_volume': round(total_volume, 0),
'total_vmt': round(total_vmt, 2),
'average_speed': round(avg_speed, 2),
'average_vc_ratio': round(avg_vc, 3),
'max_vc_ratio': round(max_vc, 3)
},
'congestion_analysis': congestion_levels,
'convergence_info': {
'method_used': '${methodDescription}',
'uses_user_vdf': vdf_config.get('user_defined', False),
'vdf_functions': vdf_config.get('functions_in_use', ['Default']),
'segments_processed': len(demand_segments)
}
}
except Exception as e:
result = {
'error': str(e),
'assignment_type': '${assignmentMethod}',
'method_description': '${methodDescription}',
'status': 'failed'
}
`;
}
// PuT Assignment
if (request.includes('put assignment') || request.includes('transit assignment') || request.includes('public transport assignment')) {
return `
# PuT Assignment - Public Transport
try:
print("Starting PuT Assignment...")
# Configure PuT assignment parameters
visum.Procedures.Functions.PuTAssignment.SetAttValue('MaxIter', 10)
visum.Procedures.Functions.PuTAssignment.SetAttValue('ShareOfSearch', 1.0)
visum.Procedures.Functions.PuTAssignment.SetAttValue('ConnectionScanAlgorithm', True)
# Execute assignment
visum.Procedures.Functions.PuTAssignment.Execute()
# Collect results
total_passengers = sum(line.GetAttValue('PassTransfer(AP)') for line in visum.Net.Lines if line.GetAttValue('PassTransfer(AP)') is not None)
total_boardings = sum(stop.GetAttValue('PassBoard(AP)') for stop in visum.Net.StopPoints if stop.GetAttValue('PassBoard(AP)') is not None)
result = {
'assignment_type': 'PuT',
'status': 'completed',
'transit_performance': {
'total_passengers': round(total_passengers, 0),
'total_boardings': round(total_boardings, 0),
'lines_count': visum.Net.Lines.Count,
'stops_count': visum.Net.StopPoints.Count
}
}
except Exception as e:
result = {'error': str(e), 'assignment_type': 'PuT', 'status': 'failed'}
`;
}
// Shortest Path Analysis
if (request.includes('shortest path') || request.includes('path analysis') || request.includes('route analysis') || request.includes('percorso minimo')) {
return `
# Shortest Path Analysis
try:
print("Performing Shortest Path Analysis...")
# Get sample zone pairs for analysis
zones = list(visum.Net.Zones)
if len(zones) < 2:
raise Exception("Need at least 2 zones for path analysis")
sample_paths = []
max_samples = min(10, len(zones))
for i in range(max_samples):
for j in range(i+1, min(i+4, len(zones))): # Limited pairs to avoid timeout
try:
orig_zone = zones[i].GetAttValue('No')
dest_zone = zones[j].GetAttValue('No')
# Calculate shortest path
path_result = visum.Analysis.PrTShortestPath.CreatePrTShortestPath(orig_zone, dest_zone)
sample_paths.append({
'origin': orig_zone,
'destination': dest_zone,
'distance_km': round(path_result.GetAttValue('Distance'), 2),
'travel_time_min': round(path_result.GetAttValue('tCur_PrTSys(c)'), 2),
'generalized_cost': round(path_result.GetAttValue('ImpPrT(c)'), 2)
})
except:
continue
result = {
'analysis_type': 'shortest_path',
'status': 'completed',
'sample_paths': sample_paths,
'total_zones': len(zones),
'paths_analyzed': len(sample_paths)
}
except Exception as e:
result = {'error': str(e), 'analysis_type': 'shortest_path', 'status': 'failed'}
`;
}
// Skim Matrix Creation
if (request.includes('skim matrix') || request.includes('travel time matrix') || request.includes('distance matrix') || request.includes('cost matrix')) {
return `
# Skim Matrix Creation
try:
print("Creating Skim Matrices...")
# Travel Time Skim Matrix
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('MatrixNumber', 901)
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('Operation', 'Set')
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('MatrixName', 'TravelTime_Skim')
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('ImpedanceAttribute', 'tCur_PrTSys(c)')
visum.Procedures.Functions.PrTCreateSkimMatrix.Execute()
# Distance Skim Matrix
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('MatrixNumber', 902)
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('MatrixName', 'Distance_Skim')
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('ImpedanceAttribute', 'Distance')
visum.Procedures.Functions.PrTCreateSkimMatrix.Execute()
# Generalized Cost Skim Matrix
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('MatrixNumber', 903)
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('MatrixName', 'GenCost_Skim')
visum.Procedures.Functions.PrTCreateSkimMatrix.SetAttValue('ImpedanceAttribute', 'ImpPrT(c)')
visum.Procedures.Functions.PrTCreateSkimMatrix.Execute()
# Get matrix statistics
tt_matrix = visum.Net.Matrices.ItemByKey(901)
dist_matrix = visum.Net.Matrices.ItemByKey(902)
cost_matrix = visum.Net.Matrices.ItemByKey(903)
result = {
'analysis_type': 'skim_matrices',
'status': 'completed',
'matrices_created': {
'travel_time': {
'matrix_number': 901,
'average_value': round(tt_matrix.GetAttValue('AvgValue'), 2),
'max_value': round(tt_matrix.GetAttValue('MaxValue'), 2)
},
'distance': {
'matrix_number': 902,
'average_value': round(dist_matrix.GetAttValue('AvgValue'), 2),
'max_value': round(dist_matrix.GetAttValue('MaxValue'), 2)
},
'generalized_cost': {
'matrix_number': 903,
'average_value': round(cost_matrix.GetAttValue('AvgValue'), 2),
'max_value': round(cost_matrix.GetAttValue('MaxValue'), 2)
}
}
}
except Exception as e:
result = {'error': str(e), 'analysis_type': 'skim_matrices', 'status': 'failed'}
`;
}
// Demand Segments Analysis
if (request.includes('demand segment') || request.includes('segmenti domanda') || request.includes('segments analysis') ||
request.includes('demand configuration') || request.includes('matrix segments') || request.includes('segmenti matrice')) {
return `
# Demand Segments Analysis
try:
print("Analyzing Demand Segments configuration...")
# Get all demand segments
all_segments = []
segment_details = {}
try:
segments_list = list(visum.Net.DemandSegments)
for seg in segments_list:
segment_info = {
'code': seg.GetAttValue('Code'),
'name': seg.GetAttValue('Name') if hasattr(seg, 'GetAttValue') else 'N/A',
'mode': seg.GetAttValue('Mode') if hasattr(seg, 'GetAttValue') else 'Unknown',
'demand_matrices': []
}
# Get associated matrices for this segment
try:
matrices = seg.GetAttValue('DemandMatrixNumbers') if hasattr(seg, 'GetAttValue') else []
if matrices:
segment_info['demand_matrices'] = matrices
except:
segment_info['demand_matrices'] = ['Matrix info not available']
all_segments.append(segment_info)
# Categorize segments by mode
prt_segments = [seg for seg in all_segments if 'P' in seg.get('mode', '') or 'PrT' in seg.get('code', '')]
put_segments = [seg for seg in all_segments if 'Pu' in seg.get('mode', '') or 'PuT' in seg.get('code', '')]
other_segments = [seg for seg in all_segments if seg not in prt_segments and seg not in put_segments]
segment_details = {
'total_segments': len(all_segments),
'prt_segments': prt_segments,
'put_segments': put_segments,
'other_segments': other_segments,
'segments_by_mode': {
'private_transport': len(prt_segments),
'public_transport': len(put_segments),
'other': len(other_segments)
}
}
except Exception as e:
print(f"Warning: Could not analyze demand segments: {e}")
segment_details = {
'error': str(e),
'note': 'Could not access demand segments - check project configuration'
}
# Check matrix configuration
matrix_info = {}
try:
all_matrices = list(visum.Net.Matrices)
matrix_count = len(all_matrices)
# Sample first few matrices
sample_matrices = []
for i, matrix in enumerate(all_matrices[:10]): # First 10 matrices
matrix_data = {
'number': matrix.GetAttValue('No'),
'name': matrix.GetAttValue('Name') if hasattr(matrix, 'GetAttValue') else f'Matrix_{i}',
'type': matrix.GetAttValue('MatrixType') if hasattr(matrix, 'GetAttValue') else 'Unknown'
}
sample_matrices.append(matrix_data)
matrix_info = {
'total_matrices': matrix_count,
'sample_matrices': sample_matrices
}
except Exception as e:
matrix_info = {'error': f'Could not analyze matrices: {e}'}
result = {
'analysis_type': 'demand_segments',
'status': 'completed',
'segment_configuration': segment_details,
'matrix_information': matrix_info,
'recommendations': {
'assignment_ready': len(segment_details.get('prt_segments', [])) > 0,
'suggested_segments': [seg['code'] for seg in segment_details.get('prt_segments', [])[:3]],
'configuration_notes': [
'PrT segments found' if len(segment_details.get('prt_segments', [])) > 0 else 'No PrT segments detected',
'Multiple segments available' if segment_details.get('total_segments', 0) > 1 else 'Limited segments found',
'Matrix configuration seems valid' if not matrix_info.get('error') else 'Matrix configuration may need review'
]
}
}
except Exception as e:
result = {'error': str(e), 'analysis_type': 'demand_segments', 'status': 'failed'}
`;
}
// Volume Delay Function Analysis
if (request.includes('vdf analysis') || request.includes('volume delay function') || request.includes('impedance function') ||
request.includes('funzione impedenza') || request.includes('funzione ritardo') || request.includes('congestion function')) {
return `
# Volume Delay Function (VDF) Analysis
try:
print("Analyzing Volume Delay Functions and impedance configuration...")
# Get VDF configuration from links
vdf_analysis = {}
# Analyze VDF formulas used in the network
link_vdf_formulas = visum.Net.Links.GetMultiAttValues(['No', 'VolCapFormula', 'FreeFlowSpeed', 'VolCapPrT'])
vdf_types = {}
sample_links = []
for i in range(min(50, len(link_vdf_formulas[0]))):
link_no = link_vdf_formulas[0][i]
formula = link_vdf_formulas[1][i]
free_speed = link_vdf_formulas[2][i]
capacity = link_vdf_formulas[3][i]
# Count VDF types
vdf_types[formula] = vdf_types.get(formula, 0) + 1
# Sample links with different VDF types
if len(sample_links) < 20:
sample_links.append({
'link_no': link_no,
'vdf_formula': formula,
'free_flow_speed': free_speed,
'capacity': capacity
})
# Get VDF parameters if available
vdf_parameters = {}
try:
# Try to access VDF parameter settings from General Procedure Settings
# These may be stored in different places depending on Visum version
vdf_parameters['bpr_alpha'] = visum.Net.NetPara.GetAttValue('BPRAlpha') if hasattr(visum.Net.NetPara, 'GetAttValue') else 'N/A'
vdf_parameters['bpr_beta'] = visum.Net.NetPara.GetAttValue('BPRBeta') if hasattr(visum.Net.NetPara, 'GetAttValue') else 'N/A'
except:
vdf_parameters = {'note': 'VDF parameters configured in General Procedure Settings'}
result = {
'analysis_type': 'vdf_analysis',
'status': 'completed',
'vdf_distribution': vdf_types,
'total_links_analyzed': len(link_vdf_formulas[0]),
'vdf_parameters': vdf_parameters,
'sample_links': sample_links,
'supported_vdf_types': [
'BPR (Bureau of Public Roads)',
'Davidson Function',
'Akcelik Function',
'Custom VDF',
'Conical Function',
'Polynomial Function'
]
}
except Exception as e:
result = {'error': str(e), 'analysis_type': 'vdf_analysis', 'status': 'failed'}
`;
}
// Procedure Sequence Management
if (request.includes('procedure sequence') || request.includes('sequence management') || request.includes('procedura sequenza') ||
request.includes('check sequence') || request.includes('execute sequence') || request.includes('run sequence')) {
return `
# Procedure Sequence Management
try:
print("Managing Procedure Sequence...")
# Get current procedure sequence
procedure_sequence = visum.Procedures.ProcedureSequence
sequence_count = procedure_sequence.Count
# List all procedures in sequence
sequence_procedures = []
for i in range(sequence_count):
try:
proc_item = procedure_sequence.Item(i)
proc_info = {
'line_number': i + 1,
'procedure_name': proc_item.GetAttValue('ProcedureName') if hasattr(proc_item, 'GetAttValue') else f'Procedure_{i+1}',
'procedure_type': str(type(proc_item).__name__),
'is_enabled': proc_item.GetAttValue('Enabled') if hasattr(proc_item, 'GetAttValue') else True
}
sequence_procedures.append(proc_info)
except Exception as e:
sequence_procedures.append({
'line_number': i + 1,
'procedure_name': f'Unknown_Procedure_{i+1}',
'error': str(e)
})
# Check for assignment procedures
assignment_procedures = [proc for proc in sequence_procedures if 'Assignment' in proc.get('procedure_name', '')]
# Execution instructions
execution_instructions = {
'manual_execution': 'Go to Procedures > Procedure Sequence in Visum GUI',
'from_line_execution': f'Use "Execute from line X" to run specific procedures',
'full_sequence': 'Use "Execute All" to run entire sequence',
'selective_execution': 'Enable/disable procedures as needed before execution'
}
result = {
'analysis_type': 'procedure_sequence',
'status': 'completed',
'sequence_info': {
'total_procedures': sequence_count,
'procedures_list': sequence_procedures,
'assignment_procedures_found': len(assignment_procedures),
'assignment_procedures': assignment_procedures
},
'execution_options': execution_instructions,
'user_actions': {
'step_1': 'Review procedures in Visum GUI: Procedures > Procedure Sequence',
'step_2': 'Enable/disable procedures as needed',
'step_3': 'Execute selected procedures using GUI controls',
'step_4': 'Monitor execution progress in Visum'
}
}
except Exception as e:
result = {'error': str(e), 'analysis_type': 'procedure_sequence', 'status': 'failed'}
`;
}
// Critical Link Analysis (CLA) with Flow Bundle
if (request.includes('critical link') || request.includes('cla analysis') || request.includes('flow bundle') || request.includes('network vulnerability') || request.includes('bottleneck analysis')) {
return `
# Critical Link Analysis with Flow Bundle
try:
print("Performing Critical Link Analysis...")
# Execute Flow Bundle analysis
visum.Procedures.Functions.FlowBundle.SetAttValue('FlowBundleType', 'Volume')
visum.Procedures.Functions.FlowBundle.SetAttValue('MinVolume', 100) # Minimum volume threshold
visum.Procedures.Functions.FlowBundle.Execute()
# Analyze link criticality
critical_links = []
total_volume = sum(link.GetAttValue('VolPrT(AP)') for link in visum.Net.Links)
for link in visum.Net.Links:
volume = link.GetAttValue('VolPrT(AP)')
capacity = link.GetAttValue('VolCapPrT')
length = link.GetAttValue('Length')
if volume > 0 and capacity > 0:
vc_ratio = volume / capacity
volume_share = volume / total_volume if total_volume > 0 else 0
criticality_index = vc_ratio * volume_share * 100 # Combined criticality
if criticality_index > 0.1: # Threshold for critical links
critical_links.append({
'from_node': link.GetAttValue('FromNodeNo'),
'to_node': link.GetAttValue('ToNodeNo'),
'volume': round(volume, 0),
'capacity': round(capacity, 0),
'vc_ratio': round(vc_ratio, 3),
'length_km': round(length, 2),
'volume_share_pct': round(volume_share * 100, 2),
'criticality_index': round(criticality_index, 3)
})
# Sort by criticality
critical_links.sort(key=lambda x: x['criticality_index'], reverse=True)
# Network vulnerability metrics
high_vc_links = sum(1 for link in visum.Net.Links if link.GetAttValue('VolCapPrT') > 0 and link.GetAttValue('VolPrT(AP)') / link.GetAttValue('VolCapPrT') > 0.8)
overloaded_links = sum(1 for link in visum.Net.Links if link.GetAttValue('VolCapPrT') > 0 and link.GetAttValue('VolPrT(AP)') / link.GetAttValue('VolCapPrT') > 1.0)
result = {
'analysis_type': 'critical_link_analysis',
'status': 'completed',
'network_vulnerability': {
'total_links': visum.Net.Links.Count,
'critical_links_count': len(critical_links),
'high_vc_links': high_vc_links,
'overloaded_links': overloaded_links,
'vulnerability_ratio': round(len(critical_links) / visum.Net.Links.Count, 3)
},
'top_critical_links': critical_links[:10], # Top 10 most critical
'flow_concentration': {
'total_network_volume': round(total_volume, 0),
'top_10_links_volume': round(sum(link['volume'] for link in critical_links[:10]), 0)
}
}
except Exception as e:
result = {'error': str(e), 'analysis_type': 'critical_link_analysis', 'status': 'failed'}
`;
}
// ============= FASE 1: STATISTICHE BASE (EXISTING) =============
// Network Statistics
if (request.includes('statistic') || request.includes('network') || request.includes('count') || request.includes('summary')) {
return `
# Network Statistics Analysis
try:
num_nodes = visum.Net.Nodes.Count
num_links = visum.Net.Links.Count
num_zones = visum.Net.Zones.Count
num_connectors = getattr(visum.Net, 'Connectors', None)
connector_count = num_connectors.Count if num_connectors else 0
result = {
'network_summary': {
'nodes': num_nodes,
'links': num_links,
'zones': num_zones,
'connectors': connector_count
},
'network_density': round(num_links / max((num_nodes * (num_nodes - 1) / 2), 1), 6),
'avg_degree': round((num_links * 2) / max(num_nodes, 1), 2)
}
except Exception as e:
result = {'error': str(e)}
`;
}
// Node Analysis
if (request.includes('node') || request.includes('nod')) {
const sampleSize = returnFormat === 'detailed' ? '100' : '10';
return `
# Node Distribution Analysis
try:
nodes_data = visum.Net.Nodes.GetMultiAttValues(['No', 'XCoord', 'YCoord'])
total_nodes = len(nodes_data[0])
sample_size = min(${sampleSize}, total_nodes)
# Sample nodes
sample_nodes = []
for i in range(sample_size):
sample_nodes.append({
'id': nodes_data[0][i],
'x': nodes_data[1][i],
'y': nodes_data[2][i]
})
# Bounding box
x_coords = [x for x in nodes_data[1] if x is not None]
y_coords = [y for y in nodes_data[2] if y is not None]
result = {
'total_nodes': total_nodes,
'sample_nodes': sample_nodes,
'bounding_box': {
'min_x': min(x_coords) if x_coords else None,
'max_x': max(x_coords) if x_coords else None,
'min_y': min(y_coords) if y_coords else None,
'max_y': max(y_coords) if y_coords else None
} if x_coords and y_coords else None
}
except Exception as e:
result = {'error': str(e)}
`;
}
// Link Analysis
if (request.includes('link') || request.includes('edge') || request.includes('connection')) {
const sampleSize = returnFormat === 'detailed' ? '50' : '10';
return `
# Link Analysis
try:
links_data = visum.Net.Links.GetMultiAttValues(['No', 'FromNodeNo', 'ToNodeNo', 'Length'])
total_links = len(links_data[0])
sample_size = min(${sampleSize}, total_links)
sample_links = []
lengths = []
for i in range(sample_size):
length = links_data[3][i]
sample_links.append({
'id': links_data[0][i],
'from_node': links_data[1][i],
'to_node': links_data[2][i],
'length': length
})
if length is not None:
lengths.append(length)
result = {
'total_links': total_links,
'sample_links': sample_links,
'length_stats': {
'avg_length': round(sum(lengths) / len(lengths), 2) if lengths else None,
'min_length': min(lengths) if lengths else None,
'max_length': max(lengths) if lengths else None,
'total_length': round(sum(lengths), 2) if lengths else None
} if lengths else None
}
except Exception as e:
result = {'error': str(e)}
`;
}
// Zone Analysis
if (request.includes('zone') || request.includes('zon')) {
return `
# Zone Analysis
try:
zones_data = visum.Net.Zones.GetMultiAttValues(['No', 'XCoord', 'YCoord'])
total_zones = len(zones_data[0])
sample_size = min(20, total_zones)
sample_zones = []
for i in range(sample_size):
sample_zones.append({
'id': zones_data[0][i],
'x': zones_data[1][i],
'y': zones_data[2][i]
})
result = {
'total_zones': total_zones,
'sample_zones': sample_zones
}
except Exception as e:
result = {'error': str(e)}
`;
}
// Default: comprehensive analysis
return `
# Comprehensive Network Analysis
try:
# Basic counts
num_nodes = visum.Net.Nodes.Count
num_links = visum.Net.Links.Count
num_zones = visum.Net.Zones.Count
# Sample data
if num_nodes > 0:
nodes_sample = visum.Net.Nodes.GetMultiAttValues(['No', 'XCoord', 'YCoord'])
sample_nodes = [{
'id': nodes_sample[0][i],
'x': nodes_sample[1][i],
'y': nodes_sample[2][i]
} for i in range(min(5, len(nodes_sample[0])))]
else:
sample_nodes = [] {"jsonrpc": "2.0", "id": 3, "method": "tools/call", "params": {"name": "instance_cleanup", "arguments": {"force": true}}}
result = {
'analysis_type': 'comprehensive',
'request': '${sanitizedRequest}',
'network_summary': {
'nodes': num_nodes,
'links': num_links,
'zones': num_zones
},
'sample_data': {
'nodes': sample_nodes
}
}
except Exception as e:
result = {'error': str(e), 'analysis_type': 'failed', 'request': '${sanitizedRequest}'}
`;
}
// Execute Project Analysis Tool
server.tool(
"project_execute_analysis",
"Execute intelligent analysis on specific project instance with ultra-fast performance. Automatically generates Python code based on analysis request.",
{
projectId: z.string().describe("Project identifier to execute analysis on"),
analysisRequest: z.string().describe("Natural language description of the analysis you want to perform (e.g., 'get network statistics', 'analyze node distribution', 'check link lengths')"),
returnFormat: z.enum(["summary", "detailed", "raw"]).optional().default("summary").describe("Format of results: summary (key metrics), detailed (comprehensive), raw (full data)")
},
async ({ projectId, analysisRequest, returnFormat = "summary" }) => {
try {
// Generate appropriate Python code based on the analysis request
const analysisCode = generateAnalysisCode(analysisRequest, returnFormat);
const result = await projectManager.executeProjectAnalysis(projectId, analysisCode, analysisRequest);
if (result.success) {
return {
content: [
{
type: "text",
text: `π **Analisi Completata** (${result.projectInfo?.projectName})\n\nπ **Richiesta:** ${analysisRequest}\n\nβ‘ **Tempo esecuzione:** ${result.executionTimeMs}ms\n\nπ **Risultati:**\n\`\`\`json\n${JSON.stringify(result.result, null, 2)}\n\`\`\``
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Analisi**\n\n**Richiesta:** ${analysisRequest}\n**Errore:** ${result.error}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Get Instances Status Tool
server.tool(
"project_instances_status",
"Get status of all active project instances",
{},
async () => {
try {
const status = projectManager.getInstancesStatus();
const instanceCount = Object.keys(status).length;
if (instanceCount === 0) {
return {
content: [
{
type: "text",
text: `π **Status Istanze Progetto**\n\nβ Nessuna istanza attiva.`
}
]
};
}
let statusText = `π **Status Istanze Progetto** (${instanceCount} attive)\n\n`;
for (const [projectId, info] of Object.entries(status)) {
const uptime = Math.floor((info.uptime || 0) / 1000);
const lastUsed = info.lastUsed ? Math.floor((Date.now() - info.lastUsed) / 1000) : 'Mai';
statusText += `π§ **${info.name}**\n`;
statusText += ` β’ ID: ${projectId}\n`;
statusText += ` β’ Status: ${info.isActive ? 'β
Attiva' : 'β Inattiva'}\n`;
statusText += ` β’ Uptime: ${uptime}s\n`;
statusText += ` β’ Ultimo uso: ${lastUsed}s fa\n`;
statusText += ` β’ Network: ${info.stats?.nodes} nodi, ${info.stats?.links} link\n\n`;
}
return {
content: [
{
type: "text",
text: statusText
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Project Health Check Tool
server.tool(
"project_health_check",
"Check health of specific project instance",
{
projectId: z.string().describe("Project identifier to check health for")
},
async ({ projectId }) => {
try {
const result = await projectManager.checkProjectHealth(projectId);
if (result.success) {
const health = result.health;
const uptime = Math.floor((health.uptime || 0) / 1000);
return {
content: [
{
type: "text",
text: `π **Health Check - ${health.projectName}**\n\nβ
**Status:** Salutare\nπ **Uptime:** ${uptime}s\nβ‘ **Performance:** ${health.response_time_ms}ms\nπ **Memory Usage:** ${health.memory_mb}MB\nπ **Progetto Caricato:** ${health.project_loaded ? 'β
' : 'β'}\nπ **Network:** ${health.network_ready ? 'β
' : 'β'}`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Health Check Fallito**\n\n${result.error}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Shutdown Project Instance Tool
server.tool(
"project_shutdown_instance",
"Shutdown specific project instance",
{
projectId: z.string().describe("Project identifier to shutdown")
},
async ({ projectId }) => {
try {
const result = await projectManager.shutdownProjectInstance(projectId);
return {
content: [
{
type: "text",
text: result.success ?
`π **Istanza Terminata**\n\nβ
${result.message}` :
`β **Errore Terminazione**\n\n${result.message}`
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// =============================================================================
// PROJECT TCP SERVER MANAGEMENT TOOLS
// =============================================================================
// Instance Diagnosis and Repair Tool - ALWAYS RUN FIRST WHEN ERRORS OCCUR
server.tool(
"instance_diagnosis",
"π§ DIAGNOSTIC TOOL: Run this first when encountering errors. Diagnoses and repairs existing Visum instances instead of creating new ones.",
{},
async () => {
try {
console.error(`π§ INSTANCE_DIAGNOSIS CHIAMATO: ${new Date().toISOString()}`);
// Check all active instances and their health
const instancesStatus = projectManager.getInstancesStatus();
const activeProjects = serverManager.getActiveProjects();
const diagnosis = {
persistent_controller: { status: 'unknown', health: 'unknown' },
project_instances: Object.keys(instancesStatus).length,
tcp_servers: activeProjects.length,
issues_found: [] as string[],
repair_actions: [] as string[],
recommendations: [] as string[]
};
// Test persistent controller
try {
const healthResult = await visumController.checkInstanceHealth();
const statsResult = await visumController.getNetworkStats();
if (healthResult.success && statsResult.success) {
diagnosis.persistent_controller.status = 'healthy';
diagnosis.persistent_controller.health = 'good';
diagnosis.recommendations.push('β
Persistent controller is healthy - use visum_custom_analysis or visum_network_stats');
} else {
diagnosis.persistent_controller.status = 'unhealthy';
diagnosis.persistent_controller.health = 'poor';
diagnosis.issues_found.push('β Persistent controller not responding');
diagnosis.repair_actions.push('π§ Restart persistent controller recommended');
}
} catch (error) {
diagnosis.persistent_controller.status = 'error';
diagnosis.issues_found.push(`β Persistent controller error: ${error instanceof Error ? error.message : String(error)}`);
diagnosis.repair_actions.push('π§ Reinitialize persistent controller');
}
// Check project instances
for (const [projectId, info] of Object.entries(instancesStatus)) {
try {
const healthCheck = await projectManager.checkProjectHealth(projectId);
if (!healthCheck.success) {
diagnosis.issues_found.push(`β Project instance '${projectId}' unhealthy: ${healthCheck.error}`);
diagnosis.repair_actions.push(`π§ Consider shutting down and restarting instance '${projectId}'`);
}
} catch (error) {
diagnosis.issues_found.push(`β Cannot check instance '${projectId}': ${error instanceof Error ? error.message : String(error)}`);
diagnosis.repair_actions.push(`π§ Force shutdown instance '${projectId}' if necessary`);
}
}
// Check TCP servers
activeProjects.forEach(project => {
if (project.status !== 'active') {
diagnosis.issues_found.push(`β TCP server for '${project.projectName}' status: ${project.status}`);
diagnosis.repair_actions.push(`π§ Restart TCP server for project '${project.projectId}'`);
}
});
// Provide clear recommendations
if (diagnosis.issues_found.length === 0) {
diagnosis.recommendations.push('β
All systems healthy - proceed with normal operations');
diagnosis.recommendations.push('π‘ Use existing instances instead of creating new ones');
} else {
diagnosis.recommendations.push('β οΈ Issues found - repair before creating new instances');
diagnosis.recommendations.push('π οΈ Use repair actions listed above');
diagnosis.recommendations.push('π« AVOID creating new instances until issues are resolved');
}
return {
content: [
{
type: "text",
text: `π§ **Diagnosi Istanze Visum**\n\n` +
`**Controller Persistente:** ${diagnosis.persistent_controller.status}\n` +
`**Istanze Progetto:** ${diagnosis.project_instances}\n` +
`**Server TCP:** ${diagnosis.tcp_servers}\n\n` +
`**β Problemi Rilevati (${diagnosis.issues_found.length}):**\n` +
(diagnosis.issues_found.length > 0 ?
diagnosis.issues_found.map(issue => `β’ ${issue}`).join('\n') + '\n\n' :
'β’ Nessun problema rilevato\n\n') +
`**π§ Azioni Riparazione:**\n` +
(diagnosis.repair_actions.length > 0 ?
diagnosis.repair_actions.map(action => `β’ ${action}`).join('\n') + '\n\n' :
'β’ Nessuna riparazione necessaria\n\n') +
`**π‘ Raccomandazioni:**\n` +
diagnosis.recommendations.map(rec => `β’ ${rec}`).join('\n') + '\n\n' +
`**π¨ IMPORTANTE:** Prima di aprire nuove istanze, risolvi i problemi sopra elencati!`
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore Diagnosi:** ${error instanceof Error ? error.message : String(error)}\n\n` +
`**Suggerimento:** Prova a riavviare completamente il server MCP.`
}
]
};
}
}
);
// Instance Cleanup and Repair Tool
server.tool(
"instance_cleanup",
"π§Ή CLEANUP TOOL: Safely shutdown problematic instances and clean up resources. Use after instance_diagnosis identifies issues.",
{
force: z.boolean().optional().default(false).describe("Force cleanup even if instances appear healthy")
},
async ({ force = false }) => {
try {
console.error(`π§Ή INSTANCE_CLEANUP CHIAMATO: force=${force}, ${new Date().toISOString()}`);
const cleanupResults = {
instances_shutdown: 0,
tcp_servers_closed: 0,
errors_encountered: [] as string[],
actions_taken: [] as string[]
};
// Get current state
const instancesStatus = projectManager.getInstancesStatus();
const activeProjects = serverManager.getActiveProjects();
// Shutdown problematic project instances
for (const [projectId, info] of Object.entries(instancesStatus)) {
try {
const shouldCleanup = force || !info.isActive;
if (shouldCleanup) {
const shutdownResult = await projectManager.shutdownProjectInstance(projectId);
if (shutdownResult.success) {
cleanupResults.instances_shutdown++;
cleanupResults.actions_taken.push(`β
Shutdown project instance: ${projectId}`);
} else {
cleanupResults.errors_encountered.push(`β Failed to shutdown ${projectId}: ${shutdownResult.message}`);
}
}
} catch (error) {
cleanupResults.errors_encountered.push(`β Error shutting down ${projectId}: ${error instanceof Error ? error.message : String(error)}`);
}
}
// Close problematic TCP servers
for (const project of activeProjects) {
try {
const shouldClose = force || project.status !== 'active';
if (shouldClose) {
const closeResult = await serverManager.closeProject(project.projectId, false);
if (closeResult.success) {
cleanupResults.tcp_servers_closed++;
cleanupResults.actions_taken.push(`β
Closed TCP server: ${project.projectName}`);
} else {
cleanupResults.errors_encountered.push(`β Failed to close TCP server ${project.projectId}: ${closeResult.message}`);
}
}
} catch (error) {
cleanupResults.errors_encountered.push(`β Error closing TCP server ${project.projectId}: ${error instanceof Error ? error.message : String(error)}`);
}
}
// Reset persistent controller if forced or if there were issues
if (force) {
try {
// Note: We don't have a direct reset method, but we can check if it needs reinitialization
cleanupResults.actions_taken.push(`βΉοΈ Persistent controller status checked (reset not available)`);
} catch (error) {
cleanupResults.errors_encountered.push(`β Error checking persistent controller: ${error instanceof Error ? error.message : String(error)}`);
}
}
const success = cleanupResults.errors_encountered.length === 0;
const totalActions = cleanupResults.instances_shutdown + cleanupResults.tcp_servers_closed;
return {
content: [
{
type: "text",
text: `π§Ή **Pulizia Istanze Completata**\n\n` +
`**Risultati:**\n` +
`β’ Istanze chiuse: ${cleanupResults.instances_shutdown}\n` +
`β’ Server TCP chiusi: ${cleanupResults.tcp_servers_closed}\n` +
`β’ Errori riscontrati: ${cleanupResults.errors_encountered.length}\n\n` +
`**Azioni Eseguite:**\n` +
(cleanupResults.actions_taken.length > 0 ?
cleanupResults.actions_taken.map(action => `β’ ${action}`).join('\n') + '\n\n' :
'β’ Nessuna azione necessaria\n\n') +
(cleanupResults.errors_encountered.length > 0 ?
`**β Errori:**\n${cleanupResults.errors_encountered.map(err => `β’ ${err}`).join('\n')}\n\n` : '') +
`**Status:** ${success ? 'β
Pulizia completata con successo' : 'β οΈ Pulizia completata con alcuni errori'}\n\n` +
`**Prossimo passo:** ${totalActions > 0 ? 'Ora puoi procedere con operazioni normali' : 'Nessuna pulizia necessaria'}`
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore durante pulizia:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// =============================================================================
// VISUM PROCEDURES CREATION TOOLS
// =============================================================================
// Create Visum Procedure Tool
server.tool(
"visum_create_procedure",
"π― Create a Visum procedure (PrT Assignment, PuT Assignment, etc.) using the verified Visum API. This tool uses the correct visum.Procedures.Operations.AddOperation() method discovered on 2025-10-10.",
{
projectId: z.string().describe("Project ID of the active Visum project"),
procedureType: z.enum(["PrT_Assignment", "PuT_Assignment", "Demand_Model", "Matrix_Calculation"]).describe("Type of procedure to create"),
position: z.number().optional().describe("Position where to insert the procedure (1-20, default: 20 = append at end)"),
parameters: z.record(z.any()).optional().describe("Optional parameters to configure the procedure")
},
async ({ projectId, procedureType, position = 20, parameters = {} }) => {
try {
// Map procedure types to OPERATIONTYPE codes
const operationTypeCodes: Record<string, number> = {
"PrT_Assignment": 101, // OperationTypeAssignmentPrT
"PuT_Assignment": 100, // OperationTypeAssignmentPuT (was 102 - FIXED!)
"Demand_Model": 103, // OperationTypeCalculateSkimMatrixPrT
"Matrix_Calculation": 104
};
const operationCode = operationTypeCodes[procedureType];
// Check if we need to add Delete Assignment Results before PrT/PuT Assignment
const needsDeleteBefore = procedureType === "PrT_Assignment" || procedureType === "PuT_Assignment";
const deleteOperationType = 9; // OperationTypeInitAssignment - deletes previous assignment results
// Generate Python code to create the procedure
const pythonCode = `
try:
operations_container = visum.Procedures.Operations
# Step 0: Find or create "Visum-BOT" group
print("Searching for Visum-BOT group...")
visum_bot_group = None
# Search for existing Visum-BOT group by name
all_ops = list(operations_container.GetAll)
for op in all_ops:
try:
op_type = op.AttValue("OPERATIONTYPE")
if op_type == 75: # Group type
group_params = op.GroupParameters
group_name = group_params.AttValue("Name")
if group_name == "Visum-BOT":
visum_bot_group = op
group_pos = op.AttValue("NO")
print(f"Found existing Visum-BOT group at position {group_pos}")
break
except:
continue
# Create group if not found
if visum_bot_group is None:
print("Creating new Visum-BOT group...")
# Count top-level operations only
top_level_ops = operations_container.GetChildren()
top_level_count = len(list(top_level_ops)) if top_level_ops else 0
visum_bot_group = operations_container.AddOperation(top_level_count + 1)
visum_bot_group.SetAttValue("OPERATIONTYPE", 75) # Group type
# Set group name via GroupParameters
group_params = visum_bot_group.GroupParameters
group_params.SetAttValue("Name", "Visum-BOT")
group_pos = visum_bot_group.AttValue("NO")
print(f"Visum-BOT group created at position {group_pos}")
# Count existing operations in the Visum-BOT group
group_children = operations_container.GetChildren(visum_bot_group)
group_children_count = len(list(group_children)) if group_children else 0
print(f"Visum-BOT group currently has {group_children_count} operations")
${needsDeleteBefore ? `
# Step 1: Create "Initialize Assignment" operation at END of Visum-BOT group
print("Creating Initialize Assignment (DELETE) at end of Visum-BOT group...")
delete_rel_pos = group_children_count + 1
delete_op = operations_container.AddOperation(delete_rel_pos, visum_bot_group)
delete_op.SetAttValue("OPERATIONTYPE", ${deleteOperationType})
delete_position = delete_op.AttValue("NO")
print(f"Initialize Assignment created at position {delete_position} (relative pos {delete_rel_pos} in group)")
` : ''}
# Step 2: Create the assignment operation at END of Visum-BOT group
print(f"Creating ${procedureType} at end of Visum-BOT group...")
# Next position after delete (or after existing operations if no delete)
assignment_rel_pos = group_children_count + ${needsDeleteBefore ? '2' : '1'}
new_op = operations_container.AddOperation(assignment_rel_pos, visum_bot_group)
# Set operation type
new_op.SetAttValue("OPERATIONTYPE", ${operationCode})
# Get ACTUAL position from the operation itself (not from count!)
actual_position = new_op.AttValue("NO")
# VERIFY: Read back the operation type to confirm
verify_type = new_op.AttValue("OPERATIONTYPE")
print(f"${procedureType} operation created at position {actual_position}")
print(f"Verified type code: {verify_type} (expected: ${operationCode})")
# Configure parameters if provided
params_configured = []
${Object.entries(parameters).length > 0 ? `
try:
# Access specific parameters object based on type
${procedureType === 'PrT_Assignment' ? `
params = new_op.PrTAssignmentParameters
eq_params = new_op.PrTEquilibriumAssignmentParameters
# Configure equilibrium parameters
${parameters.numIterations ? `eq_params.SetAttValue("NUMITER", ${parameters.numIterations})
params_configured.append("NUMITER=${parameters.numIterations}")` : ''}
${parameters.precisionDemand ? `eq_params.SetAttValue("PRECISIONDEMAND", ${parameters.precisionDemand})
params_configured.append("PRECISIONDEMAND=${parameters.precisionDemand}")` : ''}
` : ''}
print(f" Parameters configured: {params_configured}")
except Exception as e:
print(f" Warning: Could not configure all parameters: {e}")
` : ''}
# Verify creation using actual position
created_op = visum.Procedures.Operations.ItemByKey(actual_position)
operation_type = created_op.AttValue("OPERATIONTYPE")
result = {
"status": "success",
"procedure_type": "${procedureType}",
"operation_code": ${operationCode},
"requested_position": ${position},
"actual_position": actual_position,
"group_position": group_pos,
"group_name": "Visum-BOT",
${needsDeleteBefore ? `"delete_position": delete_position,` : ''}
"parameters_configured": params_configured,
"verified": operation_type == ${operationCode},
${needsDeleteBefore ?
`"message": f"Visum-BOT group at position {group_pos}. Delete operation at {delete_position}, ${procedureType} at {actual_position} (both inside group)"` :
`"message": f"${procedureType} created at position {actual_position} inside Visum-BOT group (position {group_pos})"`
}
}
except Exception as e:
import traceback
result = {
"status": "error",
"error": str(e),
"traceback": traceback.format_exc(),
"procedure_type": "${procedureType}",
"attempted_position": ${position}
}
`;
// Execute via project_execute
const result = await serverManager.executeCommand(projectId, pythonCode, `Create ${procedureType} procedure`);
if (result.success && result.result?.status === 'success') {
const hasDelete = result.result.delete_position !== undefined;
return {
content: [
{
type: "text",
text: `β
**Procedura Visum Creata nel Gruppo "${result.result.group_name}"**\n\n` +
`π¦ **Gruppo:** ${result.result.group_name}\n` +
` β’ Posizione gruppo: ${result.result.group_position}\n\n` +
(hasDelete ?
`ποΈ **Delete Assignment Results:**\n` +
` β’ Posizione: ${result.result.delete_position}\n` +
` β’ Tipo: Initialize Assignment (code 9)\n` +
` β’ Dentro gruppo: ${result.result.group_name}\n\n` : '') +
`β
**${procedureType}:**\n` +
` β’ Posizione: ${result.result.actual_position}\n` +
` β’ Tipo: ${procedureType.replace('_', ' ')} (code ${result.result.operation_code})\n` +
` β’ Dentro gruppo: ${result.result.group_name}\n` +
` β’ Verificata: ${result.result.verified ? 'β
' : 'β'}\n\n` +
(result.result.parameters_configured.length > 0 ?
`**Parametri Configurati:**\n${result.result.parameters_configured.map((p: string) => `β’ ${p}`).join('\n')}\n\n` : '') +
`β±οΈ **Tempo esecuzione:** ${result.executionTimeMs}ms\n\n` +
`β οΈ **IMPORTANTE:**\n` +
`β’ Tutte le operazioni sono nel gruppo **${result.result.group_name}** (posizione ${result.result.group_position})\n` +
(hasDelete ?
`β’ Delete: posizione **${result.result.delete_position}**\n` +
`β’ Assignment: posizione **${result.result.actual_position}**\n` +
`β’ Usa posizione **${result.result.actual_position}** per configurare DSEGSET!\n\n` :
`β’ Usa posizione **${result.result.actual_position}** per configurare questa procedura!\n\n`) +
`π‘ **Suggerimento:** Tutte le operazioni MCP sono organizzate nel gruppo "${result.result.group_name}" per facile gestione!`
}
]
};
} else {
const errorMsg = result.result?.error || result.error || 'Unknown error';
return {
content: [
{
type: "text",
text: `β **Errore Creazione Procedura**\n\n` +
`**Tipo richiesto:** ${procedureType}\n` +
`**Posizione:** ${position}\n` +
`**Errore:** ${errorMsg}\n\n` +
(result.result?.traceback ? `**Traceback:**\n\`\`\`\n${result.result.traceback}\n\`\`\`\n\n` : '') +
`π‘ **Suggerimento:** Verifica che la posizione sia valida (1-20) e che il progetto sia caricato correttamente.`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}\n\n` +
`Consulta VISUM_PROCEDURES_API.md per la documentazione completa.`
}
]
};
}
}
);
// List Demand Segments Tool
server.tool(
"visum_list_demand_segments",
"π List all available demand segments for PrT (Private Transport) modes in the loaded Visum project. Use this before configuring DSEGSET on a procedure.",
{
projectId: z.string().describe("Project ID of the active Visum project"),
filterMode: z.string().optional().describe("Optional: Filter by mode code (e.g., 'C' for Car, 'H' for HGV)")
},
async ({ projectId, filterMode }) => {
try {
const pythonCode = `
try:
import sys
# Find all PrT Transport Systems
all_tsys = visum.Net.TSystems.GetAll
prt_tsys = []
for tsys in all_tsys:
code = tsys.AttValue("CODE")
name = tsys.AttValue("NAME")
tsys_type = tsys.AttValue("TYPE")
if tsys_type == "PRT":
prt_tsys.append({"code": code, "name": name})
# Find corresponding Modes
all_modes = visum.Net.Modes.GetAll
prt_mode_codes = []
mode_mapping = {}
for mode in all_modes:
mode_code = mode.AttValue("CODE")
mode_name = mode.AttValue("NAME")
for tsys in prt_tsys:
if mode_name.upper() == tsys["name"].upper():
prt_mode_codes.append(mode_code)
mode_mapping[mode_code] = {
"mode_name": mode_name,
"tsys_code": tsys["code"]
}
break
# Collect demand segments by mode
all_segments = visum.Net.DemandSegments.GetAll
segments_by_mode = {}
all_prt_segments = []
for seg in all_segments:
seg_code = seg.AttValue("CODE")
seg_mode = seg.AttValue("MODE")
if seg_mode in prt_mode_codes:
if seg_mode not in segments_by_mode:
segments_by_mode[seg_mode] = []
segments_by_mode[seg_mode].append(seg_code)
all_prt_segments.append(seg_code)
${filterMode ? `
# Filter by specific mode
if "${filterMode}" in segments_by_mode:
filtered_segments = segments_by_mode["${filterMode}"]
dsegset = ",".join(filtered_segments)
else:
filtered_segments = []
dsegset = ""
result = {
"status": "success",
"filter_mode": "${filterMode}",
"segments": filtered_segments,
"dsegset": dsegset,
"total": len(filtered_segments),
"all_modes": list(segments_by_mode.keys())
}
` : `
# Return all PrT segments with numbering
dsegset = ",".join(all_prt_segments)
# Create numbered list for user selection
numbered_segments = []
idx = 1
for mode_code in sorted(segments_by_mode.keys()):
for seg in segments_by_mode[mode_code]:
numbered_segments.append({
"number": idx,
"code": seg,
"mode": mode_code
})
idx += 1
result = {
"status": "success",
"prt_tsys": prt_tsys,
"mode_mapping": mode_mapping,
"segments_by_mode": segments_by_mode,
"numbered_segments": numbered_segments,
"dsegset": dsegset,
"total": len(all_prt_segments),
"modes_available": list(segments_by_mode.keys())
}
`}
except Exception as e:
import traceback
result = {
"status": "error",
"error": str(e),
"traceback": traceback.format_exc()
}
`;
const result = await serverManager.executeCommand(projectId, pythonCode, "List PrT demand segments");
if (result.success && result.result?.status === 'success') {
const res = result.result;
if (filterMode) {
return {
content: [
{
type: "text",
text: `π **Demand Segments - Mode ${filterMode}**\n\n` +
`**Segments trovati:** ${res.total}\n` +
`**Segments:**\n${res.segments.map((s: string) => `β’ ${s}`).join('\n')}\n\n` +
`**DSEGSET string:**\n\`\`\`\n${res.dsegset}\n\`\`\`\n\n` +
`**Modi disponibili:** ${res.all_modes.join(', ')}\n\n` +
`π‘ **Prossimo passo:** Usa \`visum_configure_dsegset\` per applicare questi segments alla procedura`
}
]
};
} else {
// Create numbered list grouped by mode
let numberedList = '';
for (const [mode, segments] of Object.entries(res.segments_by_mode)) {
const modeInfo = res.mode_mapping[mode];
numberedList += `\n**Mode ${mode}** (${modeInfo.mode_name} β TSys ${modeInfo.tsys_code}):\n`;
const modeSegments = res.numbered_segments.filter((s: any) => s.mode === mode);
numberedList += modeSegments.map((s: any) => ` ${s.number}. ${s.code}`).join('\n') + '\n';
}
return {
content: [
{
type: "text",
text: `π **Demand Segments PrT Disponibili**\n\n` +
`**Transport Systems PrT:** ${res.prt_tsys.length}\n` +
`${res.prt_tsys.map((t: any) => `β’ ${t.code}: ${t.name}`).join('\n')}\n\n` +
`**Modi PrT:** ${res.modes_available.join(', ')}\n` +
`**Totale segments:** ${res.total}\n` +
numberedList + '\n' +
`**DSEGSET completo (tutti i ${res.total} segments):**\n\`\`\`\n${res.dsegset}\n\`\`\`\n\n` +
`π‘ **Come procedere:**\n\n` +
`**Opzione 1 - Tutti i segments:**\n` +
`Rispondi: "Usa tutti" o "tutti"\n\n` +
`**Opzione 2 - Solo un modo:**\n` +
`Rispondi: "Solo C" o "Solo H"\n\n` +
`**Opzione 3 - Selezione personalizzata:**\n` +
`Rispondi con i numeri: "1,2,3,5,7" o "1-10,15,20"\n\n` +
`**Opzione 4 - Copia manuale:**\n` +
`Copia i codici segments desiderati dalla lista sopra`
}
]
};
}
} else {
return {
content: [
{
type: "text",
text: `β **Errore nel listare demand segments**\n\n${result.result?.error || result.error}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Configure DSEGSET Tool
server.tool(
"visum_configure_dsegset",
"βοΈ Configure demand segments (DSEGSET) on a PrT Assignment procedure. Use visum_list_demand_segments first to see available segments. Accepts segment codes OR numbers from the numbered list.",
{
projectId: z.string().describe("Project ID of the active Visum project"),
procedurePosition: z.number().describe("Position of the PrT Assignment procedure to configure (typically 20)"),
dsegset: z.string().optional().describe("Comma-separated list of demand segment codes (e.g., 'C_CORRETTA_AM,C_CORRETTA_IP1,...') OR 'ALL' for all segments"),
segmentNumbers: z.string().optional().describe("Alternative: comma-separated numbers from visum_list_demand_segments (e.g., '1,2,3,5-10')"),
filterMode: z.string().optional().describe("Alternative: mode code to use all segments from that mode (e.g., 'C', 'H')"),
additionalParams: z.record(z.any()).optional().describe("Optional additional parameters (NUMITER, PRECISIONDEMAND, etc.)")
},
async ({ projectId, procedurePosition, dsegset, segmentNumbers, filterMode, additionalParams = {} }) => {
try {
// First, resolve the DSEGSET based on input type
let resolvedDsegset = '';
let segmentCount = 0;
if (segmentNumbers) {
// User provided numbers - need to fetch segments and resolve
const listPythonCode = `
try:
# Get all PrT segments with numbers
all_tsys = visum.Net.TSystems.GetAll
prt_tsys = []
for tsys in all_tsys:
if tsys.AttValue("TYPE") == "PRT":
prt_tsys.append({"code": tsys.AttValue("CODE"), "name": tsys.AttValue("NAME")})
all_modes = visum.Net.Modes.GetAll
prt_mode_codes = []
for mode in all_modes:
for tsys in prt_tsys:
if mode.AttValue("NAME").upper() == tsys["name"].upper():
prt_mode_codes.append(mode.AttValue("CODE"))
break
all_segments = visum.Net.DemandSegments.GetAll
segments_by_mode = {}
all_prt_segments = []
for seg in all_segments:
seg_code = seg.AttValue("CODE")
seg_mode = seg.AttValue("MODE")
if seg_mode in prt_mode_codes:
if seg_mode not in segments_by_mode:
segments_by_mode[seg_mode] = []
segments_by_mode[seg_mode].append(seg_code)
all_prt_segments.append(seg_code)
# Create numbered list
numbered_segments = []
idx = 1
for mode_code in sorted(segments_by_mode.keys()):
for seg in segments_by_mode[mode_code]:
numbered_segments.append({"number": idx, "code": seg})
idx += 1
result = {"status": "success", "numbered_segments": numbered_segments, "all_segments": all_prt_segments}
except Exception as e:
result = {"status": "error", "error": str(e)}
`;
const listResult = await serverManager.executeCommand(projectId, listPythonCode, "Get numbered segments");
if (!listResult.success || listResult.result?.status !== 'success') {
throw new Error(`Failed to resolve segment numbers: ${listResult.result?.error || listResult.error}`);
}
// Parse segment numbers (supports "1,2,3" and "1-5" notation)
const numberedSegs = listResult.result.numbered_segments;
const selectedNumbers: number[] = [];
segmentNumbers.split(',').forEach(part => {
part = part.trim();
if (part.includes('-')) {
const [start, end] = part.split('-').map(n => parseInt(n.trim()));
for (let i = start; i <= end; i++) {
selectedNumbers.push(i);
}
} else {
selectedNumbers.push(parseInt(part));
}
});
// Get segment codes for selected numbers
const selectedCodes = numberedSegs
.filter((s: any) => selectedNumbers.includes(s.number))
.map((s: any) => s.code);
resolvedDsegset = selectedCodes.join(',');
segmentCount = selectedCodes.length;
} else if (filterMode) {
// User wants all segments from a specific mode
const listPythonCode = `
try:
all_tsys = visum.Net.TSystems.GetAll
prt_tsys = []
for tsys in all_tsys:
if tsys.AttValue("TYPE") == "PRT":
prt_tsys.append({"code": tsys.AttValue("CODE"), "name": tsys.AttValue("NAME")})
all_modes = visum.Net.Modes.GetAll
prt_mode_codes = []
for mode in all_modes:
for tsys in prt_tsys:
if mode.AttValue("NAME").upper() == tsys["name"].upper():
prt_mode_codes.append(mode.AttValue("CODE"))
break
all_segments = visum.Net.DemandSegments.GetAll
mode_segments = []
for seg in all_segments:
seg_code = seg.AttValue("CODE")
seg_mode = seg.AttValue("MODE")
if seg_mode == "${filterMode}":
mode_segments.append(seg_code)
result = {"status": "success", "segments": mode_segments}
except Exception as e:
result = {"status": "error", "error": str(e)}
`;
const listResult = await serverManager.executeCommand(projectId, listPythonCode, "Get mode segments");
if (!listResult.success || listResult.result?.status !== 'success') {
throw new Error(`Failed to get segments for mode ${filterMode}: ${listResult.result?.error || listResult.error}`);
}
resolvedDsegset = listResult.result.segments.join(',');
segmentCount = listResult.result.segments.length;
} else if (dsegset === 'ALL' || dsegset === 'all' || dsegset === 'tutti') {
// User wants all segments
const listPythonCode = `
try:
all_tsys = visum.Net.TSystems.GetAll
prt_tsys = []
for tsys in all_tsys:
if tsys.AttValue("TYPE") == "PRT":
prt_tsys.append({"code": tsys.AttValue("CODE")})
all_modes = visum.Net.Modes.GetAll
prt_mode_codes = []
for mode in all_modes:
for tsys in prt_tsys:
if mode.AttValue("NAME").upper() == tsys["name"].upper():
prt_mode_codes.append(mode.AttValue("CODE"))
break
all_segments = visum.Net.DemandSegments.GetAll
all_prt_segments = []
for seg in all_segments:
if seg.AttValue("MODE") in prt_mode_codes:
all_prt_segments.append(seg.AttValue("CODE"))
result = {"status": "success", "segments": all_prt_segments}
except Exception as e:
result = {"status": "error", "error": str(e)}
`;
const listResult = await serverManager.executeCommand(projectId, listPythonCode, "Get all segments");
if (!listResult.success || listResult.result?.status !== 'success') {
throw new Error(`Failed to get all segments: ${listResult.result?.error || listResult.error}`);
}
resolvedDsegset = listResult.result.segments.join(',');
segmentCount = listResult.result.segments.length;
} else if (dsegset) {
// User provided explicit segment codes
resolvedDsegset = dsegset;
segmentCount = dsegset.split(',').length;
} else {
throw new Error("Must provide either 'dsegset', 'segmentNumbers', 'filterMode', or dsegset='ALL'");
}
// Now configure the procedure with the resolved DSEGSET
const pythonCode = `
try:
# Access the procedure operation
operation = visum.Procedures.Operations.ItemByKey(${procedurePosition})
# Verify it's a PrT Assignment
op_type = operation.AttValue("OPERATIONTYPE")
if op_type != 101:
raise Exception(f"Operation at position ${procedurePosition} is not a PrT Assignment (type {op_type}, expected 101)")
# Access PrT Assignment parameters
params = operation.PrTAssignmentParameters
# Configure DSEGSET
dsegset_value = """${resolvedDsegset}"""
segment_count = ${segmentCount}
print(f"Configuring DSEGSET with {segment_count} segments...")
params.SetAttValue("DSEGSET", dsegset_value)
print(f"DSEGSET configured successfully")
# Configure additional parameters
params_configured = ["DSEGSET"]
${Object.entries(additionalParams).map(([key, value]) => `
try:
${key === 'NUMITER' || key === 'PRECISIONDEMAND' ?
`eq_params = operation.PrTEquilibriumAssignmentParameters
eq_params.SetAttValue("${key}", ${typeof value === 'string' ? `"${value}"` : value})` :
`params.SetAttValue("${key}", ${typeof value === 'string' ? `"${value}"` : value})`
}
params_configured.append("${key}=${value}")
print(f"Parameter ${key} set to ${value}")
except Exception as e:
print(f"Warning: Could not set ${key}: {e}")
`).join('\n')}
# Verify configuration
try:
configured_dsegset = params.AttValue("DSEGSET")
verified = configured_dsegset == dsegset_value
except:
verified = False
configured_dsegset = "Could not verify"
result = {
"status": "success",
"procedure_position": ${procedurePosition},
"segments_configured": segment_count,
"dsegset_length": len(dsegset_value),
"parameters_set": params_configured,
"verified": verified,
"message": f"DSEGSET configured with {segment_count} demand segments"
}
except Exception as e:
import traceback
result = {
"status": "error",
"error": str(e),
"traceback": traceback.format_exc(),
"procedure_position": ${procedurePosition}
}
`;
const result = await serverManager.executeCommand(projectId, pythonCode, "Configure DSEGSET on PrT Assignment");
if (result.success && result.result?.status === 'success') {
const res = result.result;
return {
content: [
{
type: "text",
text: `β
**DSEGSET Configurato**\n\n` +
`**Procedura:** Posizione ${res.procedure_position}\n` +
`**Segments configurati:** ${res.segments_configured}\n` +
`**Lunghezza DSEGSET:** ${res.dsegset_length} caratteri\n` +
`**Verificato:** ${res.verified ? 'β
SΓ¬' : 'β οΈ Non verificato'}\n\n` +
`**Parametri configurati:**\n${res.parameters_set.map((p: string) => `β’ ${p}`).join('\n')}\n\n` +
`**Messaggio:** ${res.message}\n\n` +
`β±οΈ **Tempo esecuzione:** ${result.executionTimeMs}ms\n\n` +
`π **La procedura Γ¨ ora pronta per l'esecuzione!**\n` +
`Vai in Visum β Procedures β Operations β Posizione ${res.procedure_position} per eseguire.`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Configurazione DSEGSET**\n\n${result.result?.error || result.error}\n\n` +
(result.result?.traceback ? `**Traceback:**\n\`\`\`\n${result.result.traceback}\n\`\`\`\n\n` : '') +
`π‘ Verifica che la procedura alla posizione ${procedurePosition} sia un PrT Assignment (tipo 101)`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Check Assignment Tool - Verifica se un'assegnazione Γ¨ stata eseguita
server.tool(
"visum_check_assignment",
"π Verify if a PrT assignment has been executed successfully. Checks for volume data on links. Returns total volume, number of links with traffic, and confirms assignment completion.",
{
projectId: z.string().describe("Project ID of the active Visum project"),
analysisPeriod: z.string().optional().default("AP").describe("Analysis period code (default: 'AP')")
},
async ({ projectId, analysisPeriod = "AP" }) => {
try {
const pythonCode = `
try:
import sys
# Check if network has links
links = visum.Net.Links
link_count = links.Count
if link_count == 0:
result = {
"status": "no_data",
"exists": False,
"reason": "No links in network",
"link_count": 0
}
else:
# Try to get VolVehPrT attribute
attr_name = f"VolVehPrT(${analysisPeriod})"
try:
# Use GetMultiAttValues to get all volumes at once
volumes_data = links.GetMultiAttValues(attr_name)
# volumes_data is a tuple: (keys, values)
# keys = list of tuples (FromNode, ToNode)
# values = list of volume values
if len(volumes_data) >= 2 and len(volumes_data[1]) > 0:
volumes = volumes_data[1]
# Calculate statistics
total_volume = sum(volumes)
links_with_traffic = sum(1 for v in volumes if v > 0)
max_volume = max(volumes) if volumes else 0
avg_volume = total_volume / len(volumes) if len(volumes) > 0 else 0
# Find congested links (V/C > 0.9)
try:
vc_ratios = links.GetMultiAttValues(f"VolCapRatioPrT(${analysisPeriod})")
if len(vc_ratios) >= 2:
congested_links = sum(1 for vc in vc_ratios[1] if vc > 0.9)
else:
congested_links = None
except:
congested_links = None
result = {
"status": "success",
"exists": True,
"analysis_period": "${analysisPeriod}",
"total_links": len(volumes),
"links_with_traffic": links_with_traffic,
"total_volume": round(total_volume, 2),
"max_volume": round(max_volume, 2),
"avg_volume": round(avg_volume, 2),
"congested_links": congested_links,
"message": f"Assignment found with traffic on {links_with_traffic}/{len(volumes)} links"
}
else:
result = {
"status": "no_data",
"exists": False,
"reason": "Attribute exists but no data returned",
"link_count": link_count
}
except Exception as attr_error:
# Attribute doesn't exist = no assignment executed
result = {
"status": "not_executed",
"exists": False,
"reason": f"Attribute {attr_name} not found - assignment not executed",
"link_count": link_count,
"error_detail": str(attr_error)
}
except Exception as e:
import traceback
result = {
"status": "error",
"exists": False,
"error": str(e),
"traceback": traceback.format_exc()
}
`;
const result = await serverManager.executeCommand(projectId, pythonCode, "Check PrT Assignment");
if (result.success && result.result) {
const res = result.result;
if (res.status === 'success' && res.exists) {
// Assignment exists with data
const congestionInfo = res.congested_links !== null ?
`\n**Archi congestionati (V/C > 0.9):** ${res.congested_links}` : '';
return {
content: [
{
type: "text",
text: `β
**Assegnazione PrT Trovata**\n\n` +
`**Periodo di analisi:** ${res.analysis_period}\n` +
`**Archi totali:** ${res.total_links}\n` +
`**Archi con traffico:** ${res.links_with_traffic} (${(res.links_with_traffic / res.total_links * 100).toFixed(1)}%)\n\n` +
`**Statistiche Volume:**\n` +
`β’ Volume totale: ${res.total_volume.toLocaleString()} veicoli\n` +
`β’ Volume massimo: ${res.max_volume.toLocaleString()}\n` +
`β’ Volume medio: ${res.avg_volume.toFixed(2)}${congestionInfo}\n\n` +
`β
${res.message}\n\n` +
`β±οΈ **Tempo verifica:** ${result.executionTimeMs}ms`
}
]
};
} else if (res.status === 'not_executed') {
// Assignment not executed yet
return {
content: [
{
type: "text",
text: `β οΈ **Assegnazione PrT Non Trovata**\n\n` +
`**Motivo:** ${res.reason}\n` +
`**Archi nella rete:** ${res.link_count}\n\n` +
`π‘ **L'assegnazione non Γ¨ stata ancora eseguita.**\n\n` +
`**Per eseguire l'assegnazione:**\n` +
`1. Crea procedura PrT con \`visum_create_procedure\`\n` +
`2. Configura segments con \`visum_configure_dsegset\`\n` +
`3. Esegui la procedura in Visum\n\n` +
`β±οΈ **Tempo verifica:** ${result.executionTimeMs}ms`
}
]
};
} else {
// No data or other status
return {
content: [
{
type: "text",
text: `β οΈ **Stato Assegnazione PrT**\n\n` +
`**Status:** ${res.status}\n` +
`**Motivo:** ${res.reason || 'Non specificato'}\n` +
`**Archi nella rete:** ${res.link_count || 'N/A'}\n\n` +
`β±οΈ **Tempo verifica:** ${result.executionTimeMs}ms`
}
]
};
}
} else {
// Execution error
const errorInfo = result.result || {};
return {
content: [
{
type: "text",
text: `β **Errore Verifica Assegnazione**\n\n` +
`${errorInfo.error || result.error}\n\n` +
(errorInfo.traceback ? `**Traceback:**\n\`\`\`\n${errorInfo.traceback}\n\`\`\`\n\n` : '') +
`π‘ Verifica che il progetto sia aperto e connesso.`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Open Project with TCP Server Tool - DEFAULT FOR OPENING PROJECTS
server.tool(
"project_open",
"π DEFAULT TOOL for opening Visum projects. Always use this tool when asked to open, load, or launch any Visum project. Creates dedicated TCP server for ultra-fast communication. β οΈ If you encounter errors, run 'instance_diagnosis' first!",
{
projectPath: z.string().describe("Full path to the Visum project file (.ver)")
},
async ({ projectPath }) => {
console.error(`π PROJECT_OPEN CHIAMATO: ${projectPath}`);
console.error(`β° Timestamp: ${new Date().toISOString()}`);
try {
// Pre-flight check: warn if there are existing instances that might conflict
const instancesStatus = projectManager.getInstancesStatus();
const activeProjects = serverManager.getActiveProjects();
const totalExistingInstances = Object.keys(instancesStatus).length + activeProjects.length;
if (totalExistingInstances > 2) {
console.error(`β οΈ WARNING: ${totalExistingInstances} existing instances detected. Consider running instance_diagnosis first.`);
return {
content: [
{
type: "text",
text: `β οΈ **Troppe Istanze Attive**\n\n` +
`Rilevate **${totalExistingInstances} istanze** giΓ attive:\n` +
`β’ Istanze progetto: ${Object.keys(instancesStatus).length}\n` +
`β’ Server TCP: ${activeProjects.length}\n\n` +
`**π§ RACCOMANDAZIONE:**\n` +
`Prima di aprire un nuovo progetto, esegui:\n` +
`1. \`instance_diagnosis\` - Per verificare lo stato delle istanze\n` +
`2. \`instance_cleanup\` - Se necessario, per pulire istanze problematiche\n` +
`3. Poi riprova ad aprire il progetto\n\n` +
`**οΏ½ Questo previene conflitti e migliorare le performance!**`
}
]
};
}
console.error(`οΏ½π Avvio ProjectServerManager.openProject...`);
const result = await serverManager.openProject(projectPath);
console.error(`β
ProjectServerManager.openProject completato: ${result.success}`);
if (result.success) {
return {
content: [
{
type: "text",
text: `π **Progetto Aperto con Server TCP**\n\nβ
${result.message}\n\nπ **Dettagli Server:**\n- **ID Progetto:** ${result.projectId}\n- **Nome:** ${result.serverInfo.projectName}\n- **Porta TCP:** ${result.serverInfo.port}\n- **PID:** ${result.serverInfo.pid}\n- **Status:** ${result.serverInfo.status}\n\nπ **Connessione Client:**\n- Host: localhost\n- Porta: ${result.serverInfo.port}\n\nβ‘ Server pronto per ricevere comandi ultra-veloci dai client TCP!`
}
]
};
} else {
console.error(`β ProjectServerManager.openProject fallito: ${result.message}`);
return {
content: [
{
type: "text",
text: `β **Errore Apertura Progetto**\n\n${result.message}\n\n**π§ Suggerimenti:**\nβ’ Verifica che il file .ver esista\nβ’ Controlla che Visum sia installato correttamente\nβ’ Esegui \`instance_diagnosis\` per verificare lo stato del sistema`
}
]
};
}
} catch (error) {
console.error(`π₯ Eccezione in project_open: ${error instanceof Error ? error.message : String(error)}`);
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}\n\n**π§ Prima di riprovare:**\nβ’ Esegui \`instance_diagnosis\` per identificare problemi\nβ’ Considera \`instance_cleanup\` se ci sono istanze problematiche`
}
]
};
}
}
);
// Save Project Tool
server.tool(
"project_save",
"Save the currently opened project in its TCP server",
{
projectId: z.string().describe("Project ID to save"),
saveAs: z.string().optional().describe("Optional: Save with a different filename")
},
async ({ projectId, saveAs }) => {
try {
const result = await serverManager.saveProject(projectId, saveAs);
if (result.success) {
return {
content: [
{
type: "text",
text: `πΎ **Progetto Salvato**\n\nβ
${result.message}${saveAs ? `\n\nπ Salvato come: ${saveAs}` : ''}`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Salvataggio**\n\n${result.message || result.error}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Close Project Tool
server.tool(
"project_close",
"Close a project TCP server with optional save",
{
projectId: z.string().describe("Project ID to close"),
save: z.boolean().optional().describe("Save project before closing (default: false)")
},
async ({ projectId, save }) => {
try {
const result = await serverManager.closeProject(projectId, save || false);
if (result.success) {
return {
content: [
{
type: "text",
text: `π **Progetto Chiuso**\n\nβ
${result.message}`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Chiusura**\n\n${result.message}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Execute Project Command Tool
server.tool(
"project_execute",
"Execute a command on a project TCP server",
{
projectId: z.string().describe("Project ID to execute command on"),
code: z.string().describe("Python code to execute in the Visum context"),
description: z.string().describe("Description of what the code does")
},
async ({ projectId, code, description }) => {
try {
const result = await serverManager.executeCommand(projectId, code, description);
if (result.success) {
return {
content: [
{
type: "text",
text: `β‘ **Comando Eseguito**\n\nβ
${description}\n\nπ **Risultato:**\n\`\`\`json\n${JSON.stringify(result.result, null, 2)}\n\`\`\`\n\nβ±οΈ **Performance:**\n- Tempo risposta: ${result.responseTimeMs}ms\n- Esecuzione VisumPy: ${result.executionTimeMs}ms`
}
]
};
} else {
return {
content: [
{
type: "text",
text: `β **Errore Esecuzione**\n\n${result.error}`
}
]
};
}
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// Project Status Tool
server.tool(
"project_status",
"Get status of all active project TCP servers",
{},
async () => {
try {
const projects = serverManager.getActiveProjects();
if (projects.length === 0) {
return {
content: [
{
type: "text",
text: `π **Status Server Progetti**\n\nβ Nessun progetto attivo.`
}
]
};
}
let statusText = `π **Status Server Progetti** (${projects.length} attivi)\n\n`;
projects.forEach((project, index) => {
statusText += `**${index + 1}. ${project.projectName}**\n`;
statusText += ` β’ ID: ${project.projectId}\n`;
statusText += ` β’ Porta TCP: ${project.port}\n`;
statusText += ` β’ PID: ${project.pid}\n`;
statusText += ` β’ Status: ${project.status}\n`;
statusText += ` β’ Avviato: ${project.startTime}\n`;
statusText += ` β’ Path: \`${project.projectPath}\`\n\n`;
});
return {
content: [
{
type: "text",
text: statusText
}
]
};
} catch (error) {
return {
content: [
{
type: "text",
text: `β **Errore:** ${error instanceof Error ? error.message : String(error)}`
}
]
};
}
}
);
// =============================================================================
// SERVER STARTUP
// =============================================================================
async function main() {
try {
console.error("INFO: Initializing Sequential Thinking MCP Server with VisumPy Integration...");
// Initialize storage for thinking state
await initializeStorage();
await loadThinkingState(); // Load saved state on startup
console.error("β
Storage and thinking state initialized");
// Start MCP server
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("STARTED: Sequential Thinking MCP Server with VisumPy Integration running on stdio");
console.error("TOOLS Available Tools:");
console.error(" THINKING Tools:");
console.error(" β’ sequential_thinking - Step-by-step reasoning");
console.error(" β’ reset_thinking - Clear thinking state");
console.error(" β’ get_thinking_summary - View current progress");
console.error(" DIAGNOSTIC Tools (NEW - ERROR PREVENTION):");
console.error(" β’ instance_diagnosis - π§ Diagnose existing instances instead of creating new ones");
console.error(" β’ instance_cleanup - π§Ή Clean up problematic instances safely");
console.error(" PROJECT Tools (TCP SERVERS):");
console.error(" β’ project_open - π DEFAULT: Open projects (with pre-flight checks)");
console.error(" β’ project_save - Save project via TCP server");
console.error(" β’ project_close - Close project TCP server");
console.error(" β’ project_execute - Execute commands via TCP");
console.error(" β’ project_status - View all active TCP servers");
console.error(" PROJECT-SPECIFIC Instance Tools:");
console.error(" β’ project_start_instance - Start dedicated project instance");
console.error(" β’ project_execute_analysis - Execute ultra-fast analysis");
console.error(" β’ project_instances_status - View all active instances");
console.error(" β’ project_health_check - Check project instance health");
console.error(" β’ project_shutdown_instance - Shutdown specific instance");
console.error(" β οΈ IMPORTANT: Run 'instance_diagnosis' FIRST when encountering errors!");
} catch (error) {
console.error("β Fatal error starting server:", error);
process.exit(1);
}
}
// Start the server
main();