Skip to main content
Glama
observer.py2.64 kB
""" Реализация паттерна Наблюдатель для MCP компонентов. """ from abc import ABC from typing import Any, Dict, List, Protocol class Observer(Protocol): """ Протокол для наблюдателей, которые получают уведомления от Observable. """ async def update(self, uri: str, data: Any) -> None: """ Получает уведомление об изменении данных. Args: uri: Идентификатор ресурса, который изменился data: Новые данные """ ... class Observable(ABC): """ Базовый класс для объектов, за которыми можно наблюдать. Позволяет регистрировать, удалять и уведомлять наблюдателей. """ def __init__(self) -> None: """Инициализирует словарь наблюдателей.""" self._observers: Dict[str, List[Observer]] = {} async def notify_observers(self, uri: str, data: Any) -> None: """ Уведомляет всех наблюдателей об изменении данных. Args: uri: Идентификатор ресурса, который изменился data: Новые данные """ for observer in self._observers.get(uri, []): await observer.update(uri, data) def add_observer(self, uri: str, observer: Observer) -> None: """ Добавляет наблюдателя для указанного URI. Args: uri: Идентификатор ресурса для наблюдения observer: Наблюдатель, который будет получать уведомления """ if uri not in self._observers: self._observers[uri] = [] if observer not in self._observers[uri]: self._observers[uri].append(observer) def remove_observer(self, uri: str, observer: Observer) -> None: """ Удаляет наблюдателя для указанного URI. Args: uri: Идентификатор ресурса observer: Наблюдатель для удаления """ if uri in self._observers and observer in self._observers[uri]: self._observers[uri].remove(observer) if not self._observers[uri]: del self._observers[uri]

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/eagurin/myaiserv'

If you have feedback or need assistance with the MCP directory API, please join our Discord server