import os
import aiofiles
import logging
from urllib.parse import urlparse
from typing import List, Optional
from app.protocol.models import Resource, ResourceContent
from app.core.errors import ResourceNotFoundError, InvalidParamsError
from app.core.config import config
logger = logging.getLogger(__name__)
class ResourceService:
"""Service for handling resource operations"""
def __init__(self):
self.allowed_schemes = config.ALLOWED_RESOURCE_SCHEMES
self.max_file_size = config.MAX_FILE_SIZE
async def list_resources(self, base_path: str = ".", cursor: Optional[str] = None) -> List[Resource]:
"""List available resources in a directory"""
try:
# Validate and normalize base path
if not base_path or base_path == ".":
base_path = os.getcwd()
if not os.path.exists(base_path):
raise ResourceNotFoundError(f"Base path does not exist: {base_path}")
if not os.path.isdir(base_path):
raise InvalidParamsError(f"Base path is not a directory: {base_path}")
resources = []
try:
entries = os.listdir(base_path)
# Sort for consistent ordering
entries.sort()
for entry in entries:
entry_path = os.path.join(base_path, entry)
# Skip hidden files and directories
if entry.startswith('.'):
continue
# Create file:// URI
file_uri = f"file://{os.path.abspath(entry_path)}"
# Determine MIME type based on extension
mime_type = self._get_mime_type(entry)
# Create resource description
if os.path.isdir(entry_path):
description = f"Directory containing {len(os.listdir(entry_path))} items"
mime_type = "inode/directory"
else:
file_size = os.path.getsize(entry_path)
description = f"File ({file_size} bytes)"
resource = Resource(
uri=file_uri,
name=entry,
description=description,
mimeType=mime_type
)
resources.append(resource)
except PermissionError:
raise InvalidParamsError(f"Permission denied accessing: {base_path}")
logger.info(f"Listed {len(resources)} resources from {base_path}")
return resources
except Exception as e:
logger.error(f"Error listing resources: {str(e)}")
raise
async def read_resource(self, uri: str) -> ResourceContent:
"""Read content of a resource"""
try:
# Parse URI
parsed = urlparse(uri)
if parsed.scheme not in self.allowed_schemes:
raise InvalidParamsError(f"Unsupported URI scheme: {parsed.scheme}")
if parsed.scheme == "file":
return await self._read_file_resource(parsed.path)
raise InvalidParamsError(f"Handler not implemented for scheme: {parsed.scheme}")
except Exception as e:
logger.error(f"Error reading resource {uri}: {str(e)}")
raise
async def _read_file_resource(self, file_path: str) -> ResourceContent:
"""Read a file resource"""
# Normalize path
file_path = os.path.abspath(file_path)
if not os.path.exists(file_path):
raise ResourceNotFoundError(f"File not found: {file_path}")
if not os.path.isfile(file_path):
raise InvalidParamsError(f"Path is not a file: {file_path}")
# Check file size
file_size = os.path.getsize(file_path)
if file_size > self.max_file_size:
raise InvalidParamsError(f"File too large: {file_size} bytes (max: {self.max_file_size})")
# Determine MIME type
mime_type = self._get_mime_type(os.path.basename(file_path))
try:
# Read file content
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
return ResourceContent(
uri=f"file://{file_path}",
mimeType=mime_type,
text=content
)
except UnicodeDecodeError:
# Try reading as binary and encode as base64 for non-text files
async with aiofiles.open(file_path, 'rb') as f:
content = await f.read()
import base64
encoded_content = base64.b64encode(content).decode('ascii')
return ResourceContent(
uri=f"file://{file_path}",
mimeType=mime_type or "application/octet-stream",
text=f"data:{mime_type or 'application/octet-stream'};base64,{encoded_content}"
)
except Exception as e:
raise ResourceNotFoundError(f"Failed to read file {file_path}: {str(e)}")
def _get_mime_type(self, filename: str) -> str:
"""Get MIME type based on file extension"""
ext = os.path.splitext(filename)[1].lower()
mime_types = {
'.txt': 'text/plain',
'.md': 'text/markdown',
'.py': 'text/x-python',
'.js': 'text/javascript',
'.html': 'text/html',
'.css': 'text/css',
'.json': 'application/json',
'.xml': 'application/xml',
'.yaml': 'application/x-yaml',
'.yml': 'application/x-yaml',
'.csv': 'text/csv',
'.sql': 'application/sql',
'.log': 'text/plain',
}
return mime_types.get(ext, 'text/plain')
# Global instance
resource_service = ResourceService()