Skip to main content
Glama

Office Word MCP Server

by franlealp1
test_core_download_logic.pyโ€ข10.3 kB
#!/usr/bin/env python3 """ Test the core download link generation logic without external dependencies. """ import os import sys import uuid import sqlite3 import json from datetime import datetime, timedelta from pathlib import Path def test_download_link_generation(): """Test the core logic of download link generation.""" print("๐Ÿงช Testing Core Download Link Generation Logic") print("=" * 60) try: # Step 1: Setup (simulate what create_document_with_download_link does) TEMP_FILES_DIR = Path("/tmp/mcp_files") DB_FILE = TEMP_FILES_DIR / "file_registry.db" def init_temp_storage(): TEMP_FILES_DIR.mkdir(exist_ok=True) conn = sqlite3.connect(DB_FILE) conn.execute(""" CREATE TABLE IF NOT EXISTS temp_files ( file_id TEXT PRIMARY KEY, original_filename TEXT NOT NULL, user_filename TEXT NOT NULL, file_path TEXT NOT NULL, created_at DATETIME NOT NULL, expires_at DATETIME NOT NULL, download_count INTEGER DEFAULT 0 ) """) # Handle schema migration cursor = conn.execute("PRAGMA table_info(temp_files)") columns = [row[1] for row in cursor.fetchall()] if 'user_filename' not in columns: conn.execute("ALTER TABLE temp_files ADD COLUMN user_filename TEXT") conn.execute("UPDATE temp_files SET user_filename = original_filename WHERE user_filename IS NULL") conn.execute("CREATE INDEX IF NOT EXISTS idx_user_filename ON temp_files(user_filename)") conn.commit() conn.close() def register_temp_file(file_path: str, original_filename: str, user_filename: str, cleanup_hours: int = 24) -> str: file_id = str(uuid.uuid4()) created_at = datetime.now() expires_at = created_at + timedelta(hours=cleanup_hours) conn = sqlite3.connect(DB_FILE) conn.execute(""" INSERT INTO temp_files (file_id, original_filename, user_filename, file_path, created_at, expires_at) VALUES (?, ?, ?, ?, ?, ?) """, (file_id, original_filename, user_filename, file_path, created_at.isoformat(), expires_at.isoformat())) conn.commit() conn.close() return file_id def get_transport_config(): # This simulates the function in main.py return { 'transport': 'streamable-http', 'host': '0.0.0.0', 'port': 8000, 'path': '/mcp', 'sse_path': '/sse' } print("๐Ÿ”ง Step 1: Initializing temp storage...") init_temp_storage() print("โœ“ Temp storage initialized") # Step 2: Simulate file creation print("\n๐Ÿ”ง Step 2: Simulating document creation...") filename = "test_products.docx" cleanup_hours = 24 # Create a dummy file (simulate document creation) unique_filename = f"{uuid.uuid4()}_{filename}" temp_file_path = TEMP_FILES_DIR / unique_filename temp_file_path.write_text("Dummy document content for testing") print(f"โœ“ Created dummy file: {temp_file_path}") print(f"โœ“ File size: {temp_file_path.stat().st_size} bytes") # Step 3: Register file and generate download link print("\n๐Ÿ”ง Step 3: Registering file and generating download link...") file_id = register_temp_file(str(temp_file_path), filename, filename, cleanup_hours) print(f"โœ“ File registered with ID: {file_id}") # Generate download URL (this is the key part we're testing) config = get_transport_config() base_url = f"http://{config['host']}:{config['port']}" download_url = f"{base_url}/files/{file_id}" expires_at = datetime.now() + timedelta(hours=cleanup_hours) # Create the result structure that the tool would return tool_result = { "success": True, "message": f"Document {filename} created successfully", "download_url": download_url, "file_id": file_id, "original_filename": filename, "expires_at": expires_at.isoformat(), "cleanup_hours": cleanup_hours } print("โœ“ Download URL generated") print(f"โœ“ Result structure created") # Step 4: Validate the result print("\n๐Ÿ” Step 4: Validating result...") print("Tool Result JSON:") print(json.dumps(tool_result, indent=2)) # Check required fields required_fields = ["success", "download_url", "file_id", "original_filename", "expires_at", "cleanup_hours"] missing_fields = [] for field in required_fields: if field not in tool_result: missing_fields.append(field) else: print(f"โœ“ Field '{field}' present: {tool_result[field]}") if missing_fields: print(f"โŒ Missing required fields: {missing_fields}") return False # Validate URL format expected_url_pattern = f"http://{config['host']}:{config['port']}/files/" if tool_result["download_url"].startswith(expected_url_pattern): print(f"โœ“ Download URL format correct: {tool_result['download_url']}") else: print(f"โŒ Download URL format incorrect. Expected to start with: {expected_url_pattern}") return False # Step 5: Test file retrieval (simulate what HTTP endpoint would do) print("\n๐Ÿ”ง Step 5: Testing file retrieval simulation...") def get_temp_file_info(file_id: str): conn = sqlite3.connect(DB_FILE) cursor = conn.execute(""" SELECT file_id, original_filename, user_filename, file_path, created_at, expires_at, download_count FROM temp_files WHERE file_id = ? """, (file_id,)) row = cursor.fetchone() conn.close() if not row: return None return { "file_id": row[0], "original_filename": row[1], "user_filename": row[2], "file_path": row[3], "created_at": row[4], "expires_at": row[5], "download_count": row[6] } # Test retrieving the file by ID (simulate /files/{file_id} endpoint) file_info = get_temp_file_info(file_id) if file_info: print("โœ“ File retrievable by ID from database") print(f" Database record: {file_info['file_path']}") # Check if file exists if os.path.exists(file_info["file_path"]): print("โœ“ File exists on filesystem") # Check if not expired expires_at_dt = datetime.fromisoformat(file_info["expires_at"]) if datetime.now() <= expires_at_dt: print("โœ“ File not expired") else: print("โŒ File has expired") return False else: print("โŒ File missing from filesystem") return False else: print("โŒ File not found in database") return False # Step 6: Test the actual download URL components print("\n๐Ÿ”— Step 6: Analyzing download URL components...") url_parts = tool_result["download_url"].split("/") print(f"URL parts: {url_parts}") if len(url_parts) >= 4 and url_parts[-2] == "files": extracted_file_id = url_parts[-1] if extracted_file_id == file_id: print(f"โœ“ File ID correctly embedded in URL: {extracted_file_id}") else: print(f"โŒ File ID mismatch. Expected: {file_id}, Got: {extracted_file_id}") return False else: print("โŒ URL structure incorrect") return False # Clean up temp_file_path.unlink() print("โœ“ Test file cleaned up") # Step 7: Summary print("\n" + "=" * 60) print("๐ŸŽ‰ DOWNLOAD LINK GENERATION TEST SUCCESSFUL!") print("\nโœ… What the tool can generate:") print(f" ๐Ÿ“ค Download URL: {tool_result['download_url']}") print(f" ๐Ÿ†” File ID: {tool_result['file_id']}") print(f" ๐Ÿ“… Expires: {tool_result['expires_at']}") print(f" โœ… Success flag: {tool_result['success']}") print("\nโœ… Verification passed:") print(" 1. โœ“ Temp storage initialization") print(" 2. โœ“ File creation and registration") print(" 3. โœ“ Download URL generation") print(" 4. โœ“ Database storage and retrieval") print(" 5. โœ“ File existence verification") print(" 6. โœ“ URL structure validation") print(" 7. โœ“ JSON result format") print("\n๐Ÿ“‹ CONCLUSION:") print(" The create_document_with_download_link tool SHOULD work correctly.") print(" It generates proper download URLs and stores files appropriately.") return True except Exception as e: print(f"โŒ TEST FAILED: {str(e)}") import traceback traceback.print_exc() return False def main(): """Run the core download logic test.""" success = test_download_link_generation() print(f"\n{'SUCCESS' if success else 'FAILURE'}") return 0 if success else 1 if __name__ == "__main__": sys.exit(main())

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/franlealp1/mcp-word'

If you have feedback or need assistance with the MCP directory API, please join our Discord server