Skip to main content
Glama

Alibaba Cloud Observability MCP Server

Official
by aliyun
arms_toolkit.py30.1 kB
from typing import Any from alibabacloud_arms20190808.client import Client as ArmsClient from alibabacloud_arms20190808.models import ( GetTraceAppRequest, GetTraceAppResponse, GetTraceAppResponseBodyTraceApp, SearchTraceAppByPageRequest, SearchTraceAppByPageResponse, SearchTraceAppByPageResponseBodyPageBean, ) from alibabacloud_sls20201230.client import Client from alibabacloud_sls20201230.models import CallAiToolsRequest, CallAiToolsResponse from alibabacloud_tea_util import models as util_models from mcp.server.fastmcp import Context, FastMCP from pydantic import Field from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from mcp_server_aliyun_observability.logger import log_error from mcp_server_aliyun_observability.utils import ( get_arms_user_trace_log_store, text_to_sql, ) class ArmsToolkit: def __init__(self, server: FastMCP): self.server = server self._register_tools() def _register_tools(self): """register arms related tools functions""" @self.server.tool() def arms_search_apps( ctx: Context, appNameQuery: str = Field(..., description="app name query"), regionId: str = Field( ..., description="region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), pageSize: int = Field(20, description="page size,max is 100", ge=1, le=100), pageNumber: int = Field(1, description="page number,default is 1", ge=1), ) -> Any: """搜索ARMS应用。 ## 功能概述 该工具用于根据应用名称搜索ARMS应用,返回应用的基本信息,包括应用名称、PID、用户ID和类型。 ## 使用场景 - 当需要查找特定名称的应用时 - 当需要获取应用的PID以便进行其他ARMS操作时 - 当需要检查用户拥有的应用列表时 ## 搜索条件 - app_name_query必须是应用名称的一部分,而非自然语言 - 搜索结果将分页返回,可以指定页码和每页大小 ## 返回数据结构 返回一个字典,包含以下信息: - total: 符合条件的应用总数 - page_size: 每页大小 - page_number: 当前页码 - trace_apps: 应用列表,每个应用包含app_name、pid、user_id和type ## 查询示例 - "帮我查询下 XXX 的应用" - "找出名称包含'service'的应用" Args: ctx: MCP上下文,用于访问ARMS客户端 app_name_query: 应用名称查询字符串 region_id: 阿里云区域ID page_size: 每页大小,范围1-100,默认20 page_number: 页码,默认1 Returns: 包含应用信息的字典 """ arms_client: ArmsClient = ctx.request_context.lifespan_context[ "arms_client" ].with_region(regionId) request: SearchTraceAppByPageRequest = SearchTraceAppByPageRequest( trace_app_name=appNameQuery, region_id=regionId, page_size=pageSize, page_number=pageNumber, ) response: SearchTraceAppByPageResponse = ( arms_client.search_trace_app_by_page(request) ) page_bean: SearchTraceAppByPageResponseBodyPageBean = ( response.body.page_bean ) result = { "total": page_bean.total_count, "page_size": page_bean.page_size, "page_number": page_bean.page_number, "trace_apps": [], } if page_bean: result["trace_apps"] = [ { "app_name": app.app_name, "pid": app.pid, "user_id": app.user_id, "type": app.type, } for app in page_bean.trace_apps ] return result @self.server.tool() @retry( stop=stop_after_attempt(2), wait=wait_fixed(1), retry=retry_if_exception_type(Exception), reraise=True, ) def arms_generate_trace_query( ctx: Context, user_id: int = Field(..., description="user aliyun account id"), pid: str = Field(..., description="pid,the pid of the app"), region_id: str = Field( ..., description="region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), question: str = Field( ..., description="question,the question to query the trace" ), ) -> Any: """生成ARMS应用的调用链查询语句。 ## 功能概述 该工具用于将自然语言描述转换为ARMS调用链查询语句,便于分析应用性能和问题。 ## 使用场景 - 当需要查询应用的调用链信息时 - 当需要分析应用性能问题时 - 当需要跟踪特定请求的执行路径时 - 当需要分析服务间调用关系时 ## 查询处理 工具会将自然语言问题转换为SLS查询,并返回: - 生成的SLS查询语句 - 存储调用链数据的项目名 - 存储调用链数据的日志库名 ## 查询上下文 查询会考虑以下信息: - 应用的PID - 响应时间以纳秒存储,需转换为毫秒 - 数据以span记录存储,查询耗时需要对符合条件的span进行求和 - 服务相关信息使用serviceName字段 - 如果用户明确提出要查询 trace信息,则需要在查询问题上question 上添加说明返回trace信息 ## 查询示例 - "帮我查询下 XXX 的 trace 信息" - "分析最近一小时内响应时间超过1秒的调用链" Args: ctx: MCP上下文,用于访问ARMS和SLS客户端 user_id: 用户阿里云账号ID pid: 应用的PID region_id: 阿里云区域ID question: 查询调用链的自然语言问题 Returns: 包含查询信息的字典,包括sls_query、project和log_store """ data: dict[str, str] = get_arms_user_trace_log_store(user_id, region_id) instructions = [ "1. pid为" + pid, "2. 响应时间字段为 duration,单位为纳秒,转换成毫秒", "3. 注意因为保存的是每个 span 记录,如果是耗时,需要对所有符合条件的span 耗时做求和", "4. 涉及到接口服务等字段,使用 serviceName字段", "5. 如果用户明确提出要查询 trace信息,则需要返回 trace_id", ] instructions_str = "\n".join(instructions) prompt = f""" 问题: {question} 补充信息: {instructions_str} 请根据以上信息生成sls查询语句 """ sls_text_to_query = text_to_sql( ctx, prompt, data["project"], data["log_store"], region_id ) return { "sls_query": sls_text_to_query["data"], "requestId": sls_text_to_query["requestId"], "project": data["project"], "log_store": data["log_store"], } @self.server.tool() def arms_profile_flame_analysis( ctx: Context, pid: str = Field(..., description="arms application id"), startMs: str = Field(..., description="profile start ms"), endMs: str = Field(..., description="profile end ms"), profileType: str = Field( default="cpu", description="profile type, like 'cpu' 'memory'" ), ip: str = Field(None, description="arms service host ip"), thread: str = Field(None, description="arms service thread id"), threadGroup: str = Field(None, description="arms service thread group"), regionId: str = Field( default=..., description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), ) -> Any: """分析ARMS应用火焰图性能热点。 ## 功能概述 当应用存在性能问题且开启持续剖析时,可以调用该工具对ARMS应用火焰图性能热点进行分析,生成分析结果。分析结果会包含火焰图的性能热点问题、优化建议等信息。 ## 使用场景 - 当需要分析ARMS应用火焰图性能问题时 ## 查询示例 - "帮我分析下ARMS应用 XXX 的火焰图性能热点" Args: ctx: MCP上下文,用于访问SLS客户端 pid: ARMS应用监控服务PID startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳 endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳 profileType: Profile类型,用于选择需要分析的Profile指标,支持CPU热点和内存热点,如'cpu'、'memory' ip: ARMS应用服务主机地址,非必要参数,用于选择所在的服务机器,如有多个填写时以英文逗号","分隔,如'192.168.0.1,192.168.0.2',不填写默认查询服务所在的所有IP thread: 服务线程名称,非必要参数,用于选择对应线程,如有多个填写时以英文逗号","分隔,如'C1 CompilerThre,C2 CompilerThre',不填写默认查询服务所有线程 threadGroup: 服务聚合线程组名称,非必要参数,用于选择对应线程组,如有多个填写时以英文逗号","分隔,如'http-nio-*-exec-*,http-nio-*-ClientPoller-*',不填写默认查询服务所有聚合线程组 regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等 """ try: valid_types = ["cpu", "memory"] profileType = profileType.lower() if profileType not in valid_types: raise ValueError( f"无效的profileType: {profileType}, 仅支持: {', '.join(valid_types)}" ) # Connect to ARMS client arms_client: ArmsClient = ctx.request_context.lifespan_context[ "arms_client" ].with_region(regionId) request: GetTraceAppRequest = GetTraceAppRequest( pid=pid, region_id=regionId ) response: GetTraceAppResponse = arms_client.get_trace_app(request) trace_app: GetTraceAppResponseBodyTraceApp = response.body.trace_app if not trace_app: raise ValueError("无法找到应用信息") # Extract application details service_name = trace_app.app_name language = trace_app.language # Validate language parameter if language not in ["java", "go"]: raise ValueError( f"暂不支持的语言类型: {language}. 当前仅支持 'java' 和 'go'" ) # Prepare SLS client for Flame analysis sls_client: Client = ctx.request_context.lifespan_context[ "sls_client" ].with_region("cn-shanghai") ai_request: CallAiToolsRequest = CallAiToolsRequest( tool_name="profile_flame_analysis", region_id=regionId ) params: dict[str, Any] = { "serviceName": service_name, "startMs": startMs, "endMs": endMs, "profileType": profileType, "ip": ip, "language": language, "thread": thread, "threadGroup": threadGroup, "sys.query": f"帮我分析下应用 {service_name} 的火焰图性能热点问题", } ai_request.params = params runtime: util_models.RuntimeOptions = util_models.RuntimeOptions( read_timeout=60000, connect_timeout=60000 ) tool_response: CallAiToolsResponse = ( sls_client.call_ai_tools_with_options( request=ai_request, headers={}, runtime=runtime ) ) data = tool_response.body if "------answer------\n" in data: data = data.split("------answer------\n")[1] return {"data": data} except Exception as e: log_error(f"调用火焰图数据性能热点AI工具失败: {str(e)}") raise @self.server.tool() def arms_diff_profile_flame_analysis( ctx: Context, pid: str = Field(..., description="arms application id"), currentStartMs: str = Field(..., description="current profile start ms"), currentEndMs: str = Field(..., description="current profile end ms"), referenceStartMs: str = Field( ..., description="reference profile start ms (for comparison)" ), referenceEndMs: str = Field( ..., description="reference profile end ms (for comparison)" ), profileType: str = Field( default="cpu", description="profile type, like 'cpu' 'memory'" ), ip: str = Field(None, description="arms service host ip"), thread: str = Field(None, description="arms service thread id"), threadGroup: str = Field(None, description="arms service thread group"), regionId: str = Field( default=..., description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), ) -> Any: """对比两个时间段火焰图的性能变化。 ## 功能概述 对应用在两个不同时间段内的性能进行分析,生成差分火焰图。通常用于发布前后或性能优化前后性能对比,帮助识别性能提升或退化。 ## 使用场景 - 发布前后、性能优化前后不同时间段火焰图性能对比 ## 查询示例 - "帮我分析应用 XXX 在发布前后的性能变化情况" Args: ctx: MCP上下文,用于访问SLS客户端 pid: ARMS应用监控服务PID currentStartMs: 火焰图当前(基准)时间段的开始时间戳,通过get_current_time工具获取毫秒级时间戳 currentEndMs: 火焰图当前(基准)时间段的结束时间戳,通过get_current_time工具获取毫秒级时间戳 referenceStartMs: 火焰图对比时间段(参考时间段)的开始时间戳,通过get_current_time工具获取毫秒级时间戳 referenceEndMs: 火焰图对比时间段(参考时间段)的结束时间戳,通过get_current_time工具获取毫秒级时间戳 profileType: Profile类型,如'cpu'、'memory' ip: ARMS应用服务主机地址,非必要参数,用于选择所在的服务机器,如有多个填写时以英文逗号","分隔,如'192.168.0.1,192.168.0.2',不填写默认查询服务所在的所有IP thread: 服务线程名称,非必要参数,用于选择对应线程,如有多个填写时以英文逗号","分隔,如'C1 CompilerThre,C2 CompilerThre',不填写默认查询服务所有线程 threadGroup: 服务聚合线程组名称,非必要参数,用于选择对应线程组,如有多个填写时以英文逗号","分隔,如'http-nio-*-exec-*,http-nio-*-ClientPoller-*',不填写默认查询服务所有聚合线程组 regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等 """ try: valid_types = ["cpu", "memory"] profileType = profileType.lower() if profileType not in valid_types: raise ValueError( f"无效的profileType: {profileType}, 仅支持: {', '.join(valid_types)}" ) arms_client: ArmsClient = ctx.request_context.lifespan_context[ "arms_client" ].with_region(regionId) request: GetTraceAppRequest = GetTraceAppRequest( pid=pid, region_id=regionId, ) response: GetTraceAppResponse = arms_client.get_trace_app(request) trace_app: GetTraceAppResponseBodyTraceApp = response.body.trace_app if not trace_app: raise ValueError("无法找到应用信息") service_name = trace_app.app_name language = trace_app.language if language not in ["java", "go"]: raise ValueError( f"暂不支持的语言类型: {language}. 当前仅支持 'java' 和 'go'" ) sls_client: Client = ctx.request_context.lifespan_context[ "sls_client" ].with_region("cn-shanghai") ai_request: CallAiToolsRequest = CallAiToolsRequest( tool_name="diff_profile_flame_analysis", region_id=regionId ) params: dict[str, Any] = { "serviceName": service_name, "startMs": currentStartMs, "endMs": currentEndMs, "baseStartMs": referenceStartMs, "baseEndMs": referenceEndMs, "profileType": profileType, "ip": ip, "language": language, "thread": thread, "threadGroup": threadGroup, "sys.query": f"帮我分析应用 {service_name} 在两个时间段前后的性能变化情况", } ai_request.params = params runtime: util_models.RuntimeOptions = util_models.RuntimeOptions( read_timeout=60000, connect_timeout=60000 ) tool_response: CallAiToolsResponse = ( sls_client.call_ai_tools_with_options( request=ai_request, headers={}, runtime=runtime ) ) data = tool_response.body if "------answer------\n" in data: data = data.split("------answer------\n")[1] return {"data": data} except Exception as e: log_error(f"调用差分火焰图性能变化分析工具失败: {str(e)}") raise @self.server.tool() def arms_get_application_info( ctx: Context, pid: str = Field(..., description="pid,the pid of the app"), regionId: str = Field( ..., description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), ) -> Any: """ 根据 PID获取具体某个应用的信息, ## 功能概述 1. 获取ARMS应用信息,会返回应用的 PID,AppName,开发语言类型比如 java,python 等 ## 使用场景 1. 当用户明确提出要查询某个应用的信息时,可以调用该工具 2. 有场景需要获取应用的开发语言类型,可以调用该工具 """ arms_client: ArmsClient = ctx.request_context.lifespan_context[ "arms_client" ].with_region(regionId) request: GetTraceAppRequest = GetTraceAppRequest( pid=pid, region_id=regionId, ) response: GetTraceAppResponse = arms_client.get_trace_app(request) if response.body: trace_app: GetTraceAppResponseBodyTraceApp = response.body.trace_app return { "pid": trace_app.pid, "app_name": trace_app.app_name, "language": trace_app.language, } else: return "没有找到应用信息" @self.server.tool() def arms_trace_quality_analysis( ctx: Context, traceId: str = Field(..., description="traceId"), startMs: int = Field( ..., description="start time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters", ), endMs: int = Field( ..., description="end time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters", ), regionId: str = Field( default=..., description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), ) -> Any: """Trace 质量检测 ## 功能概述 识别指定 traceId 的 Trace 是否存在完整性问题(断链)和性能问题(错慢调用) ## 使用场景 - 检测调用链是否存在问题 ## 查询示例 - "帮我分析调用链" Args: ctx: MCP上下文,用于访问SLS客户端 traceId: 待分析的 Trace 的 traceId,必要参数 startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳 endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳 regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等 """ try: sls_client: Client = ctx.request_context.lifespan_context[ "sls_client" ].with_region("cn-shanghai") ai_request: CallAiToolsRequest = CallAiToolsRequest( tool_name="trace_struct_analysis", region_id=regionId ) params: dict[str, Any] = { "startMs": startMs, "endMs": endMs, "traceId": traceId, "sys.query": f"分析这个trace", } ai_request.params = params runtime: util_models.RuntimeOptions = util_models.RuntimeOptions( read_timeout=60000, connect_timeout=60000 ) tool_response: CallAiToolsResponse = ( sls_client.call_ai_tools_with_options( request=ai_request, headers={}, runtime=runtime ) ) data = tool_response.body if "------answer------\n" in data: data = data.split("------answer------\n")[1] return {"data": data} except Exception as e: log_error(f"调用Trace质量检测工具失败: {str(e)}") raise @self.server.tool() def arms_slow_trace_analysis( ctx: Context, traceId: str = Field(..., description="traceId"), startMs: int = Field( ..., description="start time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters", ), endMs: int = Field( ..., description="end time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters", ), regionId: str = Field( default=..., description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), ) -> Any: """深入分析 Trace 慢调用根因 ## 功能概述 针对 Trace 中的慢调用进行诊断分析,输出包含概述、根因、影响范围及解决方案的诊断报告。 ## 使用场景 - 性能问题定位和修复 ## 查询示例 - "请分析 ${traceId} 这个 trace 慢的原因" Args: ctx: MCP上下文,用于访问SLS客户端 traceId: 待分析的Trace的 traceId,必要参数 startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳 endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳 regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等 """ try: sls_client: Client = ctx.request_context.lifespan_context[ "sls_client" ].with_region("cn-shanghai") ai_request: CallAiToolsRequest = CallAiToolsRequest( tool_name="trace_slow_analysis", region_id=regionId ) params: dict[str, Any] = { "startMs": startMs, "endMs": endMs, "traceId": traceId, "sys.query": f"深入分析慢调用根因", } ai_request.params = params runtime: util_models.RuntimeOptions = util_models.RuntimeOptions( read_timeout=60000, connect_timeout=60000 ) tool_response: CallAiToolsResponse = ( sls_client.call_ai_tools_with_options( request=ai_request, headers={}, runtime=runtime ) ) data = tool_response.body if "------answer------\n" in data: data = data.split("------answer------\n")[1] return {"data": data} except Exception as e: log_error(f"调用Trace慢调用分析工具失败: {str(e)}") raise @self.server.tool() def arms_error_trace_analysis( ctx: Context, traceId: str = Field(..., description="traceId"), startMs: int = Field( ..., description="start time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters", ), endMs: int = Field( ..., description="end time (ms) for trace query. unit is millisecond, should be unix timestamp, only number, no other characters", ), regionId: str = Field( default=..., description="aliyun region id,region id format like 'xx-xxx',like 'cn-hangzhou'", ), ) -> Any: """深入分析 Trace 错误根因 ## 功能概述 针对 Trace 中的错误调用进行深入诊断分析,输出包含概述、根因、影响范围及解决方案的错误诊断报告。 ## 使用场景 - 性能问题定位和修复 ## 查询示例 - "请分析 ${traceId} 这个 trace 发生错误的原因" Args: ctx: MCP上下文,用于访问SLS客户端 traceId: 待分析的Trace的 traceId,必要参数 startMs: 分析的开始时间,通过get_current_time工具获取毫秒级时间戳 endMs: 分析的结束时间,通过get_current_time工具获取毫秒级时间戳 regionId: 阿里云区域ID,如'cn-hangzhou'、'cn-shanghai'等 """ try: sls_client: Client = ctx.request_context.lifespan_context[ "sls_client" ].with_region("cn-shanghai") ai_request: CallAiToolsRequest = CallAiToolsRequest( tool_name="trace_error_analysis", region_id=regionId ) params: dict[str, Any] = { "startMs": startMs, "endMs": endMs, "traceId": traceId, "sys.query": f"深入分析错误根因", } ai_request.params = params runtime: util_models.RuntimeOptions = util_models.RuntimeOptions( read_timeout=60000, connect_timeout=60000 ) tool_response: CallAiToolsResponse = ( sls_client.call_ai_tools_with_options( request=ai_request, headers={}, runtime=runtime ) ) data = tool_response.body if "------answer------\n" in data: data = data.split("------answer------\n")[1] return {"data": data} except Exception as e: log_error(f"调用Trace错误分析工具失败: {str(e)}") raise log_error(f"调用Trace错误分析工具失败: {str(e)}") raise

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aliyun/alibabacloud-observability-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server