Skip to main content
Glama

gm_lag

Analyze spatial relationships in cross-sectional data using a spatial lag model to estimate coefficients, fit metrics, and spatial diagnostics for geographic datasets.

Instructions

Run spreg.GM_Lag (spatial 2SLS / GMM-IV spatial lag model) on a cross-section.

Returns coefficients with SE/z/p, fit metrics, (optional) AK test, and a small data preview.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
shapefile_pathYes
y_colYes
x_colsYes
target_crsNoEPSG:4326
weights_methodNoqueen
distance_thresholdNo
w_lagsNo
lag_qNo
yend_colsNo
q_colsNo
robustNo
hac_bandwidthNo
spat_diagNo
sig2n_kNo
drop_naNo

Implementation Reference

  • The handler function implementing the 'gm_lag' tool, which fits a spatial lag model using spreg.GM_Lag (GMM/2SLS). It handles data loading, spatial weights construction (Queen, Rook, DistanceBand), instrument generation, robust inference, and spatial diagnostics.
    @gis_mcp.tool()
    
    def gm_lag(
        shapefile_path: str,
        y_col: str,                               # dependent variable
        x_cols: Union[str, List[str]],            # exogenous regressors (no constant)
        target_crs: str = "EPSG:4326",
        weights_method: str = "queen",            # 'queen'|'rook'|'distance'
        distance_threshold: float = 100000,       # meters; auto→degrees for EPSG:4326
        # IV/GMM config
        w_lags: int = 1,                          # instruments: WX, WWX, ...
        lag_q: bool = True,                       # also lag external instruments q
        yend_cols: Union[None, str, List[str]] = None,  # other endogenous regressors (optional)
        q_cols: Union[None, str, List[str]] = None,     # their external instruments (optional)
        # Inference options
        robust: Union[None, str] = None,          # None | 'white' | 'hac'
        hac_bandwidth: float = None,              # only used if robust='hac'
        spat_diag: bool = True,                   # AK test
        sig2n_k: bool = False,                    # variance uses n-k if True
        drop_na: bool = True                      # drop rows with NA in y/x/yend/q
    ) -> Dict[str, Any]:
        """
        Run spreg.GM_Lag (spatial 2SLS / GMM-IV spatial lag model) on a cross-section.
    
        Returns coefficients with SE/z/p, fit metrics, (optional) AK test, and a small data preview.
        """
        try:
            # --- sanitize inputs ---
            for s in ("shapefile_path","target_crs","weights_method"):
                v = locals()[s]
                if isinstance(v, str):
                    locals()[s] = v.replace("`","")
    
            if isinstance(x_cols, str):
                x_cols_list = [c.strip() for c in x_cols.split(",") if c.strip()]
            else:
                x_cols_list = list(x_cols)
    
            if isinstance(yend_cols, str):
                yend_cols_list = [c.strip() for c in yend_cols.split(",") if c.strip()]
            elif yend_cols is None:
                yend_cols_list = None
            else:
                yend_cols_list = list(yend_cols)
    
            if isinstance(q_cols, str):
                q_cols_list = [c.strip() for c in q_cols.split(",") if c.strip()]
            elif q_cols is None:
                q_cols_list = None
            else:
                q_cols_list = list(q_cols)
    
            if not os.path.exists(shapefile_path):
                return {"status": "error", "message": f"Shapefile not found: {shapefile_path}"}
            if not x_cols_list:
                return {"status": "error", "message": "x_cols must include at least one regressor."}
    
            # --- load + project ---
            gdf = gpd.read_file(shapefile_path)
            needed = [y_col] + x_cols_list + (yend_cols_list or []) + (q_cols_list or [])
            missing = [c for c in needed if c not in gdf.columns]
            if missing:
                return {"status": "error", "message": f"Columns not found: {missing}"}
    
            gdf = gdf.to_crs(target_crs)
    
            # --- coerce numerics & subset ---
            gdf[needed] = gdf[needed].apply(pd.to_numeric, errors="coerce")
            data = gdf[needed + [gdf.geometry.name]].copy()
    
            if drop_na:
                before = data.shape[0]
                data = data.dropna(subset=needed)
                after = data.shape[0]
                if after == 0:
                    return {"status": "error", "message": "All rows dropped due to NA in y/x/yend/q."}
                if after < before:
                    gdf = gdf.loc[data.index].reset_index(drop=True)
                    data = data.reset_index(drop=True)
    
            # --- arrays for spreg ---
            y = data[[y_col]].to_numpy(dtype=float)              # (n,1)
            X = data[x_cols_list].to_numpy(dtype=float)          # (n,k), no constant
            YEND = None if not yend_cols_list else data[yend_cols_list].to_numpy(dtype=float)
            Q = None if not q_cols_list else data[q_cols_list].to_numpy(dtype=float)
    
            # --- spatial weights ---
            import libpysal
            wm = weights_method.lower()
            if wm == "queen":
                w = libpysal.weights.Queen.from_dataframe(gdf.loc[data.index], use_index=True)
            elif wm == "rook":
                w = libpysal.weights.Rook.from_dataframe(gdf.loc[data.index], use_index=True)
            elif wm == "distance":
                thr = distance_threshold if target_crs.upper() != "EPSG:4326" else distance_threshold / 111000.0
                w = libpysal.weights.DistanceBand.from_dataframe(
                    gdf.loc[data.index], threshold=thr, binary=False, use_index=True
                )
            else:
                return {"status":"error","message":f"Unknown weights_method: {weights_method}"}
            w.transform = "r"
    
            # Drop islands and re-align data if needed
            if w.islands:
                keep = [i for i in range(len(gdf.loc[data.index])) if i not in set(w.islands)]
                if not keep:
                    return {"status":"error","message":"All units are islands under current weights."}
                # reindex everything
                y = y[keep, :]
                X = X[keep, :]
                if YEND is not None: YEND = YEND[keep, :]
                if Q is not None: Q = Q[keep, :]
                sub_gdf = gdf.loc[data.index].iloc[keep]
                if wm == "queen":
                    w = libpysal.weights.Queen.from_dataframe(sub_gdf, use_index=True)
                elif wm == "rook":
                    w = libpysal.weights.Rook.from_dataframe(sub_gdf, use_index=True)
                else:
                    thr = distance_threshold if target_crs.upper() != "EPSG:4326" else distance_threshold / 111000.0
                    w = libpysal.weights.DistanceBand.from_dataframe(sub_gdf, threshold=thr, binary=False, use_index=True)
                w.transform = "r"
    
            # --- HAC kernel weights if requested ---
            gwk = None
            if robust == "hac":
                # Build Kernel weights on centroids; ensure ones on diagonal
                coords = np.column_stack([gdf.loc[data.index].geometry.centroid.x,
                                          gdf.loc[data.index].geometry.centroid.y])
                bw = hac_bandwidth or (np.ptp(coords[:,0]) + np.ptp(coords[:,1])) / 20.0
                gwk = libpysal.weights.Kernel(coords, bandwidth=bw, fixed=True, function="triangular", diagonal=True)
                gwk.transform = "r"
    
            # --- fit GM_Lag ---
            try:
                from spreg import GM_Lag
            except ModuleNotFoundError:
                return {"status": "error", "message": "The 'spreg' package is not installed. Install with: pip install spreg"}
    
            reg = GM_Lag(
                y, X,
                yend=YEND, q=Q,
                w=w,
                w_lags=int(w_lags),
                lag_q=bool(lag_q),
                robust=robust,               # None | 'white' | 'hac'
                gwk=gwk,                     # required if robust='hac'
                sig2n_k=bool(sig2n_k),
                spat_diag=bool(spat_diag),
                name_y=y_col,
                name_x=x_cols_list,
                name_yend=(yend_cols_list or None),
                name_q=(q_cols_list or None),
                name_w=f"{weights_method}"
            )
    
            # --- package outputs ---
            def arr(x): return None if x is None else np.asarray(x).tolist()
            def zpack(z_list):
                # z_stat is list of (z, p)
                return [{"z": float(zp[0]), "p": float(zp[1])} for zp in (z_list or [])]
    
            # tiny preview (avoid geometry dtype issues)
            preview = gdf.loc[data.index, [y_col, *x_cols_list, gdf.geometry.name]].head(5).copy()
            preview["geometry_wkt"] = preview[gdf.geometry.name].apply(lambda g: g.wkt if g is not None else None)
            preview = pd.DataFrame(preview.drop(columns=[gdf.geometry.name])).to_dict(orient="records")
    
            result = {
                "n_obs": int(reg.n),
                "k_vars": int(reg.k),  # includes constant internally
                "dependent": y_col,
                "exog": x_cols_list,
                "endog": yend_cols_list,
                "instruments": q_cols_list,
                "weights_method": weights_method.lower(),
                "spec": {"w_lags": int(w_lags), "lag_q": bool(lag_q), "robust": robust, "sig2n_k": bool(sig2n_k)},
                "betas": [float(b) for b in np.asarray(reg.betas).flatten()],
                "beta_names": (["const"] + x_cols_list + (yend_cols_list or []) + ["W_y"])[:len(np.asarray(reg.betas).flatten())],
                "std_err": arr(reg.std_err),
                "z_stats": zpack(getattr(reg, "z_stat", None)),
                "pseudo_r2": float(getattr(reg, "pr2", np.nan)),
                "pseudo_r2_reduced": float(getattr(reg, "pr2_e", np.nan)),
                "sig2": float(getattr(reg, "sig2", np.nan)),
                "ssr": float(getattr(reg, "utu", np.nan)),
                "ak_test": arr(getattr(reg, "ak_test", None)),  # [stat, p] if spat_diag=True
                "pred_y_head": [float(v) for v in np.asarray(reg.predy).flatten()[:5]],
                "data_preview": preview
            }
    
            msg = "GM_Lag estimation completed successfully"
            if wm == "distance" and target_crs.upper() == "EPSG:4326":
                msg += f" (threshold interpreted as {distance_threshold/111000.0:.6f} degrees)."
            if robust == "hac":
                msg += f" (HAC bandwidth ~ {bw:.3f})."
    
            return {"status": "success", "message": msg, "result": result}
    
        except Exception as e:
            return {"status": "error", "message": f"Failed to run GM_Lag: {str(e)}"}

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mahdin75/gis-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server