"""
RustFS客户端模块
使用boto3 SDK与RustFS S3兼容服务进行交互。
"""
import os
import time
import mimetypes
import uuid
from typing import Dict, Any, Optional
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError, NoCredentialsError
from .config import config
class RustFSClient:
"""RustFS客户端 - 使用boto3 SDK"""
def __init__(self):
self.endpoint_url = config.fs_url.rstrip('/')
self.access_key = config.fs_ak
self.secret_key = config.fs_sk
self.timeout = config.timeout
self.region_name = 'us-east-1' # S3兼容服务的默认区域
# 初始化boto3 S3客户端
self.s3_client = boto3.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=Config(signature_version='s3v4'),
region_name=self.region_name
)
def upload_file(self, file_path: str, bucket: str = None) -> Dict[str, Any]:
"""
使用boto3 SDK上传文件到RustFS
Args:
file_path: 本地文件绝对路径
bucket: 存储桶名称,如果为None则使用配置的默认bucket
Returns:
包含上传结果的字典,包括访问路径等信息
Raises:
FileNotFoundError: 文件不存在
ValueError: 参数无效
Exception: 上传失败
"""
# 验证文件存在性
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件不存在: {file_path}")
if not os.path.isfile(file_path):
raise ValueError(f"路径不是文件: {file_path}")
# 获取文件信息
filename = os.path.basename(file_path)
file_size = os.path.getsize(file_path)
content_type = mimetypes.guess_type(filename)[0] or 'application/octet-stream'
# 使用配置的默认bucket
if bucket is None:
bucket = config.default_bucket
# 生成唯一文件名(避免冲突)
file_extension = os.path.splitext(filename)[1]
unique_filename = f"{int(time.time())}_{uuid.uuid4().hex[:8]}{file_extension}"
try:
# 使用boto3上传文件
extra_args = {
'ContentType': content_type,
'Metadata': {
'original_filename': filename,
'upload_time': str(int(time.time()))
}
}
# 执行上传
self.s3_client.upload_file(
file_path,
bucket,
unique_filename,
ExtraArgs=extra_args
)
# 构造访问URL
access_url = f"{self.endpoint_url}/{bucket}/{unique_filename}"
# 构造返回结果
return {
"success": True,
"filename": unique_filename,
"original_filename": filename,
"size": file_size,
"content_type": content_type,
"access_url": access_url,
"file_id": unique_filename,
"bucket": bucket,
"upload_method": "boto3_s3"
}
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
raise Exception(f"上传失败: {error_code} - {error_msg}") from e
except NoCredentialsError as e:
raise Exception("认证失败:请检查Access Key和Secret Key") from e
except Exception as e:
raise Exception(f"上传过程中发生错误: {str(e)}") from e
def get_file_info(self, file_id: str, bucket: str = None) -> Optional[Dict[str, Any]]:
"""
获取文件信息(使用boto3 SDK)
Args:
file_id: 文件ID(文件名)
bucket: 存储桶名称,如果为None则使用配置的默认bucket
Returns:
文件信息字典,如果文件不存在返回None
"""
try:
# 使用配置的默认bucket
if bucket is None:
bucket = config.default_bucket
# 使用boto3获取对象元数据
response = self.s3_client.head_object(Bucket=bucket, Key=file_id)
# 构造访问URL
access_url = f"{self.endpoint_url}/{bucket}/{file_id}"
return {
"file_id": file_id,
"content_type": response.get("ContentType", ""),
"content_length": response.get("ContentLength", 0),
"last_modified": response.get("LastModified", "").isoformat() if response.get("LastModified") else "",
"etag": response.get("ETag", "").strip('"') if response.get("ETag") else "",
"access_url": access_url,
"metadata": response.get("Metadata", {}),
"bucket": bucket
}
except ClientError as e:
if e.response['Error']['Code'] == '404' or 'NoSuchKey' in str(e):
return None
else:
# 其他错误,可以选择记录日志或返回None
return None
except Exception:
return None
def download_file(self, bucket: str, file_key: str, local_path: str) -> Dict[str, Any]:
"""
使用boto3 SDK从RustFS下载文件
Args:
bucket: 存储桶名称
file_key: 文件在S3中的键名
local_path: 本地保存路径
Returns:
包含下载结果的字典,包括:
- success: 下载是否成功
- bucket: 存储桶名称
- file_key: 原始文件键名
- local_path: 本地文件路径
- filename: 本地文件名
- size: 文件大小(字节)
- content_type: 文件MIME类型
- download_method: 下载方法标识
Raises:
FileNotFoundError: 文件不存在
Exception: 下载失败
"""
try:
# 确保本地目录存在
import os
local_dir = os.path.dirname(local_path)
if local_dir and not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
# 使用boto3下载文件
self.s3_client.download_file(bucket, file_key, local_path)
# 获取文件信息
file_info = self.get_file_info(file_key, bucket)
local_file_size = os.path.getsize(local_path)
return {
"success": True,
"bucket": bucket,
"file_key": file_key,
"local_path": local_path,
"filename": os.path.basename(local_path),
"size": local_file_size,
"content_type": file_info.get("content_type", "application/octet-stream") if file_info else "application/octet-stream",
"download_method": "boto3_s3"
}
except ClientError as e:
error_code = e.response['Error']['Code']
error_msg = e.response['Error']['Message']
if error_code in ['404', 'NoSuchKey']:
raise FileNotFoundError(f"文件不存在: {bucket}/{file_key}")
raise Exception(f"下载失败: {error_code} - {error_msg}") from e
except NoCredentialsError as e:
raise Exception("认证失败:请检查Access Key和Secret Key") from e
except Exception as e:
raise Exception(f"下载过程中发生错误: {str(e)}") from e
def parse_rustfs_url(self, url: str) -> tuple[str, str] | None:
"""
解析RustFS URL,提取bucket和文件key
Args:
url: RustFS文件URL
Returns:
(bucket, file_key) 元组,如果不是RustFS URL则返回None
"""
try:
from urllib.parse import urlparse
parsed_url = urlparse(url)
# 检查是否是RustFS URL
if not parsed_url.netloc or self.endpoint_url not in f"{parsed_url.scheme}://{parsed_url.netloc}":
return None
# 提取路径并去除开头的斜杠
path = parsed_url.path.lstrip('/')
# 分割bucket和文件key
if '/' not in path:
return None
bucket, file_key = path.split('/', 1)
if not bucket or not file_key:
return None
return (bucket, file_key)
except Exception:
return None