Skip to main content
Glama
query_service.py7.07 kB
"""查询业务服务。 处理所有查询相关的业务逻辑,包括API详情查询、端点搜索等。 """ from __future__ import annotations from typing import TYPE_CHECKING, Any, Dict, List, Optional from magicapi_tools.logging_config import get_logger from magicapi_tools.utils import create_operation_error from magicapi_tools.domain.dtos.query_dtos import QueryRequest, QueryResponse, EndpointFilter from .base_service import BaseService if TYPE_CHECKING: from magicapi_mcp.tool_registry import ToolContext logger = get_logger('services.query') class QueryService(BaseService): """查询业务服务类。""" def get_api_details_by_path(self, path: str, fuzzy: bool = True) -> Dict[str, Any]: """根据路径获取API详情。""" return self.execute_operation( "根据路径查询API详情", self._get_api_details_by_path_impl, path=path, fuzzy=fuzzy ) def _get_api_details_by_path_impl(self, path: str, fuzzy: bool = True) -> Dict[str, Any]: """根据路径获取API详情的实现。""" # 这里应该包含query.py中的路径查询逻辑 return {"success": True, "path": path, "fuzzy": fuzzy, "results": []} def get_api_details_by_id(self, file_id: str) -> Dict[str, Any]: """根据ID获取API详情。""" return self.execute_operation( "根据ID查询API详情", self._get_api_details_by_id_impl, file_id=file_id ) def _get_api_details_by_id_impl(self, file_id: str) -> Dict[str, Any]: """根据ID获取API详情的实现。""" ok, payload = self.http_client.api_detail(file_id) if not ok: return create_operation_error("查询API详情", "api_detail_failed", "无法获取API详情", payload) if not payload: return create_operation_error("查询API详情", "api_not_found", f"API不存在: {file_id}") # 从payload中提取body作为实际的API响应数据 api_response_body = payload.get("body", payload) if isinstance(payload, dict) else payload # 检查API业务逻辑响应码 from magicapi_tools.utils.tool_helpers import check_api_response_success api_error = check_api_response_success(api_response_body, self.settings, "查询API详情") if api_error: return api_error # 获取完整的路径 from magicapi_tools.tools.query import _get_full_path_by_api_details method = api_response_body.get("method", "").upper() path = api_response_body.get("path", "") name = api_response_body.get("name", "") full_path = _get_full_path_by_api_details(self.http_client, file_id, method, path, name) return {**api_response_body, "full_path": full_path} def search_api_endpoints(self, request: QueryRequest) -> QueryResponse: """搜索API端点。""" # 验证请求 if not request.validate(): errors = request.get_validation_errors() return QueryResponse( success=False, query_type=request.query_type, summary={"error": "; ".join(errors)} ) return self.execute_operation( "搜索API端点", self._search_api_endpoints_impl, request=request ) # 向后兼容的方法 def search_api_endpoints_legacy( self, method_filter: Optional[str] = None, path_filter: Optional[str] = None, name_filter: Optional[str] = None, query_filter: Optional[str] = None ) -> Dict[str, Any]: """搜索API端点(向后兼容版本)。""" filters = EndpointFilter( method_filter=method_filter, path_filter=path_filter, name_filter=name_filter, query_filter=query_filter ) request = QueryRequest( query_type="endpoints", filters=filters ) response = self.search_api_endpoints(request) return response.to_dict() def _search_api_endpoints_impl(self, request: QueryRequest) -> QueryResponse: """搜索API端点的实现。""" try: from magicapi_tools.utils.extractor import extract_api_endpoints, load_resource_tree from magicapi_tools.utils.extractor import filter_endpoints, _collect_all_endpoints # 获取资源树数据 tree = load_resource_tree(client=self.http_client) endpoints = extract_api_endpoints(tree) # 应用过滤条件 filtered_endpoints = filter_endpoints( endpoints, method_filter=request.filters.method_filter if request.filters else None, path_filter=request.filters.path_filter if request.filters else None, name_filter=request.filters.name_filter if request.filters else None, query_filter=request.filters.query_filter if request.filters else None, ) # 获取所有端点的详细信息(包含ID和display字符串) all_endpoint_details = [] for child in tree.api_nodes: _collect_all_endpoints(child, "", all_endpoint_details) # 创建端点字符串到ID的映射 endpoint_to_id_map = {} for detail in all_endpoint_details: display = detail.get("display") api_id = detail.get("id") if display and api_id: endpoint_to_id_map[display] = api_id # 构建结果 results = [] for endpoint in filtered_endpoints: if "[" in endpoint and "]" in endpoint: method_path, name = endpoint.split(" [", 1) name = name.rstrip("]") else: method_path, name = endpoint, "" method, path_value = method_path.split(" ", 1) # 获取对应的ID api_id = endpoint_to_id_map.get(endpoint) results.append({ "method": method, "path": path_value, "name": name, "id": api_id, "display": endpoint, }) return QueryResponse( success=True, query_type=request.query_type, total_count=len(endpoints), filtered_count=len(filtered_endpoints), returned_count=len(results), filters_applied=request.filters, results=results, summary={ "filters_applied": not (request.filters.is_empty() if request.filters else True) } ) except Exception as e: return QueryResponse( success=False, query_type=request.query_type, summary={"error": str(e)} )

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Dwsy/magic-api-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server