Skip to main content
Glama
geopandas_functions.py16.5 kB
"""GeoPandas-related MCP tool functions and resource listings.""" import os import logging from typing import Any, Dict, List, Optional from .mcp import gis_mcp import geopandas as gpd import pandas as pd # Configure logging logger = logging.getLogger(__name__) @gis_mcp.resource("gis://geopandas/io") def get_geopandas_io() -> Dict[str, List[str]]: """List available GeoPandas I/O operations.""" return { "operations": [ "read_file_gpd", "to_file_gpd", "overlay_gpd", "dissolve_gpd", "explode_gpd", "clip_vector", "write_file_gpd" ] } @gis_mcp.resource("gis://geopandas/joins") def get_geopandas_joins() -> Dict[str, List[str]]: """List available GeoPandas join operations.""" return { "operations": [ "append_gpd", "merge_gpd", "sjoin_gpd", "sjoin_nearest_gpd", "point_in_polygon" ] } @gis_mcp.tool() def read_file_gpd(file_path: str) -> Dict[str, Any]: """Reads a geospatial file and returns stats and a data preview.""" try: if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") gdf = gpd.read_file(file_path) preview = gdf.head(5).to_dict(orient="records") return { "status": "success", "columns": list(gdf.columns), "column_types": gdf.dtypes.astype(str).to_dict(), "num_rows": len(gdf), "num_columns": gdf.shape[1], "crs": str(gdf.crs), "bounds": gdf.total_bounds.tolist(), # [minx, miny, maxx, maxy] "preview": preview, "message": f"File loaded successfully with {len(gdf)} rows and {gdf.shape[1]} columns" } except Exception as e: logger.error(f"Error reading file: {str(e)}") return { "status": "error", "message": f"Failed to read file: {str(e)}" } @gis_mcp.tool() def append_gpd(shapefile1_path: str, shapefile2_path: str, output_path: str) -> Dict[str, Any]: """ Reads two shapefiles directly, concatenates them vertically.""" try: # Configure a basic logger for demonstration logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Step 1: Read the two shapefiles into GeoDataFrames. logger.info(f"Reading {shapefile1_path}...") gdf1 = gpd.read_file(shapefile1_path) logger.info(f"Reading {shapefile2_path}...") gdf2 = gpd.read_file(shapefile2_path) # Step 2: Ensure the Coordinate Reference Systems (CRS) match. if gdf1.crs != gdf2.crs: logger.warning( f"CRS mismatch: GDF1 has '{gdf1.crs}' and GDF2 has '{gdf2.crs}'. " "Reprojecting GDF2." ) gdf2 = gdf2.to_crs(gdf1.crs) # Step 3: Concatenate the two GeoDataFrames. combined_gdf = pd.concat([gdf1, gdf2], ignore_index=True) # Step 4: Save the combined GeoDataFrame to a new shapefile. logger.info(f"Saving combined shapefile to {output_path}...") combined_gdf.to_file(output_path, driver='ESRI Shapefile') return { "status": "success", "message": f"Shapefiles concatenated successfully into '{output_path}'.", "info": { "output_path": output_path, "num_features": len(combined_gdf), "crs": str(combined_gdf.crs), "columns": list(combined_gdf.columns) } } except Exception as e: logger.error(f"Error processing shapefiles: {str(e)}") raise ValueError(f"Failed to process shapefiles: {str(e)}") @gis_mcp.tool() def merge_gpd(shapefile1_path: str, shapefile2_path: str, output_path: str) -> Dict[str, Any]: """ Merges two shapefiles based on common attribute columns, This function performs a database-style join, not a spatial join. Args: left_shapefile_path: Path to the left shapefile. The geometry from this file is preserved. right_shapefile_path: Path to the right shapefile to merge. output_path: Path to save the merged output shapefile. how: Type of merge. One of 'left', 'right', 'outer', 'inner'. Defaults to 'inner'. on: Column name to join on. Must be found in both shapefiles. left_on: Column name to join on in the left shapefile. right_on: Column name to join on in the right shapefile. suffixes: Suffix to apply to overlapping column names. """ try : # Step 1: Read the two shapefiles directly into GeoDataFrames. logger.info(f"Reading left shapefile: {shapefile1_path}...") left_gdf = gpd.read_file(shapefile1_path) logger.info(f"Reading right shapefile: {shapefile2_path}...") # For an attribute join, we only need the attribute data from the right file. # We can drop its geometry column to make the merge cleaner and more memory-efficient. right_df = pd.DataFrame(gpd.read_file(shapefile2_path).drop(columns='geometry')) # Step 2: Perform the merge operation using the optimized geopandas.merge. # This function correctly handles the geometry of the left GeoDataFrame. logger.info(f"Performing merge...") merged_gdf = gpd.merge( left_gdf, right_df, how='inner', # Default to inner merge suffixes=('_left', '_right') ) if merged_gdf.empty: logger.warning("The merge result is empty. No matching records were found.") # Step 3: Save the merged GeoDataFrame to a new shapefile. logger.info(f"Saving merged shapefile to {output_path}...") merged_gdf.to_file(output_path, driver='ESRI Shapefile') return { "status": "success", "message": f"Shapefiles merged successfully into '{output_path}'.", "info": { "output_path": output_path, "merge_type": 'inner', "num_features": len(merged_gdf), "crs": str(merged_gdf.crs), "columns": list(merged_gdf.columns) } } except Exception as e: logger.error(f"Error merging shapefiles: {str(e)}") raise ValueError(f"Failed to merge shapefiles: {str(e)}") @gis_mcp.tool() def overlay_gpd(gdf1_path: str, gdf2_path: str, how: str = "intersection", output_path: str = None) -> Dict[str, Any]: """ Overlay two GeoDataFrames using geopandas.overlay. Args: gdf1_path: Path to the first geospatial file. gdf2_path: Path to the second geospatial file. how: Overlay method ('intersection', 'union', 'identity', 'symmetric_difference', 'difference'). output_path: Optional path to save the result. Returns: Dictionary with status, message, and output info. """ try: gdf1 = gpd.read_file(gdf1_path) gdf2 = gpd.read_file(gdf2_path) if gdf1.crs != gdf2.crs: gdf2 = gdf2.to_crs(gdf1.crs) result = gpd.overlay(gdf1, gdf2, how=how) if output_path: result.to_file(output_path) preview = result.head(5).to_dict(orient="records") return { "status": "success", "message": f"Overlay ({how}) completed successfully.", "num_features": len(result), "crs": str(result.crs), "columns": list(result.columns), "preview": preview, "output_path": output_path, } except Exception as e: logger.error(f"Error in overlay_gpd: {str(e)}") return {"status": "error", "message": str(e)} @gis_mcp.tool() def dissolve_gpd(gdf_path: str, by: str = None, output_path: str = None) -> Dict[str, Any]: """ Dissolve geometries by attribute using geopandas.dissolve. Args: gdf_path: Path to the geospatial file. by: Column to dissolve by (optional). output_path: Optional path to save the result. Returns: Dictionary with status, message, and output info. """ try: gdf = gpd.read_file(gdf_path) result = gdf.dissolve(by=by) if output_path: result.to_file(output_path) preview = result.head(5).to_dict(orient="records") return { "status": "success", "message": f"Dissolve completed successfully.", "num_features": len(result), "crs": str(result.crs), "columns": list(result.columns), "preview": preview, "output_path": output_path, } except Exception as e: logger.error(f"Error in dissolve_gpd: {str(e)}") return {"status": "error", "message": str(e)} @gis_mcp.tool() def explode_gpd(gdf_path: str, output_path: str = None) -> Dict[str, Any]: """ Split multi-part geometries into single parts using geopandas.explode. Args: gdf_path: Path to the geospatial file. output_path: Optional path to save the result. Returns: Dictionary with status, message, and output info. """ try: gdf = gpd.read_file(gdf_path) result = gdf.explode(index_parts=True, ignore_index=True) if output_path: result.to_file(output_path) preview = result.head(5).to_dict(orient="records") return { "status": "success", "message": "Explode completed successfully.", "num_features": len(result), "crs": str(result.crs), "columns": list(result.columns), "preview": preview, "output_path": output_path, } except Exception as e: logger.error(f"Error in explode_gpd: {str(e)}") return {"status": "error", "message": str(e)} @gis_mcp.tool() def clip_vector(gdf_path: str, clip_path: str, output_path: str = None) -> Dict[str, Any]: """ Clip vector geometries using geopandas.clip. Args: gdf_path: Path to the input geospatial file. clip_path: Path to the clipping geometry file. output_path: Optional path to save the result. Returns: Dictionary with status, message, and output info. """ try: gdf = gpd.read_file(gdf_path) clip_gdf = gpd.read_file(clip_path) if gdf.crs != clip_gdf.crs: clip_gdf = clip_gdf.to_crs(gdf.crs) result = gpd.clip(gdf, clip_gdf) if output_path: result.to_file(output_path) preview = result.head(5).to_dict(orient="records") return { "status": "success", "message": "Clip completed successfully.", "num_features": len(result), "crs": str(result.crs), "columns": list(result.columns), "preview": preview, "output_path": output_path, } except Exception as e: logger.error(f"Error in clip_vector: {str(e)}") return {"status": "error", "message": str(e)} @gis_mcp.tool() def sjoin_gpd(left_path: str, right_path: str, how: str = "inner", predicate: str = "intersects", output_path: str = None) -> Dict[str, Any]: """ Spatial join between two GeoDataFrames using geopandas.sjoin. Args: left_path: Path to the left geospatial file. right_path: Path to the right geospatial file. how: Type of join ('left', 'right', 'inner'). predicate: Spatial predicate ('intersects', 'within', 'contains', etc.). output_path: Optional path to save the result. Returns: Dictionary with status, message, and output info. """ try: left = gpd.read_file(left_path) right = gpd.read_file(right_path) if left.crs != right.crs: right = right.to_crs(left.crs) result = gpd.sjoin(left, right, how=how, predicate=predicate) if output_path: result.to_file(output_path) preview = result.head(5).to_dict(orient="records") return { "status": "success", "message": f"Spatial join ({how}, {predicate}) completed successfully.", "num_features": len(result), "crs": str(result.crs), "columns": list(result.columns), "preview": preview, "output_path": output_path, } except Exception as e: logger.error(f"Error in sjoin_gpd: {str(e)}") return {"status": "error", "message": str(e)} @gis_mcp.tool() def sjoin_nearest_gpd(left_path: str, right_path: str, how: str = "left", max_distance: float = None, output_path: str = None) -> Dict[str, Any]: """ Nearest neighbor spatial join using geopandas.sjoin_nearest. Args: left_path: Path to the left geospatial file. right_path: Path to the right geospatial file. how: Type of join ('left', 'right'). max_distance: Optional maximum search distance. output_path: Optional path to save the result. Returns: Dictionary with status, message, and output info. """ try: left = gpd.read_file(left_path) right = gpd.read_file(right_path) if left.crs != right.crs: right = right.to_crs(left.crs) kwargs = {"how": how} if max_distance is not None: kwargs["max_distance"] = max_distance result = gpd.sjoin_nearest(left, right, **kwargs) if output_path: result.to_file(output_path) preview = result.head(5).to_dict(orient="records") return { "status": "success", "message": f"Nearest spatial join ({how}) completed successfully.", "num_features": len(result), "crs": str(result.crs), "columns": list(result.columns), "preview": preview, "output_path": output_path, } except Exception as e: logger.error(f"Error in sjoin_nearest_gpd: {str(e)}") return {"status": "error", "message": str(e)} @gis_mcp.tool() def point_in_polygon(points_path: str, polygons_path: str, output_path: str = None) -> Dict[str, Any]: """ Check if points are inside polygons using spatial join (predicate='within'). Args: points_path: Path to the point geospatial file. polygons_path: Path to the polygon geospatial file. output_path: Optional path to save the result. Returns: Dictionary with status, message, and output info. """ try: points = gpd.read_file(points_path) polygons = gpd.read_file(polygons_path) if points.crs != polygons.crs: polygons = polygons.to_crs(points.crs) result = gpd.sjoin(points, polygons, how="left", predicate="within") if output_path: result.to_file(output_path) preview = result.head(5).to_dict(orient="records") return { "status": "success", "message": "Point-in-polygon test completed successfully.", "num_features": len(result), "crs": str(result.crs), "columns": list(result.columns), "preview": preview, "output_path": output_path, } except Exception as e: logger.error(f"Error in point_in_polygon: {str(e)}") return {"status": "error", "message": str(e)} @gis_mcp.tool() def write_file_gpd(gdf_path: str, output_path: str, driver: str = None) -> Dict[str, Any]: """ Export a GeoDataFrame to a file (Shapefile, GeoJSON, GPKG, etc.). Args: gdf_path: Path to the input geospatial file. output_path: Path to save the exported file. driver: Optional OGR driver name (e.g., 'ESRI Shapefile', 'GeoJSON', 'GPKG'). Returns: Dictionary with status and message. """ try: gdf = gpd.read_file(gdf_path) kwargs = {"driver": driver} if driver else {} gdf.to_file(output_path, **kwargs) return { "status": "success", "message": f"GeoDataFrame exported to '{output_path}' successfully.", "output_path": output_path, "crs": str(gdf.crs), "num_features": len(gdf), "columns": list(gdf.columns), } except Exception as e: logger.error(f"Error in write_file_gpd: {str(e)}") return {"status": "error", "message": str(e)}

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/mahdin75/gis-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server