"""
SFTP Client for fetching XML files from remote SFTP server
"""
import os
import hashlib
import logging
from pathlib import Path
from typing import List, Tuple, Optional
import paramiko
from paramiko import SSHClient
from paramiko.sftp_client import SFTPClient as ParamikoSFTPClient
logger = logging.getLogger(__name__)
class SFTPClient:
"""SFTP client for fetching files from remote server"""
def __init__(self, host: str, username: str, password: Optional[str] = None,
key_path: Optional[str] = None, port: int = 22, remote_path: str = "/"):
self.host = host
self.username = username
self.password = password
self.key_path = key_path
self.port = port
self.remote_path = remote_path.rstrip('/') + '/'
self.client: Optional[SSHClient] = None
self.sftp: Optional[ParamikoSFTPClient] = None
def connect(self) -> bool:
"""Establish SFTP connection"""
try:
self.client = SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# Use key-based auth if key_path is provided, otherwise use password
if self.key_path and os.path.exists(self.key_path):
private_key = paramiko.RSAKey.from_private_key_file(self.key_path)
self.client.connect(
hostname=self.host,
port=self.port,
username=self.username,
pkey=private_key,
timeout=30
)
elif self.password:
self.client.connect(
hostname=self.host,
port=self.port,
username=self.username,
password=self.password,
timeout=30
)
else:
raise ValueError("Either password or key_path must be provided")
self.sftp = self.client.open_sftp()
logger.info(f"Connected to SFTP server {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to SFTP server: {e}")
return False
def disconnect(self):
"""Close SFTP connection"""
if self.sftp:
self.sftp.close()
if self.client:
self.client.close()
logger.info("Disconnected from SFTP server")
def list_files(self, pattern: str = "*.xml") -> List[str]:
"""List XML files in remote directory"""
if not self.sftp:
raise ConnectionError("Not connected to SFTP server")
try:
files = []
for item in self.sftp.listdir_attr(self.remote_path):
if item.filename.endswith('.xml') or pattern == "*":
files.append(item.filename)
logger.info(f"Found {len(files)} files in {self.remote_path}")
return files
except Exception as e:
logger.error(f"Failed to list files: {e}")
return []
def get_file_checksum(self, filename: str) -> Optional[str]:
"""Get MD5 checksum of remote file"""
if not self.sftp:
raise ConnectionError("Not connected to SFTP server")
try:
remote_file_path = self.remote_path + filename
# Read file content and calculate checksum
with self.sftp.open(remote_file_path, 'rb') as remote_file:
file_content = remote_file.read()
checksum = hashlib.md5(file_content).hexdigest()
return checksum
except Exception as e:
logger.error(f"Failed to get checksum for {filename}: {e}")
return None
def download_file(self, filename: str, local_path: str) -> bool:
"""Download file from SFTP server to local path"""
if not self.sftp:
raise ConnectionError("Not connected to SFTP server")
try:
remote_file_path = self.remote_path + filename
local_file_path = Path(local_path) / filename
# Create local directory if it doesn't exist
local_file_path.parent.mkdir(parents=True, exist_ok=True)
# Download file
self.sftp.get(remote_file_path, str(local_file_path))
logger.info(f"Downloaded {filename} to {local_file_path}")
return True
except Exception as e:
logger.error(f"Failed to download {filename}: {e}")
return False
def get_new_files(self, processed_checksums: set) -> List[Tuple[str, str]]:
"""
Get list of new or changed files
Returns list of (filename, checksum) tuples
"""
if not self.sftp:
raise ConnectionError("Not connected to SFTP server")
new_files = []
files = self.list_files()
for filename in files:
checksum = self.get_file_checksum(filename)
if checksum and checksum not in processed_checksums:
new_files.append((filename, checksum))
logger.info(f"New/changed file detected: {filename} (checksum: {checksum})")
return new_files
def __enter__(self):
"""Context manager entry"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit"""
self.disconnect()