We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/songyi-noh/keyvault-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import sys
import os
from typing import List, Dict, Optional
from azure.keyvault.secrets import SecretClient
from azure.keyvault.certificates import CertificateClient, CertificatePolicy
from azure.core.exceptions import ResourceNotFoundError
# SSL 인증서 검증 제어 (프록시 환경 대응)
if os.environ.get("AZURE_KEYVAULT_DISABLE_SSL_VERIFY", "").lower() in ("1", "true", "yes"):
import ssl
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
os.environ['PYTHONHTTPSVERIFY'] = '0'
os.environ['CURL_CA_BUNDLE'] = ''
os.environ['REQUESTS_CA_BUNDLE'] = ''
class KeyVaultManager:
"""Key Vault Secret 및 Certificate 관리"""
def __init__(self, vault_url: str, credential):
self.vault_url = vault_url
self.credential = credential
# ⭐ SSL 검증 완전히 비활성화 (KT 프록시 환경 대응)
print("🔧 SSL 검증 비활성화 모드로 Key Vault 연결 중...", file=sys.stderr)
import ssl
import urllib3
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
from azure.core.pipeline.transport import RequestsTransport
# SSL 경고 비활성화
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# 전역적으로 SSL 검증 비활성화 (requests 라이브러리)
requests.packages.urllib3.disable_warnings()
# 커스텀 SSL 컨텍스트 생성 (검증 완전히 비활성화)
class NoVerifyHTTPAdapter(HTTPAdapter):
def init_poolmanager(self, *args, **kwargs):
ctx = create_urllib3_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
# SSL 검증 비활성화된 세션 생성
session = requests.Session()
session.verify = False
session.mount('https://', NoVerifyHTTPAdapter())
# RequestsTransport를 상속받아 session을 강제로 설정
class CustomRequestsTransport(RequestsTransport):
def __init__(self, session=None, *args, **kwargs):
# session을 먼저 설정하지 않고 부모 초기화
super().__init__(*args, **kwargs)
if session:
# 내부 session 속성 강제 설정
self.session = session
if hasattr(self, '_session'):
self._session = session
# _client도 설정 (일부 버전에서 필요)
if hasattr(self, '_client'):
self._client.session = session
def send(self, request, **kwargs):
# Azure SDK는 connection_verify를 사용하므로 이를 False로 설정
kwargs['connection_verify'] = False
# verify도 False로 설정 (혹시 모를 경우 대비)
if 'verify' in kwargs:
kwargs.pop('verify')
# 부모 클래스의 send 메서드 호출
return super().send(request, **kwargs)
transport = CustomRequestsTransport(session=session)
# 클라이언트 생성
self.secret_client = SecretClient(
vault_url=vault_url,
credential=credential,
transport=transport
)
self.cert_client = CertificateClient(
vault_url=vault_url,
credential=credential,
transport=transport
)
print("✅ SSL 검증 비활성화 완료", file=sys.stderr)
self._test_connection()
def _test_connection(self):
"""Key Vault 접근 테스트"""
try:
list(self.secret_client.list_properties_of_secrets(max_page_size=1))
print(f"✅ Key Vault 연결 성공: {self.vault_url}", file=sys.stderr)
except Exception as e:
error_msg = str(e)
# SSL 오류 감지
is_ssl_error = (
"CERTIFICATE_VERIFY_FAILED" in error_msg or
"certificate verify failed" in error_msg.lower() or
"ssl" in error_msg.lower()
)
if is_ssl_error:
print(f"❌ SSL 인증서 검증 오류 (여전히 발생):", file=sys.stderr)
print(f" {error_msg}", file=sys.stderr)
print(f"", file=sys.stderr)
print(f"💡 해결 방법:", file=sys.stderr)
print(f" 1. MCP 설정에서 환경 변수 추가:", file=sys.stderr)
print(f" AZURE_KEYVAULT_DISABLE_SSL_VERIFY=1", file=sys.stderr)
print(f" 2. Claude Desktop 재시작", file=sys.stderr)
raise ConnectionError(f"SSL 인증서 검증 실패: {error_msg}")
# 권한 오류
print(f"❌ Key Vault 접근 실패: {e}", file=sys.stderr)
vault_name = self.vault_url.split("//")[1].split(".")[0]
print(f"💡 권한 부여가 필요할 수 있습니다:", file=sys.stderr)
print(f"az role assignment create \\", file=sys.stderr)
print(f" --role 'Key Vault Secrets Officer' \\", file=sys.stderr)
print(f" --assignee $(az ad signed-in-user show --query id -o tsv) \\", file=sys.stderr)
print(f" --scope $(az keyvault show --name {vault_name} --query id -o tsv)", file=sys.stderr)
raise ConnectionError(f"Key Vault 접근 실패: {e}")
# ===== SECRET 관리 =====
def set_secret(self, name: str, value: str) -> Dict:
"""Secret 생성/업데이트"""
try:
secret = self.secret_client.set_secret(name, value)
return {
"success": True,
"name": secret.name,
"version": secret.properties.version,
"created": str(secret.properties.created_on)
}
except Exception as e:
return {"success": False, "error": str(e)}
def get_secret(self, name: str) -> Dict:
"""Secret 조회"""
try:
secret = self.secret_client.get_secret(name)
return {
"success": True,
"name": secret.name,
"value": secret.value,
"version": secret.properties.version,
"updated": str(secret.properties.updated_on)
}
except ResourceNotFoundError:
return {"success": False, "error": f"Secret '{name}'을 찾을 수 없습니다."}
except Exception as e:
return {"success": False, "error": str(e)}
def list_secrets(self) -> List[Dict]:
"""모든 Secret 목록 조회"""
try:
secrets = []
for secret_props in self.secret_client.list_properties_of_secrets():
secrets.append({
"name": secret_props.name,
"enabled": secret_props.enabled,
"created": str(secret_props.created_on),
"updated": str(secret_props.updated_on)
})
return secrets
except Exception as e:
print(f"❌ Secret 목록 조회 실패: {e}", file=sys.stderr)
return []
def delete_secret(self, name: str) -> Dict:
"""Secret 삭제 (soft delete)"""
try:
poller = self.secret_client.begin_delete_secret(name)
deleted_secret = poller.result()
return {
"success": True,
"name": deleted_secret.name,
"deleted_on": str(deleted_secret.deleted_date)
}
except Exception as e:
return {"success": False, "error": str(e)}
# ===== CERTIFICATE 관리 =====
def import_certificate(self, name: str, pfx_bytes: bytes, password: Optional[str] = None) -> Dict:
"""PFX 인증서 import"""
try:
has_existing = False
try:
self.cert_client.get_certificate(name)
has_existing = True
except ResourceNotFoundError:
has_existing = False
import_kwargs = {
"certificate_name": name,
"certificate_bytes": pfx_bytes,
"password": password
}
# 기존 인증서가 없을 때만 정책 설정
# Azure Key Vault import 시 content_type을 명시적으로 설정
if not has_existing:
# PFX 형식 인증서를 위한 정책 생성
policy = CertificatePolicy(
content_type="application/x-pkcs12"
)
import_kwargs["policy"] = policy
cert = self.cert_client.import_certificate(**import_kwargs)
return {
"success": True,
"name": cert.name,
"id": cert.id,
"thumbprint": cert.properties.x509_thumbprint.hex() if cert.properties.x509_thumbprint else None,
"is_new": not has_existing # 신규 추가인지 갱신인지 구분
}
except Exception as e:
return {"success": False, "error": str(e)}
def get_certificate(self, name: str) -> Dict:
"""인증서 조회"""
try:
cert = self.cert_client.get_certificate(name)
return {
"success": True,
"name": cert.name,
"id": cert.id,
"enabled": cert.properties.enabled,
"created": str(cert.properties.created_on),
"expires": str(cert.properties.expires_on),
"thumbprint": cert.properties.x509_thumbprint.hex() if cert.properties.x509_thumbprint else None
}
except ResourceNotFoundError:
return {"success": False, "error": f"Certificate '{name}'을 찾을 수 없습니다."}
except Exception as e:
return {"success": False, "error": str(e)}
def list_certificates(self) -> List[Dict]:
"""모든 인증서 목록 조회"""
try:
certs = []
for cert_props in self.cert_client.list_properties_of_certificates():
certs.append({
"name": cert_props.name,
"enabled": cert_props.enabled,
"created": str(cert_props.created_on),
"expires": str(cert_props.expires_on) if cert_props.expires_on else None,
"thumbprint": cert_props.x509_thumbprint.hex() if cert_props.x509_thumbprint else None
})
return certs
except Exception as e:
print(f"❌ 인증서 목록 조회 실패: {e}", file=sys.stderr)
return []
def delete_certificate(self, name: str) -> Dict:
"""인증서 삭제"""
try:
poller = self.cert_client.begin_delete_certificate(name)
deleted_cert = poller.result()
# deleted_date 속성 확인 (속성 이름이 다를 수 있음)
deleted_on = None
if hasattr(deleted_cert, 'deleted_date') and deleted_cert.deleted_date:
deleted_on = str(deleted_cert.deleted_date)
elif hasattr(deleted_cert, 'deleted_on') and deleted_cert.deleted_on:
deleted_on = str(deleted_cert.deleted_on)
elif hasattr(deleted_cert.properties, 'deleted_on') and deleted_cert.properties.deleted_on:
deleted_on = str(deleted_cert.properties.deleted_on)
return {
"success": True,
"name": deleted_cert.name,
"deleted_on": deleted_on
}
except Exception as e:
return {"success": False, "error": str(e)}