Skip to main content
Glama
mcp_server.py31.8 kB
import importlib import logging import secrets import string import threading from odoo import api, fields, models, _ from odoo.exceptions import UserError, ValidationError _logger = logging.getLogger(__name__) # 全局标志,确保自动启动只执行一次 _auto_start_executed = False _auto_start_lock = threading.Lock() class MCPServer(models.Model): _name = 'mcp.server' _description = 'MCP 服务器' _inherit = ['mail.thread', 'mail.activity.mixin'] _order = 'name' # 移除SQL约束,改用Python验证 name = fields.Char('服务器名称', required=True, tracking=True) server_url = fields.Char('服务器URL', readonly=True, help="自动生成的服务器URL") server_port = fields.Integer('服务器端口', default=lambda self: self._get_next_available_port(), help="MCP服务器监听端口", tracking=True) api_key = fields.Char('API密钥', tracking=True) active = fields.Boolean('激活状态', default=True, tracking=True) state = fields.Selection([ ('draft', '草稿'), ('active', '活动'), ('inactive', '不活动'), ], string='状态', default='draft', tracking=True) # 安全设置 require_auth = fields.Boolean('需要授权', default=True, help="启用后,所有请求必须包含有效的认证凭证", tracking=True) auth_method = fields.Selection([ ('api_key', 'API密钥'), ('jwt', 'JWT Bearer Token') ], string='认证方式', default='api_key', required=True, tracking=True) allowed_ips = fields.Text('允许的IP地址', help="每行一个IP地址或CIDR格式。留空表示允许所有IP", tracking=True) log_requests = fields.Boolean('记录请求', default=True, help="启用后,记录所有请求到日志", tracking=True) max_requests_per_minute = fields.Integer('每分钟最大请求数', default=60, help="限制每个IP每分钟的最大请求数,0表示不限制", tracking=True) # API密钥认证相关设置 api_key = fields.Char('API密钥', tracking=True, help="用于API密钥认证方式的密钥") # JWT认证相关设置 jwt_public_key = fields.Text('JWT公钥', tracking=True, help="用于验证JWT令牌的RSA公钥(PEM格式)") jwks_uri = fields.Char('JWKS URI', tracking=True, help="JSON Web Key Set URI,用于动态获取验证密钥") jwt_issuer = fields.Char('令牌颁发者(Issuer)', tracking=True, help="JWT令牌颁发者标识,用于验证令牌来源") jwt_audience = fields.Char('令牌接收方(Audience)', tracking=True, help="JWT令牌接收方标识,用于验证令牌目标") jwt_algorithm = fields.Selection([ ('RS256', 'RS256'), ('RS384', 'RS384'), ('RS512', 'RS512'), ('ES256', 'ES256'), ('ES384', 'ES384'), ('ES512', 'ES512') ], string='签名算法', default='RS256', tracking=True) note = fields.Text('备注') last_connection = fields.Datetime('最后连接时间', readonly=True) connection_count = fields.Integer('连接次数', readonly=True, default=0) resource_ids = fields.One2many('mcp.resource', 'server_id', string='资源') resource_count = fields.Integer(compute='_compute_resource_count', string='资源数量') @api.depends('resource_ids') def _compute_resource_count(self): for record in self: record.resource_count = len(record.resource_ids) @api.model def _auto_start_servers_on_startup(self): """在系统启动时自动启动活动的MCP服务器""" global _auto_start_executed, _auto_start_lock with _auto_start_lock: if _auto_start_executed: return _auto_start_executed = True try: _logger.info("系统启动:开始自动启动活动的MCP服务器") # 延迟启动,确保系统完全初始化 def delayed_auto_start(): try: import time time.sleep(3.0) # 等待3秒确保系统完全启动 _logger.info("开始延迟自动启动MCP服务器") # 获取FastMCP服务并启动活动服务器 from ..services.fast_mcp_service import FastMCPService success = FastMCPService.auto_start_on_module_init() if success: _logger.info("系统启动:MCP服务器自动启动成功") else: _logger.warning("系统启动:MCP服务器自动启动未完全成功") except Exception as e: _logger.error("系统启动时自动启动MCP服务器失败: %s", str(e)) import traceback _logger.error("错误详情: %s", traceback.format_exc()) # 在后台线程中执行延迟启动 startup_thread = threading.Thread( target=delayed_auto_start, daemon=True, name="MCP-SystemAutoStart" ) startup_thread.start() _logger.info("系统启动:已启动MCP服务器自动启动线程") except Exception as e: _logger.error("设置MCP服务器自动启动时发生错误: %s", str(e)) @api.model def _register_hook(self): """在模型注册时调用,用于设置自动启动""" super()._register_hook() # 在模型注册完成后触发自动启动 self._auto_start_servers_on_startup() def _get_next_available_port(self): """获取下一个可用的端口号""" import socket import random # 端口范围配置 start_port = 10888 max_port = 11000 # 最大端口范围 # 获取已使用的端口 used_ports = set() existing_servers = self.search([('server_port', '!=', False)]) for server in existing_servers: if server.server_port: used_ports.add(server.server_port) # 首先尝试顺序查找 for port in range(start_port, max_port): if port not in used_ports: # 检查端口是否真的可用 try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', port)) sock.close() _logger.info("找到可用端口: %d", port) return port except OSError: _logger.debug("端口 %d 已被占用", port) continue # 如果顺序查找失败,尝试随机端口 _logger.warning("顺序查找端口失败,尝试随机端口") attempts = 0 while attempts < 50: # 最多尝试50次 attempts += 1 port = random.randint(20000, 65000) # 使用高端口范围 if port not in used_ports: try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.bind(('127.0.0.1', port)) sock.close() _logger.info("找到随机可用端口: %d", port) return port except OSError: continue # 如果仍然失败,使用默认端口并警告 _logger.error("无法找到可用端口,使用默认端口10888,可能会导致端口冲突") return 10888 return 10888 @api.constrains('server_port') def _check_server_port(self): """验证服务器端口唯一性""" for record in self: if not record.server_port: continue # 检查是否有其他记录使用相同的端口 same_port_records = self.search([ ('id', '!=', record.id), ('server_port', '=', record.server_port) ]) if same_port_records: raise ValidationError(_( '端口 %s 已被服务器 "%s" 使用,请选择其他端口。' ) % (record.server_port, same_port_records[0].name)) def action_generate_new_api_key(self): """生成新的API密钥""" self.ensure_one() alphabet = string.ascii_letters + string.digits new_api_key = ''.join(secrets.choice(alphabet) for i in range(32)) self.write({'api_key': new_api_key}) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('API密钥已更新'), 'message': _('已为服务器 "%s" 生成新的API密钥') % self.name, 'type': 'success', 'sticky': False, } } @api.model_create_multi def create(self, vals_list): # 处理单个字典的情况 if isinstance(vals_list, dict): vals_list = [vals_list] processed_vals_list = [] for vals in vals_list: vals = dict(vals) # 创建副本以避免修改原始数据 if 'api_key' not in vals or not vals['api_key']: alphabet = string.ascii_letters + string.digits vals['api_key'] = ''.join(secrets.choice(alphabet) for i in range(32)) # 确保每个服务器都有唯一的端口 if 'server_port' not in vals or not vals['server_port']: vals['server_port'] = self._get_next_available_port() else: # 检查端口是否已被使用 same_port_records = self.search([('server_port', '=', vals['server_port'])]) if same_port_records: raise ValidationError(_( '端口 %s 已被服务器 "%s" 使用,请选择其他端口。' ) % (vals['server_port'], same_port_records[0].name)) # 生成服务器URL if 'server_port' in vals: vals['server_url'] = f'http://127.0.0.1:{vals["server_port"]}' processed_vals_list.append(vals) return super(MCPServer, self).create(processed_vals_list) def write(self, vals): """重写write方法,确保端口唯一性""" if 'server_port' in vals: # 检查端口是否已被其他服务器使用 same_port_records = self.search([ ('id', 'not in', self.ids), ('server_port', '=', vals['server_port']) ]) if same_port_records: raise ValidationError(_( '端口 %s 已被服务器 "%s" 使用,请选择其他端口。' ) % (vals['server_port'], same_port_records[0].name)) # 更新服务器URL vals['server_url'] = f'http://127.0.0.1:{vals["server_port"]}' return super(MCPServer, self).write(vals) def action_activate(self): """启动MCP服务器""" for record in self: if record.state == 'active': return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('服务器已运行'), 'message': _('MCP服务器 "%s" 已经在运行中') % record.name, 'type': 'info', 'sticky': False, } } try: _logger.info('用户请求启动MCP服务器: %s (端口: %d)', record.name, record.server_port) # 启动FastMCP服务器 if self._start_fastmcp_server(record): record.write({ 'state': 'active', 'last_connection': fields.Datetime.now() }) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('启动成功'), 'message': _('MCP服务器 "%s" 已成功启动在端口 %d') % (record.name, record.server_port), 'type': 'success', 'sticky': False, } } else: return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('启动失败'), 'message': _('MCP服务器 "%s" 启动失败,请查看日志了解详情') % record.name, 'type': 'danger', 'sticky': True, } } except Exception as e: _logger.error('启动MCP服务器失败: %s', str(e)) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('启动错误'), 'message': _('启动MCP服务器时发生错误: %s') % str(e), 'type': 'danger', 'sticky': True, } } def action_deactivate(self): """停止MCP服务器""" for record in self: if record.state == 'inactive': return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('服务器已停止'), 'message': _('MCP服务器 "%s" 已经处于停止状态') % record.name, 'type': 'info', 'sticky': False, } } try: _logger.info('用户请求停止MCP服务器: %s (端口: %d)', record.name, record.server_port) # 停止FastMCP服务器 if self._stop_fastmcp_server(record): record.write({ 'state': 'inactive' }) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('停止成功'), 'message': _('MCP服务器 "%s" 已成功停止') % record.name, 'type': 'success', 'sticky': False, } } else: return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('停止失败'), 'message': _('MCP服务器 "%s" 停止失败,请查看日志了解详情') % record.name, 'type': 'warning', 'sticky': True, } } except Exception as e: _logger.error('停止MCP服务器失败: %s', str(e)) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('停止错误'), 'message': _('停止MCP服务器时发生错误: %s') % str(e), 'type': 'danger', 'sticky': True, } } def action_restart(self): """重启MCP服务器""" self.ensure_one() try: _logger.info('用户请求重启MCP服务器: %s (端口: %d)', self.name, self.server_port) # 如果服务器正在运行,先停止它 if self.state == 'active': if not self._stop_fastmcp_server(self): return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('重启失败'), 'message': _('无法停止MCP服务器 "%s",重启失败') % self.name, 'type': 'danger', 'sticky': True, } } # 等待一下确保服务器完全停止 import time time.sleep(2) # 启动服务器 if self._start_fastmcp_server(self): self.write({ 'state': 'active', 'last_connection': fields.Datetime.now() }) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('重启成功'), 'message': _('MCP服务器 "%s" 已成功重启在端口 %d') % (self.name, self.server_port), 'type': 'success', 'sticky': False, } } else: self.write({'state': 'inactive'}) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('重启失败'), 'message': _('MCP服务器 "%s" 重启失败,请查看日志了解详情') % self.name, 'type': 'danger', 'sticky': True, } } except Exception as e: _logger.error('重启MCP服务器失败: %s', str(e)) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('重启错误'), 'message': _('重启MCP服务器时发生错误: %s') % str(e), 'type': 'danger', 'sticky': True, } } def action_check_port_availability(self): """检查端口是否可用""" self.ensure_one() import socket try: # 检查端口是否被占用 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex(('127.0.0.1', self.server_port)) sock.close() if result == 0: # 端口已被占用 return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('端口检查'), 'message': _('端口 %d 已被占用,请选择其他端口') % self.server_port, 'type': 'warning', 'sticky': True, } } else: # 端口可用 return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('端口检查'), 'message': _('端口 %d 可用') % self.server_port, 'type': 'success', 'sticky': False, } } except Exception as e: return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('端口检查错误'), 'message': _('检查端口时发生错误: %s') % str(e), 'type': 'danger', 'sticky': True, } } def action_get_server_status(self): """获取服务器详细状态""" self.ensure_one() try: fastmcp_service = self._get_fastmcp_service() if not fastmcp_service: status_info = "FastMCP服务不可用" is_running = False else: # 检查服务器是否在服务字典中 is_running = self.id in fastmcp_service.mcp_servers if is_running: server_info = fastmcp_service.mcp_servers[self.id] thread_alive = server_info['thread'].is_alive() if server_info['thread'] else False status_info = f"服务器运行中\n端口: {server_info['port']}\n线程状态: {'活动' if thread_alive else '已停止'}" else: status_info = "服务器未运行" # 检查端口是否被占用 import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) port_in_use = sock.connect_ex(('127.0.0.1', self.server_port)) == 0 sock.close() status_info += f"\n端口 {self.server_port}: {'占用' if port_in_use else '空闲'}" status_info += f"\n数据库状态: {self.state}" status_info += f"\n最后连接: {self.last_connection or '从未连接'}" return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('服务器状态: %s') % self.name, 'message': status_info, 'type': 'info', 'sticky': True, } } except Exception as e: _logger.error('获取服务器状态失败: %s', str(e)) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('状态检查错误'), 'message': _('获取服务器状态时发生错误: %s') % str(e), 'type': 'danger', 'sticky': True, } } def action_test_connection(self): self.ensure_one() try: # 使用FastMCP服务测试连接 fastmcp_service = self._get_fastmcp_service() if not fastmcp_service: raise UserError(_('无法获取FastMCP服务,请确保服务已正确安装和配置。')) # 获取或创建MCP服务器实例 mcp_server = fastmcp_service.get_or_create_mcp_server(self) if not mcp_server: raise UserError(_('无法创建FastMCP服务器实例。')) # 检查服务器是否已经运行 server_running = self.id in fastmcp_service.mcp_servers if not server_running: _logger.info('测试连接时发现服务器未运行,尝试启动: %s', self.name) if fastmcp_service.start_server(self): _logger.info('测试连接时成功启动MCP服务器: %s', self.name) self.write({'state': 'active'}) else: _logger.warning('在测试连接时启动服务器失败: %s', self.name) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('连接失败'), 'message': _('MCP服务器未运行且启动失败,请检查服务器配置'), 'type': 'danger', 'sticky': True, } } else: _logger.info('测试连接时发现服务器已在运行: %s', self.name) # 更新连接信息 self.write({ 'last_connection': fields.Datetime.now(), 'connection_count': self.connection_count + 1, }) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('连接成功'), 'message': _('与MCP服务器的连接测试成功,服务器已自动启动'), 'type': 'success', 'sticky': False, } } except Exception as e: _logger.error('MCP服务器连接测试失败: %s', str(e)) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('连接失败'), 'message': str(e), 'type': 'danger', 'sticky': False, } } def action_batch_start(self): """批量启动选中的服务器""" success_count = 0 failed_servers = [] for record in self: if record.state != 'active': try: if self._start_fastmcp_server(record): record.write({ 'state': 'active', 'last_connection': fields.Datetime.now() }) success_count += 1 else: failed_servers.append(record.name) except Exception as e: _logger.error('批量启动服务器 %s 失败: %s', record.name, str(e)) failed_servers.append(record.name) message = f'成功启动 {success_count} 个服务器' if failed_servers: message += f',失败: {", ".join(failed_servers)}' return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('批量启动结果'), 'message': message, 'type': 'success' if not failed_servers else 'warning', 'sticky': False, } } def action_batch_stop(self): """批量停止选中的服务器""" success_count = 0 failed_servers = [] for record in self: if record.state == 'active': try: if self._stop_fastmcp_server(record): record.write({'state': 'inactive'}) success_count += 1 else: failed_servers.append(record.name) except Exception as e: _logger.error('批量停止服务器 %s 失败: %s', record.name, str(e)) failed_servers.append(record.name) message = f'成功停止 {success_count} 个服务器' if failed_servers: message += f',失败: {", ".join(failed_servers)}' return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('批量停止结果'), 'message': message, 'type': 'success' if not failed_servers else 'warning', 'sticky': False, } } def _get_fastmcp_service(self): """获取FastMCP服务实例""" try: # 动态导入FastMCP服务模块 module = importlib.import_module('odoo.addons.mcp_server.services.fast_mcp_service') # 获取FastMCPService类的单例实例 return module.FastMCPService.get_instance() except ImportError as e: _logger.error('导入FastMCP服务模块失败: %s', str(e)) return None except Exception as e: _logger.error('获取FastMCP服务实例失败: %s', str(e)) return None def _start_fastmcp_server(self, record): """启动FastMCP服务器""" try: fastmcp_service = self._get_fastmcp_service() if not fastmcp_service: _logger.error('无法获取FastMCP服务实例') return False # 启动服务器 result = fastmcp_service.start_server(record) if result: _logger.info('成功启动FastMCP服务器: %s', record.name) else: _logger.error('启动FastMCP服务器失败: %s', record.name) return result except Exception as e: _logger.error('启动FastMCP服务器时发生错误: %s', str(e)) return False def _stop_fastmcp_server(self, record): """停止FastMCP服务器""" try: fastmcp_service = self._get_fastmcp_service() if not fastmcp_service: _logger.error('无法获取FastMCP服务实例') return False # 停止服务器 result = fastmcp_service.stop_server(record) if result: _logger.info('成功停止FastMCP服务器: %s', record.name) else: _logger.error('停止FastMCP服务器失败: %s', record.name) return result except Exception as e: _logger.error('停止FastMCP服务器时发生错误: %s', str(e)) return False class MCPResource(models.Model): _name = 'mcp.resource' _description = 'MCP 资源' _inherit = ['mail.thread', 'mail.activity.mixin'] _order = 'name' name = fields.Char('资源名称', required=True, tracking=True) resource_uri = fields.Char('资源URI', required=True, tracking=True) server_id = fields.Many2one('mcp.server', string='服务器', required=True, ondelete='cascade') active = fields.Boolean('激活状态', default=True, tracking=True) resource_type = fields.Selection([ ('file', '文件'), ('service', '服务'), ('api', 'API'), ('other', '其他'), ], string='资源类型', default='file', tracking=True) description = fields.Text('描述') content = fields.Text('内容', help='资源内容或描述') last_fetch = fields.Datetime('最后获取时间', readonly=True) def action_fetch_resource(self): self.ensure_one() try: # 在这里实现资源获取逻辑 self.write({ 'last_fetch': fields.Datetime.now(), }) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('获取成功'), 'message': _('资源获取成功'), 'type': 'success', 'sticky': False, } } except Exception as e: _logger.error('MCP资源获取失败: %s', str(e)) return { 'type': 'ir.actions.client', 'tag': 'display_notification', 'params': { 'title': _('获取失败'), 'message': str(e), 'type': 'danger', 'sticky': False, } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kaikongbj/odoo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server