"""
Taildrop File Sharing Module
Provides comprehensive Taildrop functionality for secure file sharing
across Tailscale networks with advanced features and monitoring.
"""
import asyncio
import hashlib
import time
from pathlib import Path
from typing import Any
import structlog
from pydantic import BaseModel, Field
from .exceptions import TailscaleMCPError
from .utils.tailscale_cli import TailscaleCLI
logger = structlog.get_logger(__name__)
class TaildropFile(BaseModel):
"""Taildrop file metadata model."""
filename: str = Field(..., description="File name")
size: int = Field(..., description="File size in bytes")
checksum: str = Field(..., description="File checksum")
sender: str = Field(..., description="Sender device ID")
recipient: str = Field(..., description="Recipient device ID")
status: str = Field(..., description="Transfer status")
created_at: float = Field(..., description="Creation timestamp")
completed_at: float | None = Field(None, description="Completion timestamp")
expires_at: float | None = Field(None, description="Expiration timestamp")
class TaildropTransfer(BaseModel):
"""Taildrop transfer information model."""
transfer_id: str = Field(..., description="Unique transfer ID")
sender_device: str = Field(..., description="Sender device name")
recipient_device: str = Field(..., description="Recipient device name")
files: list[TaildropFile] = Field(..., description="Files in transfer")
status: str = Field(..., description="Transfer status")
progress: float = Field(..., description="Transfer progress percentage")
created_at: float = Field(..., description="Transfer creation time")
estimated_completion: float | None = Field(
None, description="Estimated completion time"
)
class TaildropManager:
"""Comprehensive Taildrop file sharing manager."""
def __init__(
self,
taildrop_dir: str | None = None,
max_file_size: int = 100 * 1024 * 1024,
use_cli: bool = True,
tailscale_binary: str | None = None,
):
"""Initialize Taildrop manager.
Args:
taildrop_dir: Directory for Taildrop files (default: system temp)
max_file_size: Maximum file size in bytes (default: 100MB)
use_cli: Use real Tailscale CLI for transfers (default: True)
tailscale_binary: Path to tailscale binary (default: auto-detect)
"""
import tempfile
self.taildrop_dir = (
Path(taildrop_dir)
if taildrop_dir
else Path(tempfile.gettempdir()) / "taildrop"
)
self.max_file_size = max_file_size
self.transfers: dict[str, TaildropTransfer] = {}
self.active_transfers: dict[str, asyncio.Task] = {}
self.use_cli = use_cli
# Initialize CLI if enabled
if self.use_cli:
try:
self.cli = TailscaleCLI(tailscale_binary=tailscale_binary)
logger.info("Taildrop manager using Tailscale CLI", cli_available=True)
except Exception as e:
logger.warning(
"Tailscale CLI not available, falling back to simulated transfers",
error=str(e),
)
self.use_cli = False
self.cli = None
else:
self.cli = None
# Create taildrop directory if it doesn't exist
self.taildrop_dir.mkdir(parents=True, exist_ok=True)
logger.info(
"Taildrop manager initialized",
taildrop_dir=str(self.taildrop_dir),
max_file_size=max_file_size,
use_cli=self.use_cli,
)
async def send_file(
self,
file_path: str,
recipient_device: str,
sender_device: str | None = None,
expire_hours: int = 24,
) -> dict[str, Any]:
"""Send a file via Taildrop.
Args:
file_path: Path to the file to send
recipient_device: Target device ID or name
sender_device: Sender device ID or name (optional, auto-detected if using CLI)
expire_hours: File expiration time in hours
Returns:
Transfer information and status
"""
try:
file_path_obj = Path(file_path)
if not file_path_obj.exists():
raise FileNotFoundError(f"File not found: {file_path}")
file_size = file_path_obj.stat().st_size
if file_size > self.max_file_size:
raise ValueError(
f"File too large: {file_size} bytes (max: {self.max_file_size})"
)
# Generate transfer ID
transfer_id = hashlib.sha256(
f"{file_path}_{recipient_device}_{time.time()}".encode()
).hexdigest()
# Calculate file checksum
checksum = await self._calculate_checksum(file_path_obj)
# Use real CLI if available
if self.use_cli and self.cli:
try:
logger.info(
"Sending file via Tailscale CLI",
file_path=str(file_path_obj),
recipient=recipient_device,
)
# Use CLI to send file
cli_result = await self.cli.file_send(
str(file_path_obj.absolute()), recipient_device, wait=True
)
if cli_result.get("success"):
# Create transfer record for successful transfer
transfer = TaildropTransfer(
transfer_id=transfer_id,
sender_device=sender_device or "local",
recipient_device=recipient_device,
files=[
TaildropFile(
filename=file_path_obj.name,
size=file_size,
checksum=checksum,
sender=sender_device or "local",
recipient=recipient_device,
status="completed",
created_at=time.time(),
completed_at=time.time(),
expires_at=time.time() + (expire_hours * 3600),
)
],
status="completed",
progress=100.0,
created_at=time.time(),
estimated_completion=time.time(),
)
self.transfers[transfer_id] = transfer
logger.info(
"Taildrop transfer completed via CLI",
transfer_id=transfer_id,
filename=file_path_obj.name,
recipient=recipient_device,
)
return {
"transfer_id": transfer_id,
"status": "completed",
"filename": file_path_obj.name,
"size": file_size,
"recipient": recipient_device,
"expires_at": transfer.files[0].expires_at,
"message": f"File sent successfully to {recipient_device}",
"cli_output": cli_result.get("output"),
}
else:
# CLI failed, fall through to simulated transfer
logger.warning(
"CLI transfer failed, falling back to simulated",
error=cli_result.get("error"),
)
raise TailscaleMCPError(
f"CLI transfer failed: {cli_result.get('error')}"
)
except Exception as cli_error:
logger.warning(
"CLI transfer error, falling back to simulated",
error=str(cli_error),
)
# Fall through to simulated transfer if CLI fails
if not isinstance(cli_error, TailscaleMCPError):
raise
# Fallback to simulated transfer
# Create transfer record
transfer = TaildropTransfer(
transfer_id=transfer_id,
sender_device=sender_device or "local",
recipient_device=recipient_device,
files=[
TaildropFile(
filename=file_path_obj.name,
size=file_size,
checksum=checksum,
sender=sender_device or "local",
recipient=recipient_device,
status="pending",
created_at=time.time(),
expires_at=time.time() + (expire_hours * 3600),
)
],
status="pending",
progress=0.0,
created_at=time.time(),
)
# Store transfer
self.transfers[transfer_id] = transfer
# Start transfer process
transfer_task = asyncio.create_task(self._process_transfer(transfer_id))
self.active_transfers[transfer_id] = transfer_task
logger.info(
"Taildrop transfer initiated (simulated)",
transfer_id=transfer_id,
filename=file_path_obj.name,
recipient=recipient_device,
size=file_size,
)
return {
"transfer_id": transfer_id,
"status": "initiated",
"filename": file_path_obj.name,
"size": file_size,
"recipient": recipient_device,
"expires_at": transfer.files[0].expires_at,
"message": f"File transfer initiated to {recipient_device}",
"note": "Using simulated transfer (CLI not available)",
}
except Exception as e:
logger.error("Error sending file via Taildrop", error=str(e))
raise TailscaleMCPError(f"Failed to send file: {e}") from e
async def receive_file(
self,
transfer_id: str | None = None,
save_path: str | None = None,
accept_all: bool = False,
) -> dict[str, Any]:
"""Receive files via Taildrop.
Args:
transfer_id: Optional transfer ID to receive (if None, receives all pending)
save_path: Optional custom save path or directory
accept_all: Accept all pending files automatically
Returns:
Reception status and file information
"""
try:
# Use real CLI if available
if self.use_cli and self.cli:
try:
logger.info(
"Receiving files via Tailscale CLI", save_path=save_path
)
# Use CLI to receive files
cli_result = await self.cli.file_receive(
save_path=save_path, accept_all=accept_all
)
if cli_result.get("success"):
logger.info(
"Files received via Tailscale CLI",
output=cli_result.get("output"),
)
return {
"status": "received",
"save_path": save_path or str(self.taildrop_dir),
"output": cli_result.get("output"),
"message": "Files received successfully via Tailscale CLI",
}
else:
raise TailscaleMCPError(
f"CLI receive failed: {cli_result.get('error')}"
)
except Exception as cli_error:
logger.warning(
"CLI receive error, falling back to simulated",
error=str(cli_error),
)
if not isinstance(cli_error, TailscaleMCPError):
raise
# Fallback to simulated receive (requires transfer_id)
if not transfer_id:
raise ValueError(
"transfer_id required when not using CLI. Use CLI mode or provide transfer_id."
)
if transfer_id not in self.transfers:
raise ValueError(f"Transfer not found: {transfer_id}")
transfer = self.transfers[transfer_id]
if transfer.status != "completed":
raise ValueError(f"Transfer not ready for reception: {transfer.status}")
# Determine save path
if not save_path:
save_path = (
self.taildrop_dir
/ f"received_{transfer_id}_{transfer.files[0].filename}"
)
else:
save_path = Path(save_path)
# Create received file record
received_file = TaildropFile(
filename=save_path.name,
size=transfer.files[0].size,
checksum=transfer.files[0].checksum,
sender=transfer.sender_device,
recipient=transfer.recipient_device,
status="received",
created_at=time.time(),
completed_at=time.time(),
)
logger.info(
"File received via Taildrop (simulated)",
transfer_id=transfer_id,
filename=save_path.name,
size=received_file.size,
)
return {
"transfer_id": transfer_id,
"status": "received",
"filename": save_path.name,
"size": received_file.size,
"save_path": str(save_path),
"checksum": received_file.checksum,
"message": "File received successfully",
"note": "Using simulated receive (CLI not available)",
}
except Exception as e:
logger.error("Error receiving file via Taildrop", error=str(e))
raise TailscaleMCPError(f"Failed to receive file: {e}") from e
async def list_transfers(
self, status_filter: str | None = None
) -> list[dict[str, Any]]:
"""List Taildrop transfers.
Args:
status_filter: Optional status filter (pending, completed, failed, expired)
Returns:
List of transfers
"""
try:
transfers = []
current_time = time.time()
for transfer_id, transfer in self.transfers.items():
# Check for expired transfers
if (
transfer.files
and transfer.files[0].expires_at
and current_time > transfer.files[0].expires_at
):
transfer.status = "expired"
if status_filter and transfer.status != status_filter:
continue
transfers.append(
{
"transfer_id": transfer_id,
"sender_device": transfer.sender_device,
"recipient_device": transfer.recipient_device,
"filename": transfer.files[0].filename
if transfer.files
else "unknown",
"size": transfer.files[0].size if transfer.files else 0,
"status": transfer.status,
"progress": transfer.progress,
"created_at": transfer.created_at,
"expires_at": transfer.files[0].expires_at
if transfer.files
else None,
}
)
logger.info(
"Taildrop transfers listed",
total_transfers=len(transfers),
status_filter=status_filter,
)
return transfers
except Exception as e:
logger.error("Error listing Taildrop transfers", error=str(e))
raise TailscaleMCPError(f"Failed to list transfers: {e}") from e
async def cancel_transfer(self, transfer_id: str) -> dict[str, Any]:
"""Cancel a Taildrop transfer.
Args:
transfer_id: Transfer ID to cancel
Returns:
Cancellation status
"""
try:
if transfer_id not in self.transfers:
raise ValueError(f"Transfer not found: {transfer_id}")
transfer = self.transfers[transfer_id]
if transfer.status in ["completed", "cancelled"]:
raise ValueError(
f"Cannot cancel transfer with status: {transfer.status}"
)
# Cancel asyncio task if active
if transfer_id in self.active_transfers:
self.active_transfers[transfer_id].cancel()
del self.active_transfers[transfer_id]
# Update transfer status
transfer.status = "cancelled"
logger.info("Taildrop transfer cancelled", transfer_id=transfer_id)
return {
"transfer_id": transfer_id,
"status": "cancelled",
"message": "Transfer cancelled successfully",
}
except Exception as e:
logger.error("Error cancelling Taildrop transfer", error=str(e))
raise TailscaleMCPError(f"Failed to cancel transfer: {e}") from e
async def get_transfer_status(self, transfer_id: str) -> dict[str, Any]:
"""Get detailed transfer status.
Args:
transfer_id: Transfer ID to check
Returns:
Detailed transfer status
"""
try:
if transfer_id not in self.transfers:
raise ValueError(f"Transfer not found: {transfer_id}")
transfer = self.transfers[transfer_id]
current_time = time.time()
# Check for expiration
if (
transfer.files
and transfer.files[0].expires_at
and current_time > transfer.files[0].expires_at
):
transfer.status = "expired"
return {
"transfer_id": transfer_id,
"sender_device": transfer.sender_device,
"recipient_device": transfer.recipient_device,
"filename": transfer.files[0].filename if transfer.files else "unknown",
"size": transfer.files[0].size if transfer.files else 0,
"status": transfer.status,
"progress": transfer.progress,
"created_at": transfer.created_at,
"expires_at": transfer.files[0].expires_at if transfer.files else None,
"is_expired": transfer.status == "expired",
"estimated_completion": transfer.estimated_completion,
}
except Exception as e:
logger.error("Error getting transfer status", error=str(e))
raise TailscaleMCPError(f"Failed to get transfer status: {e}") from e
async def cleanup_expired_transfers(self) -> dict[str, Any]:
"""Clean up expired transfers.
Returns:
Cleanup summary
"""
try:
current_time = time.time()
expired_count = 0
cleaned_files = []
for transfer_id, transfer in list(self.transfers.items()):
if (
transfer.files
and transfer.files[0].expires_at
and current_time > transfer.files[0].expires_at
):
if transfer.status != "expired":
transfer.status = "expired"
expired_count += 1
# Clean up associated files
file_path = (
self.taildrop_dir
/ f"{transfer_id}_{transfer.files[0].filename}"
)
if file_path.exists():
file_path.unlink()
cleaned_files.append(str(file_path))
logger.info(
"Expired transfers cleaned up",
expired_count=expired_count,
cleaned_files=len(cleaned_files),
)
return {
"status": "completed",
"expired_transfers": expired_count,
"cleaned_files": len(cleaned_files),
"cleaned_file_paths": cleaned_files,
"message": f"Cleaned up {expired_count} expired transfers",
}
except Exception as e:
logger.error("Error cleaning up expired transfers", error=str(e))
raise TailscaleMCPError(f"Failed to cleanup expired transfers: {e}") from e
async def get_taildrop_statistics(self) -> dict[str, Any]:
"""Get Taildrop usage statistics.
Returns:
Statistics summary
"""
try:
time.time()
total_transfers = len(self.transfers)
status_counts = {}
total_size = 0
active_transfers = 0
for transfer in self.transfers.values():
status = transfer.status
status_counts[status] = status_counts.get(status, 0) + 1
if transfer.files:
total_size += transfer.files[0].size
if status in ["pending", "in_progress"]:
active_transfers += 1
# Calculate average transfer time
completed_transfers = [
t
for t in self.transfers.values()
if t.status == "completed" and t.files and t.files[0].completed_at
]
avg_transfer_time = 0
if completed_transfers:
total_time = sum(
t.files[0].completed_at - t.created_at for t in completed_transfers
)
avg_transfer_time = total_time / len(completed_transfers)
return {
"total_transfers": total_transfers,
"active_transfers": active_transfers,
"status_breakdown": status_counts,
"total_data_transferred": total_size,
"average_transfer_time": avg_transfer_time,
"expired_transfers": status_counts.get("expired", 0),
"success_rate": (
status_counts.get("completed", 0) / total_transfers * 100
)
if total_transfers > 0
else 0,
}
except Exception as e:
logger.error("Error getting Taildrop statistics", error=str(e))
raise TailscaleMCPError(f"Failed to get statistics: {e}") from e
async def _calculate_checksum(self, file_path: Path) -> str:
"""Calculate file checksum."""
hash_sha256 = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_sha256.update(chunk)
return hash_sha256.hexdigest()
async def _process_transfer(self, transfer_id: str) -> None:
"""Process a file transfer (simulated)."""
try:
transfer = self.transfers[transfer_id]
# Simulate transfer progress
for progress in [25, 50, 75, 100]:
await asyncio.sleep(1) # Simulate transfer time
transfer.progress = progress
if progress == 100:
transfer.status = "completed"
if transfer.files:
transfer.files[0].status = "completed"
transfer.files[0].completed_at = time.time()
logger.info(
"Taildrop transfer completed",
transfer_id=transfer_id,
filename=transfer.files[0].filename
if transfer.files
else "unknown",
)
break
# Clean up active transfer
if transfer_id in self.active_transfers:
del self.active_transfers[transfer_id]
except asyncio.CancelledError:
transfer.status = "cancelled"
logger.info("Taildrop transfer cancelled", transfer_id=transfer_id)
except Exception as e:
transfer.status = "failed"
logger.error(
"Taildrop transfer failed", transfer_id=transfer_id, error=str(e)
)