"""Plugin loader for dynamic plugin loading."""
import importlib.util
import inspect
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Type
from .base import BasePlugin, PluginManager
from ..utils.logging import get_logger
logger = get_logger(__name__)
class PluginLoader:
"""Dynamic plugin loader."""
def __init__(self, plugin_manager: PluginManager) -> None:
"""Initialize plugin loader.
Args:
plugin_manager: Plugin manager instance
"""
self.plugin_manager = plugin_manager
self.plugin_paths: List[Path] = []
self._logger = get_logger(__name__)
def add_plugin_path(self, path: str | Path) -> None:
"""Add a path to search for plugins.
Args:
path: Path to add
"""
plugin_path = Path(path)
if plugin_path.exists() and plugin_path.is_dir():
self.plugin_paths.append(plugin_path)
self._logger.info(f"Added plugin path: {plugin_path}")
else:
self._logger.warning(f"Plugin path does not exist: {plugin_path}")
def discover_plugins(self) -> List[str]:
"""Discover available plugins in registered paths.
Returns:
List of discovered plugin file paths
"""
discovered = []
for plugin_path in self.plugin_paths:
try:
for file_path in plugin_path.glob("**/*.py"):
if file_path.stem.startswith("plugin_") or file_path.parent.name == "plugins":
discovered.append(str(file_path))
self._logger.debug(f"Discovered plugin file: {file_path}")
except Exception as e:
self._logger.error(f"Error discovering plugins in {plugin_path}: {e}")
return discovered
async def load_plugin_file(self, file_path: str | Path) -> List[BasePlugin]:
"""Load plugins from a Python file.
Args:
file_path: Path to plugin file
Returns:
List of loaded plugin instances
"""
file_path = Path(file_path)
if not file_path.exists():
raise FileNotFoundError(f"Plugin file not found: {file_path}")
# Create module name from file path
module_name = f"mcp_plugin_{file_path.stem}_{id(file_path)}"
try:
# Load module from file
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec is None or spec.loader is None:
raise ImportError(f"Could not load spec for {file_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
# Find plugin classes in module
plugin_instances = []
for name, obj in inspect.getmembers(module):
if inspect.isclass(obj) and issubclass(obj, BasePlugin) and obj is not BasePlugin:
try:
# Instantiate plugin
plugin = obj()
plugin_instances.append(plugin)
self._logger.info(f"Loaded plugin class: {name} from {file_path}")
except Exception as e:
self._logger.error(f"Failed to instantiate plugin {name}: {e}")
# Look for plugin factory functions
for name, obj in inspect.getmembers(module):
if callable(obj) and name.startswith("create_plugin") and not inspect.isclass(obj):
try:
plugin = obj()
if isinstance(plugin, BasePlugin):
plugin_instances.append(plugin)
self._logger.info(f"Created plugin from factory: {name}")
except Exception as e:
self._logger.error(f"Failed to create plugin from {name}: {e}")
return plugin_instances
except Exception as e:
self._logger.error(f"Failed to load plugin file {file_path}: {e}")
# Clean up module if it was added
if module_name in sys.modules:
del sys.modules[module_name]
raise
async def load_plugin_directory(self, directory_path: str | Path) -> List[BasePlugin]:
"""Load all plugins from a directory.
Args:
directory_path: Path to plugin directory
Returns:
List of loaded plugin instances
"""
directory_path = Path(directory_path)
if not directory_path.exists() or not directory_path.is_dir():
raise NotADirectoryError(f"Plugin directory not found: {directory_path}")
all_plugins = []
# Look for Python files
for file_path in directory_path.glob("*.py"):
if not file_path.name.startswith("__"):
try:
plugins = await self.load_plugin_file(file_path)
all_plugins.extend(plugins)
except Exception as e:
self._logger.error(f"Failed to load plugins from {file_path}: {e}")
# Look for plugin packages (directories with __init__.py)
for subdir in directory_path.iterdir():
if subdir.is_dir() and (subdir / "__init__.py").exists():
try:
plugins = await self.load_plugin_file(subdir / "__init__.py")
all_plugins.extend(plugins)
except Exception as e:
self._logger.error(f"Failed to load plugin package {subdir}: {e}")
return all_plugins
async def load_and_register_plugin(self, file_path: str | Path) -> int:
"""Load and register plugins from a file.
Args:
file_path: Path to plugin file
Returns:
Number of plugins successfully loaded and registered
"""
try:
plugins = await self.load_plugin_file(file_path)
registered_count = 0
for plugin in plugins:
try:
await self.plugin_manager.register_plugin(plugin)
registered_count += 1
except Exception as e:
self._logger.error(f"Failed to register plugin {plugin.name}: {e}")
self._logger.info(f"Loaded and registered {registered_count} plugins from {file_path}")
return registered_count
except Exception as e:
self._logger.error(f"Failed to load plugins from {file_path}: {e}")
return 0
async def auto_discover_and_load(self) -> int:
"""Automatically discover and load all plugins.
Returns:
Total number of plugins loaded
"""
discovered_files = self.discover_plugins()
total_loaded = 0
for file_path in discovered_files:
try:
loaded = await self.load_and_register_plugin(file_path)
total_loaded += loaded
except Exception as e:
self._logger.error(f"Error loading plugin file {file_path}: {e}")
self._logger.info(f"Auto-discovery complete: {total_loaded} plugins loaded")
return total_loaded
def validate_plugin(self, plugin: BasePlugin) -> List[str]:
"""Validate plugin implementation.
Args:
plugin: Plugin to validate
Returns:
List of validation issues (empty if valid)
"""
issues = []
# Check required attributes
if not hasattr(plugin, "name") or not plugin.name:
issues.append("Plugin must have a non-empty name")
if not hasattr(plugin, "version") or not plugin.version:
issues.append("Plugin must have a version")
# Check required methods are implemented
required_methods = ["initialize", "cleanup", "get_info"]
for method_name in required_methods:
if not hasattr(plugin, method_name):
issues.append(f"Plugin must implement {method_name} method")
elif not callable(getattr(plugin, method_name)):
issues.append(f"Plugin {method_name} must be callable")
# Validate plugin info
try:
info = plugin.get_info()
if not info.name:
issues.append("Plugin info must have a name")
if not info.version:
issues.append("Plugin info must have a version")
except Exception as e:
issues.append(f"Plugin get_info() failed: {e}")
return issues