Skip to main content
Glama
minio.py6.24 kB
"""MinIO storage implementation for YaraFlux MCP Server. This module provides a storage client that uses MinIO (S3-compatible storage) for storing YARA rules, samples, scan results, and other files. """ import logging from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List, Optional, Tuple, Union try: from minio import Minio from minio.error import S3Error except ImportError as e: raise ImportError("MinIO support requires the MinIO client library. Install it with: pip install minio") from e from yaraflux_mcp_server.storage.base import StorageClient, StorageError # Handle conditional imports to avoid circular references if TYPE_CHECKING: from yaraflux_mcp_server.config import settings else: from yaraflux_mcp_server.config import settings # Configure logging logger = logging.getLogger(__name__) class MinioStorageClient(StorageClient): """Storage client that uses MinIO (S3-compatible storage).""" def __init__(self): """Initialize MinIO storage client.""" # Validate MinIO settings if not all([settings.MINIO_ENDPOINT, settings.MINIO_ACCESS_KEY, settings.MINIO_SECRET_KEY]): raise ValueError("MinIO storage requires MINIO_ENDPOINT, MINIO_ACCESS_KEY, and MINIO_SECRET_KEY settings") # Initialize MinIO client self.client = Minio( endpoint=settings.MINIO_ENDPOINT, access_key=settings.MINIO_ACCESS_KEY, secret_key=settings.MINIO_SECRET_KEY, secure=settings.MINIO_SECURE, ) # Define bucket names self.rules_bucket = settings.MINIO_BUCKET_RULES self.samples_bucket = settings.MINIO_BUCKET_SAMPLES self.results_bucket = settings.MINIO_BUCKET_RESULTS self.files_bucket = "yaraflux-files" self.files_meta_bucket = "yaraflux-files-meta" # Ensure buckets exist self._ensure_bucket_exists(self.rules_bucket) self._ensure_bucket_exists(self.samples_bucket) self._ensure_bucket_exists(self.results_bucket) self._ensure_bucket_exists(self.files_bucket) self._ensure_bucket_exists(self.files_meta_bucket) logger.info( f"Initialized MinIO storage: endpoint={settings.MINIO_ENDPOINT}, " f"rules={self.rules_bucket}, samples={self.samples_bucket}, " f"results={self.results_bucket}, files={self.files_bucket}" ) def _ensure_bucket_exists(self, bucket_name: str) -> None: """Ensure a bucket exists, creating it if necessary. Args: bucket_name: Name of the bucket to check/create Raises: StorageError: If the bucket cannot be created """ try: if not self.client.bucket_exists(bucket_name): self.client.make_bucket(bucket_name) logger.info(f"Created MinIO bucket: {bucket_name}") except S3Error as e: logger.error(f"Failed to create MinIO bucket {bucket_name}: {str(e)}") raise StorageError(f"Failed to create MinIO bucket: {str(e)}") from e # TODO: Implement the rest of the StorageClient interface for MinIO # This would include implementations for all methods from the StorageClient abstract base class. # For now, we're just providing a stub since the module is not critical for the current implementation. # Rule management methods def save_rule(self, rule_name: str, content: str, source: str = "custom") -> str: raise NotImplementedError("MinIO storage client is not fully implemented yet") def get_rule(self, rule_name: str, source: str = "custom") -> str: raise NotImplementedError("MinIO storage client is not fully implemented yet") def delete_rule(self, rule_name: str, source: str = "custom") -> bool: raise NotImplementedError("MinIO storage client is not fully implemented yet") def list_rules(self, source: Optional[str] = None) -> List[Dict[str, Any]]: raise NotImplementedError("MinIO storage client is not fully implemented yet") # Sample management methods def save_sample(self, filename: str, content: Union[bytes, BinaryIO]) -> Tuple[str, str]: raise NotImplementedError("MinIO storage client is not fully implemented yet") def get_sample(self, sample_id: str) -> bytes: raise NotImplementedError("MinIO storage client is not fully implemented yet") # Result management methods def save_result(self, result_id: str, content: Dict[str, Any]) -> str: raise NotImplementedError("MinIO storage client is not fully implemented yet") def get_result(self, result_id: str) -> Dict[str, Any]: raise NotImplementedError("MinIO storage client is not fully implemented yet") # File management methods def save_file( self, filename: str, content: Union[bytes, BinaryIO], metadata: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: raise NotImplementedError("MinIO storage client is not fully implemented yet") def get_file(self, file_id: str) -> bytes: raise NotImplementedError("MinIO storage client is not fully implemented yet") def list_files( self, page: int = 1, page_size: int = 100, sort_by: str = "uploaded_at", sort_desc: bool = True ) -> Dict[str, Any]: raise NotImplementedError("MinIO storage client is not fully implemented yet") def get_file_info(self, file_id: str) -> Dict[str, Any]: raise NotImplementedError("MinIO storage client is not fully implemented yet") def delete_file(self, file_id: str) -> bool: raise NotImplementedError("MinIO storage client is not fully implemented yet") def extract_strings( self, file_id: str, *, min_length: int = 4, include_unicode: bool = True, include_ascii: bool = True, limit: Optional[int] = None, ) -> Dict[str, Any]: raise NotImplementedError("MinIO storage client is not fully implemented yet") def get_hex_view( self, file_id: str, *, offset: int = 0, length: Optional[int] = None, bytes_per_line: int = 16 ) -> Dict[str, Any]: raise NotImplementedError("MinIO storage client is not fully implemented yet")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ThreatFlux/YaraFlux'

If you have feedback or need assistance with the MCP directory API, please join our Discord server