We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
import asyncio
import io
import os
from typing import Optional, BinaryIO
from minio import Minio
from minio.error import S3Error
from src.server.utils.logger import logger
class MinioClient:
"""Async wrapper for MinIO client."""
def __init__(self):
self.endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9300")
self.access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
self.secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
self.bucket_name = os.getenv("MINIO_BUCKET", "knowledge-base")
self.secure = os.getenv("MINIO_SECURE", "False").lower() == "true"
self.client = Minio(
self.endpoint,
access_key=self.access_key,
secret_key=self.secret_key,
secure=self.secure
)
# Ensure bucket exists
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""Ensure the target bucket exists."""
try:
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
logger.info(f"Created MinIO bucket: {self.bucket_name}")
except Exception as e:
logger.error(f"Failed to check/create MinIO bucket: {e}")
async def upload_bytes(self, object_name: str, data: bytes, content_type: str = "application/octet-stream") -> str:
"""Upload bytes to MinIO asynchronously."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self._upload_sync,
object_name,
data,
content_type
)
def _upload_sync(self, object_name: str, data: bytes, content_type: str) -> str:
"""Synchronous upload logic."""
try:
stream = io.BytesIO(data)
self.client.put_object(
self.bucket_name,
object_name,
stream,
length=len(data),
content_type=content_type
)
# Return the s3 path or http url?
# Returning s3-like path is better for internal usage: s3://bucket/key
return f"s3://{self.bucket_name}/{object_name}"
except S3Error as e:
logger.error(f"MinIO upload failed: {e}")
raise e
def get_presigned_url(self, object_name: str) -> str:
"""Get a presigned URL for downloading (optional usage)."""
return self.client.get_presigned_url("GET", self.bucket_name, object_name)
async def object_exists(self, object_name: str) -> bool:
"""Check if an object exists in MinIO."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._object_exists_sync, object_name)
def _object_exists_sync(self, object_name: str) -> bool:
"""Synchronous object existence check."""
try:
self.client.stat_object(self.bucket_name, object_name)
return True
except S3Error as e:
if e.code == "NoSuchKey":
return False
logger.error(f"MinIO stat_object failed: {e}")
return False
async def download_bytes(self, object_name: str) -> Optional[bytes]:
"""Download object content as bytes."""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._download_sync, object_name)
def _download_sync(self, object_name: str) -> Optional[bytes]:
"""Synchronous download logic."""
try:
response = self.client.get_object(self.bucket_name, object_name)
data = response.read()
response.close()
response.release_conn()
return data
except S3Error as e:
logger.error(f"MinIO download failed: {e}")
return None