Skip to main content
Glama
qgis_ollama_assistant.py18.3 kB
#!/usr/bin/env python3 """ QGIS Ollama助手 - 使用deepseek-r1:14b模型与QGIS交互 """ import socket import json import requests import sys import re import time from typing import List, Dict, Any, Optional, Tuple # from .ollama_client import OllamaClient from qgis_mcp.ollama_client import OllamaClient class QgisMCPSocketClient: """与QGIS MCP插件通信的Socket客户端""" def __init__(self, host='localhost', port=9876): self.host = host self.port = port self.socket = None def connect(self): """连接到QGIS MCP服务器""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.host, self.port)) print(f"已连接到QGIS MCP服务器 {self.host}:{self.port}") return True except Exception as e: print(f"连接QGIS MCP服务器失败: {str(e)}") self.socket = None return False def disconnect(self): """断开与QGIS MCP服务器的连接""" if self.socket: self.socket.close() self.socket = None print("已断开与QGIS MCP服务器的连接") def send_command(self, command_type, params=None): """发送命令到QGIS MCP服务器 Args: command_type: 命令类型 params: 命令参数 Returns: 服务器响应 """ if not self.socket: if not self.connect(): return {"status": "error", "message": "未连接到QGIS MCP服务器"} command = { "type": command_type, "params": params or {} } try: # 发送命令 command_json = json.dumps(command) self.socket.sendall(command_json.encode('utf-8')) # 接收响应 response_data = b'' while True: chunk = self.socket.recv(4096) if not chunk: break response_data += chunk try: # 尝试解析JSON,如果成功则表示接收完成 json.loads(response_data.decode('utf-8')) break except json.JSONDecodeError: # 数据不完整,继续接收 continue # 解析响应 response = json.loads(response_data.decode('utf-8')) return response except Exception as e: print(f"发送命令失败: {str(e)}") self.disconnect() return {"status": "error", "message": str(e)} class OllamaClient: """Ollama API客户端""" def __init__(self, base_url="http://localhost:11434", model="deepseek-r1:14b"): self.base_url = base_url self.model = model self.api_generate_url = f"{base_url}/api/generate" def generate(self, prompt, system_prompt=None): """向Ollama发送请求并获取生成的回复 Args: prompt: 用户提示 system_prompt: 系统提示(可选) Returns: 生成的文本响应 """ payload = { "model": self.model, "prompt": prompt, "stream": False } if system_prompt: payload["system"] = system_prompt try: response = requests.post(self.api_generate_url, json=payload) response.raise_for_status() return response.json().get("response", "") except Exception as e: print(f"调用Ollama API失败: {str(e)}") return f"错误: {str(e)}" class QgisOllamaAssistant: """QGIS deepseek助手 - 使用deepseek-r1:14b模型与QGIS交互""" def __init__(self, qgis_host='localhost', qgis_port=9876, ollama_url="http://localhost:11434", ollama_model="deepseek-r1:14b"): self.qgis_client = QgisMCPSocketClient(host=qgis_host, port=qgis_port) self.ollama_client = OllamaClient(base_url=ollama_url, model=ollama_model) self.context = { "last_command": None, "last_layer_id": None, "project_info": None, "layers": [] } self.system_prompt = """你是一个GIS专家助手,专门处理QGIS相关任务。 你需要将用户的自然语言指令转换为QGIS命令。 可用的QGIS命令包括: - add_vector_layer: 添加矢量图层,参数包括path(文件路径)和name(图层名称) - add_raster_layer: 添加栅格图层,参数包括path(文件路径)和name(图层名称) - zoom_to_layer: 缩放到图层,参数包括layer_id(图层ID) - get_layers: 获取所有图层 - remove_layer: 移除图层,参数包括layer_id(图层ID) - get_project_info: 获取项目信息 - execute_processing: 执行处理算法,参数包括algorithm(算法名称)和parameters(算法参数) 常用的处理算法包括: - native:randompointsinextent: 在范围内生成随机点,参数包括: * EXTENT: 范围,格式为"xmin,xmax,ymin,ymax" * POINTS_NUMBER: 点的数量 * OUTPUT: 输出文件路径 - native:buffer: 创建缓冲区,参数包括: * INPUT: 输入图层ID * DISTANCE: 缓冲距离 * OUTPUT: 输出文件路径 请分析用户指令并提供相应的QGIS命令序列,格式为JSON。例如: [ {"command": "add_vector_layer", "params": {"path": "D:/data/roads.shp", "name": "roads"}}, {"command": "zoom_to_layer", "params": {"layer_id": "roads_123456"}} ] 注意:确保使用正确的算法名称和参数格式。 """ def get_supported_commands(self) -> List[str]: """获取服务器支持的命令列表""" response = self.qgis_client.send_command("get_commands") if response.get("status") == "success": return response.get("result", []) return [] def connect(self) -> bool: """连接到QGIS服务器""" if self.qgis_client.connect(): # 尝试获取支持的命令 commands = self.get_supported_commands() if commands: print("服务器支持的命令:") for cmd in commands: print(f" - {cmd}") return True return False def disconnect(self) -> None: """断开与QGIS服务器的连接""" self.qgis_client.disconnect() def update_context(self) -> None: """更新上下文信息""" # 获取项目信息 project_info = self.qgis_client.send_command("get_project_info") if project_info.get("status") == "success": self.context["project_info"] = project_info.get("result") # 获取图层列表 layers = self.qgis_client.send_command("get_layers") if layers.get("status") == "success": self.context["layers"] = layers.get("result") def parse_ai_response(self, response: str) -> List[Dict[str, Any]]: """解析AI响应,提取QGIS命令 Args: response: AI生成的响应文本 Returns: 解析后的QGIS命令列表 """ commands = [] # 尝试从响应中提取JSON格式的命令 json_pattern = r'\[\s*{.*}\s*\]' json_matches = re.findall(json_pattern, response, re.DOTALL) if json_matches: try: commands = json.loads(json_matches[0]) return commands except json.JSONDecodeError: pass # 如果无法提取JSON,尝试通过关键词识别命令 if "add_vector_layer" in response or "添加矢量图层" in response: # 尝试提取文件路径 path_pattern = r'["\']([D|d]:\\[^"\']+\.shp)["\']' path_matches = re.findall(path_pattern, response) if path_matches: path = path_matches[0] name = path.split('\\')[-1].split('.')[0] commands.append({ "command": "add_vector_layer", "params": { "path": path, "name": name } }) if "zoom_to_layer" in response or "缩放到图层" in response: # 如果前面有添加图层的命令,则缩放到该图层 if commands and commands[-1]["command"] == "add_vector_layer": commands.append({ "command": "zoom_to_layer", "params": { "layer_id": "PLACEHOLDER_LAYER_ID" # 这将在执行时被替换 } }) return commands def execute_commands(self, commands: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """执行QGIS命令序列 Args: commands: 要执行的命令列表 Returns: 命令执行结果列表 """ results = [] for cmd in commands: command_type = cmd.get("command") params = cmd.get("params", {}) # 特殊处理:如果是zoom_to_layer命令且layer_id是占位符 if command_type == "zoom_to_layer" and params.get("layer_id") == "PLACEHOLDER_LAYER_ID": if self.context["last_layer_id"]: params["layer_id"] = self.context["last_layer_id"] else: print("警告: 没有可用的图层ID,跳过缩放命令") continue # 特殊处理:execute_processing命令 if command_type == "execute_processing": # 检查算法名称 algorithm = params.get("algorithm", "") # 修正算法名称 if algorithm == "qgis:generate_random_points": params["algorithm"] = "native:randompointsinextent" print(f"修正算法名称: 'qgis:generate_random_points' -> 'native:randompointsinextent'") # 检查参数格式 if "parameters" in params: # 对于随机点生成,确保参数正确 if "native:randompointsinextent" in params["algorithm"]: # 确保参数符合QGIS要求 if "OUTPUT_FILE" in params["parameters"]: # 将OUTPUT_FILE转换为OUTPUT output_file = params["parameters"].pop("OUTPUT_FILE") params["parameters"]["OUTPUT"] = output_file print(f"修正参数: 'OUTPUT_FILE' -> 'OUTPUT'") # 添加必要的EXTENT参数 if "EXTENT" not in params["parameters"]: # 获取当前地图范围 extent_response = self.qgis_client.send_command("get_map_extent") if extent_response.get("status") == "success": extent = extent_response.get("result", {}).get("extent") if extent: params["parameters"]["EXTENT"] = extent print(f"添加缺失参数: 'EXTENT' = {extent}") else: # 使用默认范围 params["parameters"]["EXTENT"] = "0,1,0,1" print("添加默认EXTENT参数: '0,1,0,1'") else: # 使用默认范围 params["parameters"]["EXTENT"] = "0,1,0,1" print("添加默认EXTENT参数: '0,1,0,1'") # 检查文件路径是否存在 if command_type == "add_vector_layer" or command_type == "add_raster_layer": import os path = params.get("path", "") if not os.path.exists(path): print(f"警告: 文件不存在: {path}") # 检查是否有前一个命令生成了该文件 if len(results) > 0 and results[-1]["command"] == "execute_processing": prev_result = results[-1]["response"] if prev_result.get("status") != "success": print(f"跳过添加图层命令,因为前一个处理命令失败") continue print(f"执行命令: {command_type}, 参数: {params}") response = self.qgis_client.send_command(command_type, params) # 保存结果 results.append({ "command": command_type, "params": params, "response": response }) # 如果命令执行失败,打印详细错误信息 if response.get("status") != "success": error_msg = response.get("message", "未知错误") print(f"命令执行失败: {error_msg}") # 更新上下文 if command_type == "add_vector_layer" or command_type == "add_raster_layer": if response.get("status") == "success": self.context["last_layer_id"] = response.get("result", {}).get("id") self.context["last_command"] = command_type # 执行完所有命令后更新上下文 self.update_context() return results def process_instruction(self, instruction: str) -> Dict[str, Any]: """处理用户指令 Args: instruction: 用户自然语言指令 Returns: 处理结果 """ # 更新上下文 self.update_context() # 构建提示 context_json = json.dumps(self.context, ensure_ascii=False, indent=2) prompt = f"""用户指令: {instruction} 当前QGIS上下文: {context_json} 请分析用户指令,并提供相应的QGIS命令序列。""" # 使用Ollama生成响应 print("正在使用deepseek分析指令...") ai_response = self.ollama_client.generate(prompt, self.system_prompt) print("\ndeepseek响应:") print(ai_response) # 解析AI响应 print("\n解析AI响应为QGIS命令...") commands = self.parse_ai_response(ai_response) if not commands: print("未能从AI响应中提取有效的QGIS命令") # 对于特定指令,提供默认命令 if "加载" in instruction and "tf.shp" in instruction and "缩放" in instruction: commands = [ { "command": "add_vector_layer", "params": { "path": "D:\\code\\testgdal\\tf.shp", "name": "tf" } }, { "command": "zoom_to_layer", "params": { "layer_id": "PLACEHOLDER_LAYER_ID" } } ] print("使用默认命令序列") # 执行命令 results = self.execute_commands(commands) return { "instruction": instruction, "ai_response": ai_response, "commands": commands, "results": results } def main(): """主函数""" print("QGIS deepseek助手 - 使用deepseek-r1:14b模型与QGIS交互") # 创建助手实例 assistant = QgisOllamaAssistant() # 连接到QGIS if not assistant.connect(): print("无法连接到QGIS服务器,请确保QGIS正在运行并且MCP插件已启动") return 1 try: # 示例:处理特定指令 instruction = "加载D:\\code\\testgdal\\tf.shp并缩放到该图层" # instruction = "新建工程,保存到D:\\code\\testgdal" print(f"\n处理用户指令: {instruction}") result = assistant.process_instruction(instruction) # 显示执行结果摘要 print("\n执行结果摘要:") for cmd_result in result["results"]: status = cmd_result["response"].get("status", "unknown") command = cmd_result["command"] status_emoji = "✅" if status == "success" else "❌" print(f"{status_emoji} {command}: {status}") # 交互式模式 print("\n进入交互模式,输入'exit'退出") while True: user_input = input("\n请输入QGIS指令: ") if user_input.lower() in ["exit", "quit", "退出"]: break result = assistant.process_instruction(user_input) # 显示执行结果摘要 print("\n执行结果摘要:") for cmd_result in result["results"]: status = cmd_result["response"].get("status", "unknown") command = cmd_result["command"] status_emoji = "✅" if status == "success" else "❌" print(f"{status_emoji} {command}: {status}") except Exception as e: print(f"执行过程中出错: {str(e)}") import traceback traceback.print_exc() return 1 finally: # 断开连接 assistant.disconnect() return 0 if __name__ == "__main__": sys.exit(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kicker315/deepseek_qgis_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server