Skip to main content
Glama
fast_mcp_service.py74.3 kB
import asyncio import logging import socket import threading import time from contextlib import contextmanager from fastmcp import FastMCP, Context from odoo import api, models, fields, _ from odoo.http import request from odoo.modules.registry import Registry from queue import Queue, Empty from typing import Dict, Any, Optional _logger = logging.getLogger(__name__) class SafeDatabaseManager: """安全数据库管理器 - 管理数据库连接池和线程安全的数据库操作""" def __init__(self, pool_size=10, connection_timeout=30): """初始化数据库管理器 Args: pool_size: 连接池大小 connection_timeout: 连接超时时间(秒) """ self.pool_size = pool_size self.connection_timeout = connection_timeout self.connection_pools = {} # 每个数据库的连接池 self.pool_locks = {} # 每个连接池的锁 self._main_lock = threading.Lock() _logger.info("SafeDatabaseManager初始化完成,连接池大小: %d, 超时: %d秒", pool_size, connection_timeout) def _get_db_name(self): """获取当前数据库名称""" try: # 尝试从请求中获取 if hasattr(request, 'db') and request.db: return request.db except Exception: pass try: # 从配置中获取 import odoo.tools.config as config db_name = config.get('db_name') if db_name: return db_name # 获取可用数据库列表并选择第一个 from odoo.service.db import list_dbs dbs = list_dbs(True) if dbs: return dbs[0] except Exception as e: _logger.error("获取数据库名称失败: %s", str(e)) return None def _get_connection_pool(self, db_name): """获取指定数据库的连接池""" with self._main_lock: if db_name not in self.connection_pools: # 创建新的连接池 self.connection_pools[db_name] = Queue(maxsize=self.pool_size) self.pool_locks[db_name] = threading.Lock() _logger.info("为数据库 '%s' 创建了新的连接池", db_name) return self.connection_pools[db_name], self.pool_locks[db_name] def _create_connection(self, db_name): """创建新的数据库连接""" try: registry = Registry(db_name) cursor = registry.cursor() _logger.debug("为数据库 '%s' 创建了新连接", db_name) return cursor except Exception as e: _logger.error("创建数据库连接失败: %s", str(e)) raise def _get_connection(self, db_name): """从连接池获取连接""" pool, pool_lock = self._get_connection_pool(db_name) try: # 尝试从池中获取连接 cursor = pool.get_nowait() # 检查连接健康状态 if self._check_connection_health(cursor): _logger.debug("从连接池获取了健康的连接") return cursor else: # 连接不健康,关闭并创建新连接 _logger.debug("连接不健康,创建新连接") try: cursor.close() except: pass return self._create_connection(db_name) except Empty: # 池中没有可用连接,创建新连接 _logger.debug("连接池为空,创建新连接") return self._create_connection(db_name) def _release_connection(self, db_name, cursor): """释放连接回连接池""" if not cursor: return pool, pool_lock = self._get_connection_pool(db_name) try: # 检查连接健康状态 if self._check_connection_health(cursor): # 尝试将连接放回池中 try: pool.put_nowait(cursor) _logger.debug("连接已释放回连接池") except: # 池已满,关闭连接 _logger.debug("连接池已满,关闭连接") cursor.close() else: # 连接不健康,直接关闭 _logger.debug("连接不健康,直接关闭") cursor.close() except Exception as e: _logger.error("释放连接时出错: %s", str(e)) try: cursor.close() except: pass def _check_connection_health(self, cursor): """检查数据库连接健康状态""" try: if not cursor or cursor.closed: return False # 执行简单查询检查连接 cursor.execute("SELECT 1") cursor.fetchone() return True except Exception as e: _logger.debug("连接健康检查失败: %s", str(e)) return False @contextmanager def get_env(self, db_name=None): """获取安全的Odoo环境上下文管理器""" if not db_name: db_name = self._get_db_name() if not db_name: raise Exception("无法获取数据库名称") cursor = None try: cursor = self._get_connection(db_name) env = api.Environment(cursor, 1, {}) # 使用SUPERUSER_ID _logger.debug("成功创建安全的Odoo环境") yield env # 提交事务 cursor.commit() except Exception as e: _logger.error("数据库操作失败: %s", str(e)) if cursor: try: cursor.rollback() except: pass raise finally: if cursor: self._release_connection(db_name, cursor) async def execute_with_env(self, operation, *args, **kwargs): """安全执行数据库操作的异步方法""" db_name = self._get_db_name() if not db_name: raise Exception("无法获取数据库名称") # 在线程池中执行数据库操作 loop = asyncio.get_event_loop() def sync_operation(): with self.get_env(db_name) as env: return operation(env, *args, **kwargs) try: result = await loop.run_in_executor(None, sync_operation) return result except Exception as e: _logger.error("异步数据库操作失败: %s", str(e)) raise def cleanup_connections(self, db_name=None): """清理连接池中的所有连接""" if db_name: databases = [db_name] else: databases = list(self.connection_pools.keys()) for db in databases: if db in self.connection_pools: pool = self.connection_pools[db] closed_count = 0 while True: try: cursor = pool.get_nowait() cursor.close() closed_count += 1 except Empty: break except Exception as e: _logger.error("关闭连接时出错: %s", str(e)) _logger.info("已清理数据库 '%s' 的 %d 个连接", db, closed_count) class FastMCPService: """FastMCP服务实现类,用于在Odoo中集成FastMCP功能""" _instance = None _lock = threading.Lock() @classmethod def get_instance(cls): """获取FastMCP服务的单例实例""" if cls._instance is None: with cls._lock: if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): """初始化FastMCP服务""" self.mcp_servers = {} # 存储MCP服务器实例 {server_id: {'instance': mcp_server, 'port': port, 'thread': thread}} self.event_loop = None self._event_loop_thread = None # 初始化安全数据库管理器 self.db_manager = SafeDatabaseManager(pool_size=20, connection_timeout=30) self.init_event_loop() def init_event_loop(self): """初始化异步事件循环并在后台线程中运行""" try: _logger.info("开始初始化FastMCP事件循环") self.event_loop = asyncio.new_event_loop() # 创建并启动事件循环线程 def run_event_loop(loop): _logger.info("在后台线程中运行FastMCP事件循环") asyncio.set_event_loop(loop) try: loop.run_forever() except Exception as e: _logger.error("事件循环运行异常: %s", str(e)) finally: _logger.info("事件循环结束") loop.close() # 创建并启动守护线程 self._event_loop_thread = threading.Thread( target=run_event_loop, args=(self.event_loop,), daemon=True, name="FastMCP-EventLoop" ) self._event_loop_thread.start() # 等待事件循环启动 time.sleep(0.5) # 等待循环启动 _logger.info("FastMCP事件循环已初始化并在后台运行,循环对象信息: %s, 运行状态: %s", self.event_loop, self.event_loop.is_running()) except Exception as e: _logger.error("初始化FastMCP事件循环失败: %s", str(e)) import traceback _logger.error("异常详情: %s", traceback.format_exc()) def get_or_create_mcp_server(self, server_record): """获取或创建MCP服务器实例""" server_id = server_record.id _logger.info("尝试获取或创建MCP服务器实例,ID: %s, 名称: %s", server_id, server_record.name) if server_id in self.mcp_servers: _logger.info("找到现有的MCP服务器实例: %s", server_id) return self.mcp_servers[server_id]['instance'] try: _logger.info("开始创建新的FastMCP服务器实例") # 获取服务器安全设置 server_security_settings = self._get_server_security_settings(server_record.id) # 创建认证提供者 auth_provider = self._create_auth_provider(server_security_settings) # 创建新的FastMCP服务器实例,包含认证设置 if auth_provider: _logger.info("创建带有认证的FastMCP服务器实例") # 使用FastMCP的auth_server_provider参数 try: mcp_server = FastMCP( name=server_record.name, auth_server_provider=auth_provider ) _logger.info("创建了带有OAuth授权的FastMCP服务器实例") except Exception as e: _logger.warning("无法使用OAuth授权提供者,回退到基本模式: %s", str(e)) mcp_server = FastMCP(name=server_record.name) # 将认证提供者存储到服务器实例中,供工具级别使用 setattr(mcp_server, '_auth_provider', auth_provider) else: _logger.info("创建无认证的FastMCP服务器实例") mcp_server = FastMCP(name=server_record.name) _logger.info("创建的FastMCP服务器实例: %s", mcp_server) # 注册默认工具和资源 _logger.info("开始注册默认工具") self._register_default_tools(mcp_server, server_record) _logger.info("开始注册默认资源") self._register_default_resources(mcp_server, server_record) # 存储服务器实例信息 self.mcp_servers[server_id] = { 'instance': mcp_server, 'port': server_record.server_port, 'thread': None, 'name': server_record.name } _logger.info("为服务器 %s 创建了新的FastMCP实例,端口: %d", server_record.name, server_record.server_port) return mcp_server except Exception as e: _logger.error("创建FastMCP服务器实例失败: %s", str(e)) import traceback _logger.error("异常详情: %s", traceback.format_exc()) return None async def _safe_execute_with_env(self, operation, *args, **kwargs): """安全执行数据库操作的统一入口方法 这个方法确保: 1. 使用独立的数据库连接 2. 自动资源清理和异常处理 3. 事务管理和回滚机制 4. 线程安全的操作 Args: operation: 要执行的操作函数,第一个参数必须是env *args: 传递给操作函数的位置参数 **kwargs: 传递给操作函数的关键字参数 Returns: 操作函数的返回值 """ try: _logger.debug("开始安全数据库操作执行") result = await self.db_manager.execute_with_env(operation, *args, **kwargs) _logger.debug("安全数据库操作执行成功") return result except Exception as e: _logger.error("安全数据库操作执行失败: %s", str(e)) # 重新抛出异常,让调用者处理 raise def _query_odoo_model_impl(self, env, model_name, domain=None, fields=None, limit=100, offset=0, order=None): """查询Odoo模型的实现函数""" # 验证模型是否存在 if model_name not in env: raise ValueError(f"模型 '{model_name}' 不存在于Odoo中") # 获取模型对象 model_obj = env[model_name].sudo() # 处理domain参数 parsed_domain = [] if domain is not None and domain.strip(): import json parsed_domain = json.loads(domain) if not isinstance(parsed_domain, list): parsed_domain = [] # 处理fields参数 parsed_fields = [] if fields is not None and fields.strip(): import json parsed_fields = json.loads(fields) if not isinstance(parsed_fields, list): parsed_fields = [] # 如果没有指定字段,获取模型的基本字段 if not parsed_fields: exclude_types = ['binary', 'html'] model_fields = model_obj._fields parsed_fields = [f for f, field in model_fields.items() if field.type not in exclude_types][:15] # 限制为前15个字段 # 获取记录总数 total_count = model_obj.search_count(parsed_domain) # 执行查询 records = model_obj.search_read( domain=parsed_domain, fields=parsed_fields, limit=limit, offset=offset, order=order ) # 处理many2one字段的显示 for record in records: for field, value in record.items(): if isinstance(value, tuple) and len(value) == 2: # many2one字段显示为 [id, name] record[field] = {"id": value[0], "name": value[1]} return { "status": "success", "data": { "model": model_name, "fields": parsed_fields, "count": len(records), "total_count": total_count, "limit": limit, "offset": offset, "records": records } } def _get_odoo_record_impl(self, env, model_name, record_id): """获取Odoo记录的实现函数""" # 验证模型是否存在 if model_name not in env: raise ValueError(f"模型 '{model_name}' 不存在于Odoo中") # 获取记录 record = env[model_name].sudo().browse(record_id) if not record.exists(): raise ValueError(f"在模型 '{model_name}' 中找不到ID为 {record_id} 的记录") # 获取所有字段 exclude_types = ['binary', 'html'] model_fields = record._fields fields = [f for f, field in model_fields.items() if field.type not in exclude_types] # 读取记录数据 data = record.read(fields)[0] # 处理many2one字段的显示 for field, value in data.items(): if isinstance(value, tuple) and len(value) == 2: # many2one字段显示为 {"id": id, "name": name} data[field] = {"id": value[0], "name": value[1]} return { "status": "success", "data": data } def _create_odoo_record_impl(self, env, model_name, values): """创建Odoo记录的实现函数""" # 验证模型是否存在 if model_name not in env: raise ValueError(f"模型 '{model_name}' 不存在于Odoo中") # 创建记录 model_obj = env[model_name].sudo() # 处理many2one字段 processed_values = {} for field, value in values.items(): if isinstance(value, dict) and "id" in value: # 如果是形如 {"id": 1, "name": "xxx"} 的字典,则转换为 ID processed_values[field] = value["id"] else: processed_values[field] = value # 创建记录 new_record = model_obj.create(processed_values) # 返回新创建的记录 data = new_record.read()[0] # 处理many2one字段的显示 for field, value in data.items(): if isinstance(value, tuple) and len(value) == 2: data[field] = {"id": value[0], "name": value[1]} return { "status": "success", "message": f"成功创建记录,ID: {new_record.id}", "data": data } def _update_odoo_record_impl(self, env, model_name, record_id, values): """更新Odoo记录的实现函数""" # 验证模型是否存在 if model_name not in env: raise ValueError(f"模型 '{model_name}' 不存在于Odoo中") # 获取记录 record = env[model_name].sudo().browse(record_id) if not record.exists(): raise ValueError(f"在模型 '{model_name}' 中找不到ID为 {record_id} 的记录") # 处理many2one字段 processed_values = {} for field, value in values.items(): if isinstance(value, dict) and "id" in value: # 如果是形如 {"id": 1, "name": "xxx"} 的字典,则转换为 ID processed_values[field] = value["id"] else: processed_values[field] = value # 更新记录 record.write(processed_values) # 返回更新后的记录 data = record.read()[0] # 处理many2one字段的显示 for field, value in data.items(): if isinstance(value, tuple) and len(value) == 2: data[field] = {"id": value[0], "name": value[1]} return { "status": "success", "message": f"成功更新记录,ID: {record_id}", "data": data } def _delete_odoo_record_impl(self, env, model_name, record_id): """删除Odoo记录的实现函数""" # 验证模型是否存在 if model_name not in env: raise ValueError(f"模型 '{model_name}' 不存在于Odoo中") # 获取记录 record = env[model_name].sudo().browse(record_id) if not record.exists(): raise ValueError(f"在模型 '{model_name}' 中找不到ID为 {record_id} 的记录") # 保存记录名称供日志使用 record_name = record.display_name if hasattr(record, 'display_name') else f"ID: {record_id}" # 删除记录 record.unlink() return { "status": "success", "message": f"成功删除了模型 '{model_name}' 中ID为 {record_id} 的记录", "data": { "model": model_name, "record_id": record_id, "record_name": record_name } } def _get_odoo_model_metadata_impl(self, env, model_name): """获取Odoo模型元数据的实现函数""" # 验证模型是否存在 if model_name not in env: raise ValueError(f"模型 '{model_name}' 不存在于Odoo中") # 获取模型对象 model_obj = env[model_name].sudo() # 获取模型信息 ir_model = env['ir.model'].sudo().search([('model', '=', model_name)], limit=1) model_info = { "name": model_name, "description": ir_model.name if ir_model else model_name, "transient": ir_model.transient if ir_model else False } # 获取字段信息 fields_info = {} for field_name, field in model_obj._fields.items(): # 基本字段信息 field_data = { "type": field.type, "string": field.string, "required": field.required, "readonly": field.readonly, "store": field.store, "help": field.help or "" } # 添加关系字段信息 if field.type in ['many2one', 'one2many', 'many2many']: field_data["relation"] = field.comodel_name # 添加选择字段信息 if field.type == 'selection' and hasattr(field, 'selection') and field.selection: if callable(field.selection): # 如果selection是动态的,尝试调用它获取选项 try: selection = field.selection(model_obj) field_data["selection"] = selection except: field_data["selection"] = [] else: field_data["selection"] = field.selection fields_info[field_name] = field_data return { "status": "success", "data": { "model": model_info, "fields": fields_info } } def _list_resources_impl(self, env, server_id): """列出资源的实现函数""" resources = env['mcp.resource'].sudo().search([ ('server_id', '=', server_id), ('active', '=', True) ]) result = [] for resource in resources: result.append({ 'id': resource.id, 'name': resource.name, 'resource_uri': resource.resource_uri, 'resource_type': resource.resource_type, 'description': resource.description, 'last_fetch': resource.last_fetch.isoformat() if resource.last_fetch else None, }) # 更新服务器连接信息 server = env['mcp.server'].sudo().browse(server_id) if server.exists(): server.write({ 'last_connection': fields.Datetime.now(), 'connection_count': server.connection_count + 1 }) return {"status": "success", "data": result} def _get_resource_content_impl(self, env, server_id, resource_uri): """获取资源内容的实现函数""" resources = env['mcp.resource'].sudo().search([ ('server_id', '=', server_id), ('resource_uri', '=', resource_uri), ('active', '=', True) ], limit=1) if not resources: raise ValueError(f"资源不存在: {resource_uri}") resource = resources[0] # 更新最后获取时间 resource.write({ 'last_fetch': fields.Datetime.now() }) return { "status": "success", "data": { 'id': resource.id, 'name': resource.name, 'content': resource.content, 'resource_uri': resource.resource_uri, 'resource_type': resource.resource_type, 'description': resource.description, 'last_fetch': resource.last_fetch.isoformat() if resource.last_fetch else None, } } def _get_server_info_impl(self, env, server_id): """获取服务器信息的实现函数""" server = env['mcp.server'].sudo().browse(server_id) if not server.exists(): raise ValueError(f"服务器不存在: {server_id}") return { "id": server.id, "name": server.name, "server_url": server.server_url, "state": server.state, "last_connection": server.last_connection.isoformat() if server.last_connection else None, "connection_count": server.connection_count, } def _get_resource_by_id_impl(self, env, server_id, resource_id): """根据ID获取资源的实现函数""" resource = env['mcp.resource'].sudo().browse(resource_id) if not resource.exists() or resource.server_id.id != server_id: raise ValueError(f"资源不存在: {resource_id}") # 更新最后获取时间 resource.write({ 'last_fetch': fields.Datetime.now() }) return { "status": "success", "data": { 'id': resource.id, 'name': resource.name, 'content': resource.content, 'resource_uri': resource.resource_uri, 'resource_type': resource.resource_type, 'description': resource.description, } } def _create_auth_provider(self, server_security_settings): """创建符合FastMCP标准的授权提供者""" try: # 如果不需要授权,返回None if not server_security_settings.get('require_auth', True): _logger.debug("未启用认证,不创建授权提供者") return None # 获取认证方式,默认为api_key auth_method = server_security_settings.get('auth_method', 'api_key') if auth_method == 'jwt': # 使用JWT Bearer Token认证 try: # 导入FastMCP的BearerAuthProvider from fastmcp.server.auth import BearerAuthProvider # 获取JWT配置参数 jwt_public_key = server_security_settings.get('jwt_public_key') jwks_uri = server_security_settings.get('jwks_uri') jwt_issuer = server_security_settings.get('jwt_issuer') jwt_audience = server_security_settings.get('jwt_audience') jwt_algorithm = server_security_settings.get('jwt_algorithm', 'RS256') if not (jwt_public_key or jwks_uri): _logger.warning("JWT认证方式需要提供公钥或JWKS URI,但两者均未设置") return None # 创建FastMCP的BearerAuthProvider实例 auth_params = { 'algorithm': jwt_algorithm } if jwt_issuer: auth_params['issuer'] = jwt_issuer if jwt_audience: auth_params['audience'] = jwt_audience if jwks_uri: auth_params['jwks_uri'] = jwks_uri _logger.info(f"使用JWKS URI创建JWT认证提供者: {jwks_uri}") elif jwt_public_key: auth_params['public_key'] = jwt_public_key _logger.info("使用公钥创建JWT认证提供者") return BearerAuthProvider(**auth_params) except ImportError as e: _logger.error(f"导入FastMCP的BearerAuthProvider失败: {e}") _logger.error("请确保安装了正确版本的fastmcp库") return None except Exception as e: _logger.error(f"创建JWT认证提供者失败: {e}") return None else: # 使用API密钥认证 api_key = server_security_settings.get('api_key') if not api_key: _logger.warning("API密钥认证方式但未设置API密钥") return None allowed_ips = server_security_settings.get('allowed_ips', []) log_requests = server_security_settings.get('log_requests', True) # 创建符合FastMCP标准的OAuth授权服务器提供者 class OdooMCPAuthProvider: """Odoo MCP服务器的OAuth授权提供者""" def __init__(self, api_key, allowed_ips=None, log_requests=True): self.api_key = api_key self.allowed_ips = allowed_ips or [] self.log_requests = log_requests async def get_authorization_url(self, client_id, redirect_uri, state=None, scopes=None): """获取授权URL - 对于API密钥认证,这个方法不需要实现""" raise NotImplementedError("API密钥认证不需要授权URL") async def exchange_code_for_tokens(self, code, client_id, client_secret, redirect_uri): """交换授权码获取令牌 - 对于API密钥认证,这个方法不需要实现""" raise NotImplementedError("API密钥认证不需要授权码交换") async def verify_token(self, token, scopes=None): """验证访问令牌""" try: # 验证API密钥 if token != self.api_key: if self.log_requests: _logger.warning("API密钥验证失败") return None if self.log_requests: _logger.info("API密钥验证成功") # 返回用户信息 return { "sub": "odoo_mcp_user", "scopes": scopes or [], "client_id": "odoo_mcp_client" } except Exception as e: _logger.error(f"令牌验证过程中发生错误: {e}") return None async def refresh_token(self, refresh_token, client_id, client_secret): """刷新访问令牌 - 对于API密钥认证,这个方法不需要实现""" raise NotImplementedError("API密钥认证不需要令牌刷新") _logger.info("使用API密钥创建认证提供者") return OdooMCPAuthProvider(api_key, allowed_ips, log_requests) except Exception as e: _logger.error(f"创建授权提供者失败: {e}") return None async def _check_authentication(self, mcp_server, ctx): """检查认证""" try: # 获取安全设置 security_settings = getattr(mcp_server, '_security_settings', {}) # 如果不需要授权,直接通过 if not security_settings.get('require_auth', True): return None # 获取认证方式,默认为API密钥 auth_method = security_settings.get('auth_method', 'api_key') # 认证方式特定的验证准备 if auth_method == 'api_key': # 验证API密钥的必要存在性 api_key = security_settings.get('api_key') if not api_key: return {"status": "error", "message": "服务器未配置API密钥"} elif auth_method == 'jwt': # 检查JWT认证提供者 auth_provider = getattr(mcp_server, '_auth_provider', None) if not auth_provider: return {"status": "error", "message": "服务器JWT认证提供者未正确配置"} # 尝试从上下文获取HTTP请求 if ctx and hasattr(ctx, 'get_http_request'): try: request = ctx.get_http_request() if request: # 检查Authorization头 auth_header = request.headers.get('authorization', '') if auth_header.startswith('Bearer '): token = auth_header[7:] # 移除 'Bearer ' 前缀 # 根据不同认证方式验证token if auth_method == 'jwt': # 使用JWT认证提供者验证令牌 try: # 调用FastMCP的verify_token方法验证JWT令牌 user_info = await auth_provider.verify_token(token) if not user_info: if security_settings.get('log_requests', True): _logger.warning("JWT Bearer token验证失败") return {"status": "error", "message": "无效或过期的JWT Bearer token"} # 记录令牌验证成功的用户信息 if security_settings.get('log_requests', True): _logger.info( f"JWT Bearer token验证成功 - 用户: {user_info.get('client_id', 'unknown')}") # 检查IP白名单 allowed_ips = security_settings.get('allowed_ips', []) if allowed_ips: client_ip = getattr(request.client, 'host', 'unknown') if hasattr(request, 'client') else 'unknown' if client_ip != 'unknown' and client_ip not in allowed_ips: if security_settings.get('log_requests', True): _logger.warning(f"IP访问拒绝: {client_ip} - 不在白名单中") return {"status": "error", "message": "IP地址未授权访问"} # 验证通过,将用户信息保存到上下文 ctx.auth_user = user_info return None except Exception as e: _logger.error(f"JWT令牌验证过程中发生错误: {str(e)}") return {"status": "error", "message": "JWT令牌验证错误"} else: # API密钥认证 if token == security_settings.get('api_key'): if security_settings.get('log_requests', True): _logger.info("API密钥验证成功") # 检查IP白名单 allowed_ips = security_settings.get('allowed_ips', []) if allowed_ips: client_ip = getattr(request.client, 'host', 'unknown') if hasattr(request, 'client') else 'unknown' if client_ip != 'unknown' and client_ip not in allowed_ips: if security_settings.get('log_requests', True): _logger.warning(f"IP访问拒绝: {client_ip} - 不在白名单中") return {"status": "error", "message": "IP地址未授权访问"} # 设置基本的用户信息到上下文 ctx.auth_user = { "sub": "odoo_mcp_user", "client_id": "odoo_mcp_client" } return None # 验证通过 else: if security_settings.get('log_requests', True): _logger.warning("API密钥验证失败") return {"status": "error", "message": "无效的API密钥"} else: if security_settings.get('log_requests', True): _logger.warning("缺少Authorization头或格式不正确") return {"status": "error", "message": "需要Bearer token认证"} except Exception as e: _logger.debug("无法获取HTTP请求上下文: %s", str(e)) # 如果无法获取HTTP上下文,记录日志但允许通过(向后兼容) # 注意:这是一个向后兼容的处理,在生产环境中应考虑提高安全性 if security_settings.get('log_requests', True): _logger.info(f"收到工具调用请求(无HTTP上下文)- 认证方式: {auth_method}") return None # 通过验证 except Exception as e: _logger.error("认证检查失败: %s", str(e)) import traceback _logger.error(f"认证失败详细信息: {traceback.format_exc()}") return {"status": "error", "message": "认证验证失败"} def _register_default_tools(self, mcp_server, server_record): """注册默认工具到FastMCP服务器""" # 创建认证装饰器 - 不使用*args,因为FastMCP不支持 def require_auth(func): # 直接返回原始函数,不使用包装器 # FastMCP不支持*args,所以我们需要在每个工具函数内部进行认证检查 return func # 创建安全检查函数,在每个工具函数内部调用 async def check_auth(ctx): """检查认证,返回错误或None""" return await self._check_authentication(mcp_server, ctx) # ===== Odoo数据访问工具 ===== @mcp_server.tool() async def query_odoo_model( model_name: str, domain: Optional[str] = None, fields: Optional[str] = None, limit: int = 100, offset: int = 0, order: Optional[str] = None, ctx: Context = None ) -> Dict[str, Any]: """查询任意Odoo模型的数据 Args: model_name: Odoo模型名称 (例如 'res.partner', 'product.template') domain: 搜索域JSON字符串 (例如 '[["is_company", "=", true], ["customer_rank", ">", 0]]') fields: 要获取的字段JSON字符串 (例如 '["name", "email", "phone"]') limit: 最大返回记录数 offset: 记录偏移量 order: 排序字段和方向 (例如 'name ASC, create_date DESC') Returns: 包含查询结果的字典 """ try: # 检查认证 auth_result = await check_auth(ctx) if auth_result: return auth_result # 返回认证错误 if ctx: await ctx.info(f"开始查询模型 '{model_name}',参数: domain={domain}, fields={fields}") # 使用安全执行方法 result = await self._safe_execute_with_env( self._query_odoo_model_impl, model_name, domain, fields, limit, offset, order ) if ctx: data = result.get('data', {}) count = data.get('count', 0) total_count = data.get('total_count', 0) await ctx.info(f"从 '{model_name}' 查询到 {count} 条记录 (共 {total_count} 条)") return result except Exception as e: error_msg = f"查询Odoo模型失败: {str(e)}" if ctx: await ctx.error(error_msg) _logger.error(error_msg) return {"status": "error", "message": error_msg} @mcp_server.tool() @require_auth async def get_odoo_record(model_name: str, record_id: int, ctx: Context = None) -> Dict[str, Any]: """获取Odoo中指定记录的详细信息 Args: model_name: Odoo模型名称 (例如 'res.partner') record_id: 记录ID Returns: 包含记录详细信息的字典 """ try: # 使用安全执行方法 result = await self._safe_execute_with_env( self._get_odoo_record_impl, model_name, record_id ) if ctx: await ctx.info(f"成功获取模型 '{model_name}' 中ID为 {record_id} 的记录") return result except Exception as e: error_msg = f"获取Odoo记录失败: {str(e)}" if ctx: await ctx.error(error_msg) _logger.error(error_msg) return {"status": "error", "message": error_msg} @mcp_server.tool() @require_auth async def create_odoo_record(model_name: str, values: Dict[str, Any], ctx: Context = None) -> Dict[str, Any]: """在Odoo中创建新记录 Args: model_name: Odoo模型名称 (例如 'res.partner') values: 要创建的记录的字段值 (例如 {"name": "新客户", "email": "customer@example.com"}) Returns: 包含新创建记录信息的字典 """ try: # 使用安全执行方法 result = await self._safe_execute_with_env( self._create_odoo_record_impl, model_name, values ) if ctx: data = result.get('data', {}) record_id = data.get('id', 'unknown') await ctx.info(f"在模型 '{model_name}' 中成功创建了新记录,ID: {record_id}") return result except Exception as e: error_msg = f"创建Odoo记录失败: {str(e)}" if ctx: await ctx.error(error_msg) _logger.error(error_msg) return {"status": "error", "message": error_msg} @mcp_server.tool() @require_auth async def update_odoo_record(model_name: str, record_id: int, values: Dict[str, Any], ctx: Context = None) -> Dict[str, Any]: """更新Odoo中的记录 Args: model_name: Odoo模型名称 (例如 'res.partner') record_id: 要更新的记录ID values: 要更新的字段值 (例如 {"name": "更新的名称", "email": "new@example.com"}) Returns: 包含更新后记录信息的字典 """ try: # 使用安全执行方法 result = await self._safe_execute_with_env( self._update_odoo_record_impl, model_name, record_id, values ) if ctx: await ctx.info(f"成功更新了模型 '{model_name}' 中ID为 {record_id} 的记录") return result except Exception as e: error_msg = f"更新Odoo记录失败: {str(e)}" if ctx: await ctx.error(error_msg) _logger.error(error_msg) return {"status": "error", "message": error_msg} @mcp_server.tool() async def delete_odoo_record(model_name: str, record_id: int, ctx: Context = None) -> Dict[str, Any]: """删除Odoo中的记录 Args: model_name: Odoo模型名称 (例如 'res.partner') record_id: 要删除的记录ID Returns: 删除操作的结果 """ try: # 使用安全执行方法 result = await self._safe_execute_with_env( self._delete_odoo_record_impl, model_name, record_id ) if ctx: data = result.get('data', {}) record_name = data.get('record_name', f'ID: {record_id}') await ctx.info(f"成功删除了模型 '{model_name}' 中的记录: {record_name}") return result except Exception as e: error_msg = f"删除Odoo记录失败: {str(e)}" if ctx: await ctx.error(error_msg) _logger.error(error_msg) return {"status": "error", "message": error_msg} @mcp_server.tool() async def get_odoo_model_metadata(model_name: str, ctx: Context = None) -> Dict[str, Any]: """获取Odoo模型的元数据信息,包括字段定义等 Args: model_name: Odoo模型名称 (例如 'res.partner') Returns: 包含模型元数据信息的字典 """ try: # 使用安全执行方法 result = await self._safe_execute_with_env( self._get_odoo_model_metadata_impl, model_name ) if ctx: await ctx.info(f"成功获取模型 '{model_name}' 的元数据信息") return result except Exception as e: error_msg = f"获取Odoo模型元数据失败: {str(e)}" if ctx: await ctx.error(error_msg) _logger.error(error_msg) return {"status": "error", "message": error_msg} # ===== MCP资源管理工具 ===== @mcp_server.tool() async def list_resources(ctx: Context) -> Dict[str, Any]: """列出当前MCP服务器上的所有可用资源""" try: # 使用安全执行方法 result = await self._safe_execute_with_env( self._list_resources_impl, server_record.id ) data = result.get('data', []) await ctx.info(f"获取到 {len(data)} 个资源") return result except Exception as e: await ctx.error(f"获取资源列表失败: {str(e)}") _logger.error("FastMCP工具list_resources失败: %s", str(e)) return {"status": "error", "message": str(e)} @mcp_server.tool() async def get_resource_content(resource_uri: str, ctx: Context) -> Dict[str, Any]: """获取指定URI的资源内容""" try: # 使用安全执行方法 result = await self._safe_execute_with_env( self._get_resource_content_impl, server_record.id, resource_uri ) data = result.get('data', {}) resource_name = data.get('name', resource_uri) await ctx.info(f"成功获取资源: {resource_name}") return result except Exception as e: await ctx.error(f"获取资源内容失败: {str(e)}") _logger.error("FastMCP工具get_resource_content失败: %s", str(e)) return {"status": "error", "message": str(e)} async def _check_authentication(self, mcp_server, ctx): """检查认证""" try: # 获取认证提供者 auth_provider = getattr(mcp_server, '_auth_provider', None) if not auth_provider: # 没有认证提供者,允许访问 return None # 尝试从上下文获取HTTP请求 request = None if ctx and hasattr(ctx, 'get_http_request'): try: request = ctx.get_http_request() except Exception as e: _logger.debug(f"无法获取HTTP请求: {e}") # 从请求中提取API密钥 api_key = None if request: # 尝试从Authorization头获取Bearer token auth_header = request.headers.get('authorization', '') if auth_header.startswith('Bearer '): api_key = auth_header[7:] # 移除 'Bearer ' 前缀 # 如果没有找到Bearer token,尝试从X-API-Key头获取 if not api_key: api_key = request.headers.get('x-api-key') or request.headers.get('X-API-Key') # 最后尝试从查询参数获取 if not api_key and hasattr(request, 'query_params'): api_key = request.query_params.get('api_key') # 如果没有提供API密钥 if not api_key: _logger.warning("请求缺少API密钥") return { "status": "error", "message": "需要API密钥认证。请在Authorization头中提供Bearer token或使用X-API-Key头。", "code": 401 } # 验证API密钥 is_valid = await auth_provider.verify_token(api_key, request) if not is_valid: return { "status": "error", "message": "无效的API密钥或访问被拒绝", "code": 403 } # 认证成功 return None except Exception as e: _logger.error(f"认证检查失败: {e}") return { "status": "error", "message": "认证过程中发生错误", "code": 500 } def _check_security(self, mcp_server, ctx): """检查安全验证""" try: # 获取安全设置 security_settings = getattr(mcp_server, '_security_settings', {}) # 如果不需要授权,直接通过 if not security_settings.get('require_auth', True): return None # 获取请求信息 request = ctx.get_http_request() if ctx and hasattr(ctx, 'get_http_request') else None if request: # 获取客户端IP client_ip = getattr(request.client, 'host', 'unknown') if hasattr(request, 'client') else 'unknown' # 记录请求 if security_settings.get('log_requests', True): _logger.info( f"收到请求: {request.url.path if hasattr(request, 'url') else 'unknown'} - 客户端IP: {client_ip}") # 检查IP白名单 allowed_ips = security_settings.get('allowed_ips', []) if allowed_ips and client_ip != "unknown" and client_ip not in allowed_ips: _logger.warning(f"IP访问拒绝: {client_ip} - 不在白名单中") return {"error": "IP地址未授权访问", "status": 403} # 检查API密钥 api_key = security_settings.get('api_key') auth_header = request.headers.get("Authorization") if hasattr(request, 'headers') else None if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] # 去掉"Bearer "前缀 if token != api_key: _logger.warning(f"API密钥验证失败: {client_ip}") return {"error": "无效的API密钥", "status": 401} else: # 尝试从查询参数获取API密钥 query_api_key = request.query_params.get("api_key") if hasattr(request, 'query_params') else None if not query_api_key or query_api_key != api_key: _logger.warning(f"API密钥验证失败: {client_ip}") return {"error": "无效的API密钥", "status": 401} return None # 通过验证 except Exception as e: _logger.error("安全检查失败: %s", str(e)) return {"status": "error", "message": "安全验证失败"} def _log_request(self, tool_name, mcp_server): """记录工具调用请求""" try: # 获取安全设置 security_settings = getattr(mcp_server, '_security_settings', {}) # 记录请求 if security_settings.get('log_requests', True): _logger.info(f"工具调用: {tool_name}") except Exception as e: _logger.error("记录请求失败: %s", str(e)) def _get_server_security_settings(self, server_id): """获取服务器安全设置""" try: def get_security_settings(env): server = env['mcp.server'].sudo().browse(server_id) if not server.exists(): return { 'require_auth': True, 'auth_method': 'api_key', # 默认使用API密钥认证 'api_key': None, 'allowed_ips': [], 'max_requests_per_minute': 60, 'log_requests': True } # 解析允许的IP地址 allowed_ips = [] if server.allowed_ips: allowed_ips = [ip.strip() for ip in server.allowed_ips.split('\n') if ip.strip()] # 基本安全设置 settings = { 'require_auth': server.require_auth, 'auth_method': server.auth_method or 'api_key', # 获取认证方式,默认为api_key 'api_key': server.api_key, 'allowed_ips': allowed_ips, 'max_requests_per_minute': server.max_requests_per_minute, 'log_requests': server.log_requests } # 如果是JWT认证方式,添加JWT相关配置 if settings['auth_method'] == 'jwt': settings.update({ 'jwt_public_key': server.jwt_public_key, 'jwks_uri': server.jwks_uri, 'jwt_issuer': server.jwt_issuer, 'jwt_audience': server.jwt_audience, 'jwt_algorithm': server.jwt_algorithm or 'RS256', }) _logger.debug(f"获取到JWT认证配置: 使用{'JWKS URI' if server.jwks_uri else '公钥'}验证") return settings # 使用同步方式获取安全设置 with self.db_manager.get_env() as env: return get_security_settings(env) except Exception as e: _logger.error("获取服务器安全设置失败: %s", str(e)) import traceback _logger.error(f"异常详情: {traceback.format_exc()}") # 返回默认安全设置 return { 'require_auth': True, 'auth_method': 'api_key', # 默认使用API密钥认证 'api_key': None, 'allowed_ips': [], 'max_requests_per_minute': 60, 'log_requests': True } def _register_default_resources(self, mcp_server, server_record): """注册默认资源到FastMCP服务器""" @mcp_server.resource("server://info") def get_server_info(): """获取服务器基本信息""" try: with self.db_manager.get_env() as env: return self._get_server_info_impl(env, server_record.id) except Exception as e: _logger.error("获取服务器信息失败: %s", str(e)) return {"status": "error", "message": str(e)} @mcp_server.resource("resources://list") def get_resources_list(): """获取所有资源列表""" try: with self.db_manager.get_env() as env: return self._list_resources_impl(env, server_record.id) except Exception as e: _logger.error("获取资源列表失败: %s", str(e)) return {"status": "error", "message": str(e)} @mcp_server.resource("resource://{resource_id}") def get_resource(resource_id: int): """获取指定ID的资源""" try: with self.db_manager.get_env() as env: return self._get_resource_by_id_impl(env, server_record.id, resource_id) except Exception as e: _logger.error("获取资源失败: %s", str(e)) return {"status": "error", "message": str(e)} def start_server(self, server_record): """启动MCP服务器""" _logger.info("开始启动MCP服务器: %s", server_record.name) mcp_server = self.get_or_create_mcp_server(server_record) if not mcp_server: _logger.error("无法获取MCP服务器实例,启动失败") return False try: _logger.info("准备异步启动MCP服务器,循环状态: %s", self.event_loop.is_running()) # 注意:这里需要在实际部署时根据需要修改host和port # FastMCP服务器通常以异步方式启动 future = asyncio.run_coroutine_threadsafe( self._start_server_async(mcp_server, server_record), self.event_loop ) _logger.info("异步启动请求已提交,future对象: %s", future) # 尝试获取后台任务的初始状态,设置较长超时确保得到结果 try: result = future.result(timeout=3.0) # 增加超时时间到3秒 _logger.info("服务器启动任务结果: %s", result) # 如果异步启动成功,在主线程中更新服务器状态 if result.get('success', False): _logger.info("在主线程中更新服务器状态") server_record.sudo().write({ 'state': 'active', 'last_connection': fields.Datetime.now() }) return True else: _logger.error("异步启动失败: %s", result.get('error', '未知错误')) return False except asyncio.TimeoutError: _logger.info("服务器启动任务已提交,在后台运行但超时未等到结果") # 假设启动成功,更新状态 server_record.sudo().write({ 'state': 'active', 'last_connection': fields.Datetime.now() }) return True except Exception as inner_e: _logger.warning("获取服务器启动状态时出现警告: %s", str(inner_e)) return False except Exception as e: _logger.error("启动FastMCP服务器失败: %s", str(e)) import traceback _logger.error("异常详情: %s", traceback.format_exc()) return False async def _start_server_async(self, mcp_server, server_record): """异步启动MCP服务器""" try: # 在异步函数开始时保存必要的信息,而不使用server_record对象 server_id = server_record.id server_name = server_record.name server_port = server_record.server_port _logger.info("开始异步启动MCP服务器: %s (端口: %d)", server_name, server_port) # 使用服务器记录中配置的端口 host = "0.0.0.0" # 监听所有网络接口 port = server_port # 使用服务器记录中的端口 _logger.info("服务器配置 - 主机: %s, 端口: %s, 模式: %s", host, port, "http") # 检查端口是否已经被占用 import socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex((host, port)) if result == 0: _logger.warning("端口 %s 似乎已经被占用", port) sock.close() # 使用简单的方法启动FastMCP _logger.info("尝试启动FastMCP服务器") # 创建事件标志和错误存储 import threading from multiprocessing import Event server_started = Event() server_error = [None] # 创建启动FastMCP的函数 def start_fastmcp_server(): try: _logger.info("在新线程中启动FastMCP服务器") # 使用正确的方式启动FastMCP服务器 # 使用streamable-http传输模式 _logger.info("开始启动FastMCP服务器,transport=streamable-http, host=%s, port=%s", host, port) # 配置服务器的初始化回调 original_on_connect = getattr(mcp_server, '_on_connect_callbacks', []) def on_server_ready(): """服务器完全准备好时的回调""" _logger.info("MCP服务器初始化完成,可以接受客户端连接") server_started.set() # 设置启动完成回调 if hasattr(mcp_server, 'add_on_connect_callback'): mcp_server.add_on_connect_callback(on_server_ready) # 记录安全状态 if hasattr(mcp_server, '_auth_provider'): _logger.info("FastMCP服务器已配置API密钥认证") else: _logger.info("FastMCP服务器使用内置OAuth认证或无认证") # 启动服务器 - 这个调用会阻塞直到服务器关闭 mcp_server.run( transport="streamable-http", # 使用streamable-http传输模式 host=host, port=port, ) # 如果run()返回,说明服务器正常启动了 _logger.info("FastMCP服务器已启动完成") if not server_started.is_set(): server_started.set() except Exception as e: _logger.error("启动FastMCP服务器失败: %s", str(e)) import traceback _logger.error("异常详情: %s", traceback.format_exc()) server_error[0] = str(e) # 确保事件被设置,避免阻塞 if not server_started.is_set(): server_started.set() # 创建并启动服务器线程 server_thread = threading.Thread( target=start_fastmcp_server, daemon=True, name=f"FastMCP-Server-{server_id}-{port}" ) server_thread.start() _logger.info("FastMCP服务器线程已启动,线程名: %s", server_thread.name) # 存储线程引用 if server_id in self.mcp_servers: self.mcp_servers[server_id]['thread'] = server_thread # 给服务器一些时间启动并绑定端口 await asyncio.sleep(3.0) # 增加到3秒让服务器完全初始化 # 检查端口是否被绑定,作为服务器启动的指示 startup_successful = False for attempt in range(15): # 增加到15次检查,总共7.5秒超时 try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) result = sock.connect_ex((host, port)) sock.close() if result == 0: _logger.info("FastMCP服务器已成功绑定端口,启动完成") startup_successful = True break except Exception as e: _logger.debug("端口检查异常: %s", str(e)) pass await asyncio.sleep(0.5) # 每次检查间隔0.5秒 # 额外等待,确保MCP协议初始化完成 if startup_successful: _logger.info("端口绑定成功,等待MCP协议初始化完成...") await asyncio.sleep(3.0) # 增加到3秒确保MCP协议完全准备好 # 添加预热机制 - 确保服务器真正准备好 _logger.info("开始MCP服务器预热检查...") for warmup_attempt in range(5): try: await asyncio.sleep(1.0) # 每次预热检查间隔1秒 _logger.info("预热检查 %d/5", warmup_attempt + 1) except Exception as e: _logger.debug("预热检查异常: %s", str(e)) _logger.info("MCP服务器预热完成,服务器已准备好接受连接") if not startup_successful: _logger.warning("无法确认FastMCP服务器是否成功启动(端口检测失败)") # 检查是否有启动错误 if server_error[0]: _logger.error("启动FastMCP服务器出错: %s", server_error[0]) return {'success': False, 'error': server_error[0]} _logger.info("FastMCP服务器已在 %s:%s 启动", host, port) # 使用安全数据库管理器更新服务器状态和URL def update_server_url(env): server = env['mcp.server'].sudo().browse(server_id) if server.exists(): server.write({ 'server_url': f'http://127.0.0.1:{port}', 'state': 'active', 'last_connection': fields.Datetime.now() }) return True return False await self._safe_execute_with_env(update_server_url) return {'success': True, 'server_id': server_id} except Exception as e: _logger.error("异步启动FastMCP服务器失败: %s", str(e)) import traceback _logger.error("异常详情: %s", traceback.format_exc()) # 使用安全数据库管理器更新服务器状态 def update_server_state(env): server = env['mcp.server'].sudo().browse(server_id) if server.exists(): server.write({'state': 'inactive'}) return True return False try: await self._safe_execute_with_env(update_server_state) except Exception as update_error: _logger.error("更新服务器状态失败: %s", str(update_error)) return {'success': False, 'error': str(e)} def stop_server(self, server_record): """停止MCP服务器""" server_id = server_record.id if server_id in self.mcp_servers: try: # 异步停止服务器 asyncio.run_coroutine_threadsafe( self._stop_server_async(server_id, server_record), self.event_loop ) return True except Exception as e: _logger.error("停止FastMCP服务器失败: %s", str(e)) return False else: _logger.warning("服务器 %s 未运行", server_record.name) return False async def _stop_server_async(self, server_id, server_record): """异步停止MCP服务器""" try: # 这里应该是FastMCP服务器的停止逻辑 # 由于FastMCP可能没有明确的stop方法,我们可能需要自己实现或者依赖 # 关闭事件循环或取消任务 # 获取服务器信息并清理 server_info = None if server_id in self.mcp_servers: server_info = self.mcp_servers[server_id] _logger.info("正在停止MCP服务器: %s (端口: %d)", server_info['name'], server_info['port']) # 清理服务器实例 del self.mcp_servers[server_id] # 使用安全数据库管理器更新服务器状态 def update_server_state(env): server = env['mcp.server'].sudo().browse(server_record.id) if server.exists(): server.write({'state': 'inactive'}) return True return False await self._safe_execute_with_env(update_server_state) _logger.info("FastMCP服务器 %s 已停止", server_record.name) except Exception as e: _logger.error("异步停止FastMCP服务器失败: %s", str(e)) raise def auto_start_active_servers(self): """自动启动所有活动状态的MCP服务器""" try: _logger.info("开始自动启动所有活动状态的MCP服务器") def get_active_servers(env): """获取所有活动状态的服务器""" return env['mcp.server'].sudo().search([('state', '=', 'active')]) # 使用同步方式获取活动服务器列表 with self.db_manager.get_env() as env: active_servers = get_active_servers(env) if not active_servers: _logger.info("没有找到活动状态的MCP服务器") return True _logger.info("找到 %d 个活动状态的MCP服务器", len(active_servers)) success_count = 0 for server in active_servers: try: _logger.info("正在自动启动MCP服务器: %s (ID: %d)", server.name, server.id) # 启动服务器 if self.start_server(server): success_count += 1 _logger.info("成功自动启动MCP服务器: %s", server.name) else: _logger.warning("自动启动MCP服务器失败: %s", server.name) except Exception as e: _logger.error("自动启动MCP服务器 %s 时发生错误: %s", server.name, str(e)) _logger.info("自动启动完成,成功启动 %d/%d 个MCP服务器", success_count, len(active_servers)) return success_count > 0 except Exception as e: _logger.error("自动启动活动MCP服务器时发生错误: %s", str(e)) import traceback _logger.error("错误详情: %s", traceback.format_exc()) return False @classmethod def auto_start_on_module_init(cls): """模块初始化时自动启动活动服务器的类方法""" try: _logger.info("模块初始化:开始自动启动MCP服务器") # 获取服务实例 service = cls.get_instance() # 给事件循环一些时间完全初始化 time.sleep(2.0) # 自动启动活动服务器 success = service.auto_start_active_servers() if success: _logger.info("模块初始化:MCP服务器自动启动完成") else: _logger.warning("模块初始化:MCP服务器自动启动未完全成功") return success except Exception as e: _logger.error("模块初始化时自动启动MCP服务器失败: %s", str(e)) import traceback _logger.error("错误详情: %s", traceback.format_exc()) return False

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/kaikongbj/odoo-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server