QGIS MCP Server
by jjsantos01
- qgis_mcp_plugin
import os
import json
import socket
import traceback
from qgis.core import *
from qgis.gui import *
from qgis.PyQt.QtCore import QObject, pyqtSignal, QTimer, Qt, QSize
from qgis.PyQt.QtWidgets import QAction, QDockWidget, QVBoxLayout, QLabel, QPushButton, QSpinBox, QWidget
from qgis.PyQt.QtGui import QIcon, QColor
from qgis.utils import active_plugins
class QgisMCPServer(QObject):
"""Server class to handle socket connections and execute QGIS commands"""
def __init__(self, host='localhost', port=9876, iface=None):
super().__init__()
self.host = host
self.port = port
self.iface = iface
self.running = False
self.socket = None
self.client = None
self.buffer = b''
self.timer = None
def start(self):
"""Start the server"""
self.running = True
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.bind((self.host, self.port))
self.socket.listen(1)
self.socket.setblocking(False)
# Create a timer to process server operations
self.timer = QTimer()
self.timer.timeout.connect(self.process_server)
self.timer.start(100) # 100ms interval
QgsMessageLog.logMessage(f"QGIS MCP server started on {self.host}:{self.port}", "QGIS MCP")
return True
except Exception as e:
QgsMessageLog.logMessage(f"Failed to start server: {str(e)}", "QGIS MCP", Qgis.Critical)
self.stop()
return False
def stop(self):
"""Stop the server"""
self.running = False
if self.timer:
self.timer.stop()
self.timer = None
if self.socket:
self.socket.close()
if self.client:
self.client.close()
self.socket = None
self.client = None
QgsMessageLog.logMessage("QGIS MCP server stopped", "QGIS MCP")
def process_server(self):
"""Process server operations (called by timer)"""
if not self.running:
return
try:
# Accept new connections
if not self.client and self.socket:
try:
self.client, address = self.socket.accept()
self.client.setblocking(False)
QgsMessageLog.logMessage(f"Connected to client: {address}", "QGIS MCP")
except BlockingIOError:
pass # No connection waiting
except Exception as e:
QgsMessageLog.logMessage(f"Error accepting connection: {str(e)}", "QGIS MCP", Qgis.Warning)
# Process existing connection
if self.client:
try:
# Try to receive data
try:
data = self.client.recv(8192)
if data:
self.buffer += data
# Try to process complete messages
try:
# Attempt to parse the buffer as JSON
command = json.loads(self.buffer.decode('utf-8'))
# If successful, clear the buffer and process command
self.buffer = b''
response = self.execute_command(command)
response_json = json.dumps(response)
self.client.sendall(response_json.encode('utf-8'))
except json.JSONDecodeError:
# Incomplete data, keep in buffer
pass
else:
# Connection closed by client
QgsMessageLog.logMessage("Client disconnected", "QGIS MCP")
self.client.close()
self.client = None
self.buffer = b''
except BlockingIOError:
pass # No data available
except Exception as e:
QgsMessageLog.logMessage(f"Error receiving data: {str(e)}", "QGIS MCP", Qgis.Warning)
self.client.close()
self.client = None
self.buffer = b''
except Exception as e:
QgsMessageLog.logMessage(f"Error with client: {str(e)}", "QGIS MCP", Qgis.Warning)
if self.client:
self.client.close()
self.client = None
self.buffer = b''
except Exception as e:
QgsMessageLog.logMessage(f"Server error: {str(e)}", "QGIS MCP", Qgis.Critical)
def execute_command(self, command):
"""Execute a command"""
try:
cmd_type = command.get("type")
params = command.get("params", {})
handlers = {
"ping": self.ping,
"get_qgis_info": self.get_qgis_info,
"load_project": self.load_project,
"get_project_info": self.get_project_info,
"execute_code": self.execute_code,
"add_vector_layer": self.add_vector_layer,
"add_raster_layer": self.add_raster_layer,
"get_layers": self.get_layers,
"remove_layer": self.remove_layer,
"zoom_to_layer": self.zoom_to_layer,
"get_layer_features": self.get_layer_features,
"execute_processing": self.execute_processing,
"save_project": self.save_project,
"render_map": self.render_map,
"create_new_project": self.create_new_project,
}
handler = handlers.get(cmd_type)
if handler:
try:
QgsMessageLog.logMessage(f"Executing handler for {cmd_type}", "QGIS MCP")
result = handler(**params)
QgsMessageLog.logMessage(f"Handler execution complete", "QGIS MCP")
return {"status": "success", "result": result}
except Exception as e:
QgsMessageLog.logMessage(f"Error in handler: {str(e)}", "QGIS MCP", Qgis.Critical)
traceback.print_exc()
return {"status": "error", "message": str(e)}
else:
return {"status": "error", "message": f"Unknown command type: {cmd_type}"}
except Exception as e:
QgsMessageLog.logMessage(f"Error executing command: {str(e)}", "QGIS MCP", Qgis.Critical)
traceback.print_exc()
return {"status": "error", "message": str(e)}
# Command handlers
def ping(self, **kwargs):
"""Simple ping command"""
return {"pong": True}
def get_qgis_info(self, **kwargs):
"""Get basic QGIS information"""
return {
"qgis_version": Qgis.version(),
"profile_folder": QgsApplication.qgisSettingsDirPath(),
"plugins_count": len(active_plugins)
}
def get_project_info(self, **kwargs):
"""Get information about the current QGIS project"""
project = QgsProject.instance()
# Get basic project information
info = {
"filename": project.fileName(),
"title": project.title(),
"layer_count": len(project.mapLayers()),
"crs": project.crs().authid(),
"layers": []
}
# Add basic layer information (limit to 10 layers for performance)
layers = list(project.mapLayers().values())
for i, layer in enumerate(layers):
if i >= 10: # Limit to 10 layers
break
layer_info = {
"id": layer.id(),
"name": layer.name(),
"type": self._get_layer_type(layer),
"visible": layer.isValid() and project.layerTreeRoot().findLayer(layer.id()).isVisible()
}
info["layers"].append(layer_info)
return info
def _get_layer_type(self, layer):
"""Helper to get layer type as string"""
if layer.type() == QgsMapLayer.VectorLayer:
return f"vector_{layer.geometryType()}"
elif layer.type() == QgsMapLayer.RasterLayer:
return "raster"
else:
return str(layer.type())
def execute_code(self, code, **kwargs):
"""Execute arbitrary PyQGIS code"""
try:
# Create a local namespace for execution
namespace = {
"qgis": Qgis,
"QgsProject": QgsProject,
"iface": self.iface,
"QgsApplication": QgsApplication,
"QgsVectorLayer": QgsVectorLayer,
"QgsRasterLayer": QgsRasterLayer,
"QgsCoordinateReferenceSystem": QgsCoordinateReferenceSystem
}
# Execute the code
exec(code, namespace)
return {"executed": True}
except Exception as e:
raise Exception(f"Code execution error: {str(e)}")
def add_vector_layer(self, path, name=None, provider="ogr", **kwargs):
"""Add a vector layer to the project"""
if not name:
name = os.path.basename(path)
# Create the layer
layer = QgsVectorLayer(path, name, provider)
if not layer.isValid():
raise Exception(f"Layer is not valid: {path}")
# Add to project
QgsProject.instance().addMapLayer(layer)
return {
"id": layer.id(),
"name": layer.name(),
"type": self._get_layer_type(layer),
"feature_count": layer.featureCount()
}
def add_raster_layer(self, path, name=None, provider="gdal", **kwargs):
"""Add a raster layer to the project"""
if not name:
name = os.path.basename(path)
# Create the layer
layer = QgsRasterLayer(path, name, provider)
if not layer.isValid():
raise Exception(f"Layer is not valid: {path}")
# Add to project
QgsProject.instance().addMapLayer(layer)
return {
"id": layer.id(),
"name": layer.name(),
"type": "raster",
"width": layer.width(),
"height": layer.height()
}
def get_layers(self, **kwargs):
"""Get all layers in the project"""
project = QgsProject.instance()
layers = []
for layer_id, layer in project.mapLayers().items():
layer_info = {
"id": layer_id,
"name": layer.name(),
"type": self._get_layer_type(layer),
"visible": project.layerTreeRoot().findLayer(layer_id).isVisible()
}
# Add type-specific information
if layer.type() == QgsMapLayer.VectorLayer:
layer_info.update({
"feature_count": layer.featureCount(),
"geometry_type": layer.geometryType()
})
elif layer.type() == QgsMapLayer.RasterLayer:
layer_info.update({
"width": layer.width(),
"height": layer.height()
})
layers.append(layer_info)
return layers
def remove_layer(self, layer_id, **kwargs):
"""Remove a layer from the project"""
project = QgsProject.instance()
if layer_id in project.mapLayers():
project.removeMapLayer(layer_id)
return {"removed": layer_id}
else:
raise Exception(f"Layer not found: {layer_id}")
def zoom_to_layer(self, layer_id, **kwargs):
"""Zoom to a layer's extent"""
project = QgsProject.instance()
if layer_id in project.mapLayers():
layer = project.mapLayer(layer_id)
self.iface.setActiveLayer(layer)
self.iface.zoomToActiveLayer()
return {"zoomed_to": layer_id}
else:
raise Exception(f"Layer not found: {layer_id}")
def get_layer_features(self, layer_id, limit=10, **kwargs):
"""Get features from a vector layer"""
project = QgsProject.instance()
if layer_id in project.mapLayers():
layer = project.mapLayer(layer_id)
if layer.type() != QgsMapLayer.VectorLayer:
raise Exception(f"Layer is not a vector layer: {layer_id}")
features = []
for i, feature in enumerate(layer.getFeatures()):
if i >= limit:
break
# Extract attributes
attrs = {}
for field in layer.fields():
attrs[field.name()] = feature.attribute(field.name())
# Extract geometry if available
geom = None
if feature.hasGeometry():
geom = {
"type": feature.geometry().type(),
"wkt": feature.geometry().asWkt(precision=4)
}
features.append({
"id": feature.id(),
"attributes": attrs,
"geometry": geom
})
return {
"layer_id": layer_id,
"feature_count": layer.featureCount(),
"features": features,
"fields": [field.name() for field in layer.fields()]
}
else:
raise Exception(f"Layer not found: {layer_id}")
def execute_processing(self, algorithm, parameters, **kwargs):
"""Execute a processing algorithm"""
try:
import processing
result = processing.run(algorithm, parameters)
return {
"algorithm": algorithm,
"result": {k: str(v) for k, v in result.items()} # Convert values to strings for JSON
}
except Exception as e:
raise Exception(f"Processing error: {str(e)}")
def save_project(self, path=None, **kwargs):
"""Save the current project"""
project = QgsProject.instance()
if not path and not project.fileName():
raise Exception("No project path specified and no current project path")
save_path = path if path else project.fileName()
if project.write(save_path):
return {"saved": save_path}
else:
raise Exception(f"Failed to save project to {save_path}")
def load_project(self, path, **kwargs):
"""Load a project"""
project = QgsProject.instance()
if project.read(path):
self.iface.mapCanvas().refresh()
return {
"loaded": path,
"layer_count": len(project.mapLayers())
}
else:
raise Exception(f"Failed to load project from {path}")
def create_new_project(self, path, **kwargs):
"""
Creates a new QGIS project and saves it at the specified path.
If a project is already loaded, it clears it before creating the new one.
:param project_path: Full path where the project will be saved
(e.g., 'C:/path/to/project.qgz')
"""
project = QgsProject.instance()
if project.fileName():
project.clear()
project.setFileName(path)
self.iface.mapCanvas().refresh()
# Save the project
if project.write():
return {
"created": f"Project created and saved successfully at: {path}",
"layer_count": len(project.mapLayers())
}
else:
raise Exception(f"Failed to save project to {path}")
def render_map(self, path, width=800, height=600, **kwargs):
"""Render the current map view to an image"""
try:
# Create map settings
ms = QgsMapSettings()
# Set layers to render
layers = list(QgsProject.instance().mapLayers().values())
ms.setLayers(layers)
# Set map canvas properties
rect = self.iface.mapCanvas().extent()
ms.setExtent(rect)
ms.setOutputSize(QSize(width, height))
ms.setBackgroundColor(QColor(255, 255, 255))
ms.setOutputDpi(96)
# Create the render
render = QgsMapRendererParallelJob(ms)
# Start rendering
render.start()
render.waitForFinished()
# Get the image and save
img = render.renderedImage()
if img.save(path):
return {
"rendered": True,
"path": path,
"width": width,
"height": height
}
else:
raise Exception(f"Failed to save rendered image to {path}")
except Exception as e:
raise Exception(f"Render error: {str(e)}")
class QgisMCPDockWidget(QDockWidget):
"""Dock widget for the QGIS MCP plugin"""
closed = pyqtSignal()
def __init__(self, iface):
super().__init__("QGIS MCP")
self.iface = iface
self.server = None
self.setup_ui()
def setup_ui(self):
"""Set up the dock widget UI"""
# Create widget and layout
widget = QWidget()
layout = QVBoxLayout()
widget.setLayout(layout)
# Add port selection
layout.addWidget(QLabel("Server Port:"))
self.port_spin = QSpinBox()
self.port_spin.setMinimum(1024)
self.port_spin.setMaximum(65535)
self.port_spin.setValue(9876)
layout.addWidget(self.port_spin)
# Add server control buttons
self.start_button = QPushButton("Start Server")
self.start_button.clicked.connect(self.start_server)
layout.addWidget(self.start_button)
self.stop_button = QPushButton("Stop Server")
self.stop_button.clicked.connect(self.stop_server)
self.stop_button.setEnabled(False)
layout.addWidget(self.stop_button)
# Add status label
self.status_label = QLabel("Server: Stopped")
layout.addWidget(self.status_label)
# Add to dock widget
self.setWidget(widget)
def start_server(self):
"""Start the server"""
if not self.server:
port = self.port_spin.value()
self.server = QgisMCPServer(port=port, iface=self.iface)
if self.server.start():
self.status_label.setText(f"Server: Running on port {self.server.port}")
self.start_button.setEnabled(False)
self.stop_button.setEnabled(True)
self.port_spin.setEnabled(False)
def stop_server(self):
"""Stop the server"""
if self.server:
self.server.stop()
self.server = None
self.status_label.setText("Server: Stopped")
self.start_button.setEnabled(True)
self.stop_button.setEnabled(False)
self.port_spin.setEnabled(True)
def closeEvent(self, event):
"""Stop server on dock close"""
self.stop_server()
self.closed.emit()
super().closeEvent(event)
class QgisMCPPlugin:
"""Main plugin class for QGIS MCP"""
def __init__(self, iface):
self.iface = iface
self.dock_widget = None
self.action = None
def initGui(self):
"""Initialize GUI"""
# Create action
self.action = QAction(
"QGIS MCP",
self.iface.mainWindow()
)
self.action.setCheckable(True)
self.action.triggered.connect(self.toggle_dock)
# Add to plugins menu and toolbar
self.iface.addPluginToMenu("QGIS MCP", self.action)
self.iface.addToolBarIcon(self.action)
def toggle_dock(self, checked):
"""Toggle the dock widget"""
if checked:
# Create dock widget if it doesn't exist
if not self.dock_widget:
self.dock_widget = QgisMCPDockWidget(self.iface)
self.iface.addDockWidget(Qt.RightDockWidgetArea, self.dock_widget)
# Connect close event
self.dock_widget.closed.connect(self.dock_closed)
else:
# Show existing dock widget
self.dock_widget.show()
else:
# Hide dock widget
if self.dock_widget:
self.dock_widget.hide()
def dock_closed(self):
"""Handle dock widget closed"""
self.action.setChecked(False)
def unload(self):
"""Unload plugin"""
# Stop server if running
if self.dock_widget:
self.dock_widget.stop_server()
self.iface.removeDockWidget(self.dock_widget)
self.dock_widget = None
# Remove plugin menu item and toolbar icon
self.iface.removePluginMenu("QGIS MCP", self.action)
self.iface.removeToolBarIcon(self.action)
# Plugin entry point
def classFactory(iface):
return QgisMCPPlugin(iface)