Skip to main content
Glama
aliyun

Alibaba Cloud Observability MCP Server

Official
by aliyun
client.py26.2 kB
# -*- coding: utf-8 -*- # This file is auto-generated, don't edit it. Thanks. from __future__ import annotations from typing import Dict from alibabacloud_tea_openapi import utils_models as open_api_util_models from alibabacloud_tea_openapi.client import Client as OpenApiClient from alibabacloud_tea_openapi.utils import Utils from darabonba.core import DaraCore as DaraCore from darabonba.runtime import RuntimeOptions from alibabacloud_sts20150401 import models as main_models """ """ class Client(OpenApiClient): def __init__( self, config: open_api_util_models.Config, ): super().__init__(config) self._signature_algorithm = "v2" self._endpoint_rule = "regional" self._endpoint_map = { "ap-northeast-2-pop": "sts.aliyuncs.com", "cn-beijing-finance-1": "sts.aliyuncs.com", "cn-beijing-finance-pop": "sts.aliyuncs.com", "cn-beijing-gov-1": "sts.aliyuncs.com", "cn-beijing-nu16-b01": "sts.aliyuncs.com", "cn-edge-1": "sts.aliyuncs.com", "cn-fujian": "sts.aliyuncs.com", "cn-haidian-cm12-c01": "sts.aliyuncs.com", "cn-hangzhou-bj-b01": "sts.aliyuncs.com", "cn-hangzhou-finance": "sts.aliyuncs.com", "cn-hangzhou-internal-prod-1": "sts.aliyuncs.com", "cn-hangzhou-internal-test-1": "sts.aliyuncs.com", "cn-hangzhou-internal-test-2": "sts.aliyuncs.com", "cn-hangzhou-internal-test-3": "sts.aliyuncs.com", "cn-hangzhou-test-306": "sts.aliyuncs.com", "cn-hongkong-finance-pop": "sts.aliyuncs.com", "cn-huhehaote-nebula-1": "sts.aliyuncs.com", "cn-north-2-gov-1": "sts-vpc.cn-north-2-gov-1.aliyuncs.com", "cn-qingdao-nebula": "sts.aliyuncs.com", "cn-shanghai-et15-b01": "sts.aliyuncs.com", "cn-shanghai-et2-b01": "sts.aliyuncs.com", "cn-shanghai-inner": "sts.aliyuncs.com", "cn-shanghai-internal-test-1": "sts.aliyuncs.com", "cn-shenzhen-finance-1": "sts-vpc.cn-shenzhen-finance-1.aliyuncs.com", "cn-shenzhen-inner": "sts.aliyuncs.com", "cn-shenzhen-st4-d01": "sts.aliyuncs.com", "cn-shenzhen-su18-b01": "sts.aliyuncs.com", "cn-wuhan": "sts.aliyuncs.com", "cn-yushanfang": "sts.aliyuncs.com", "cn-zhangbei": "sts.aliyuncs.com", "cn-zhangbei-na61-b01": "sts.aliyuncs.com", "cn-zhangjiakou-na62-a01": "sts.aliyuncs.com", "cn-zhengzhou-nebula-1": "sts.aliyuncs.com", "eu-west-1-oxs": "sts.aliyuncs.com", "rus-west-1-pop": "sts.aliyuncs.com", } self.check_config(config) self._endpoint = self.get_endpoint( "sts", self._region_id, self._endpoint_rule, self._network, self._suffix, self._endpoint_map, self._endpoint ) def get_endpoint( self, product_id: str, region_id: str, endpoint_rule: str, network: str, suffix: str, endpoint_map: Dict[str, str], endpoint: str, ) -> str: if not DaraCore.is_null(endpoint): return endpoint if not DaraCore.is_null(endpoint_map) and not DaraCore.is_null(endpoint_map.get(region_id)): return endpoint_map.get(region_id) return Utils.get_endpoint_rules(product_id, region_id, endpoint_rule, network, suffix) def assume_role_with_options( self, request: main_models.AssumeRoleRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.external_id): query["ExternalId"] = request.external_id if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.role_session_name): query["RoleSessionName"] = request.role_session_name req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRole", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.AssumeRoleResponse(), self.call_api(params, req, runtime)) async def assume_role_with_options_async( self, request: main_models.AssumeRoleRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.external_id): query["ExternalId"] = request.external_id if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.role_session_name): query["RoleSessionName"] = request.role_session_name req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRole", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.AssumeRoleResponse(), await self.call_api_async(params, req, runtime)) def assume_role( self, request: main_models.AssumeRoleRequest, ) -> main_models.AssumeRoleResponse: runtime = RuntimeOptions() return self.assume_role_with_options(request, runtime) async def assume_role_async( self, request: main_models.AssumeRoleRequest, ) -> main_models.AssumeRoleResponse: runtime = RuntimeOptions() return await self.assume_role_with_options_async(request, runtime) def assume_role_with_oidcwith_options( self, request: main_models.AssumeRoleWithOIDCRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleWithOIDCResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.oidcprovider_arn): query["OIDCProviderArn"] = request.oidcprovider_arn if not DaraCore.is_null(request.oidctoken): query["OIDCToken"] = request.oidctoken if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.role_session_name): query["RoleSessionName"] = request.role_session_name req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRoleWithOIDC", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="Anonymous", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.AssumeRoleWithOIDCResponse(), self.call_api(params, req, runtime)) async def assume_role_with_oidcwith_options_async( self, request: main_models.AssumeRoleWithOIDCRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleWithOIDCResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.oidcprovider_arn): query["OIDCProviderArn"] = request.oidcprovider_arn if not DaraCore.is_null(request.oidctoken): query["OIDCToken"] = request.oidctoken if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.role_session_name): query["RoleSessionName"] = request.role_session_name req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRoleWithOIDC", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="Anonymous", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.AssumeRoleWithOIDCResponse(), await self.call_api_async(params, req, runtime) ) def assume_role_with_oidc( self, request: main_models.AssumeRoleWithOIDCRequest, ) -> main_models.AssumeRoleWithOIDCResponse: runtime = RuntimeOptions() return self.assume_role_with_oidcwith_options(request, runtime) async def assume_role_with_oidc_async( self, request: main_models.AssumeRoleWithOIDCRequest, ) -> main_models.AssumeRoleWithOIDCResponse: runtime = RuntimeOptions() return await self.assume_role_with_oidcwith_options_async(request, runtime) def assume_role_with_samlwith_options( self, request: main_models.AssumeRoleWithSAMLRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleWithSAMLResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.samlassertion): query["SAMLAssertion"] = request.samlassertion if not DaraCore.is_null(request.samlprovider_arn): query["SAMLProviderArn"] = request.samlprovider_arn req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRoleWithSAML", version="2015-04-01", protocol="HTTP", pathname="/", method="POST", auth_type="Anonymous", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.AssumeRoleWithSAMLResponse(), self.call_api(params, req, runtime)) async def assume_role_with_samlwith_options_async( self, request: main_models.AssumeRoleWithSAMLRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleWithSAMLResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.samlassertion): query["SAMLAssertion"] = request.samlassertion if not DaraCore.is_null(request.samlprovider_arn): query["SAMLProviderArn"] = request.samlprovider_arn req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRoleWithSAML", version="2015-04-01", protocol="HTTP", pathname="/", method="POST", auth_type="Anonymous", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.AssumeRoleWithSAMLResponse(), await self.call_api_async(params, req, runtime) ) def assume_role_with_saml( self, request: main_models.AssumeRoleWithSAMLRequest, ) -> main_models.AssumeRoleWithSAMLResponse: runtime = RuntimeOptions() return self.assume_role_with_samlwith_options(request, runtime) async def assume_role_with_saml_async( self, request: main_models.AssumeRoleWithSAMLRequest, ) -> main_models.AssumeRoleWithSAMLResponse: runtime = RuntimeOptions() return await self.assume_role_with_samlwith_options_async(request, runtime) def assume_role_with_service_identity_with_options( self, request: main_models.AssumeRoleWithServiceIdentityRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleWithServiceIdentityResponse: request.validate() query = {} if not DaraCore.is_null(request.assume_role_for): query["AssumeRoleFor"] = request.assume_role_for if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.role_session_name): query["RoleSessionName"] = request.role_session_name req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRoleWithServiceIdentity", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.AssumeRoleWithServiceIdentityResponse(), self.call_api(params, req, runtime) ) async def assume_role_with_service_identity_with_options_async( self, request: main_models.AssumeRoleWithServiceIdentityRequest, runtime: RuntimeOptions, ) -> main_models.AssumeRoleWithServiceIdentityResponse: request.validate() query = {} if not DaraCore.is_null(request.assume_role_for): query["AssumeRoleFor"] = request.assume_role_for if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.policy): query["Policy"] = request.policy if not DaraCore.is_null(request.role_arn): query["RoleArn"] = request.role_arn if not DaraCore.is_null(request.role_session_name): query["RoleSessionName"] = request.role_session_name req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="AssumeRoleWithServiceIdentity", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.AssumeRoleWithServiceIdentityResponse(), await self.call_api_async(params, req, runtime) ) def assume_role_with_service_identity( self, request: main_models.AssumeRoleWithServiceIdentityRequest, ) -> main_models.AssumeRoleWithServiceIdentityResponse: runtime = RuntimeOptions() return self.assume_role_with_service_identity_with_options(request, runtime) async def assume_role_with_service_identity_async( self, request: main_models.AssumeRoleWithServiceIdentityRequest, ) -> main_models.AssumeRoleWithServiceIdentityResponse: runtime = RuntimeOptions() return await self.assume_role_with_service_identity_with_options_async(request, runtime) def generate_session_access_key_with_options( self, request: main_models.GenerateSessionAccessKeyRequest, runtime: RuntimeOptions, ) -> main_models.GenerateSessionAccessKeyResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="GenerateSessionAccessKey", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.GenerateSessionAccessKeyResponse(), self.call_api(params, req, runtime)) async def generate_session_access_key_with_options_async( self, request: main_models.GenerateSessionAccessKeyRequest, runtime: RuntimeOptions, ) -> main_models.GenerateSessionAccessKeyResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="GenerateSessionAccessKey", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.GenerateSessionAccessKeyResponse(), await self.call_api_async(params, req, runtime) ) def generate_session_access_key( self, request: main_models.GenerateSessionAccessKeyRequest, ) -> main_models.GenerateSessionAccessKeyResponse: runtime = RuntimeOptions() return self.generate_session_access_key_with_options(request, runtime) async def generate_session_access_key_async( self, request: main_models.GenerateSessionAccessKeyRequest, ) -> main_models.GenerateSessionAccessKeyResponse: runtime = RuntimeOptions() return await self.generate_session_access_key_with_options_async(request, runtime) def generate_token_by_ticket_with_options( self, request: main_models.GenerateTokenByTicketRequest, runtime: RuntimeOptions, ) -> main_models.GenerateTokenByTicketResponse: request.validate() query = {} if not DaraCore.is_null(request.ticket): query["Ticket"] = request.ticket if not DaraCore.is_null(request.ticket_type): query["TicketType"] = request.ticket_type req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="GenerateTokenByTicket", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.GenerateTokenByTicketResponse(), self.call_api(params, req, runtime)) async def generate_token_by_ticket_with_options_async( self, request: main_models.GenerateTokenByTicketRequest, runtime: RuntimeOptions, ) -> main_models.GenerateTokenByTicketResponse: request.validate() query = {} if not DaraCore.is_null(request.ticket): query["Ticket"] = request.ticket if not DaraCore.is_null(request.ticket_type): query["TicketType"] = request.ticket_type req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="GenerateTokenByTicket", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.GenerateTokenByTicketResponse(), await self.call_api_async(params, req, runtime) ) def generate_token_by_ticket( self, request: main_models.GenerateTokenByTicketRequest, ) -> main_models.GenerateTokenByTicketResponse: runtime = RuntimeOptions() return self.generate_token_by_ticket_with_options(request, runtime) async def generate_token_by_ticket_async( self, request: main_models.GenerateTokenByTicketRequest, ) -> main_models.GenerateTokenByTicketResponse: runtime = RuntimeOptions() return await self.generate_token_by_ticket_with_options_async(request, runtime) def get_caller_identity_with_options( self, runtime: RuntimeOptions, ) -> main_models.GetCallerIdentityResponse: req = open_api_util_models.OpenApiRequest() params = open_api_util_models.Params( action="GetCallerIdentity", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.GetCallerIdentityResponse(), self.call_api(params, req, runtime)) async def get_caller_identity_with_options_async( self, runtime: RuntimeOptions, ) -> main_models.GetCallerIdentityResponse: req = open_api_util_models.OpenApiRequest() params = open_api_util_models.Params( action="GetCallerIdentity", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.GetCallerIdentityResponse(), await self.call_api_async(params, req, runtime) ) def get_caller_identity(self) -> main_models.GetCallerIdentityResponse: runtime = RuntimeOptions() return self.get_caller_identity_with_options(runtime) async def get_caller_identity_async(self) -> main_models.GetCallerIdentityResponse: runtime = RuntimeOptions() return await self.get_caller_identity_with_options_async(runtime) def get_federation_token_with_options( self, request: main_models.GetFederationTokenRequest, runtime: RuntimeOptions, ) -> main_models.GetFederationTokenResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.name): query["Name"] = request.name if not DaraCore.is_null(request.policy): query["Policy"] = request.policy req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="GetFederationToken", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map(main_models.GetFederationTokenResponse(), self.call_api(params, req, runtime)) async def get_federation_token_with_options_async( self, request: main_models.GetFederationTokenRequest, runtime: RuntimeOptions, ) -> main_models.GetFederationTokenResponse: request.validate() query = {} if not DaraCore.is_null(request.duration_seconds): query["DurationSeconds"] = request.duration_seconds if not DaraCore.is_null(request.name): query["Name"] = request.name if not DaraCore.is_null(request.policy): query["Policy"] = request.policy req = open_api_util_models.OpenApiRequest(query=Utils.query(query)) params = open_api_util_models.Params( action="GetFederationToken", version="2015-04-01", protocol="HTTPS", pathname="/", method="POST", auth_type="AK", style="RPC", req_body_type="formData", body_type="json", ) return DaraCore.from_map( main_models.GetFederationTokenResponse(), await self.call_api_async(params, req, runtime) ) def get_federation_token( self, request: main_models.GetFederationTokenRequest, ) -> main_models.GetFederationTokenResponse: runtime = RuntimeOptions() return self.get_federation_token_with_options(request, runtime) async def get_federation_token_async( self, request: main_models.GetFederationTokenRequest, ) -> main_models.GetFederationTokenResponse: runtime = RuntimeOptions() return await self.get_federation_token_with_options_async(request, runtime)

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/aliyun/alibabacloud-observability-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server