import json
import networkx as nx
def evaluate_ULS_performance_by_diameter(global_json_path: str):
# Load global data file to get the network file path and cascading failure data
with open(global_json_path, 'r') as file:
file_paths = json.load(file)
network_file = file_paths.get('interdependent_infrastructure_networks') # Get network file path
cascading_failure_file = file_paths.get('cascading_failure_identification_by_big_nodes_attacks') # Get cascading failure file path
if not network_file or not cascading_failure_file:
print("Error: Network file or cascading failure file not found in Global_Data.json.")
return
# Load network data
with open(network_file, 'r') as file:
network_data = json.load(file)
if not isinstance(network_data, dict):
print("Error: The network data is not in the correct format.")
return
nodes = network_data.get('nodes', [])
edges = network_data.get('edges', [])
if not nodes or not edges:
return "Error: Network data is incomplete."
# Load cascading failure data
with open(cascading_failure_file, 'r') as file:
cascading_failure_data = json.load(file)
if not isinstance(cascading_failure_data, dict):
print("Error: The cascading failure data is not in the correct format.")
return
failed_nodes = cascading_failure_data.get('failed_nodes', [])
remaining_nodes = cascading_failure_data.get('remaining_nodes', [])
if not failed_nodes or not remaining_nodes:
return "Error: Cascading failure data is incomplete."
# Construct an undirected graph for diameter analysis
G = nx.Graph()
for node in nodes:
G.add_node(node['Code'], **node)
for edge in edges:
G.add_edge(edge['Start'], edge['End'], **edge)
total_nodes = G.number_of_nodes()
if total_nodes == 0:
return "Error: The network is empty."
# Compute diameter before failures
if nx.is_connected(G):
diameter_before = nx.diameter(G)
else:
largest_cc = max(nx.connected_components(G), key=len)
subgraph_before = G.subgraph(largest_cc)
diameter_before = nx.diameter(subgraph_before) if nx.is_connected(subgraph_before) else float('inf')
# Create a new graph without the failed nodes
G_after_failure = G.copy()
G_after_failure.remove_nodes_from(failed_nodes)
# Compute diameter after cascading failures
if G_after_failure.number_of_nodes() > 0:
if nx.is_connected(G_after_failure):
diameter_after = nx.diameter(G_after_failure)
else:
largest_cc_after = max(nx.connected_components(G_after_failure), key=len)
subgraph_after = G_after_failure.subgraph(largest_cc_after)
diameter_after = nx.diameter(subgraph_after) if nx.is_connected(subgraph_after) else float('inf')
else:
diameter_after = float('inf')
# Calculate network resilience based on diameter
network_resilience = diameter_before / diameter_after if diameter_after > 0 and diameter_after != float('inf') else 0
# Prepare the result to save
result = {
'initial_attack_nodes': cascading_failure_data.get('initial_attack_nodes', []), # List of initial attack nodes
'all_failed_nodes': failed_nodes, # Nodes failed due to cascading failure
'number_of_failed_nodes': len(failed_nodes), # Total number of failed nodes
'remaining_nodes': remaining_nodes, # Nodes remaining after cascading failure
'number_of_remaining_nodes': len(remaining_nodes), # Total number of remaining nodes
'diameter_before': diameter_before, # Network diameter before failure
'diameter_after': diameter_after, # Network diameter after failure
'network_resilience': network_resilience, # Resilience ratio based on diameter
}
output_json_path = 'evaluate_ULS_performance_by_diameter.json'
with open(output_json_path, 'w') as outfile:
json.dump(result, outfile, indent=4)
print(f"Network resilience assessment results saved to {output_json_path}")
# Update the Global_Data.json file with the path
file_paths['evaluate_ULS_performance_by_diameter'] = output_json_path
with open(global_json_path, 'w') as file:
json.dump(file_paths, file, indent=4)
print(f"Global_Data.json updated with network resilience result path.")