demo_kubernetes.py•1.18 kB
"""demo: kubernetes sdk via mcp."""
import json
import sys
def send_request(method: str, params: dict, req_id: int = 1):
"""send mcp request to stdin."""
request = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params
}
print(json.dumps(request), file=sys.stdout, flush=True)
def main():
"""demo kubernetes mcp tools."""
print("=== kubernetes mcp demo ===\n")
# 1. list all available kubernetes tools
print("1. listing kubernetes tools...")
send_request("tools/list", {})
# 2. list namespaces
print("\n2. calling kubernetes.CoreV1Api.list_namespace...")
send_request("tools/call", {
"name": "kubernetes.CoreV1Api.list_namespace",
"arguments": {}
}, req_id=2)
# 3. list pods
print("\n3. calling kubernetes.CoreV1Api.list_pod_for_all_namespaces...")
send_request("tools/call", {
"name": "kubernetes.CoreV1Api.list_pod_for_all_namespaces",
"arguments": {}
}, req_id=3)
print("\n=== demo complete ===")
print("send these requests to: python -m src.mcp_server")
if __name__ == "__main__":
main()