minio.py•6.24 kB
"""MinIO storage implementation for YaraFlux MCP Server.
This module provides a storage client that uses MinIO (S3-compatible storage) for storing
YARA rules, samples, scan results, and other files.
"""
import logging
from typing import TYPE_CHECKING, Any, BinaryIO, Dict, List, Optional, Tuple, Union
try:
from minio import Minio
from minio.error import S3Error
except ImportError as e:
raise ImportError("MinIO support requires the MinIO client library. Install it with: pip install minio") from e
from yaraflux_mcp_server.storage.base import StorageClient, StorageError
# Handle conditional imports to avoid circular references
if TYPE_CHECKING:
from yaraflux_mcp_server.config import settings
else:
from yaraflux_mcp_server.config import settings
# Configure logging
logger = logging.getLogger(__name__)
class MinioStorageClient(StorageClient):
"""Storage client that uses MinIO (S3-compatible storage)."""
def __init__(self):
"""Initialize MinIO storage client."""
# Validate MinIO settings
if not all([settings.MINIO_ENDPOINT, settings.MINIO_ACCESS_KEY, settings.MINIO_SECRET_KEY]):
raise ValueError("MinIO storage requires MINIO_ENDPOINT, MINIO_ACCESS_KEY, and MINIO_SECRET_KEY settings")
# Initialize MinIO client
self.client = Minio(
endpoint=settings.MINIO_ENDPOINT,
access_key=settings.MINIO_ACCESS_KEY,
secret_key=settings.MINIO_SECRET_KEY,
secure=settings.MINIO_SECURE,
)
# Define bucket names
self.rules_bucket = settings.MINIO_BUCKET_RULES
self.samples_bucket = settings.MINIO_BUCKET_SAMPLES
self.results_bucket = settings.MINIO_BUCKET_RESULTS
self.files_bucket = "yaraflux-files"
self.files_meta_bucket = "yaraflux-files-meta"
# Ensure buckets exist
self._ensure_bucket_exists(self.rules_bucket)
self._ensure_bucket_exists(self.samples_bucket)
self._ensure_bucket_exists(self.results_bucket)
self._ensure_bucket_exists(self.files_bucket)
self._ensure_bucket_exists(self.files_meta_bucket)
logger.info(
f"Initialized MinIO storage: endpoint={settings.MINIO_ENDPOINT}, "
f"rules={self.rules_bucket}, samples={self.samples_bucket}, "
f"results={self.results_bucket}, files={self.files_bucket}"
)
def _ensure_bucket_exists(self, bucket_name: str) -> None:
"""Ensure a bucket exists, creating it if necessary.
Args:
bucket_name: Name of the bucket to check/create
Raises:
StorageError: If the bucket cannot be created
"""
try:
if not self.client.bucket_exists(bucket_name):
self.client.make_bucket(bucket_name)
logger.info(f"Created MinIO bucket: {bucket_name}")
except S3Error as e:
logger.error(f"Failed to create MinIO bucket {bucket_name}: {str(e)}")
raise StorageError(f"Failed to create MinIO bucket: {str(e)}") from e
# TODO: Implement the rest of the StorageClient interface for MinIO
# This would include implementations for all methods from the StorageClient abstract base class.
# For now, we're just providing a stub since the module is not critical for the current implementation.
# Rule management methods
def save_rule(self, rule_name: str, content: str, source: str = "custom") -> str:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def get_rule(self, rule_name: str, source: str = "custom") -> str:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def delete_rule(self, rule_name: str, source: str = "custom") -> bool:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def list_rules(self, source: Optional[str] = None) -> List[Dict[str, Any]]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
# Sample management methods
def save_sample(self, filename: str, content: Union[bytes, BinaryIO]) -> Tuple[str, str]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def get_sample(self, sample_id: str) -> bytes:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
# Result management methods
def save_result(self, result_id: str, content: Dict[str, Any]) -> str:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def get_result(self, result_id: str) -> Dict[str, Any]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
# File management methods
def save_file(
self, filename: str, content: Union[bytes, BinaryIO], metadata: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def get_file(self, file_id: str) -> bytes:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def list_files(
self, page: int = 1, page_size: int = 100, sort_by: str = "uploaded_at", sort_desc: bool = True
) -> Dict[str, Any]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def get_file_info(self, file_id: str) -> Dict[str, Any]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def delete_file(self, file_id: str) -> bool:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def extract_strings(
self,
file_id: str,
*,
min_length: int = 4,
include_unicode: bool = True,
include_ascii: bool = True,
limit: Optional[int] = None,
) -> Dict[str, Any]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")
def get_hex_view(
self, file_id: str, *, offset: int = 0, length: Optional[int] = None, bytes_per_line: int = 16
) -> Dict[str, Any]:
raise NotImplementedError("MinIO storage client is not fully implemented yet")