Skip to main content
Glama

Government of Canada Open Data MCP Servers

by krunal16-c
api_client.py68.5 kB
""" API client for Canadian transportation infrastructure data. Fetches actual infrastructure records from Open Government Canada datasets. Parses GeoJSON, CSV, and queries ESRI REST APIs to return structured infrastructure data. """ import csv import io import json import logging import ssl import urllib.request from typing import Any, Optional from urllib.parse import urlencode from gov_ca_transportation.http_client import HTTPClient, RetryConfig logger = logging.getLogger(__name__) class TransportationAPIClient: """Client for Canadian transportation infrastructure data.""" BASE_URL = "https://open.canada.ca/data/api" # Known dataset resource URLs for direct data access across Canada # Each source includes province/city scope and data format info KNOWN_RESOURCES = { "bridges": { "montreal": { "province": "Quebec", "city": "Montreal", "geojson_url": "https://donnees.montreal.ca/dataset/74143072-dd9a-4309-95b4-da9a81a96d52/resource/81b60248-8e94-4dd2-ad4b-66f22daf8c9b/download/2021-liste-structure-donnees-ouvertes.geojson", "name": "Montreal Road Structures 2021", "format": "geojson", }, "ontario": { "province": "Ontario", "city": None, # Province-wide "csv_url": "https://data.ontario.ca/dataset/37a472f6-b7ea-4a41-9d4b-64a0c8e5025a/resource/703cdf01-ff09-4b86-b017-6e8d87b11fd2/download/bridge_condition_open_data_2020_en.csv", "name": "Ontario Bridge Conditions 2020", "format": "csv", }, "nova_scotia": { "province": "Nova Scotia", "city": None, "csv_url": "https://data.novascotia.ca/api/views/gs26-c3fm/rows.csv?accessType=DOWNLOAD", "name": "Nova Scotia Structures Database", "format": "csv", }, }, "railways": { "national": { "province": None, # National scope "geojson_url": "https://gnb.socrata.com/api/geospatial/v8y5-3vbg?method=export&format=GeoJSON", "name": "National Railway Network (NRWN)", "format": "geojson", }, }, "airports": { "quebec": { "province": "Quebec", "geojson_url": "https://www.donneesquebec.ca/recherche/dataset/06b37043-e3d8-46e5-8ffc-759bcc8ccecf/resource/51f16d10-e7ed-4d38-848e-c68a75c0d5fd/download/lieux-aerodrome.geojson", "name": "Quebec Airports and Aerodromes", "format": "geojson", }, }, } def __init__(self, retry_config: Optional[RetryConfig] = None): self.client = HTTPClient(self.BASE_URL, retry_config or RetryConfig()) # SSL context for fetching data (some gov sites have cert issues) self._ssl_ctx = ssl.create_default_context() self._ssl_ctx.check_hostname = False self._ssl_ctx.verify_mode = ssl.CERT_NONE def _fetch_geojson(self, url: str) -> dict[str, Any]: """Fetch and parse GeoJSON data from a URL.""" try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0 (compatible; GovMCP/1.0)"}) with urllib.request.urlopen(req, timeout=30, context=self._ssl_ctx) as response: data = response.read().decode("utf-8") return json.loads(data) except Exception as e: logger.error(f"Error fetching GeoJSON from {url}: {e}") raise def _fetch_csv(self, url: str) -> list[dict[str, Any]]: """Fetch and parse CSV data from a URL.""" try: req = urllib.request.Request(url, headers={"User-Agent": "Mozilla/5.0 (compatible; GovMCP/1.0)"}) with urllib.request.urlopen(req, timeout=30, context=self._ssl_ctx) as response: data = response.read().decode("utf-8") reader = csv.DictReader(io.StringIO(data)) return list(reader) except Exception as e: logger.error(f"Error fetching CSV from {url}: {e}") raise def _query_esri_rest(self, url: str, where: str = "1=1", limit: int = 100) -> dict[str, Any]: """Query an ArcGIS/ESRI REST API endpoint.""" try: params = { "where": where, "outFields": "*", "returnGeometry": "true", "f": "geojson", "resultRecordCount": limit, } full_url = f"{url}?{urlencode(params)}" req = urllib.request.Request(full_url, headers={"User-Agent": "Mozilla/5.0 (compatible; GovMCP/1.0)"}) with urllib.request.urlopen(req, timeout=30, context=self._ssl_ctx) as response: data = response.read().decode("utf-8") return json.loads(data) except Exception as e: logger.error(f"Error querying ESRI REST at {url}: {e}") raise def _extract_coordinates(self, geometry: dict) -> dict[str, Any]: """Extract centroid or representative coordinates from GeoJSON geometry.""" geom_type = geometry.get("type", "") coords = geometry.get("coordinates", []) if geom_type == "Point": return {"longitude": coords[0], "latitude": coords[1]} if len(coords) >= 2 else {} elif geom_type == "LineString" and coords: mid = len(coords) // 2 return {"longitude": coords[mid][0], "latitude": coords[mid][1]} elif geom_type == "Polygon" and coords and coords[0]: ring = coords[0] avg_lon = sum(p[0] for p in ring) / len(ring) avg_lat = sum(p[1] for p in ring) / len(ring) return {"longitude": avg_lon, "latitude": avg_lat} elif geom_type == "MultiPolygon" and coords and coords[0] and coords[0][0]: ring = coords[0][0] avg_lon = sum(p[0] for p in ring) / len(ring) avg_lat = sum(p[1] for p in ring) / len(ring) return {"longitude": avg_lon, "latitude": avg_lat} elif geom_type == "MultiLineString" and coords and coords[0]: line = coords[0] mid = len(line) // 2 return {"longitude": line[mid][0], "latitude": line[mid][1]} return {} def _map_condition(self, condition_index: Any) -> str: """Map numeric condition index to rating.""" if condition_index is None: return "unknown" try: idx = float(condition_index) if idx >= 80: return "good" elif idx >= 60: return "fair" elif idx >= 40: return "poor" else: return "critical" except (ValueError, TypeError): return str(condition_index) if condition_index else "unknown" def _search_datasets( self, query: str, filters: Optional[dict] = None, limit: int = 100, ) -> dict[str, Any]: """Search for transportation datasets.""" params = { "q": query, "rows": limit, } if filters: fq_parts = [] for key, value in filters.items(): if value: fq_parts.append(f'{key}:"{value}"') if fq_parts: params["fq"] = " AND ".join(fq_parts) try: response = self.client.get("3/action/package_search", params=params) return response.get("result", {}) except Exception as e: logger.error(f"Error searching datasets: {e}") return {"results": [], "count": 0} def _query_datastore( self, resource_id: str, filters: Optional[dict] = None, limit: int = 100, ) -> dict[str, Any]: """Query datastore for a specific resource.""" params = { "resource_id": resource_id, "limit": limit, } if filters: params["filters"] = filters try: response = self.client.get("3/action/datastore_search", params=params) return response.get("result", {}) except Exception as e: logger.error(f"Error querying datastore: {e}") return {"records": [], "total": 0} def query_bridges( self, province: Optional[str] = None, city: Optional[str] = None, condition: Optional[str] = None, capacity_min: Optional[float] = None, built_after: Optional[int] = None, limit: int = 100, ) -> dict[str, Any]: """ Search and filter bridge infrastructure across Canada. Uses Statistics Canada Core Public Infrastructure data for all provinces, supplemented by detailed provincial/municipal data where available. Args: province: Filter by province (e.g., 'Ontario', 'British Columbia', 'Saskatchewan') city: Filter by city name (for detailed data where available) condition: Filter by condition rating (good, fair, poor, critical) capacity_min: Minimum capacity in tonnes (not available in StatCan data) built_after: Filter bridges built after this year (for detailed data only) limit: Maximum records to return (default 100) Returns: Bridge records with condition data from Statistics Canada, plus detailed records from provincial sources where available. """ # Normalize province name province_normalized = self.PROVINCE_NAMES.get(province.lower(), province) if province else None result = { "province": province_normalized, "city": city, "bridges": [], "condition_summary": None, "sources": [], "filters_applied": { "province": province, "city": city, "condition": condition, "capacity_min": capacity_min, "built_after": built_after, }, } # First, get StatCan aggregate data for the province/region try: statcan_data = self._fetch_statcan_bridge_inventory(province_normalized or "Canada") if statcan_data: result["condition_summary"] = statcan_data.get("condition_distribution") result["statcan_data"] = { "source": "Statistics Canada - Core Public Infrastructure Survey", "table_id": self.STATCAN_BRIDGE_INVENTORY["table_id"], "reference_year": statcan_data.get("reference_year", "2022"), "condition_distribution": statcan_data.get("condition_distribution"), "by_owner": statcan_data.get("by_owner"), } result["sources"].append("Statistics Canada CCPI Survey") except Exception as e: logger.warning(f"Could not fetch StatCan bridge data: {e}") # Then, try to get detailed bridge records from provincial/municipal sources detailed_bridges = self._fetch_detailed_bridge_records( province, city, condition, built_after, limit ) result["bridges"] = detailed_bridges.get("bridges", []) result["count"] = len(result["bridges"]) result["sources"].extend(detailed_bridges.get("sources", [])) # Add note about data availability if not result["bridges"] and result.get("statcan_data"): result["note"] = ( f"Detailed bridge records not available for {province_normalized or 'all provinces'}. " "StatCan aggregate condition data is provided. " "Detailed records available for: Ontario, Quebec/Montreal, Nova Scotia." ) return result def _fetch_statcan_bridge_inventory(self, location: str) -> dict[str, Any]: """ Fetch bridge inventory/condition distribution from Statistics Canada. Returns percentage distribution by condition for the specified province. """ import zipfile import tempfile import os try: zip_url = self.STATCAN_BRIDGE_INVENTORY["url"] asset_filter = self.STATCAN_BRIDGE_INVENTORY["asset_filter"] logger.info(f"Downloading StatCan bridge inventory from {zip_url}") req = urllib.request.Request( zip_url, headers={"User-Agent": "Mozilla/5.0 (compatible; GovMCP/1.0)"} ) with urllib.request.urlopen(req, context=self._ssl_ctx, timeout=60) as response: zip_data = response.read() with tempfile.TemporaryDirectory() as tmpdir: zip_path = os.path.join(tmpdir, "data.zip") with open(zip_path, "wb") as f: f.write(zip_data) with zipfile.ZipFile(zip_path, "r") as zf: csv_files = [n for n in zf.namelist() if n.endswith(".csv") and "metadata" not in n.lower()] if not csv_files: return {} with zf.open(csv_files[0]) as csvfile: content = csvfile.read().decode("utf-8-sig") return self._parse_statcan_bridge_inventory_csv(content, asset_filter, location) except Exception as e: logger.error(f"Error fetching StatCan bridge inventory: {e}") return {} def _parse_statcan_bridge_inventory_csv( self, csv_content: str, asset_filter: str, location: str, ) -> dict[str, Any]: """ Parse Statistics Canada bridge inventory CSV. Returns condition distribution percentages by province. """ result = { "reference_year": "2022", "condition_distribution": {}, "by_owner": {}, } reader = csv.DictReader(io.StringIO(csv_content)) # Condition mapping conditions = { "Very poor": "very_poor", "Poor": "poor", "Fair": "fair", "Good": "good", "Very good": "very_good", "Physical condition unknown": "unknown", } for row in reader: geo = row.get("GEO", "") asset_type = row.get("Core public infrastructure assets", "") owner = row.get("Public organizations", "") condition = row.get("Overall physical condition of assets", "") value_str = row.get("VALUE", "") # Filter for bridges and location if asset_filter.lower() not in asset_type.lower(): continue if location.lower() not in geo.lower(): continue # Parse percentage value try: value = float(value_str) if value_str and value_str.strip() else 0 except ValueError: continue # Get reference year ref_period = row.get("REF_DATE", "") if ref_period: result["reference_year"] = ref_period # Store condition distribution for "All public organizations" if "All public organizations" in owner: cond_key = conditions.get(condition) if cond_key: result["condition_distribution"][cond_key] = { "percentage": value, "label": condition, } # Store by owner type (for all conditions - look for total) # We'll aggregate later or just capture key owner types if condition == "Very poor" or condition == "Poor": owner_key = owner.lower().replace(" ", "_").replace(",", "") if owner_key not in result["by_owner"]: result["by_owner"][owner_key] = {"poor_and_very_poor": 0} # This is simplified - in reality we'd need more passes return result def _fetch_detailed_bridge_records( self, province: Optional[str], city: Optional[str], condition: Optional[str], built_after: Optional[int], limit: int, ) -> dict[str, Any]: """ Fetch detailed bridge records from provincial/municipal open data sources. This supplements the StatCan aggregate data with individual bridge records. """ bridges = [] sources_queried = [] # Normalize province/city for matching prov_lower = province.lower() if province else None city_lower = city.lower() if city else None # Determine which sources to query based on filters sources_to_query = [] for key, resource in self.KNOWN_RESOURCES.get("bridges", {}).items(): res_province = (resource.get("province") or "").lower() res_city = (resource.get("city") or "").lower() # If no filters, query all sources if not province and not city: sources_to_query.append((key, resource)) # If province matches elif prov_lower and prov_lower in res_province.lower(): sources_to_query.append((key, resource)) # If city matches elif city_lower and city_lower in res_city.lower(): sources_to_query.append((key, resource)) # Province abbreviation matching elif prov_lower in ["qc", "québec"] and "quebec" in res_province: sources_to_query.append((key, resource)) elif prov_lower in ["on"] and "ontario" in res_province: sources_to_query.append((key, resource)) elif prov_lower in ["ns"] and "nova scotia" in res_province: sources_to_query.append((key, resource)) elif prov_lower in ["sk"] and "saskatchewan" in res_province: sources_to_query.append((key, resource)) elif prov_lower in ["ab"] and "alberta" in res_province: sources_to_query.append((key, resource)) elif prov_lower in ["bc"] and "british columbia" in res_province: sources_to_query.append((key, resource)) elif prov_lower in ["mb"] and "manitoba" in res_province: sources_to_query.append((key, resource)) # Calculate per-source limit num_sources = len(sources_to_query) if num_sources > 0 and not province and not city: per_source_limit = max(limit // num_sources, 10) else: per_source_limit = limit # Query each applicable source for key, resource in sources_to_query: if len(bridges) >= limit: break remaining = limit - len(bridges) source_limit = min(per_source_limit, remaining) try: data_format = resource.get("format", "").lower() if data_format == "geojson" and resource.get("geojson_url"): bridges.extend(self._parse_bridge_geojson( resource, condition, built_after, source_limit )) sources_queried.append(resource["name"]) elif data_format == "csv" and resource.get("csv_url"): bridges.extend(self._parse_bridge_csv( resource, city, condition, built_after, source_limit )) sources_queried.append(resource["name"]) except Exception as e: logger.warning(f"Could not fetch data from {resource.get('name')}: {e}") return { "bridges": bridges[:limit], "sources": list(set(sources_queried)), } def _parse_bridge_geojson( self, resource: dict, condition: Optional[str], built_after: Optional[int], limit: int ) -> list[dict]: """Parse bridge data from GeoJSON source (e.g., Montreal).""" bridges = [] geojson = self._fetch_geojson(resource["geojson_url"]) res_city = resource.get("city", "") res_province = resource.get("province", "") for feature in geojson.get("features", []): if len(bridges) >= limit: break props = feature.get("properties", {}) geom = feature.get("geometry", {}) coords = self._extract_coordinates(geom) # Handle Montreal-specific French field names if "montreal" in resource.get("name", "").lower(): structure_type = props.get("Type structure", "") bridge_types = ["pont", "bridge", "viaduc", "passerelle", "footbridge"] if not any(bt in structure_type.lower() for bt in bridge_types): continue icg = props.get("ICG") condition_rating = self._map_condition(icg) if condition and condition.lower() != condition_rating: continue year_built = props.get("Année de construction") if built_after and year_built: try: if int(year_built) < built_after: continue except (ValueError, TypeError): pass bridges.append({ "id": props.get("No structure") or props.get("IDE_STRCT"), "name": props.get("Nom route") or props.get("Nom obstacle"), "structure_type": structure_type, "owner": props.get("Responsablilté de gestion"), "condition_index": icg, "condition_rating": condition_rating, "condition_category": props.get("Catégorie ICG"), "year_built": year_built, "deck_area_m2": props.get("Superficie du tablier"), "location": { "city": res_city, "province": res_province, "coordinates": coords, }, "geometry": geom, "source": resource["name"], }) else: # Generic GeoJSON parsing bridges.append({ "id": props.get("id") or props.get("ID"), "name": props.get("name") or props.get("NAME") or props.get("nom"), "structure_type": props.get("type") or props.get("TYPE") or "bridge", "condition_rating": props.get("condition"), "location": { "city": res_city, "province": res_province, "coordinates": coords, }, "properties": props, "geometry": geom, "source": resource["name"], }) return bridges def _parse_bridge_csv( self, resource: dict, city: Optional[str], condition: Optional[str], built_after: Optional[int], limit: int ) -> list[dict]: """Parse bridge data from CSV source (e.g., Ontario, Nova Scotia).""" bridges = [] csv_data = self._fetch_csv(resource["csv_url"]) res_province = resource.get("province", "") for row in csv_data: if len(bridges) >= limit: break # Handle Ontario-specific fields if "ontario" in resource.get("name", "").lower(): # Apply city/county filter county = row.get("COUNTY", "") if city and city.lower() not in county.lower(): continue # Apply condition filter (BCI = Bridge Condition Index) bci = row.get("CURRENT BCI") condition_rating = self._map_condition_bci(bci) if condition and condition.lower() != condition_rating: continue # Apply built_after filter year_built = row.get("YEAR BUILT") if built_after and year_built: try: if int(year_built) < built_after: continue except (ValueError, TypeError): pass lat = row.get("LATITUDE") lon = row.get("LONGITUDE") coords = {} if lat and lon: try: coords = {"latitude": float(lat), "longitude": float(lon)} except (ValueError, TypeError): pass bridges.append({ "id": row.get("ID (SITE N°)"), "name": row.get("STRUCTURE NAME"), "highway": row.get("HIGHWAY NAME"), "structure_type": row.get("TYPE 1") or row.get("CATEGORY"), "category": row.get("CATEGORY"), "subcategory": row.get("SUBCATEGORY 1"), "material": row.get("MATERIAL 1"), "condition_index": bci, "condition_rating": condition_rating, "year_built": year_built, "last_major_rehab": row.get("LAST MAJOR REHAB"), "last_inspection": row.get("LAST INSPECTION DATE"), "span_count": row.get("NUMBER OF SPAN / CELLS"), "length_m": row.get("DECK / CULVERTS LENGTH (m)"), "width_m": row.get("WIDTH TOTAL (m)"), "owner": row.get("OWNER"), "status": row.get("OPERATION STATUS"), "location": { "region": row.get("REGION"), "county": county, "province": res_province, "coordinates": coords, }, "source": resource["name"], }) elif "nova scotia" in resource.get("name", "").lower(): # Nova Scotia has simpler fields lat = row.get("Latitude") lon = row.get("Longitude") coords = {} if lat and lon: try: coords = {"latitude": float(lat), "longitude": float(lon)} except (ValueError, TypeError): pass bridges.append({ "id": row.get("StructureID"), "name": row.get("StructureName"), "location": { "province": res_province, "coordinates": coords, }, "source": resource["name"], }) else: # Generic CSV parsing bridges.append({ "id": row.get("id") or row.get("ID"), "name": row.get("name") or row.get("NAME"), "location": {"province": res_province}, "properties": row, "source": resource["name"], }) return bridges def _map_condition_bci(self, bci_value) -> str: """Map Ontario Bridge Condition Index (0-100) to rating.""" if bci_value is None: return "unknown" try: bci = float(bci_value) if bci >= 80: return "good" elif bci >= 60: return "fair" elif bci >= 40: return "poor" else: return "critical" except (ValueError, TypeError): return "unknown" def query_tunnels( self, province: Optional[str] = None, city: Optional[str] = None, length_min: Optional[float] = None, tunnel_type: Optional[str] = None, limit: int = 100, ) -> dict[str, Any]: """ Search and filter tunnel infrastructure. Returns actual tunnel records with location and metadata. """ tunnels = [] sources_queried = [] # Montreal road structures include tunnels query_montreal = ( province is None or province.lower() in ["quebec", "québec", "qc"] or (city and city.lower() in ["montreal", "montréal"]) ) if query_montreal: try: resource = self.KNOWN_RESOURCES["bridges"]["montreal"] geojson = self._fetch_geojson(resource["geojson_url"]) sources_queried.append(resource["name"]) for feature in geojson.get("features", []): props = feature.get("properties", {}) geom = feature.get("geometry", {}) coords = self._extract_coordinates(geom) # Montreal uses French field names structure_type = props.get("Type structure", "") # Filter to tunnels only if "tunnel" not in structure_type.lower(): continue # Apply tunnel_type filter if tunnel_type and tunnel_type.lower() not in structure_type.lower(): continue # Apply length_min filter length = props.get("Superficie du tablier") # Using deck area as proxy if length_min and length: try: if float(length) < length_min: continue except (ValueError, TypeError): pass tunnel_record = { "id": props.get("No structure") or props.get("IDE_STRCT"), "name": props.get("Nom route") or props.get("Nom obstacle"), "tunnel_type": structure_type, "owner": props.get("Responsablilté de gestion"), "condition_index": props.get("ICG"), "condition_rating": self._map_condition(props.get("ICG")), "condition_category": props.get("Catégorie ICG"), "year_built": props.get("Année de construction"), "deck_area_m2": length, "status": props.get("Statut"), "location": { "city": "Montreal", "province": "Quebec", "coordinates": coords, }, "geometry": geom, "source": resource["name"], } tunnels.append(tunnel_record) if len(tunnels) >= limit: break except Exception as e: logger.warning(f"Could not fetch Montreal tunnel data: {e}") # Search for additional tunnel datasets if len(tunnels) < limit: search_query = "tunnel infrastructure" if province: search_query += f" {province}" search_results = self._search_datasets(search_query, limit=10) for ds in search_results.get("results", []): if len(tunnels) >= limit: break for resource in ds.get("resources", []): fmt = resource.get("format", "").upper() url = resource.get("url", "") if fmt == "GEOJSON" and url: try: geojson = self._fetch_geojson(url) sources_queried.append(ds.get("title", "Unknown")) for feature in geojson.get("features", []): if len(tunnels) >= limit: break props = feature.get("properties", {}) geom = feature.get("geometry", {}) coords = self._extract_coordinates(geom) tunnel_record = { "id": props.get("id") or props.get("ID"), "name": props.get("name") or props.get("NAME") or props.get("nom"), "tunnel_type": props.get("type") or "tunnel", "location": {"coordinates": coords}, "properties": props, "geometry": geom, "source": ds.get("title"), } tunnels.append(tunnel_record) except Exception: continue return { "count": len(tunnels), "tunnels": tunnels[:limit], "sources": list(set(sources_queried)), "filters_applied": { "province": province, "city": city, "length_min": length_min, "tunnel_type": tunnel_type, }, } def query_ports_airports( self, facility_type: str, province: Optional[str] = None, capacity: Optional[str] = None, services: Optional[list[str]] = None, limit: int = 100, ) -> dict[str, Any]: """ Query ports, marinas, and airports. Returns actual facility records with location and metadata. """ facilities = [] sources_queried = [] # Query Quebec airports/aerodromes for airport types if facility_type.lower() in ["airport", "aerodrome", "heliport"]: try: resource = self.KNOWN_RESOURCES["airports"].get("quebec") if resource and resource.get("geojson_url"): geojson = self._fetch_geojson(resource["geojson_url"]) sources_queried.append(resource["name"]) for feature in geojson.get("features", []): if len(facilities) >= limit: break props = feature.get("properties", {}) geom = feature.get("geometry", {}) coords = self._extract_coordinates(geom) facility = { "id": props.get("id") or props.get("ID") or props.get("code"), "name": props.get("name") or props.get("nom") or props.get("NOM"), "facility_type": props.get("type") or facility_type, "icao_code": props.get("icao") or props.get("ICAO"), "iata_code": props.get("iata") or props.get("IATA"), "location": { "province": "Quebec", "coordinates": coords, }, "properties": props, "geometry": geom, "source": resource["name"], } facilities.append(facility) except Exception as e: logger.warning(f"Could not fetch Quebec airport data: {e}") # Search for additional datasets via Open Data API if len(facilities) < limit: search_query = f"{facility_type} infrastructure canada" if province: search_query += f" {province}" search_results = self._search_datasets(search_query, limit=10) for ds in search_results.get("results", []): if len(facilities) >= limit: break for resource in ds.get("resources", []): fmt = resource.get("format", "").upper() url = resource.get("url", "") if fmt == "GEOJSON" and url: try: geojson = self._fetch_geojson(url) sources_queried.append(ds.get("title", "Unknown")) for feature in geojson.get("features", []): if len(facilities) >= limit: break props = feature.get("properties", {}) geom = feature.get("geometry", {}) coords = self._extract_coordinates(geom) facility = { "id": props.get("id") or props.get("ID"), "name": props.get("name") or props.get("NAME") or props.get("nom"), "facility_type": facility_type, "location": {"coordinates": coords}, "properties": props, "geometry": geom, "source": ds.get("title"), } facilities.append(facility) except Exception: continue return { "count": len(facilities), "facilities": facilities[:limit], "facility_type": facility_type, "sources": list(set(sources_queried)), "filters_applied": { "province": province, "capacity": capacity, "services": services, }, } def query_railways( self, operator: Optional[str] = None, region: Optional[str] = None, rail_type: Optional[str] = None, limit: int = 100, ) -> dict[str, Any]: """ Query railway lines and stations. Returns actual railway records with GeoJSON and metadata. """ railways = [] sources_queried = [] # Query National Railway Network (NRWN) try: resource = self.KNOWN_RESOURCES["railways"]["national"] geojson = self._fetch_geojson(resource["geojson_url"]) sources_queried.append(resource["name"]) for feature in geojson.get("features", []): if len(railways) >= limit: break props = feature.get("properties", {}) geom = feature.get("geometry", {}) coords = self._extract_coordinates(geom) # NRWN uses specific field names (from NB Socrata export) # trackclass, subdi1name, adminarea, trackname, tracknid, crossintyp, roadclass, levelcross feat_province = props.get("adminarea") or props.get("PROV_EN") or props.get("province") feat_type = props.get("crossintyp") or props.get("RRTYPE_EN") or props.get("type") # Road, Trail, etc. track_class = props.get("trackclass") or props.get("trackclasc") # Main, Siding, etc. # Apply region filter if region and feat_province: if region.lower() not in feat_province.lower(): continue # Apply rail_type filter (using track class or crossing type) if rail_type: if track_class and rail_type.lower() in track_class.lower(): pass elif feat_type and rail_type.lower() in feat_type.lower(): pass else: continue railway = { "id": props.get("tracknid") or props.get("nid") or props.get("tcid"), "name": props.get("trackname") or props.get("crosstypnm") or props.get("subdi1name"), # crossing name or subdivision "subdivision": props.get("subdi1name"), # e.g., "Sussex" "subdivision_distance": props.get("subd1dist"), # e.g., 19.57 miles "track_class": track_class, # e.g., "Main" "crossing_type": feat_type, # e.g., "Road" "road_class": props.get("roadclass"), # e.g., "Local/Unknown" "level_crossing": props.get("levelcross"), # e.g., "Under" "crossing_access": props.get("crosacces"), # e.g., "Public" "warning_system": props.get("warningsys"), "location": { "province": feat_province, "coordinates": coords, }, "data_provider": props.get("attprovide") or props.get("geoprovide"), # e.g., "Federal" "geometry": geom, "source": resource["name"], } railways.append(railway) except Exception as e: logger.warning(f"Could not fetch NRWN data: {e}") # Search for additional railway datasets if len(railways) < limit: search_query = "railway rail infrastructure" if operator: search_query += f" {operator}" if region: search_query += f" {region}" search_results = self._search_datasets(search_query, limit=5) for ds in search_results.get("results", []): if len(railways) >= limit: break for res in ds.get("resources", []): fmt = res.get("format", "").upper() url = res.get("url", "") if fmt == "GEOJSON" and url: try: geojson = self._fetch_geojson(url) sources_queried.append(ds.get("title", "Unknown")) for feature in geojson.get("features", []): if len(railways) >= limit: break props = feature.get("properties", {}) geom = feature.get("geometry", {}) coords = self._extract_coordinates(geom) railway = { "id": props.get("id") or props.get("ID"), "name": props.get("name") or props.get("NAME"), "operator": props.get("operator") or props.get("OPERATOR"), "rail_type": props.get("type") or props.get("TYPE"), "location": {"coordinates": coords}, "properties": props, "geometry": geom, "source": ds.get("title"), } railways.append(railway) except Exception: continue return { "count": len(railways), "railways": railways[:limit], "sources": list(set(sources_queried)), "filters_applied": { "operator": operator, "region": region, "rail_type": rail_type, }, } def analyze_bridge_conditions( self, region: str, group_by: Optional[str] = None, limit: int = 100, ) -> dict[str, Any]: """ Aggregate bridge condition data for analysis using Statistics Canada Core Public Infrastructure Survey data. Returns statistical analysis of bridge conditions for any province/territory using the unified StatCan approach (Table 34-10-0288-01). Args: region: Province/territory name (e.g., 'Ontario', 'Saskatchewan', 'Canada') group_by: Group results by field (owner, condition) - limited by StatCan data limit: Maximum detailed records to return (default 100) Returns: Condition distribution percentages from Statistics Canada """ # Normalize province name region_normalized = self.PROVINCE_NAMES.get(region.lower(), region) if region else "Canada" # Use unified query_bridges approach which fetches StatCan data bridges_data = self.query_bridges(province=region_normalized, limit=limit) # Get StatCan aggregate condition data (the primary source) statcan_data = bridges_data.get("statcan_data", {}) condition_dist = statcan_data.get("condition_distribution", {}) if not condition_dist: # Try fetching directly if not in bridges_data try: statcan_data = self._fetch_statcan_bridge_inventory(region_normalized) condition_dist = statcan_data.get("condition_distribution", {}) except Exception as e: logger.warning(f"Could not fetch StatCan data for {region_normalized}: {e}") if not condition_dist: return { "region": region_normalized, "error": "No bridge condition data available for this region", "sources": ["Statistics Canada - CCPI Survey"], "suggestion": "Try 'Ontario', 'Quebec', 'British Columbia', 'Alberta', 'Saskatchewan', or 'Canada' for available data", } # Build condition summary from StatCan percentages condition_summary = {} total_percentage = 0 # Map StatCan conditions to our categories condition_mapping = { "very_good": "very_good", "good": "good", "fair": "fair", "poor": "poor", "very_poor": "very_poor", "unknown": "unknown", } for key, mapping in condition_mapping.items(): if key in condition_dist: pct = condition_dist[key].get("percentage", 0) condition_summary[mapping] = { "percentage": pct, "label": condition_dist[key].get("label", key.replace("_", " ").title()), } total_percentage += pct # Calculate combined poor/critical percentage poor_pct = condition_summary.get("poor", {}).get("percentage", 0) very_poor_pct = condition_summary.get("very_poor", {}).get("percentage", 0) needs_attention_pct = poor_pct + very_poor_pct # Get detailed bridge records if available detailed_bridges = bridges_data.get("bridges", []) # Group by owner if requested and we have StatCan owner data grouped_data = {} by_owner = statcan_data.get("by_owner", {}) if group_by == "owner" and by_owner: grouped_data = by_owner elif group_by == "condition": grouped_data = condition_summary return { "region": region_normalized, "reference_year": statcan_data.get("reference_year", "2022"), "data_source": { "name": "Statistics Canada - Core Public Infrastructure Survey", "table_id": self.STATCAN_BRIDGE_INVENTORY.get("table_id", "34-10-0288-01"), "description": "Physical condition of core public infrastructure assets by province/territory", }, "condition_summary": condition_summary, "analysis": { "needs_attention_percentage": round(needs_attention_pct, 1), "good_or_better_percentage": round( condition_summary.get("good", {}).get("percentage", 0) + condition_summary.get("very_good", {}).get("percentage", 0), 1 ), "total_accounted_percentage": round(total_percentage, 1), }, "grouped_by": group_by, "groups": grouped_data if grouped_data else None, "detailed_records_available": len(detailed_bridges), "detailed_records": detailed_bridges[:10] if detailed_bridges else [], # Sample of detailed records "sources": bridges_data.get("sources", ["Statistics Canada CCPI Survey"]), "note": ( f"Condition percentages from Statistics Canada for {region_normalized}. " f"Detailed bridge records: {len(detailed_bridges)} available from provincial sources." ) if detailed_bridges else ( f"Condition percentages from Statistics Canada for {region_normalized}. " "Detailed bridge records available for: Ontario, Quebec/Montreal, Nova Scotia." ), } # Statistics Canada infrastructure cost data URLs STATCAN_COST_DATA = { "bridge": { "url": "https://www150.statcan.gc.ca/n1/tbl/csv/34100284-eng.zip", "table_id": "34-10-0284-01", "title": "Estimated replacement value of core public infrastructure assets", "asset_filter": "Bridge and tunnel assets", }, "road": { "url": "https://www150.statcan.gc.ca/n1/tbl/csv/34100284-eng.zip", "table_id": "34-10-0284-01", "title": "Estimated replacement value of core public infrastructure assets", "asset_filter": "Road assets", }, "transit": { "url": "https://www150.statcan.gc.ca/n1/tbl/csv/34100284-eng.zip", "table_id": "34-10-0284-01", "title": "Estimated replacement value of core public infrastructure assets", "asset_filter": "Public transit assets", }, } # Statistics Canada bridge inventory/condition distribution data STATCAN_BRIDGE_INVENTORY = { "url": "https://www150.statcan.gc.ca/n1/tbl/csv/34100288-eng.zip", "table_id": "34-10-0288-01", "title": "Inventory distribution of core public infrastructure assets by physical condition rating", "asset_filter": "Bridge and tunnel assets", } # Province name mappings for StatCan data PROVINCE_NAMES = { "ontario": "Ontario", "quebec": "Quebec", "british columbia": "British Columbia", "alberta": "Alberta", "manitoba": "Manitoba", "saskatchewan": "Saskatchewan", "nova scotia": "Nova Scotia", "new brunswick": "New Brunswick", "newfoundland": "Newfoundland and Labrador", "pei": "Prince Edward Island", "prince edward island": "Prince Edward Island", "yukon": "Yukon", "northwest territories": "Northwest Territories", "nunavut": "Nunavut", "canada": "Canada", } def get_infrastructure_costs( self, infrastructure_type: str, location: str, limit: int = 50, ) -> dict[str, Any]: """ Get actual cost data for transportation infrastructure from Statistics Canada. Downloads and analyzes the Core Public Infrastructure Survey data. Returns replacement cost estimates by condition rating and owner type. Args: infrastructure_type: Type of infrastructure (bridge, road, transit) location: Province name or 'Canada' for national data limit: Maximum records to return (default 50) Returns: Detailed cost breakdown including: - Total replacement value - Costs by condition (Good, Fair, Poor, Very Poor) - Costs by owner type (Provincial, Municipal, etc.) """ infra_lower = infrastructure_type.lower() # Check if we have StatCan data for this infrastructure type if infra_lower not in self.STATCAN_COST_DATA: return self._fallback_cost_search(infrastructure_type, location, limit) statcan_config = self.STATCAN_COST_DATA[infra_lower] # Normalize location name location_normalized = self.PROVINCE_NAMES.get(location.lower(), location) try: # Download and parse StatCan data cost_data = self._fetch_statcan_cost_data( statcan_config["url"], statcan_config["asset_filter"], location_normalized, ) if not cost_data: return self._fallback_cost_search(infrastructure_type, location, limit) return { "infrastructure_type": infrastructure_type, "location": location_normalized, "source": { "name": "Statistics Canada - Core Public Infrastructure Survey", "table_id": statcan_config["table_id"], "title": statcan_config["title"], "reference_year": cost_data.get("reference_year", "2020"), "url": f"https://www150.statcan.gc.ca/t1/tbl1/en/tv.action?pid={statcan_config['table_id'].replace('-', '')}01", }, "total_replacement_value": cost_data.get("total"), "costs_by_condition": cost_data.get("by_condition", {}), "costs_by_owner": cost_data.get("by_owner", {}), "priority_investment_needed": cost_data.get("priority_investment", {}), "unit": "millions CAD", "data_quality": "A (Excellent) - Statistics Canada certified", } except Exception as e: logger.error(f"Error fetching StatCan cost data: {e}") return self._fallback_cost_search(infrastructure_type, location, limit) def _fetch_statcan_cost_data( self, zip_url: str, asset_filter: str, location: str, ) -> dict[str, Any]: """ Download and parse Statistics Canada infrastructure cost CSV from ZIP. """ import zipfile import tempfile import os try: # Download the ZIP file logger.info(f"Downloading StatCan cost data from {zip_url}") # Create SSL context that doesn't verify (StatCan has cert issues sometimes) ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE req = urllib.request.Request( zip_url, headers={"User-Agent": "Mozilla/5.0 (compatible; GovMCP/1.0)"} ) with urllib.request.urlopen(req, context=ctx, timeout=60) as response: zip_data = response.read() # Extract and parse CSV from ZIP with tempfile.TemporaryDirectory() as tmpdir: zip_path = os.path.join(tmpdir, "data.zip") with open(zip_path, "wb") as f: f.write(zip_data) with zipfile.ZipFile(zip_path, "r") as zf: # Find the main CSV file (not metadata) csv_files = [n for n in zf.namelist() if n.endswith(".csv") and "metadata" not in n.lower()] if not csv_files: logger.error("No CSV file found in ZIP") return {} csv_filename = csv_files[0] with zf.open(csv_filename) as csvfile: # Read and decode CSV content content = csvfile.read().decode("utf-8-sig") return self._parse_statcan_cost_csv(content, asset_filter, location) except Exception as e: logger.error(f"Error downloading/parsing StatCan ZIP: {e}") return {} def _parse_statcan_cost_csv( self, csv_content: str, asset_filter: str, location: str, ) -> dict[str, Any]: """ Parse Statistics Canada infrastructure cost CSV and extract relevant data. CSV columns (from StatCan table 34-10-0284-01): - REF_DATE: Reference year - GEO: Geography (Canada, province names) - Core public infrastructure assets: Asset type - Financial value of assets: Measure type (Estimated Replacement Value, etc.) - Overall physical condition of assets: Condition rating - Public organizations: Owner type - VALUE: Dollar amount in millions """ result = { "reference_year": "2020", "total": None, "by_condition": {}, "by_owner": {}, "priority_investment": {}, } reader = csv.DictReader(io.StringIO(csv_content)) # Track values for aggregation condition_costs = {"Good": 0, "Fair": 0, "Poor": 0, "Very poor": 0} owner_costs = { "Provincial and territorial": 0, "Municipal (all)": 0, "Urban municipalities": 0, "Rural municipalities": 0, "Local and regional": 0, } total_all = None for row in reader: # Get values using actual StatCan column names geo = row.get("GEO", "") asset_type = row.get("Core public infrastructure assets", "") measure = row.get("Financial value of assets", "") condition = row.get("Overall physical condition of assets", "") owner = row.get("Public organizations", "") value_str = row.get("VALUE", "") # Filter for our infrastructure type and location if asset_filter.lower() not in asset_type.lower(): continue if location.lower() not in geo.lower(): continue if "Estimated Replacement Value" not in measure: continue # Parse value (in millions) try: value = float(value_str) if value_str and value_str.strip() else 0 except ValueError: continue # Get reference year ref_period = row.get("REF_DATE", "") if ref_period: result["reference_year"] = ref_period # Extract total if "All physical conditions" in condition and "All public organizations" in owner: total_all = value # Extract by condition (for all public organizations) if "All public organizations" in owner: for cond in ["Good", "Fair", "Poor", "Very poor"]: if condition == cond: condition_costs[cond] = value # Extract by owner (for all conditions) if "All physical conditions" in condition: if "Provincial and territorial organizations" in owner: owner_costs["Provincial and territorial"] = value elif owner == "All municipalities": owner_costs["Municipal (all)"] = value elif "All urban municipalities" in owner: owner_costs["Urban municipalities"] = value elif "All rural municipalities" in owner: owner_costs["Rural municipalities"] = value elif "Local and regional organizations" in owner: owner_costs["Local and regional"] = value # Build result result["total"] = { "value": total_all, "formatted": f"${total_all:,.1f} million" if total_all else "N/A", "in_billions": f"${total_all/1000:.1f} billion" if total_all and total_all > 1000 else None, } # Costs by condition with percentages if total_all and total_all > 0: for cond, val in condition_costs.items(): pct = (val / total_all * 100) if val else 0 result["by_condition"][cond.lower().replace(" ", "_")] = { "value_millions": val, "formatted": f"${val:,.1f} million" if val else "N/A", "percentage": round(pct, 1), } # Costs by owner for owner_type, val in owner_costs.items(): if val > 0: result["by_owner"][owner_type.lower().replace(" ", "_").replace("(", "").replace(")", "")] = { "value_millions": val, "formatted": f"${val:,.1f} million" if val else "N/A", } # Priority investment (Poor + Very Poor) poor_total = condition_costs.get("Poor", 0) + condition_costs.get("Very poor", 0) result["priority_investment"] = { "poor_and_very_poor_total": { "value_millions": poor_total, "formatted": f"${poor_total:,.1f} million", "in_billions": f"${poor_total/1000:.2f} billion" if poor_total > 500 else None, }, "description": "Infrastructure in Poor or Very Poor condition requiring priority attention", } return result def _fallback_cost_search( self, infrastructure_type: str, location: str, limit: int, ) -> dict[str, Any]: """ Fallback to dataset search when direct StatCan data is not available. """ datasets = [] # Search for cost/investment datasets search_queries = [ f"{infrastructure_type} cost investment infrastructure {location}", f"infrastructure economic accounts {location}", f"{infrastructure_type} replacement value {location}", ] seen_ids = set() for query in search_queries: search_results = self._search_datasets(query, limit=20) for ds in search_results.get("results", []): if ds.get("id") in seen_ids: continue seen_ids.add(ds.get("id")) # Find CSV resources that might contain cost data csv_resources = [ { "name": r.get("name"), "format": r.get("format"), "url": r.get("url"), } for r in ds.get("resources", []) if r.get("format", "").upper() in ["CSV", "XLS", "XLSX"] ] dataset_info = { "id": ds.get("id"), "title": ds.get("title"), "organization": ds.get("organization", {}).get("title"), "description": ds.get("notes", "")[:300] if ds.get("notes") else "", "data_resources": csv_resources[:5], } datasets.append(dataset_info) if len(datasets) >= limit: break if len(datasets) >= limit: break return { "infrastructure_type": infrastructure_type, "location": location, "total_datasets": len(datasets), "datasets": datasets, "note": "Direct cost data not available for this infrastructure type. Dataset search results provided instead.", } def compare_across_regions( self, infrastructure_type: str, regions: list[str], metrics: list[str], limit: int = 50, ) -> dict[str, Any]: """ Compare infrastructure across multiple regions. Returns comparative analysis with actual infrastructure counts and metrics. """ comparison = {} for region in regions: region_data = { "region": region, "metrics": {}, "sources": [], } # Query infrastructure based on type if infrastructure_type.lower() == "bridge": data = self.query_bridges(province=region, limit=limit) bridges = data.get("bridges", []) region_data["infrastructure_count"] = len(bridges) region_data["sources"] = data.get("sources", []) if "count" in metrics: region_data["metrics"]["count"] = len(bridges) if "condition" in metrics: conditions = [b.get("condition_rating") for b in bridges if b.get("condition_rating")] region_data["metrics"]["condition_distribution"] = { c: conditions.count(c) for c in set(conditions) } if "age" in metrics: years = [b.get("year_built") for b in bridges if b.get("year_built")] if years: try: numeric_years = [int(y) for y in years if y] if numeric_years: region_data["metrics"]["age_stats"] = { "oldest": min(numeric_years), "newest": max(numeric_years), "average_year": round(sum(numeric_years) / len(numeric_years)), } except (ValueError, TypeError): pass elif infrastructure_type.lower() == "railway": data = self.query_railways(region=region, limit=limit) railways = data.get("railways", []) region_data["infrastructure_count"] = len(railways) region_data["sources"] = data.get("sources", []) if "count" in metrics: region_data["metrics"]["count"] = len(railways) if "operator" in metrics: operators = [r.get("operator") for r in railways if r.get("operator")] region_data["metrics"]["operators"] = list(set(operators)) comparison[region] = region_data return { "infrastructure_type": infrastructure_type, "regions": regions, "metrics_requested": metrics, "comparison": comparison, }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/krunal16-c/gov-ca-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server