Skip to main content
Glama

EPICS MCP Server

by eunsang284
test_ioc_integration.py6.83 kB
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ IOC Monitor Integration Test Script EPICS-MCP-Server의 IOC Monitor 통합 기능을 테스트하는 스크립트 """ import asyncio import json from typing import Dict, Any class MockMCPClient: """테스트용 Mock MCP 클라이언트""" def __init__(self): self.tools = { "get_system_status": self._get_system_status, "get_ioc_count": self._get_ioc_count, "get_ioc_list": self._get_ioc_list, "get_ioc_details": self._get_ioc_details, "get_faulted_iocs": self._get_faulted_iocs, "get_ioc_logs": self._get_ioc_logs, "get_control_states": self._get_control_states, "get_pv_value": self._get_pv_value, "set_pv_value": self._set_pv_value, } async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: """도구 호출 시뮬레이션""" if name in self.tools: return await self.tools[name](arguments) else: return {"error": f"Unknown tool: {name}"} async def _get_system_status(self, args: Dict[str, Any]) -> Dict[str, Any]: """시스템 상태 조회 시뮬레이션""" return { "IOC Monitor Control IOC": "🟢 RUNNING", "SSH Server": "🟢 RUNNING", "IOC Info Cache Server": "🟢 RUNNING", "IOC Monitor Web Server": "🟢 RUNNING", "Alive Server": "🟢 RUNNING" } async def _get_ioc_count(self, args: Dict[str, Any]) -> Dict[str, Any]: """IOC 개수 조회 시뮬레이션""" return {"ioc_count": 15} async def _get_ioc_list(self, args: Dict[str, Any]) -> Dict[str, Any]: """IOC 목록 조회 시뮬레이션""" return { "iocs": [ "IOC_TEST_001", "IOC_TEST_002", "IOC_TEST_003", "IOC_VACUUM_001", "IOC_MOTOR_001", "IOC_BEAM_001" ] } async def _get_ioc_details(self, args: Dict[str, Any]) -> Dict[str, Any]: """IOC 상세 정보 조회 시뮬레이션""" return { "data": [ {"name": "IOC_TEST_001", "status": "ONLINE", "ip_address": "192.168.1.100"}, {"name": "IOC_TEST_002", "status": "ONLINE", "ip_address": "192.168.1.101"}, {"name": "IOC_TEST_003", "status": "OFFLINE", "ip_address": "192.168.1.102"}, {"name": "IOC_VACUUM_001", "status": "ONLINE", "ip_address": "192.168.1.103"}, {"name": "IOC_MOTOR_001", "status": "ONLINE", "ip_address": "192.168.1.104"}, {"name": "IOC_BEAM_001", "status": "ONLINE", "ip_address": "192.168.1.105"} ] } async def _get_faulted_iocs(self, args: Dict[str, Any]) -> Dict[str, Any]: """장애 IOC 조회 시뮬레이션""" return { "data": [ {"name": "IOC_TEST_003", "status": "OFFLINE", "ip_address": "192.168.1.102"} ] } async def _get_ioc_logs(self, args: Dict[str, Any]) -> Dict[str, Any]: """IOC 로그 조회 시뮬레이션""" ioc_name = args.get("ioc_name", "Unknown") return { "logs": [ f"2024-01-01 10:00:00 - {ioc_name} started", f"2024-01-01 10:05:00 - {ioc_name} connected to database", f"2024-01-01 10:10:00 - {ioc_name} all PVs loaded" ] } async def _get_control_states(self, args: Dict[str, Any]) -> Dict[str, Any]: """제어 상태 조회 시뮬레이션""" return { "monitoring_data": { "Online IOCs": 5, "Offline IOCs": 1, "Total IOCs": 6 } } async def _get_pv_value(self, args: Dict[str, Any]) -> Dict[str, Any]: """PV 값 읽기 시뮬레이션""" pv_name = args.get("pv_name", "Unknown") return { "status": "success", "value": 123.45, "message": f"{pv_name} PV의 현재 값은 123.45입니다." } async def _set_pv_value(self, args: Dict[str, Any]) -> Dict[str, Any]: """PV 값 설정 시뮬레이션""" pv_name = args.get("pv_name", "Unknown") pv_value = args.get("pv_value", "Unknown") return { "status": "success", "message": f"Successfully set PV '{pv_name}' value to: {pv_value}" } async def test_ioc_integration(): """IOC Monitor 통합 기능 테스트""" client = MockMCPClient() print("=== IOC Monitor Integration Test ===\n") # 1. 시스템 상태 확인 print("1. 시스템 상태 확인:") result = await client.call_tool("get_system_status", {}) print(json.dumps(result, indent=2, ensure_ascii=False)) print() # 2. IOC 개수 확인 print("2. IOC 개수 확인:") result = await client.call_tool("get_ioc_count", {}) print(json.dumps(result, indent=2, ensure_ascii=False)) print() # 3. IOC 목록 조회 print("3. IOC 목록 조회:") result = await client.call_tool("get_ioc_list", {}) print(json.dumps(result, indent=2, ensure_ascii=False)) print() # 4. IOC 상세 정보 조회 print("4. IOC 상세 정보 조회:") result = await client.call_tool("get_ioc_details", {}) print(json.dumps(result, indent=2, ensure_ascii=False)) print() # 5. 장애 IOC 확인 print("5. 장애 IOC 확인:") result = await client.call_tool("get_faulted_iocs", {}) print(json.dumps(result, indent=2, ensure_ascii=False)) print() # 6. 특정 IOC 로그 조회 print("6. 특정 IOC 로그 조회:") result = await client.call_tool("get_ioc_logs", {"ioc_name": "IOC_TEST_001"}) print(json.dumps(result, indent=2, ensure_ascii=False)) print() # 7. 제어 상태 조회 print("7. 제어 상태 조회:") result = await client.call_tool("get_control_states", {}) print(json.dumps(result, indent=2, ensure_ascii=False)) print() # 8. EPICS PV와 IOC 모니터링 동시 사용 print("8. EPICS PV와 IOC 모니터링 동시 사용:") # PV 값 읽기 pv_result = await client.call_tool("get_pv_value", {"pv_name": "TEST:PV:001"}) print("PV Value Result:") print(json.dumps(pv_result, indent=2, ensure_ascii=False)) # IOC 상태 확인 ioc_result = await client.call_tool("get_system_status", {}) print("\nIOC Status Result:") print(json.dumps(ioc_result, indent=2, ensure_ascii=False)) print() print("=== 테스트 완료 ===") def main(): """메인 함수""" asyncio.run(test_ioc_integration()) if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eunsang284/EPICS-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server