test_core_download_logic.pyโข10.3 kB
#!/usr/bin/env python3
"""
Test the core download link generation logic without external dependencies.
"""
import os
import sys
import uuid
import sqlite3
import json
from datetime import datetime, timedelta
from pathlib import Path
def test_download_link_generation():
"""Test the core logic of download link generation."""
print("๐งช Testing Core Download Link Generation Logic")
print("=" * 60)
try:
# Step 1: Setup (simulate what create_document_with_download_link does)
TEMP_FILES_DIR = Path("/tmp/mcp_files")
DB_FILE = TEMP_FILES_DIR / "file_registry.db"
def init_temp_storage():
TEMP_FILES_DIR.mkdir(exist_ok=True)
conn = sqlite3.connect(DB_FILE)
conn.execute("""
CREATE TABLE IF NOT EXISTS temp_files (
file_id TEXT PRIMARY KEY,
original_filename TEXT NOT NULL,
user_filename TEXT NOT NULL,
file_path TEXT NOT NULL,
created_at DATETIME NOT NULL,
expires_at DATETIME NOT NULL,
download_count INTEGER DEFAULT 0
)
""")
# Handle schema migration
cursor = conn.execute("PRAGMA table_info(temp_files)")
columns = [row[1] for row in cursor.fetchall()]
if 'user_filename' not in columns:
conn.execute("ALTER TABLE temp_files ADD COLUMN user_filename TEXT")
conn.execute("UPDATE temp_files SET user_filename = original_filename WHERE user_filename IS NULL")
conn.execute("CREATE INDEX IF NOT EXISTS idx_user_filename ON temp_files(user_filename)")
conn.commit()
conn.close()
def register_temp_file(file_path: str, original_filename: str, user_filename: str, cleanup_hours: int = 24) -> str:
file_id = str(uuid.uuid4())
created_at = datetime.now()
expires_at = created_at + timedelta(hours=cleanup_hours)
conn = sqlite3.connect(DB_FILE)
conn.execute("""
INSERT INTO temp_files (file_id, original_filename, user_filename, file_path, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?)
""", (file_id, original_filename, user_filename, file_path, created_at.isoformat(), expires_at.isoformat()))
conn.commit()
conn.close()
return file_id
def get_transport_config():
# This simulates the function in main.py
return {
'transport': 'streamable-http',
'host': '0.0.0.0',
'port': 8000,
'path': '/mcp',
'sse_path': '/sse'
}
print("๐ง Step 1: Initializing temp storage...")
init_temp_storage()
print("โ Temp storage initialized")
# Step 2: Simulate file creation
print("\n๐ง Step 2: Simulating document creation...")
filename = "test_products.docx"
cleanup_hours = 24
# Create a dummy file (simulate document creation)
unique_filename = f"{uuid.uuid4()}_{filename}"
temp_file_path = TEMP_FILES_DIR / unique_filename
temp_file_path.write_text("Dummy document content for testing")
print(f"โ Created dummy file: {temp_file_path}")
print(f"โ File size: {temp_file_path.stat().st_size} bytes")
# Step 3: Register file and generate download link
print("\n๐ง Step 3: Registering file and generating download link...")
file_id = register_temp_file(str(temp_file_path), filename, filename, cleanup_hours)
print(f"โ File registered with ID: {file_id}")
# Generate download URL (this is the key part we're testing)
config = get_transport_config()
base_url = f"http://{config['host']}:{config['port']}"
download_url = f"{base_url}/files/{file_id}"
expires_at = datetime.now() + timedelta(hours=cleanup_hours)
# Create the result structure that the tool would return
tool_result = {
"success": True,
"message": f"Document {filename} created successfully",
"download_url": download_url,
"file_id": file_id,
"original_filename": filename,
"expires_at": expires_at.isoformat(),
"cleanup_hours": cleanup_hours
}
print("โ Download URL generated")
print(f"โ Result structure created")
# Step 4: Validate the result
print("\n๐ Step 4: Validating result...")
print("Tool Result JSON:")
print(json.dumps(tool_result, indent=2))
# Check required fields
required_fields = ["success", "download_url", "file_id", "original_filename", "expires_at", "cleanup_hours"]
missing_fields = []
for field in required_fields:
if field not in tool_result:
missing_fields.append(field)
else:
print(f"โ Field '{field}' present: {tool_result[field]}")
if missing_fields:
print(f"โ Missing required fields: {missing_fields}")
return False
# Validate URL format
expected_url_pattern = f"http://{config['host']}:{config['port']}/files/"
if tool_result["download_url"].startswith(expected_url_pattern):
print(f"โ Download URL format correct: {tool_result['download_url']}")
else:
print(f"โ Download URL format incorrect. Expected to start with: {expected_url_pattern}")
return False
# Step 5: Test file retrieval (simulate what HTTP endpoint would do)
print("\n๐ง Step 5: Testing file retrieval simulation...")
def get_temp_file_info(file_id: str):
conn = sqlite3.connect(DB_FILE)
cursor = conn.execute("""
SELECT file_id, original_filename, user_filename, file_path, created_at, expires_at, download_count
FROM temp_files WHERE file_id = ?
""", (file_id,))
row = cursor.fetchone()
conn.close()
if not row:
return None
return {
"file_id": row[0],
"original_filename": row[1],
"user_filename": row[2],
"file_path": row[3],
"created_at": row[4],
"expires_at": row[5],
"download_count": row[6]
}
# Test retrieving the file by ID (simulate /files/{file_id} endpoint)
file_info = get_temp_file_info(file_id)
if file_info:
print("โ File retrievable by ID from database")
print(f" Database record: {file_info['file_path']}")
# Check if file exists
if os.path.exists(file_info["file_path"]):
print("โ File exists on filesystem")
# Check if not expired
expires_at_dt = datetime.fromisoformat(file_info["expires_at"])
if datetime.now() <= expires_at_dt:
print("โ File not expired")
else:
print("โ File has expired")
return False
else:
print("โ File missing from filesystem")
return False
else:
print("โ File not found in database")
return False
# Step 6: Test the actual download URL components
print("\n๐ Step 6: Analyzing download URL components...")
url_parts = tool_result["download_url"].split("/")
print(f"URL parts: {url_parts}")
if len(url_parts) >= 4 and url_parts[-2] == "files":
extracted_file_id = url_parts[-1]
if extracted_file_id == file_id:
print(f"โ File ID correctly embedded in URL: {extracted_file_id}")
else:
print(f"โ File ID mismatch. Expected: {file_id}, Got: {extracted_file_id}")
return False
else:
print("โ URL structure incorrect")
return False
# Clean up
temp_file_path.unlink()
print("โ Test file cleaned up")
# Step 7: Summary
print("\n" + "=" * 60)
print("๐ DOWNLOAD LINK GENERATION TEST SUCCESSFUL!")
print("\nโ
What the tool can generate:")
print(f" ๐ค Download URL: {tool_result['download_url']}")
print(f" ๐ File ID: {tool_result['file_id']}")
print(f" ๐
Expires: {tool_result['expires_at']}")
print(f" โ
Success flag: {tool_result['success']}")
print("\nโ
Verification passed:")
print(" 1. โ Temp storage initialization")
print(" 2. โ File creation and registration")
print(" 3. โ Download URL generation")
print(" 4. โ Database storage and retrieval")
print(" 5. โ File existence verification")
print(" 6. โ URL structure validation")
print(" 7. โ JSON result format")
print("\n๐ CONCLUSION:")
print(" The create_document_with_download_link tool SHOULD work correctly.")
print(" It generates proper download URLs and stores files appropriately.")
return True
except Exception as e:
print(f"โ TEST FAILED: {str(e)}")
import traceback
traceback.print_exc()
return False
def main():
"""Run the core download logic test."""
success = test_download_link_generation()
print(f"\n{'SUCCESS' if success else 'FAILURE'}")
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())