__init__.py•4.21 kB
from __future__ import annotations
import importlib
import inspect
from types import ModuleType
from typing import Any, Callable
import os
from importlib.metadata import entry_points
import time
from utils import cache_call, check_permission, write_audit
__all__ = ["Adapter", "load_adapters"]
class Adapter:
"""Base adapter that wraps an SDK module and exposes its public callables."""
def __init__(self, name: str, module: ModuleType):
self.name = name
self.module = module
self.methods: dict[str, Callable[..., Any]] = {}
self._discover_methods()
# ---------------------------------------------------------------------
# Discovery
# ---------------------------------------------------------------------
def _discover_methods(self) -> None:
"""Populate ``self.methods`` with callables we can safely expose."""
for attr_name in dir(self.module):
if attr_name.startswith("_"):
continue
attr = getattr(self.module, attr_name)
if callable(attr):
self.methods[f"{self.name}.{attr_name}"] = attr
elif inspect.isclass(attr):
# Expose classmethods & staticmethods
for name, member in inspect.getmembers(attr):
if name.startswith("_"):
continue
if inspect.isfunction(member) or inspect.ismethod(member):
fq = f"{self.name}.{attr.__name__}.{name}"
self.methods[fq] = member
# ------------------------------------------------------------------
# Execution helpers
# ------------------------------------------------------------------
def call(self, fq_name: str, *args: Any, ttl: int | None = None, **kwargs: Any) -> Any: # noqa: D401
"""call the fully-qualified function or method by name with rbac, caching, audit."""
if not check_permission(fq_name):
raise PermissionError(f"access to {fq_name} denied by policy")
fn = self.methods[fq_name]
key = (fq_name, args, tuple(sorted(kwargs.items())))
def _exec():
start = time.time()
try:
result = fn(*args, **kwargs)
write_audit({"fq_name": fq_name, "args": args, "kwargs": kwargs, "success": True, "latency": time.time() - start})
return result
except Exception as exc: # noqa: broad-except
write_audit({"fq_name": fq_name, "args": args, "kwargs": kwargs, "success": False, "error": str(exc)})
raise
return cache_call(key, _exec, ttl)
# -------------------------------------------------------------------------
# Registry & Loader
# -------------------------------------------------------------------------
_ADAPTER_REGISTRY: dict[str, Adapter] = {}
def register_adapter(name: str, module_name: str) -> Adapter:
module = importlib.import_module(module_name)
adapter = Adapter(name, module)
_ADAPTER_REGISTRY[name] = adapter
return adapter
def load_adapters() -> dict[str, Adapter]: # pragma: no cover
"""Load built-ins, entry-points (``generalized_mcp_adapter`` group), and env-config."""
_ADAPTER_REGISTRY.clear()
built_ins = {
"kubernetes": "kubernetes.client",
"github": "github",
"azure_identity": "azure.identity",
}
for name, mod in built_ins.items():
try:
register_adapter(name, mod)
except ModuleNotFoundError:
continue
# Entry points allow external packages to add adapters without code changes.
for ep in entry_points().select(group="generalized_mcp_adapter"):
try:
register_adapter(ep.name, ep.value)
except Exception: # pragma: no cover
continue
# Optional comma-separated list of extra top-level modules
extra_sdks = os.getenv("EXTRA_SDKS", "")
for mod in filter(None, [m.strip() for m in extra_sdks.split(",")]):
name = mod.split(".")[0]
try:
register_adapter(name, mod)
except ModuleNotFoundError:
continue
return _ADAPTER_REGISTRY