Skip to main content
Glama
manage-stops-workflow.pyโ€ข18.5 kB
# -*- coding: utf-8 -*- """ Sistema di gestione workflow per abilitazione/disabilitazione fermate Esegue task sequenziali su un set di linee con parametri configurabili """ import sys import os from datetime import datetime # ============================================================================ # CONFIGURAZIONE LOG # ============================================================================ # Abilita log su file (utile quando eseguito da Procedure Sequence) ENABLE_FILE_LOG = True LOG_DIR = r"H:\go\trenord_2025\logs" LOG_FILE = None if ENABLE_FILE_LOG: # Crea directory log se non esiste if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) # Nome file log con timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") LOG_FILE = os.path.join(LOG_DIR, "workflow_%s.log" % timestamp) # Apri file log log_file_handle = open(LOG_FILE, "w", encoding="utf-8") # Classe per scrivere sia su console che su file class TeeOutput: def __init__(self, *files): self.files = files def write(self, text): for f in self.files: f.write(text) f.flush() def flush(self): for f in self.files: f.flush() # Reindirizza stdout original_stdout = sys.stdout sys.stdout = TeeOutput(original_stdout, log_file_handle) print("=" * 80) print("LOG ABILITATO: %s" % LOG_FILE) print("=" * 80) print() # ============================================================================ # CONFIGURAZIONE GLOBALE # ============================================================================ # Parametri di default per abilitazione fermate DEFAULT_STOP_TIME = 60 # Tempo di sosta in secondi (1 minuto) DEFAULT_PRE_RUN_ADD = 30 # Offset PreRunTime in secondi DEFAULT_POST_RUN_ADD = 30 # Offset PostRunTime in secondi # Set di linee da processare # Formato: "LineName:LineRouteName" per specificare linea e percorso # ATTENZIONE: Case-sensitive! Usare esattamente come in Visum # Nota: Possono esistere LineRoute con stesso nome in linee diverse! TARGET_LINEROUTES = [ "R17_2022:R17_2", # LineRoute R17_2 della linea R17_2022 (ID: 121097.0) "R17_2022:R17_3", "R17_2022:R17_4", "R17_2022:R17_5", "RE7_2022:RE_7", "RE7_2022:RE_7_1", "RE7_2022:RE_7_2", "RE7_2022:RE_7_3", # Aggiungi altre linee qui ] # ============================================================================ # DEFINIZIONE TASK # ============================================================================ # Task da eseguire in sequenza TASKS = [ { "id": 1, "name": "Abilita tutte le fermate disabilitate", "action": "enable_all_disabled", "lineroutes": TARGET_LINEROUTES, "params": { "stop_time": DEFAULT_STOP_TIME, "pre_run_add": DEFAULT_PRE_RUN_ADD, "post_run_add": DEFAULT_POST_RUN_ADD } }, # Aggiungi altri task qui ] # ============================================================================ # FUNZIONI HELPER (da enable-stop-STEP1-WORKING.py) # ============================================================================ def get_lr_stop_sequence(lr_items): """Ottieni sequenza fermate dal LineRoute con distanze""" stops = [] for item in lr_items: try: stop_point_no = item.AttValue("StopPointNo") if stop_point_no and stop_point_no > 0: stops.append({ 'item': item, 'stop': int(stop_point_no), 'is_route': item.AttValue("IsRoutePoint"), 'index': item.AttValue("Index"), 'accum_length': item.AttValue("AccumLength") }) except: pass stops.sort(key=lambda x: x['index']) return stops def abilita_fermata(tp, stops, stop_no, stop_time, pre_run_add=0, post_run_add=0): """ Abilita una fermata disabilitata nel TimeProfile con tempi personalizzati. Returns: True se successo, False se errore """ print(" Abilitazione fermata %d (StopTime=%d, PreRun+%d, PostRun+%d)..." % (stop_no, stop_time, pre_run_add, post_run_add)) # Trova fermata nella sequenza stop_idx = None for i, s in enumerate(stops): if s['stop'] == stop_no: stop_idx = i break if stop_idx is None: print(" ERRORE: Fermata non trovata!") return False if stop_idx == 0 or stop_idx == len(stops) - 1: print(" SKIP: Prima/ultima fermata") return False current = stops[stop_idx] # Verifica che NON sia giร  abilitata controllando IsRoutePoint if current['is_route']: print(" SKIP - IsRoutePoint gia' True") return True # Non รจ un errore, semplicemente giร  abilitata # Verifica anche che non esista giร  nel TimeProfile for tpi in tp.TimeProfileItems: s = tpi.AttValue("StopPointNo") if s and int(s) == stop_no: print(" SKIP - TimeProfileItem gia' esistente") return True # Trova prev/next ABILITATI prev_stop = None next_stop = None for i in range(stop_idx - 1, -1, -1): if stops[i]['is_route']: prev_stop = stops[i] break for i in range(stop_idx + 1, len(stops)): if stops[i]['is_route']: next_stop = stops[i] break if not prev_stop or not next_stop: print(" ERRORE: Fermate adiacenti non trovate!") return False # Leggi tempi da TimeProfile prev_dep = None next_arr = None for tpi in tp.TimeProfileItems: s = tpi.AttValue("StopPointNo") if s and int(s) == prev_stop['stop']: prev_dep = tpi.AttValue("Dep") elif s and int(s) == next_stop['stop']: next_arr = tpi.AttValue("Arr") if prev_dep is None or next_arr is None: print(" ERRORE: Tempi prev/next non trovati!") return False time_total = next_arr - prev_dep # Abilita IsRoutePoint if current['is_route']: current['item'].SetAttValue("IsRoutePoint", False) current['item'].SetAttValue("IsRoutePoint", True) # RILEGGI AccumLength (ora aggiornato!) prev_accum = prev_stop['item'].AttValue("AccumLength") curr_accum = current['item'].AttValue("AccumLength") next_accum = next_stop['item'].AttValue("AccumLength") # Calcola PreRunTime con interpolazione distanze dist_prev_curr = curr_accum - prev_accum dist_total = next_accum - prev_accum if abs(dist_total) < 0.001: pre_run_time = time_total / 2.0 else: pre_run_time = (dist_prev_curr / dist_total) * time_total # Applica offset pre_run_time_final = pre_run_time + pre_run_add # Calcola Arr e Dep arr = prev_dep + pre_run_time_final dep = arr + stop_time # Crea TimeProfileItem try: new_tpi = tp.AddTimeProfileItem(current['item']) except Exception as e: error_msg = str(e) if "already in TimeProfile" in error_msg: print(" ERRORE: TimeProfileItem gia' esistente (nonostante i controlli)") print(" Tento di recuperare il TimeProfileItem esistente...") # Cerca il TimeProfileItem esistente existing_tpi = None for tpi in tp.TimeProfileItems: s = tpi.AttValue("StopPointNo") if s and int(s) == stop_no: existing_tpi = tpi break if existing_tpi: print(" TimeProfileItem trovato, aggiorno i tempi...") new_tpi = existing_tpi else: print(" ERRORE: Non trovo il TimeProfileItem!") return False else: print(" ERRORE: %s" % error_msg) return False new_tpi.SetAttValue("Arr", arr) new_tpi.SetAttValue("Dep", dep) # Aggiorna fermata successiva per PostRunTime if post_run_add != 0: next_tpi = None for tpi in tp.TimeProfileItems: s = tpi.AttValue("StopPointNo") if s and int(s) == next_stop['stop']: next_tpi = tpi break if next_tpi: next_arr_old = next_tpi.AttValue("Arr") next_dep_old = next_tpi.AttValue("Dep") next_stoptime = next_dep_old - next_arr_old next_arr_new = next_arr_old + post_run_add next_dep_new = next_arr_new + next_stoptime next_tpi.SetAttValue("Arr", next_arr_new) next_tpi.SetAttValue("Dep", next_dep_new) # Verifica stoptime_read = new_tpi.AttValue("StopTime") if abs(stoptime_read - stop_time) < 0.1: print(" OK - StopTime: %.1f sec" % stoptime_read) else: print(" ATTENZIONE: StopTime = %.1f (atteso %.1f)" % (stoptime_read, stop_time)) return True # ============================================================================ # TASK HANDLERS # ============================================================================ def task_enable_all_disabled(lineroutes, params): """ Task: Abilita tutte le fermate disabilitate su un set di linee Processa le fermate in sequenza temporale per aggiornare i tempi in modo congruente """ print("\n" + "=" * 80) print("TASK: ABILITA TUTTE LE FERMATE DISABILITATE") print("=" * 80) stop_time = params.get("stop_time", DEFAULT_STOP_TIME) pre_run_add = params.get("pre_run_add", DEFAULT_PRE_RUN_ADD) post_run_add = params.get("post_run_add", DEFAULT_POST_RUN_ADD) print("\nParametri:") print(" StopTime: %d sec" % stop_time) print(" PreRun add: +%d sec" % pre_run_add) print(" PostRun add: +%d sec" % post_run_add) print(" Linee target: %s" % ", ".join(lineroutes)) print() total_enabled = 0 total_errors = 0 # Processa ogni linea for lr_spec in lineroutes: # lr_spec puo' essere: # - "LineName:LineRouteName" (es. "R17:R17_2") # - "LineRouteName" (es. "R17_2") - cerca in tutte le linee if ":" in lr_spec: line_name, lr_name = lr_spec.split(":", 1) else: line_name = None lr_name = lr_spec print("\n" + "-" * 80) if line_name: print("Linea: %s, LineRoute: %s" % (line_name, lr_name)) else: print("LineRoute: %s" % lr_name) print("-" * 80) # Trova LineRoute target_lr = None if line_name: # Cerca in una linea specifica line_found = False for line in Visum.Net.Lines: current_line_name = line.AttValue("Name") if current_line_name == line_name: line_found = True print(" Linea '%s' trovata, cerco LineRoute '%s'..." % (line_name, lr_name)) # Debug: mostra tutti i LineRoute disponibili available_lrs = [] for lr in line.LineRoutes: lr_name_found = lr.AttValue("Name") available_lrs.append(lr_name_found) if lr_name_found == lr_name: target_lr = lr break if not target_lr: print(" LineRoute disponibili in linea '%s': %s" % (line_name, ", ".join(available_lrs))) break if not line_found: print(" DEBUG: Linea '%s' non trovata. Cerco linee simili..." % line_name) similar = [] for line in Visum.Net.Lines: ln = line.AttValue("Name") if line_name in ln or ln in line_name: similar.append(ln) if similar: print(" Linee simili trovate: %s" % ", ".join(similar[:5])) else: # Cerca in tutte le linee for lr in Visum.Net.LineRoutes: if lr.AttValue("Name") == lr_name: target_lr = lr break if target_lr is None: if line_name: print(" ERRORE: LineRoute '%s' nella linea '%s' non trovato!" % (lr_name, line_name)) else: print(" ERRORE: LineRoute '%s' non trovato!" % lr_name) total_errors += 1 continue # Ottieni TimeProfile # Conta quanti TimeProfiles ci sono tp_list = [] for time_profile in target_lr.TimeProfiles: tp_list.append(time_profile) if not tp_list: print(" ERRORE: Nessun TimeProfile trovato!") total_errors += 1 continue print(" TimeProfiles trovati: %d" % len(tp_list)) # Ottieni sequenza fermate (comune a tutti i TimeProfiles) stops = get_lr_stop_sequence(target_lr.LineRouteItems) # Trova fermate disabilitate (escludendo prima e ultima) disabled_stops = [] for i, s in enumerate(stops): if i > 0 and i < len(stops) - 1: # Non prima/ultima if not s['is_route']: disabled_stops.append(s) if not disabled_stops: print(" Nessuna fermata disabilitata trovata (escluse prima/ultima)") continue print("\n Fermate disabilitate da abilitare: %d" % len(disabled_stops)) for s in disabled_stops: print(" - Fermata %d (Index %d)" % (s['stop'], s['index'])) # PROCESSA TUTTI I TimeProfiles for tp_idx, tp in enumerate(tp_list): tp_name = tp.AttValue("Name") print("\n " + "~" * 70) print(" TimeProfile [%d/%d]: %s" % (tp_idx + 1, len(tp_list), tp_name)) print(" " + "~" * 70) print("\n Abilitazione in sequenza temporale...") # Abilita in sequenza (le fermate sono gia' ordinate per index) enabled_count = 0 error_count = 0 for disabled in disabled_stops: print("\n Processando fermata %d..." % disabled['stop']) # Rileggi sequenza aggiornata dopo ogni abilitazione stops = get_lr_stop_sequence(target_lr.LineRouteItems) # Debug: verifica stato attuale current_state = None for s in stops: if s['stop'] == disabled['stop']: current_state = s['is_route'] break print(" Stato IsRoutePoint attuale: %s" % current_state) success = abilita_fermata(tp, stops, disabled['stop'], stop_time, pre_run_add, post_run_add) if success: enabled_count += 1 else: error_count += 1 print("\n Risultato TimeProfile %s:" % tp_name) print(" Abilitate: %d" % enabled_count) print(" Errori: %d" % error_count) total_enabled += enabled_count total_errors += error_count # Riepilogo finale print("\n" + "=" * 80) print("TASK COMPLETATO") print("=" * 80) print("Totale fermate abilitate: %d" % total_enabled) print("Totale errori: %d" % total_errors) print("=" * 80) return { "enabled": total_enabled, "errors": total_errors } # ============================================================================ # WORKFLOW EXECUTOR # ============================================================================ def execute_workflow(tasks): """Esegue tutti i task in sequenza""" print("\n" + "=" * 80) print("INIZIO WORKFLOW - %d TASK" % len(tasks)) print("=" * 80) results = [] for task in tasks: task_id = task.get("id", "?") task_name = task.get("name", "Unnamed") task_action = task.get("action") print("\n\n") print("#" * 80) print("# TASK %s: %s" % (task_id, task_name)) print("#" * 80) # Esegui task appropriato if task_action == "enable_all_disabled": result = task_enable_all_disabled( task.get("lineroutes", []), task.get("params", {}) ) results.append({ "task_id": task_id, "task_name": task_name, "result": result }) else: print("\nERRORE: Action '%s' non riconosciuta!" % task_action) results.append({ "task_id": task_id, "task_name": task_name, "result": {"error": "Unknown action"} }) # Riepilogo finale workflow print("\n\n") print("=" * 80) print("WORKFLOW COMPLETATO") print("=" * 80) for r in results: print("\nTask %s: %s" % (r["task_id"], r["task_name"])) print(" Risultato: %s" % r["result"]) print("=" * 80) return results # ============================================================================ # MAIN # ============================================================================ try: results = execute_workflow(TASKS) print("\n\nPer vedere le modifiche nella GUI:") print(" - Chiudi e riapri Edit > Time Profiles") print(" - Oppure salva e riapri il progetto") except Exception as e: print("\nERRORE WORKFLOW: %s" % str(e)) import traceback traceback.print_exc() finally: # Chiudi file log se aperto if ENABLE_FILE_LOG and LOG_FILE: print("\n" + "=" * 80) print("LOG SALVATO: %s" % LOG_FILE) print("=" * 80) sys.stdout = original_stdout # Ripristina stdout originale log_file_handle.close()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/multiluca2020/visum-thinker-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server