"""
SFTP client for fetching files from remote location
"""
import paramiko
import os
from typing import List, Optional
from datetime import datetime
from config import Config
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SFTPClient:
"""SFTP client for file operations"""
def __init__(self):
self.host = Config.SFTP_HOST
self.port = Config.SFTP_PORT
self.username = Config.SFTP_USERNAME
self.password = Config.SFTP_PASSWORD
self.remote_path = Config.SFTP_REMOTE_PATH
self.local_path = Config.SFTP_LOCAL_PATH
self.client = None
self.sftp = None
# Create local directory if it doesn't exist
os.makedirs(self.local_path, exist_ok=True)
def connect(self):
"""Establish SFTP connection"""
try:
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(
hostname=self.host,
port=self.port,
username=self.username,
password=self.password,
timeout=30
)
self.sftp = self.client.open_sftp()
logger.info(f"Connected to SFTP server {self.host}:{self.port}")
return True
except Exception as e:
logger.error(f"Failed to connect to SFTP server: {e}")
return False
def disconnect(self):
"""Close SFTP connection"""
if self.sftp:
self.sftp.close()
if self.client:
self.client.close()
logger.info("Disconnected from SFTP server")
def list_files(self, pattern: str = "*.xml") -> List[str]:
"""List files in remote directory matching pattern"""
try:
files = []
for item in self.sftp.listdir_attr(self.remote_path):
if item.filename.endswith('.xml'):
files.append(item.filename)
return files
except Exception as e:
logger.error(f"Failed to list files: {e}")
return []
def get_file_info(self, filename: str) -> Optional[dict]:
"""Get file information (size, modification time)"""
try:
remote_file_path = f"{self.remote_path}/{filename}"
stat = self.sftp.stat(remote_file_path)
return {
'filename': filename,
'size': stat.st_size,
'mtime': datetime.fromtimestamp(stat.st_mtime)
}
except Exception as e:
logger.error(f"Failed to get file info for {filename}: {e}")
return None
def download_file(self, remote_filename: str, local_filename: Optional[str] = None) -> Optional[str]:
"""Download file from remote to local"""
try:
if local_filename is None:
local_filename = remote_filename
remote_file_path = f"{self.remote_path}/{remote_filename}"
local_file_path = os.path.join(self.local_path, local_filename)
self.sftp.get(remote_file_path, local_file_path)
logger.info(f"Downloaded {remote_filename} to {local_file_path}")
return local_file_path
except Exception as e:
logger.error(f"Failed to download {remote_filename}: {e}")
return None
def file_exists(self, filename: str) -> bool:
"""Check if file exists remotely"""
try:
remote_file_path = f"{self.remote_path}/{filename}"
self.sftp.stat(remote_file_path)
return True
except:
return False
def get_new_files(self, existing_files: List[str]) -> List[str]:
"""Get list of new files not in existing_files"""
all_files = self.list_files()
new_files = [f for f in all_files if f not in existing_files]
return new_files