Skip to main content
Glama

EPICS MCP Server

by eunsang284
server_by_stdio.py25.1 kB
from enum import Enum from epics import caget, cainfo, caput import asyncio from typing import List, Dict from mcp.server.models import InitializationOptions from mcp.types import Tool, TextContent, ImageContent, EmbeddedResource from mcp.server import NotificationOptions, Server import mcp.server.stdio from dotenv import load_dotenv import json from typing import Sequence import logging import requests # Load environment configuration load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) class EpicsTools(str, Enum): GET_PV_VALUE = "get_pv_value" SET_PV_VALUE = "set_pv_value" GET_PV_INFO = "get_pv_info" SUGGEST_PVS = "suggest_pvs" GET_ALL_PVS = "get_all_pvs" # IOC Monitor 관련 도구들 추가 GET_IOC_STATUS = "get_ioc_status" GET_IOC_LIST = "get_ioc_list" GET_IOC_DETAILS = "get_ioc_details" GET_FAULTED_IOCS = "get_faulted_iocs" GET_SYSTEM_STATUS = "get_system_status" GET_IOC_COUNT = "get_ioc_count" GET_IOC_LOGS = "get_ioc_logs" GET_CONTROL_STATES = "get_control_states" def get_all_pvs_from_channelfinder() -> dict: """ChannelFinder에서 모든 PV 리스트를 가져옵니다.""" try: CHANNEL_FINDER_URL = "http://192.168.70.235:8085/ChannelFinder/resources/channels" # 모든 PV를 가져오기 위해 빈 쿼리 사용 response = requests.get(CHANNEL_FINDER_URL, params={}) if response.status_code == 200: channels = response.json() all_pvs = [ch["name"] for ch in channels if "name" in ch] all_pvs = sorted(all_pvs) return { "status": "success", "matches": all_pvs, "message": f"Found {len(all_pvs)} total PVs in the system" } else: return { "status": "error", "message": f"ChannelFinder API returned status code: {response.status_code}" } except requests.RequestException as e: return { "status": "error", "message": f"Failed to query ChannelFinder: {str(e)}" } def suggest_pvs_from_channelfinder(query: str) -> dict: if not query or not isinstance(query, str): return {"status": "error", "message": "Query cannot be empty and must be a string."} try: CHANNEL_FINDER_URL = "http://192.168.70.235:8085/ChannelFinder/resources/channels" query_lower = query.lower() # "PV list", "all", "전체" 등의 키워드가 있으면 모든 PV 반환 if any(keyword in query_lower for keyword in ["pv list", "all", "전체", "모든", "list"]): return get_all_pvs_from_channelfinder() # 도움말 요청 처리 if any(keyword in query_lower for keyword in ["help", "도움말", "사용법", "example", "예시"]): help_message = """ChannelFinder 검색 도움말: **사용 가능한 검색 키워드:** - 진공 관련: "vacuum", "vac", "pump" - 모터 관련: "motor", "drive", "position" - 밸브 관련: "valve", "gate", "shutter" - 센서 관련: "sensor", "detector", "monitor" - 온도 관련: "temp", "temperature", "heater" - 압력 관련: "pressure", "vac", "gauge" - 전류/전압 관련: "current", "voltage", "power" - 빔 관련: "beam", "intensity", "flux" **검색 예시:** - "vacuum" → 진공 관련 PV 검색 - "motor" → 모터 관련 PV 검색 - "temp" → 온도 관련 PV 검색 - "beam" → 빔 관련 PV 검색 **전체 PV 목록:** - "list" 또는 "all" → 모든 PV 반환""" return { "status": "success", "matches": [], "message": help_message } keywords = set() keywords.add(query_lower) # 일반적인 검색 키워드 확장 if "vacuum" in query_lower or "vac" in query_lower: keywords.add("vacuum") keywords.add("vac") keywords.add("pump") # 진공 펌프도 포함 elif "motor" in query_lower: keywords.add("motor") keywords.add("drive") keywords.add("position") elif "valve" in query_lower: keywords.add("valve") keywords.add("gate") keywords.add("shutter") elif "temp" in query_lower or "temperature" in query_lower: keywords.add("temp") keywords.add("temperature") keywords.add("heater") elif "pressure" in query_lower: keywords.add("pressure") keywords.add("gauge") keywords.add("vac") elif "beam" in query_lower: keywords.add("beam") keywords.add("intensity") keywords.add("flux") matches = set() for kw in keywords: response = requests.get(CHANNEL_FINDER_URL, params={"~name": f"*{kw}*"}) if response.status_code == 200: channels = response.json() matches.update(ch["name"] for ch in channels if "name" in ch) matches = sorted(matches) if not matches: return { "status": "success", "matches": [], "message": f"No PVs found for query: '{query}'. Try using 'help' to see available search keywords." } return { "status": "success", "matches": matches, "message": f"Found {len(matches)} PVs matching query: '{query}'" } except requests.RequestException as e: return { "status": "error", "message": f"Failed to query ChannelFinder: {str(e)}" } class EpicsServer: def __init__(self): self.pv_alias_map = { # 기존 주파수 관련 별칭 "main frequency": "tTEST-CTRL:TS-EVG001:MAIN", "main": "tTEST-CTRL:TS-EVG001:MAIN", "sub frequency": "tTEST-CTRL:TS-EVG001:SUB", "sub": "tTEST-CTRL:TS-EVG001:SUB", "chopper": "tTEST-CTRL:TS-EVR001:CHOPPER", "rfq": "tTEST-CTRL:TS-EVR003:RFQ", "rfq frequency": "tTEST-CTRL:TS-EVR003:RFQ", # 게이트밸브 관련 "gv": "tTEST-CTRL:TS-GV001:STATUS", "gate valve": "tTEST-CTRL:TS-GV001:STATUS", "게이트밸브": "tTEST-CTRL:TS-GV001:STATUS", # 웜섹션 마그넷 관련 "wsm": "tTEST-CTRL:TS-WSM001:CURRENT", "warm section magnet": "tTEST-CTRL:TS-WSM001:CURRENT", "웜섹션 마그넷": "tTEST-CTRL:TS-WSM001:CURRENT", "웜섹션": "tTEST-CTRL:TS-WSM001:CURRENT", # 쿼드로플 마그넷 관련 "qm": "tTEST-CTRL:TS-QM001:CURRENT", "quadrupole magnet": "tTEST-CTRL:TS-QM001:CURRENT", "쿼드로플 마그넷": "tTEST-CTRL:TS-QM001:CURRENT", "쿼드로플": "tTEST-CTRL:TS-QM001:CURRENT", # 스티어링 마그넷 관련 "km": "tTEST-CTRL:TS-KM001:CURRENT", "steering magnet": "tTEST-CTRL:TS-KM001:CURRENT", "스티어링 마그넷": "tTEST-CTRL:TS-KM001:CURRENT", "스티어링": "tTEST-CTRL:TS-KM001:CURRENT", # ESQ 관련 "esq": "tTEST-CTRL:TS-ESQ001:VOLTAGE", "electrostatic quadrupole": "tTEST-CTRL:TS-ESQ001:VOLTAGE", "전기사중극자": "tTEST-CTRL:TS-ESQ001:VOLTAGE", # 추가적인 일반적인 약어들 "bpm": "tTEST-CTRL:TS-BPM001:POSITION", "beam position monitor": "tTEST-CTRL:TS-BPM001:POSITION", "빔 위치 모니터": "tTEST-CTRL:TS-BPM001:POSITION", "fc": "tTEST-CTRL:TS-FC001:CURRENT", "faraday cup": "tTEST-CTRL:TS-FC001:CURRENT", "패러데이 컵": "tTEST-CTRL:TS-FC001:CURRENT", "slit": "tTEST-CTRL:TS-SLIT001:POSITION", "슬릿": "tTEST-CTRL:TS-SLIT001:POSITION", } def _resolve_pv_name(self, pv_name: str) -> str: if not pv_name: return "" return self.pv_alias_map.get(pv_name.strip().lower(), pv_name) def get_pv_value(self, pv_name: str) -> Dict[str, str]: if not pv_name or not isinstance(pv_name, str): return {"status": "error", "message": "PV name cannot be empty and must be a string."} resolved_name = self._resolve_pv_name(pv_name) logging.info(f"Resolved PV name '{pv_name}' → '{resolved_name}'") try: value = caget(resolved_name, timeout=5) if value is None: raise TimeoutError("Channel connect timed out") if resolved_name != pv_name: msg = f'"{pv_name}"는 {resolved_name} PV로부터 추정되며, 현재 값은 {value}입니다.' else: msg = f'{resolved_name} PV의 현재 값은 {value}입니다.' return {"status": "success", "value": value, "message": msg} except TimeoutError: return {"status": "error", "message": f"Timeout while getting PV '{resolved_name}' value. Please check the network connection."} except Exception as e: return {"status": "error", "message": f"An unknown error occurred: {str(e)}"} def get_pv_info(self, pv_name: str) -> Dict[str, str]: if not pv_name or not isinstance(pv_name, str): return {"status": "error", "message": "PV name cannot be empty and must be a string."} resolved_name = self._resolve_pv_name(pv_name) logging.info(f"Resolved PV name '{pv_name}' → '{resolved_name}'") try: info = cainfo(resolved_name, print_out=False, timeout=5) if info is None: raise TimeoutError("Channel connect timed out") return {"status": "success", "info": info} except TimeoutError: return {"status": "error", "message": f"Timeout while getting PV '{resolved_name}' value. Please check the network connection."} except Exception as e: return {"status": "error", "message": f"An unknown error occurred: {str(e)}"} def set_pv_value(self, pv_name: str, pv_value: str) -> Dict[str, str]: if not pv_name or not isinstance(pv_name, str): return {"status": "error", "message": "PV name cannot be empty and must be a string."} if not pv_value or not isinstance(pv_value, str): return {"status": "error", "message": "PV value cannot be empty and must be a string."} resolved_name = self._resolve_pv_name(pv_name) logging.info(f"Resolved PV name '{pv_name}' → '{resolved_name}'") try: success = caput(resolved_name, pv_value, timeout=5) if not success: raise ValueError("Set operation failed") return {"status": "success", "message": f"Successfully set PV '{resolved_name}' value to: {pv_value}"} except TimeoutError: return {"status": "error", "message": f"Timeout while setting PV '{resolved_name}' value. Please check the network connection."} except Exception as e: return {"status": "error", "message": f"An unknown error occurred: {str(e)}"} def suggest_pvs(self, query: str) -> Dict[str, List[str]]: return suggest_pvs_from_channelfinder(query) class IOCMonitorClient: """IOC Monitor API 클라이언트""" def __init__(self, base_url: str = "http://192.168.70.235:5001", timeout: int = 10): self.base_url = base_url.rstrip('/') self.timeout = timeout self.session = requests.Session() def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, any]: """HTTP 요청을 수행하고 응답을 반환""" url = f"{self.base_url}{endpoint}" try: response = self.session.request( method, url, timeout=self.timeout, **kwargs ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: return {"error": f"Request failed: {str(e)}", "success": False} except json.JSONDecodeError: return {"error": "Invalid JSON response", "success": False} def get_system_status(self) -> Dict[str, any]: """시스템 상태 조회""" return self._make_request("GET", "/api/status") def get_ioc_count(self) -> Dict[str, any]: """IOC 개수 조회""" return self._make_request("GET", "/api/ioc_count") def get_ioc_list(self) -> Dict[str, any]: """IOC 목록 조회""" return self._make_request("GET", "/api/alive/ioc_list") def get_ioc_details(self) -> Dict[str, any]: """모든 IOC 상세 정보 조회""" return self._make_request("GET", "/api/alive/ioc_details") def get_ioc_detail(self, ioc_name: str) -> Dict[str, any]: """특정 IOC 상세 정보 조회""" return self._make_request("GET", f"/api/alive/ioc/{ioc_name}") def get_ioc_status_summary(self) -> Dict[str, any]: """IOC 상태 요약 정보""" return self._make_request("GET", "/api/alive/status") def get_faulted_iocs(self) -> Dict[str, any]: """장애 IOC 정보""" return self._make_request("GET", "/api/alive/faulted") def get_all_ioc_data(self) -> Dict[str, any]: """모든 IOC 데이터 (마스크 상태 포함)""" return self._make_request("GET", "/api/data") def get_ip_list(self) -> Dict[str, any]: """IOC IP 주소 목록""" return self._make_request("GET", "/api/ip_list") def get_faulted_iocs_detailed(self) -> Dict[str, any]: """장애 IOC 상세 정보 (마스크 제외)""" return self._make_request("GET", "/api/faulted_iocs") def get_control_states(self) -> Dict[str, any]: """제어 상태 및 모니터링 데이터""" return self._make_request("GET", "/api/control_states") def get_ioc_logs(self, ioc_name: str) -> Dict[str, any]: """특정 IOC의 이벤트 로그""" return self._make_request("GET", f"/api/ioc_logs/{ioc_name}") def get_all_events(self) -> Dict[str, any]: """모든 이벤트 캐시""" return self._make_request("GET", "/api/events") def read_pv(self, pv_name: str) -> Dict[str, any]: """PV 값 읽기""" return self._make_request("GET", f"/api/pv/caget/{pv_name}") def write_pv(self, pv_name: str, value: any) -> Dict[str, any]: """PV 값 설정""" data = {"value": value} return self._make_request("POST", f"/api/pv/caput/{pv_name}", json=data) def get_api_list(self) -> Dict[str, any]: """사용 가능한 모든 API 목록""" return self._make_request("GET", "/api/list") server: Server = Server("epics_ioc_monitor_tools") epics_server = EpicsServer() ioc_monitor_client = IOCMonitorClient() @server.list_tools() async def handle_list_tools() -> List[Tool]: return [ Tool( name=EpicsTools.GET_PV_VALUE.value, description="Get the value of a specific PV.", inputSchema={ "type": "object", "properties": { "pv_name": {"type": "string", "description": "The name of the PV variable provided by the user."} }, "required": ["pv_name"], }, ), Tool( name=EpicsTools.SET_PV_VALUE.value, description="Set the value of a specific PV.", inputSchema={ "type": "object", "properties": { "pv_name": {"type": "string", "description": "The name of the PV variable provided by the user."}, "pv_value": {"type": "string", "description": "The new PV value provided by the user."}, }, "required": ["pv_name", "pv_value"], }, ), Tool( name=EpicsTools.GET_PV_INFO.value, description="Get information about a specific PV.", inputSchema={ "type": "object", "properties": { "pv_name": {"type": "string", "description": "The name of the PV variable provided by the user."} }, "required": ["pv_name"], }, ), Tool( name=EpicsTools.SUGGEST_PVS.value, description="Suggest PVs related to the user's input keyword or get all PVs if 'list' or 'all' is mentioned.", inputSchema={ "type": "object", "properties": { "query": {"type": "string", "description": "Keyword or natural language input by user. Use 'list', 'all', 'PV list' to get all PVs."} }, "required": ["query"], }, ), Tool( name=EpicsTools.GET_ALL_PVS.value, description="Get all available PVs from the system.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), # IOC Monitor 관련 도구들 추가 Tool( name=EpicsTools.GET_SYSTEM_STATUS.value, description="Get the overall system status of IOC Monitor including IOC Monitor, SSH Server, Cache Server, etc.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name=EpicsTools.GET_IOC_COUNT.value, description="Get the total count of IOCs in the system.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name=EpicsTools.GET_IOC_LIST.value, description="Get a list of all IOC names from the Alive server.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name=EpicsTools.GET_IOC_DETAILS.value, description="Get detailed information about all IOCs including status, IP, uptime, etc.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name=EpicsTools.GET_FAULTED_IOCS.value, description="Get information about currently faulted/offline IOCs for monitoring and alerting.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), Tool( name=EpicsTools.GET_IOC_LOGS.value, description="Get event logs for a specific IOC for analysis and debugging.", inputSchema={ "type": "object", "properties": { "ioc_name": {"type": "string", "description": "The name of the IOC to get logs for."} }, "required": ["ioc_name"], }, ), Tool( name=EpicsTools.GET_CONTROL_STATES.value, description="Get control states and monitoring data for IOC monitoring status.", inputSchema={ "type": "object", "properties": {}, "required": [], }, ), ] @server.call_tool() async def handle_call_tool( name: str, arguments: dict ) -> Sequence[TextContent | ImageContent | EmbeddedResource]: try: match name: case EpicsTools.GET_PV_VALUE.value: result = epics_server.get_pv_value(arguments["pv_name"]) case EpicsTools.SET_PV_VALUE.value: result = epics_server.set_pv_value(arguments["pv_name"], arguments["pv_value"]) case EpicsTools.GET_PV_INFO.value: result = epics_server.get_pv_info(arguments["pv_name"]) case EpicsTools.SUGGEST_PVS.value: result = epics_server.suggest_pvs(arguments["query"]) case EpicsTools.GET_ALL_PVS.value: result = get_all_pvs_from_channelfinder() # IOC Monitor 관련 도구들 처리 case EpicsTools.GET_SYSTEM_STATUS.value: result = ioc_monitor_client.get_system_status() case EpicsTools.GET_IOC_COUNT.value: result = ioc_monitor_client.get_ioc_count() case EpicsTools.GET_IOC_LIST.value: result = ioc_monitor_client.get_ioc_list() case EpicsTools.GET_IOC_DETAILS.value: result = ioc_monitor_client.get_ioc_details() case EpicsTools.GET_FAULTED_IOCS.value: result = ioc_monitor_client.get_faulted_iocs() case EpicsTools.GET_IOC_LOGS.value: result = ioc_monitor_client.get_ioc_logs(arguments["ioc_name"]) case EpicsTools.GET_CONTROL_STATES.value: result = ioc_monitor_client.get_control_states() case _: raise ValueError(f"Unknown tool: {name}") # Special formatting for SUGGEST_PVS and GET_ALL_PVS if name in [EpicsTools.SUGGEST_PVS.value, EpicsTools.GET_ALL_PVS.value]: matches = result.get("matches", []) message = result.get("message", "") if matches: match_list = "\n".join(f"- {pv}" for pv in matches) full_message = f"{message}\n\nMatching PVs:\n{match_list}" else: full_message = message return [TextContent(type="text", text=full_message)] # Special formatting for IOC Monitor tools if name in [EpicsTools.GET_IOC_LIST.value, EpicsTools.GET_IOC_DETAILS.value, EpicsTools.GET_FAULTED_IOCS.value]: if "iocs" in result: # IOC 목록 형태의 응답 iocs = result.get("iocs", []) if isinstance(iocs, list) and iocs: ioc_list = "\n".join(f"- {ioc}" for ioc in iocs) message = result.get("message", f"Found {len(iocs)} IOCs") full_message = f"{message}\n\nIOC List:\n{ioc_list}" else: full_message = result.get("message", "No IOCs found") return [TextContent(type="text", text=full_message)] elif "data" in result: # IOC 상세 정보 형태의 응답 iocs = result.get("data", []) if isinstance(iocs, list) and iocs: ioc_details = [] for ioc in iocs: name = ioc.get("name", "Unknown") status = ioc.get("status", "Unknown") ip = ioc.get("ip_address", "N/A") ioc_details.append(f"- {name}: {status} ({ip})") ioc_list = "\n".join(ioc_details) message = result.get("message", f"Found {len(iocs)} IOCs") full_message = f"{message}\n\nIOC Details:\n{ioc_list}" else: full_message = result.get("message", "No IOC details found") return [TextContent(type="text", text=full_message)] # Special formatting for system status if name == EpicsTools.GET_SYSTEM_STATUS.value: if isinstance(result, dict): status_items = [] for service, status in result.items(): status_items.append(f"- {service}: {status}") status_list = "\n".join(status_items) full_message = f"System Status:\n{status_list}" return [TextContent(type="text", text=full_message)] msg = result.get("message") or json.dumps(result, indent=2) return [TextContent(type="text", text=msg)] except Exception as e: return [TextContent(type="text", text=f"Error processing MCP-EPICS query: {str(e)}")] async def main() -> None: async with mcp.server.stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="mcp_epics_ioc_monitor_server", server_version="1.0.0", capabilities=server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={}, ), ), ) if __name__ == "__main__": asyncio.run(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eunsang284/EPICS-MCP-Server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server