Skip to main content
Glama
minio_client.py3.87 kB
import asyncio import io import os from typing import Optional, BinaryIO from minio import Minio from minio.error import S3Error from src.server.utils.logger import logger class MinioClient: """Async wrapper for MinIO client.""" def __init__(self): self.endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9300") self.access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin") self.secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin") self.bucket_name = os.getenv("MINIO_BUCKET", "knowledge-base") self.secure = os.getenv("MINIO_SECURE", "False").lower() == "true" self.client = Minio( self.endpoint, access_key=self.access_key, secret_key=self.secret_key, secure=self.secure ) # Ensure bucket exists self._ensure_bucket_exists() def _ensure_bucket_exists(self): """Ensure the target bucket exists.""" try: if not self.client.bucket_exists(self.bucket_name): self.client.make_bucket(self.bucket_name) logger.info(f"Created MinIO bucket: {self.bucket_name}") except Exception as e: logger.error(f"Failed to check/create MinIO bucket: {e}") async def upload_bytes(self, object_name: str, data: bytes, content_type: str = "application/octet-stream") -> str: """Upload bytes to MinIO asynchronously.""" loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self._upload_sync, object_name, data, content_type ) def _upload_sync(self, object_name: str, data: bytes, content_type: str) -> str: """Synchronous upload logic.""" try: stream = io.BytesIO(data) self.client.put_object( self.bucket_name, object_name, stream, length=len(data), content_type=content_type ) # Return the s3 path or http url? # Returning s3-like path is better for internal usage: s3://bucket/key return f"s3://{self.bucket_name}/{object_name}" except S3Error as e: logger.error(f"MinIO upload failed: {e}") raise e def get_presigned_url(self, object_name: str) -> str: """Get a presigned URL for downloading (optional usage).""" return self.client.get_presigned_url("GET", self.bucket_name, object_name) async def object_exists(self, object_name: str) -> bool: """Check if an object exists in MinIO.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._object_exists_sync, object_name) def _object_exists_sync(self, object_name: str) -> bool: """Synchronous object existence check.""" try: self.client.stat_object(self.bucket_name, object_name) return True except S3Error as e: if e.code == "NoSuchKey": return False logger.error(f"MinIO stat_object failed: {e}") return False async def download_bytes(self, object_name: str) -> Optional[bytes]: """Download object content as bytes.""" loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._download_sync, object_name) def _download_sync(self, object_name: str) -> Optional[bytes]: """Synchronous download logic.""" try: response = self.client.get_object(self.bucket_name, object_name) data = response.read() response.close() response.release_conn() return data except S3Error as e: logger.error(f"MinIO download failed: {e}") return None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server