"""
URL utility functions for handling remote file downloads.
"""
import os
import tempfile
import requests
from typing import Tuple, Optional
from urllib.parse import urlparse, unquote
import hashlib
import time
def is_url(path: str) -> bool:
"""
Check if the given path is a URL.
Args:
path: Path or URL to check
Returns:
True if path is a URL, False otherwise
"""
if not isinstance(path, str):
return False
try:
result = urlparse(path)
# Check if it has a scheme (http, https, s3, gs, etc.) and a netloc
return bool(result.scheme and result.netloc)
except Exception:
return False
def get_filename_from_url(url: str) -> str:
"""
Extract filename from URL, or generate one based on URL hash.
Args:
url: The URL to extract filename from
Returns:
Filename string
"""
try:
parsed = urlparse(url)
path = unquote(parsed.path)
filename = os.path.basename(path)
# If no filename or doesn't end with .docx, generate one
if not filename or not filename.endswith('.docx'):
# Generate filename from URL hash
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
filename = f"downloaded_{url_hash}.docx"
return filename
except Exception:
# Fallback to timestamp-based filename
timestamp = int(time.time())
return f"downloaded_{timestamp}.docx"
def download_file_from_url(url: str, timeout: int = 30) -> Tuple[bool, str, Optional[str]]:
"""
Download a file from a URL to a temporary location.
Args:
url: The URL to download from
timeout: Request timeout in seconds (default: 30)
Returns:
Tuple of (success, message, temp_file_path)
"""
try:
# Validate URL
if not is_url(url):
return False, f"Invalid URL: {url}", None
# Create temp directory if it doesn't exist
temp_dir = tempfile.gettempdir()
mcp_temp_dir = os.path.join(temp_dir, 'word_mcp_downloads')
os.makedirs(mcp_temp_dir, exist_ok=True)
# Get filename and create temp file path
filename = get_filename_from_url(url)
temp_path = os.path.join(mcp_temp_dir, filename)
# Download the file
response = requests.get(url, timeout=timeout, stream=True)
response.raise_for_status()
# Save to temp file
with open(temp_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True, f"File downloaded successfully to {temp_path}", temp_path
except requests.exceptions.Timeout:
return False, f"Download timeout after {timeout} seconds", None
except requests.exceptions.RequestException as e:
return False, f"Failed to download file from URL: {str(e)}", None
except IOError as e:
return False, f"Failed to save downloaded file: {str(e)}", None
except Exception as e:
return False, f"Unexpected error downloading file: {str(e)}", None
def resolve_file_path(path: str, timeout: int = 30) -> Tuple[bool, str, Optional[str], bool]:
"""
Resolve a file path, downloading from URL if necessary.
This is the main function to use when you need to handle either local files or URLs.
Args:
path: Local file path or URL
timeout: Timeout for URL downloads in seconds (default: 30)
Returns:
Tuple of (success, message/error, resolved_path, is_temp_file)
- success: True if file is accessible
- message: Success message or error description
- resolved_path: Local file path (original or downloaded temp file)
- is_temp_file: True if file was downloaded and should be cleaned up
"""
# Check if it's a URL
if is_url(path):
success, message, temp_path = download_file_from_url(path, timeout)
return success, message, temp_path, True
# It's a local path
if not os.path.exists(path):
return False, f"File not found: {path}", None, False
if not os.path.isfile(path):
return False, f"Path is not a file: {path}", None, False
return True, f"Using local file: {path}", path, False
def cleanup_temp_file(file_path: Optional[str]) -> None:
"""
Clean up a temporary downloaded file.
Args:
file_path: Path to the temporary file to delete
"""
if file_path and os.path.exists(file_path):
try:
os.remove(file_path)
except Exception:
# Silently fail - temp files will be cleaned up eventually
pass
def cleanup_old_temp_files(max_age_hours: int = 24) -> int:
"""
Clean up old temporary files from the download directory.
Args:
max_age_hours: Maximum age of files to keep in hours (default: 24)
Returns:
Number of files deleted
"""
try:
temp_dir = tempfile.gettempdir()
mcp_temp_dir = os.path.join(temp_dir, 'word_mcp_downloads')
if not os.path.exists(mcp_temp_dir):
return 0
deleted_count = 0
current_time = time.time()
max_age_seconds = max_age_hours * 3600
for filename in os.listdir(mcp_temp_dir):
file_path = os.path.join(mcp_temp_dir, filename)
# Skip if not a file
if not os.path.isfile(file_path):
continue
# Check file age
file_age = current_time - os.path.getmtime(file_path)
if file_age > max_age_seconds:
try:
os.remove(file_path)
deleted_count += 1
except Exception:
# Skip files that can't be deleted
continue
return deleted_count
except Exception:
return 0