backend_api.py•8.52 kB
# 내용은 filesystem_backend_api.py와 동일하게 복사
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
import os
import json
from datetime import datetime
from pathlib import Path
app = FastAPI(title="File System Backend API", version="1.0.0")
# ---- Data Models ---------------------------------------------------------
class FileInfo(BaseModel):
name: str
type: str # "file" or "directory"
size: Optional[int] = None
modified: str
created: str
permissions: str
class DirectoryListing(BaseModel):
path: str
items: List[FileInfo]
total_count: int
class FileContent(BaseModel):
file_path: str
content: str
size: int
encoding: str
class FileWriteRequest(BaseModel):
content: str
encoding: str = "utf-8"
class DirectoryCreateRequest(BaseModel):
dir_path: str
class FileCopyRequest(BaseModel):
source_path: str
destination_path: str
overwrite: bool = False
# ---- API Endpoints -------------------------------------------------------
@app.get("/health")
async def health():
"""Health check endpoint"""
return {"status": "ok", "service": "File System Backend API"}
@app.get("/files/list")
async def list_directory_default() -> DirectoryListing:
"""List files and directories in the current directory (default path)"""
return await list_directory_with_path(".")
@app.get("/files/list/{path:path}")
async def list_directory(path: str) -> DirectoryListing:
"""List files and directories in the specified path"""
return await list_directory_with_path(path)
async def list_directory_with_path(path: str) -> DirectoryListing:
"""Internal function to list directory contents"""
try:
# Handle empty path or trailing slash
if not path or path == "" or path == ".":
path = "."
path_obj = Path(path)
if not path_obj.exists():
raise HTTPException(status_code=404, detail=f"Directory not found: {path}")
if not path_obj.is_dir():
raise HTTPException(
status_code=400, detail=f"Path is not a directory: {path}"
)
items = []
for item in path_obj.iterdir():
stat_info = item.stat()
item_info = FileInfo(
name=item.name,
type="directory" if item.is_dir() else "file",
size=stat_info.st_size if item.is_file() else None,
modified=datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
created=datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
permissions=oct(stat_info.st_mode)[-3:],
)
items.append(item_info)
return DirectoryListing(
path=str(path_obj.absolute()), items=items, total_count=len(items)
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/files/read/{file_path:path}")
async def read_file(file_path: str, encoding: str = "utf-8") -> FileContent:
"""Read the contents of a file"""
try:
path_obj = Path(file_path)
if not path_obj.exists():
raise HTTPException(status_code=404, detail=f"File not found: {file_path}")
if not path_obj.is_file():
raise HTTPException(
status_code=400, detail=f"Path is a directory, not a file: {file_path}"
)
with open(path_obj, "r", encoding=encoding) as f:
content = f.read()
return FileContent(
file_path=str(path_obj.absolute()),
content=content,
size=len(content),
encoding=encoding,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/files/write/{file_path:path}")
async def write_file(file_path: str, request: FileWriteRequest) -> Dict[str, Any]:
"""Write content to a file"""
try:
path_obj = Path(file_path)
# Create parent directories if they don't exist
path_obj.parent.mkdir(parents=True, exist_ok=True)
with open(path_obj, "w", encoding=request.encoding) as f:
f.write(request.content)
return {
"file_path": str(path_obj.absolute()),
"bytes_written": len(request.content.encode(request.encoding)),
"encoding": request.encoding,
"message": "File written successfully",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/files/info/{path:path}")
async def get_file_info(path: str) -> Dict[str, Any]:
"""Get detailed information about a file or directory"""
try:
path_obj = Path(path)
if not path_obj.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {path}")
stat_info = path_obj.stat()
info = {
"path": str(path_obj.absolute()),
"name": path_obj.name,
"type": "directory" if path_obj.is_dir() else "file",
"size": stat_info.st_size,
"created": datetime.fromtimestamp(stat_info.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
"accessed": datetime.fromtimestamp(stat_info.st_atime).isoformat(),
"permissions": oct(stat_info.st_mode)[-3:],
"owner": stat_info.st_uid,
"group": stat_info.st_gid,
}
if path_obj.is_file():
info["extension"] = path_obj.suffix
info["parent_directory"] = str(path_obj.parent)
return info
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/directories/create")
async def create_directory(request: DirectoryCreateRequest) -> Dict[str, Any]:
"""Create a new directory"""
try:
path_obj = Path(request.dir_path)
if path_obj.exists():
raise HTTPException(
status_code=409, detail=f"Directory already exists: {request.dir_path}"
)
path_obj.mkdir(parents=True, exist_ok=False)
return {
"dir_path": str(path_obj.absolute()),
"message": "Directory created successfully",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.delete("/files/{path:path}")
async def delete_file(path: str) -> Dict[str, Any]:
"""Delete a file or directory"""
try:
path_obj = Path(path)
if not path_obj.exists():
raise HTTPException(status_code=404, detail=f"Path not found: {path}")
if path_obj.is_dir():
path_obj.rmdir() # Only removes empty directories
message = "Directory deleted successfully"
else:
path_obj.unlink()
message = "File deleted successfully"
return {"path": str(path_obj.absolute()), "message": message}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/files/copy")
async def copy_file(request: FileCopyRequest) -> Dict[str, Any]:
"""Copy a file from source to destination"""
try:
source_path = Path(request.source_path)
destination_path = Path(request.destination_path)
# Check if source file exists
if not source_path.exists():
raise HTTPException(
status_code=404, detail=f"Source file not found: {request.source_path}"
)
if not source_path.is_file():
raise HTTPException(
status_code=400,
detail=f"Source path is not a file: {request.source_path}",
)
# Check if destination already exists
if destination_path.exists() and not request.overwrite:
raise HTTPException(
status_code=409,
detail=f"Destination file already exists: {request.destination_path}",
)
# Copy file
with open(source_path, "rb") as src, open(destination_path, "wb") as dst:
data = src.read()
dst.write(data)
return {
"source_path": str(source_path.absolute()),
"destination_path": str(destination_path.absolute()),
"bytes_copied": len(data),
"message": "File copied successfully",
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))