Skip to main content
Glama

spatial_markov

Analyze spatial-temporal patterns in regional panel data using Markov chain methods to identify transitions and dependencies across geographic areas over time.

Instructions

Run giddy Spatial Markov on a panel (n regions x t periods) from a shapefile.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
shapefile_pathYes
value_columnsYes
target_crsNoEPSG:4326
weights_methodNoqueen
distance_thresholdNo
kNo
mNo
fixedNo
permutationsNo
relativeNo
drop_naNo
fill_empty_classesNo

Implementation Reference

  • The core handler function implementing the 'spatial_markov' MCP tool. It loads panel data from a shapefile, constructs spatial weights, discretizes values into Markov classes, computes conditional transition probabilities, steady states, and performs inference tests (chi2, Q, LR).
    def spatial_markov( shapefile_path: str, value_columns: Union[str, List[str]], # time-ordered, oldest -> newest target_crs: str = "EPSG:4326", weights_method: str = "queen", # 'queen'|'rook'|'distance' distance_threshold: float = 100000, # meters (converted to degrees if 4326) k: int = 5, # classes for y (quantile bins if continuous) m: int = 5, # classes for spatial lags fixed: bool = True, # pooled quantiles across all periods permutations: int = 0, # >0 to get randomization p-values for x2 relative: bool = True, # divide each period by its mean drop_na: bool = True, # drop features with any NA across time columns fill_empty_classes: bool = True # handle empty bins by making them self-absorbing ) -> Dict[str, Any]: """Run giddy Spatial Markov on a panel (n regions x t periods) from a shapefile.""" try: # --- sanitize --- for s in ("shapefile_path","target_crs","weights_method"): v = locals()[s] if isinstance(v, str): locals()[s] = v.replace("`","") if isinstance(value_columns, str): value_cols = [c.strip() for c in value_columns.replace("`","").split(",") if c.strip()] else: value_cols = list(value_columns) if not os.path.exists(shapefile_path): return {"status": "error", "message": f"Shapefile not found: {shapefile_path}"} if len(value_cols) < 2: return {"status": "error", "message": "value_columns must include at least 2 time steps (wide format)."} # --- load + project --- gdf = gpd.read_file(shapefile_path) missing = [c for c in value_cols if c not in gdf.columns] if missing: return {"status": "error", "message": f"Columns not found: {missing}"} gdf = gdf.to_crs(target_crs) # Ensure numeric gdf[value_cols] = gdf[value_cols].apply(pd.to_numeric, errors="coerce") # --- prepare Y (n x t) --- Y = gdf[value_cols].to_numpy(copy=True).astype(float) # shape (n, t) if drop_na: mask = ~np.any(np.isnan(Y), axis=1) if mask.sum() < Y.shape[0]: gdf = gdf.loc[mask].reset_index(drop=True) Y = Y[mask, :] # optional relative values (period-wise) if relative: col_means = Y.mean(axis=0) col_means[col_means == 0] = 1.0 Y = Y / col_means # --- spatial weights --- import libpysal wm = weights_method.lower() if wm == "queen": w = libpysal.weights.Queen.from_dataframe(gdf, use_index=True) elif wm == "rook": w = libpysal.weights.Rook.from_dataframe(gdf, use_index=True) elif wm == "distance": thr = distance_threshold if target_crs.upper() == "EPSG:4326": thr = distance_threshold / 111000.0 # meters -> degrees w = libpysal.weights.DistanceBand.from_dataframe( gdf, threshold=thr, binary=False, use_index=True ) else: return {"status":"error","message":f"Unknown weights_method: {weights_method}"} w.transform = "r" # handle islands by dropping rows and rebuilding weights if w.islands: keep_idx = [i for i in range(gdf.shape[0]) if i not in set(w.islands)] if not keep_idx: return {"status":"error","message":"All units are islands under current weights; adjust weights_method/threshold."} gdf = gdf.iloc[keep_idx].reset_index(drop=True) Y = Y[keep_idx, :] if wm == "queen": w = libpysal.weights.Queen.from_dataframe(gdf, use_index=True) elif wm == "rook": w = libpysal.weights.Rook.from_dataframe(gdf, use_index=True) else: thr = distance_threshold if target_crs.upper() != "EPSG:4326" else distance_threshold/111000.0 w = libpysal.weights.DistanceBand.from_dataframe(gdf, threshold=thr, binary=False, use_index=True) w.transform = "r" # --- Spatial Markov --- try: from giddy.markov import Spatial_Markov except ModuleNotFoundError: return {"status": "error", "message": "The 'giddy' package is not installed. Install with: pip install giddy"} sm = Spatial_Markov( Y, w, k=int(k), m=int(m), permutations=int(permutations), fixed=bool(fixed), discrete=False, fill_empty_classes=bool(fill_empty_classes), variable_name=";".join(value_cols) ) # --- package results (JSON-safe) --- def tolist(x): """Recursively convert numpy arrays and nested structures to lists.""" if x is None: return None try: if isinstance(x, np.ndarray): return x.tolist() elif isinstance(x, (list, tuple)): return [tolist(xx) for xx in x] elif isinstance(x, dict): return {k: tolist(v) for k, v in x.items()} elif isinstance(x, (np.integer, np.floating)): return float(x) if isinstance(x, np.floating) else int(x) else: return x except Exception: # Fallback: try to convert to native Python type try: return float(x) if isinstance(x, (np.floating, float)) else int(x) if isinstance(x, (np.integer, int)) else str(x) except Exception: return x # Safer preview: keep geometry, write WKT to a new column, then drop geometry preview = gdf[[*value_cols, "geometry"]].head(5).copy() preview["geometry_wkt"] = preview["geometry"].apply( lambda g: g.wkt if g is not None and not g.is_empty else None ) preview = pd.DataFrame(preview.drop(columns="geometry")).to_dict(orient="records") result = { "n_regions": int(Y.shape[0]), "n_periods": int(Y.shape[1]), "k_classes_y": int(k), "m_classes_lag": int(m), "weights_method": weights_method.lower(), "value_columns": value_cols, "discretization": { "cutoffs_y": tolist(getattr(sm, "cutoffs", None)), "cutoffs_lag": tolist(getattr(sm, "lag_cutoffs", None)), "fixed": bool(fixed) }, "global_transition_prob_p": tolist(sm.p), # (k x k) "conditional_transition_prob_P": tolist(sm.P), # (m x k x k) "global_steady_state_s": tolist(sm.s), # (k,) "conditional_steady_states_S": tolist(sm.S), # (m x k) "tests": { "chi2_total_x2": float(getattr(sm, "x2", np.nan)), "chi2_df": int(getattr(sm, "x2_dof", -1)), "chi2_pvalue": float(getattr(sm, "x2_pvalue", np.nan)), "Q": float(getattr(sm, "Q", np.nan)), "Q_p_value": float(getattr(sm, "Q_p_value", np.nan)), "LR": float(getattr(sm, "LR", np.nan)), "LR_p_value": float(getattr(sm, "LR_p_value", np.nan)), }, "data_preview": preview, } msg = "Spatial Markov completed successfully" if wm == "distance" and target_crs.upper() == "EPSG:4326": msg += f" (threshold interpreted as {distance_threshold/111000.0:.6f} degrees)." # Apply tolist conversion recursively to the entire result result = tolist(result) return {"status": "success", "message": msg, "result": result} except Exception as e: return {"status": "error", "message": f"Failed to run Spatial Markov: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mahdin75/gis-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server