Skip to main content
Glama

USPTO Final Petition Decisions MCP Server

by john-walkoe
secure_storage.py17.8 kB
""" Windows DPAPI Secure Storage for USPTO API Keys MIGRATION NOTE: This module is being phased out in favor of shared_secure_storage.py. Recommended Migration Path: 1. Use shared_secure_storage.UnifiedSecureStorage for all new code 2. Run storage migration utility to convert existing keys: from fpd_mcp.shared.storage_migration import migrate_legacy_storage migrate_legacy_storage() Benefits of Unified Storage: - Single-key-per-file architecture (more secure isolation) - Uses centralized dpapi_crypto module (no code duplication) - Uses centralized storage_paths module - Consistent with other USPTO MCPs (PFW, PTAB, Citations) This module will be maintained for backward compatibility but may be deprecated in a future version. --- This module provides secure storage and retrieval of USPTO API keys using Windows Data Protection API (DPAPI). DPAPI encrypts data per-user and per-machine, so only the same user on the same machine can decrypt it. No PowerShell execution policies or external dependencies required - uses only Python ctypes. """ import ctypes import ctypes.wintypes import json import os import secrets import sys from pathlib import Path from typing import Optional, Dict class DATA_BLOB(ctypes.Structure): """Windows DATA_BLOB structure for DPAPI operations.""" _fields_ = [ ('cbData', ctypes.wintypes.DWORD), ('pbData', ctypes.POINTER(ctypes.c_char)) ] def _get_data_from_blob(blob: DATA_BLOB) -> bytes: """Extract bytes from a DATA_BLOB structure.""" if not blob.cbData: return b'' cbData = int(blob.cbData) pbData = blob.pbData buffer = ctypes.create_string_buffer(cbData) ctypes.memmove(buffer, pbData, cbData) ctypes.windll.kernel32.LocalFree(pbData) return buffer.raw def encrypt_data(data: bytes, description: str = "USPTO FPD API Key") -> bytes: """ Encrypt data using Windows DPAPI with secure random entropy. Args: data: The data to encrypt (API key as bytes) description: Optional description for the encrypted data Returns: Encrypted data as bytes (entropy + encrypted_data) Raises: OSError: If encryption fails RuntimeError: If not running on Windows """ if sys.platform != "win32": raise RuntimeError("DPAPI is only available on Windows") # Generate cryptographically secure random entropy (32 bytes = 256 bits) entropy_data = secrets.token_bytes(32) # Prepare input data blob data_in = DATA_BLOB() data_in.pbData = ctypes.cast(ctypes.create_string_buffer(data), ctypes.POINTER(ctypes.c_char)) data_in.cbData = len(data) # Prepare output data blob data_out = DATA_BLOB() # Prepare entropy blob entropy = DATA_BLOB() entropy.pbData = ctypes.cast(ctypes.create_string_buffer(entropy_data), ctypes.POINTER(ctypes.c_char)) entropy.cbData = len(entropy_data) # Call CryptProtectData CRYPTPROTECT_UI_FORBIDDEN = 0x01 result = ctypes.windll.crypt32.CryptProtectData( ctypes.byref(data_in), # pDataIn description, # szDataDescr ctypes.byref(entropy), # pOptionalEntropy None, # pvReserved None, # pPromptStruct CRYPTPROTECT_UI_FORBIDDEN, # dwFlags ctypes.byref(data_out) # pDataOut ) if not result: error_code = ctypes.windll.kernel32.GetLastError() raise OSError(f"CryptProtectData failed with error code: {error_code}") # Extract encrypted data encrypted_data = _get_data_from_blob(data_out) # Return entropy + encrypted data (entropy needed for decryption) return entropy_data + encrypted_data def decrypt_data(encrypted_data: bytes) -> bytes: """ Decrypt data using Windows DPAPI. Args: encrypted_data: The encrypted data (entropy + encrypted_data) Returns: Decrypted data as bytes Raises: OSError: If decryption fails RuntimeError: If not running on Windows ValueError: If encrypted data format is invalid """ if sys.platform != "win32": raise RuntimeError("DPAPI is only available on Windows") # Validate minimum data length (32 bytes entropy + some encrypted data) if len(encrypted_data) < 32: raise ValueError("Invalid encrypted data format: too short") # Extract entropy (first 32 bytes) and actual encrypted data entropy_data = encrypted_data[:32] actual_encrypted_data = encrypted_data[32:] # Prepare input data blob with actual encrypted data data_in = DATA_BLOB() data_in.pbData = ctypes.cast(ctypes.create_string_buffer(actual_encrypted_data), ctypes.POINTER(ctypes.c_char)) data_in.cbData = len(actual_encrypted_data) # Prepare output data blob data_out = DATA_BLOB() # Prepare entropy blob with extracted entropy entropy = DATA_BLOB() entropy.pbData = ctypes.cast(ctypes.create_string_buffer(entropy_data), ctypes.POINTER(ctypes.c_char)) entropy.cbData = len(entropy_data) # Prepare description pointer description_ptr = ctypes.wintypes.LPWSTR() # Call CryptUnprotectData CRYPTPROTECT_UI_FORBIDDEN = 0x01 result = ctypes.windll.crypt32.CryptUnprotectData( ctypes.byref(data_in), # pDataIn ctypes.byref(description_ptr), # ppszDataDescr ctypes.byref(entropy), # pOptionalEntropy None, # pvReserved None, # pPromptStruct CRYPTPROTECT_UI_FORBIDDEN, # dwFlags ctypes.byref(data_out) # pDataOut ) if not result: error_code = ctypes.windll.kernel32.GetLastError() raise OSError(f"CryptUnprotectData failed with error code: {error_code}") # Clean up description if description_ptr.value: ctypes.windll.kernel32.LocalFree(description_ptr) # Extract decrypted data decrypted_data = _get_data_from_blob(data_out) return decrypted_data class SecureStorage: """Secure storage manager for USPTO API keys using Windows DPAPI.""" def __init__(self, storage_file: Optional[str] = None): """ Initialize secure storage with PFW priority. Args: storage_file: Path to storage file. If None, uses priority-based detection. """ if storage_file is None: # Priority 1: Check for PFW shared storage (if PFW is installed) pfw_storage = os.path.join(os.path.expanduser("~"), ".uspto_pfw_secure_keys") fpd_storage = os.path.join(os.path.expanduser("~"), ".uspto_fpd_secure_keys") if os.path.exists(pfw_storage): storage_file = pfw_storage self.using_shared_pfw_storage = True else: storage_file = fpd_storage self.using_shared_pfw_storage = False else: # Custom path provided self.using_shared_pfw_storage = storage_file.endswith(".uspto_pfw_secure_keys") self.storage_file = Path(storage_file) def store_api_key(self, api_key: str, key_name: str = "USPTO_API_KEY") -> bool: """ Store API key securely using Windows DPAPI. Args: api_key: The API key to store key_name: Name of the key (USPTO_API_KEY or MISTRAL_API_KEY) Returns: True if successful, False otherwise """ try: if sys.platform != "win32": # Fall back to environment variable on non-Windows return False # Basic validation - API keys should be reasonable length if not api_key or len(api_key) < 10: raise ValueError(f"Invalid {key_name} format") # Warn if storing to shared PFW storage if self.using_shared_pfw_storage: import logging logger = logging.getLogger(__name__) logger.info(f"Storing {key_name} to shared PFW storage: {self.storage_file}") # Load existing keys or create new structure keys_data = self._load_keys_data() # Add/update the key keys_data[key_name] = api_key # Encrypt the entire keys structure json_data = json.dumps(keys_data) encrypted_data = encrypt_data(json_data.encode('utf-8')) # Write to file self.storage_file.write_bytes(encrypted_data) # Set restrictive permissions (Windows) os.chmod(self.storage_file, 0o600) return True except Exception: return False def get_api_key(self, key_name: str = "USPTO_API_KEY") -> Optional[str]: """ Retrieve API key from secure storage. Args: key_name: Name of the key to retrieve (USPTO_API_KEY or MISTRAL_API_KEY) Returns: The decrypted API key, or None if not found/failed """ try: if sys.platform != "win32": # Fall back to environment variable on non-Windows return os.environ.get(key_name) if not self.storage_file.exists(): # Fall back to environment variable if no secure storage return os.environ.get(key_name) # Load and decrypt all keys keys_data = self._load_keys_data() # Return the requested key or fall back to environment variable if key_name in keys_data: return keys_data[key_name] else: return os.environ.get(key_name) except Exception: # Fall back to environment variable on any error return os.environ.get(key_name) def _load_keys_data(self) -> Dict[str, str]: """Load and decrypt the keys data structure with backward compatibility.""" try: if not self.storage_file.exists(): return {} # Read encrypted data encrypted_data = self.storage_file.read_bytes() try: # Try new format first (entropy + encrypted_data) decrypted_data = decrypt_data(encrypted_data) except (ValueError, OSError): # Fall back to old format with hardcoded entropy try: decrypted_data = self._decrypt_legacy_data(encrypted_data) # Re-encrypt with new secure format self._migrate_to_secure_format(decrypted_data) except Exception: return {} json_data = decrypted_data.decode('utf-8') # Parse JSON keys_data = json.loads(json_data) # Validate it's a dictionary if not isinstance(keys_data, dict): return {} return keys_data except Exception: return {} def _decrypt_legacy_data(self, encrypted_data: bytes) -> bytes: """Decrypt data using legacy hardcoded entropy for backward compatibility.""" if sys.platform != "win32": raise RuntimeError("DPAPI is only available on Windows") # Prepare input data blob data_in = DATA_BLOB() data_in.pbData = ctypes.cast(ctypes.create_string_buffer(encrypted_data), ctypes.POINTER(ctypes.c_char)) data_in.cbData = len(encrypted_data) # Prepare output data blob data_out = DATA_BLOB() # Legacy hardcoded entropy entropy_data = b"uspto_fpd_entropy_v1" entropy = DATA_BLOB() entropy.pbData = ctypes.cast(ctypes.create_string_buffer(entropy_data), ctypes.POINTER(ctypes.c_char)) entropy.cbData = len(entropy_data) # Prepare description pointer description_ptr = ctypes.wintypes.LPWSTR() # Call CryptUnprotectData CRYPTPROTECT_UI_FORBIDDEN = 0x01 result = ctypes.windll.crypt32.CryptUnprotectData( ctypes.byref(data_in), # pDataIn ctypes.byref(description_ptr), # ppszDataDescr ctypes.byref(entropy), # pOptionalEntropy None, # pvReserved None, # pPromptStruct CRYPTPROTECT_UI_FORBIDDEN, # dwFlags ctypes.byref(data_out) # pDataOut ) if not result: error_code = ctypes.windll.kernel32.GetLastError() raise OSError(f"Legacy decryption failed with error code: {error_code}") # Clean up description if description_ptr.value: ctypes.windll.kernel32.LocalFree(description_ptr) # Extract decrypted data return _get_data_from_blob(data_out) def _migrate_to_secure_format(self, decrypted_data: bytes): """Migrate old format to new secure format.""" try: # Re-encrypt with new secure random entropy new_encrypted_data = encrypt_data(decrypted_data) # Write back to file with new format self.storage_file.write_bytes(new_encrypted_data) # Set restrictive permissions os.chmod(self.storage_file, 0o600) except Exception: # Migration failed, but don't break existing functionality pass def has_secure_key(self, key_name: str = "USPTO_API_KEY") -> bool: """ Check if a secure key is stored. Args: key_name: Name of the key to check Returns: True if secure key exists and can be decrypted """ try: api_key = self.get_api_key(key_name) return api_key is not None and len(api_key) >= 10 except Exception: return False def remove_secure_key(self) -> bool: """ Remove the secure key file. Returns: True if successful or file doesn't exist """ try: if self.storage_file.exists(): self.storage_file.unlink() return True except Exception: return False def get_storage_stats(self) -> Dict: """ Get storage statistics for diagnostic purposes. Returns: Dictionary with storage file information """ keys_data = self._load_keys_data() return { 'storage_file_path': str(self.storage_file), 'storage_file_exists': self.storage_file.exists(), 'using_shared_pfw_storage': self.using_shared_pfw_storage, 'storage_type': 'Shared PFW Storage' if self.using_shared_pfw_storage else 'FPD Local Storage', 'platform': sys.platform, 'stored_keys_count': len(keys_data) } def list_stored_keys(self) -> list: """ List all keys stored in secure storage. Returns: List of key names """ keys_data = self._load_keys_data() return list(keys_data.keys()) def get_secure_api_key(key_name: str = "USPTO_API_KEY") -> Optional[str]: """ Convenience function to get API key from secure storage. Args: key_name: Name of the key to retrieve (USPTO_API_KEY or MISTRAL_API_KEY) Returns: The API key, or None if not available """ storage = SecureStorage() return storage.get_api_key(key_name) def store_secure_api_key(api_key: str, key_name: str = "USPTO_API_KEY") -> bool: """ Convenience function to store API key securely. Args: api_key: The API key to store key_name: Name of the key (USPTO_API_KEY or MISTRAL_API_KEY) Returns: True if successful """ storage = SecureStorage() return storage.store_api_key(api_key, key_name) if __name__ == "__main__": # Simple test/demo with proper logging import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) if len(sys.argv) > 1: if sys.argv[1] == "test": # Test encryption/decryption test_data = "test_uspto_key_123456789" logger.info("Testing DPAPI encryption/decryption...") try: encrypted = encrypt_data(test_data.encode('utf-8')) logger.info(f"Encrypted: {len(encrypted)} bytes") decrypted = decrypt_data(encrypted) decrypted_str = decrypted.decode('utf-8') logger.info(f"Decrypted: {decrypted_str}") if decrypted_str == test_data: logger.info("[SUCCESS] DPAPI test PASSED") else: logger.error("[FAILED] DPAPI test FAILED") except Exception as e: logger.error(f"[FAILED] DPAPI test FAILED: {e}") elif sys.argv[1] == "store": if len(sys.argv) > 2: success = store_secure_api_key(sys.argv[2]) if success: logger.info("[SUCCESS] API key stored securely") else: logger.error("[FAILED] Failed to store API key") else: logger.info("Usage: python secure_storage.py store <api_key>") elif sys.argv[1] == "get": api_key = get_secure_api_key() if api_key: logger.info(f"API key: {api_key[:10]}...") else: logger.warning("No API key found") else: logger.info("Usage:") logger.info(" python secure_storage.py test - Test DPAPI functionality") logger.info(" python secure_storage.py store <key> - Store API key") logger.info(" python secure_storage.py get - Retrieve API key")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/john-walkoe/uspto_fpd_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server