"""
Main job server - Scheduled SFTP file fetcher and processor
"""
import os
import sys
import time
import logging
import hashlib
from pathlib import Path
from dotenv import load_dotenv
import schedule
from sftp_client import SFTPClient
from xml_parser import XMLParser
from db_writer import DBWriter
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class JobServer:
"""Main job server for fetching and processing PM XML files"""
def __init__(self):
# SFTP configuration
self.sftp_host = os.getenv("SFTP_HOST")
self.sftp_username = os.getenv("SFTP_USERNAME")
self.sftp_password = os.getenv("SFTP_PASSWORD")
self.sftp_key_path = os.getenv("SFTP_KEY_PATH")
self.sftp_port = int(os.getenv("SFTP_PORT", "22"))
self.sftp_remote_path = os.getenv("SFTP_REMOTE_PATH", "/")
# Database configuration
self.db_host = os.getenv("POSTGRES_HOST", "postgres")
self.db_database = os.getenv("POSTGRES_DB", "pm_data")
self.db_user = os.getenv("POSTGRES_USER", "postgres")
self.db_password = os.getenv("POSTGRES_PASSWORD", "postgres")
self.db_port = int(os.getenv("POSTGRES_PORT", "5432"))
# Local storage configuration
self.local_data_dir = Path(os.getenv("LOCAL_DATA_DIR", "data"))
self.keep_local_files = os.getenv("KEEP_LOCAL_FILES", "false").lower() == "true"
# Ensure local data directory exists
self.local_data_dir.mkdir(parents=True, exist_ok=True)
# Initialize components
self.db_writer = None
self.current_interval = None
def connect_db(self) -> bool:
"""Connect to database"""
try:
self.db_writer = DBWriter(
host=self.db_host,
database=self.db_database,
user=self.db_user,
password=self.db_password,
port=self.db_port
)
return self.db_writer.connect()
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
return False
def get_file_checksum_local(self, file_path: Path) -> str:
"""Calculate MD5 checksum of local file"""
with open(file_path, 'rb') as f:
return hashlib.md5(f.read()).hexdigest()
def process_file(self, filename: str, checksum: str) -> bool:
"""Process a single XML file"""
local_file_path = self.local_data_dir / filename
try:
# Parse XML file
parser = XMLParser(str(local_file_path))
if not parser.parse():
logger.error(f"Failed to parse {filename}")
return False
# Extract data
file_header = parser.get_file_header()
network_element = parser.get_network_element()
intervals = parser.get_measurement_intervals()
alerts = parser.get_threshold_alerts()
quality_indicators = parser.get_data_quality()
# Insert into database
file_id = self.db_writer.insert_file_data(
filename=filename,
checksum=checksum,
file_header=file_header,
network_element=network_element,
intervals=intervals,
alerts=alerts,
quality_indicators=quality_indicators
)
if file_id:
# Delete local file if not keeping
if not self.keep_local_files:
local_file_path.unlink()
logger.info(f"Deleted local file {filename}")
return True
else:
logger.error(f"Failed to insert {filename} into database")
return False
except Exception as e:
logger.error(f"Error processing file {filename}: {e}")
return False
def fetch_and_process(self):
"""Main job: fetch files from SFTP and process them"""
logger.info("Starting fetch and process job")
if not self.db_writer:
if not self.connect_db():
logger.error("Cannot proceed without database connection")
return
# Get processed checksums
processed_checksums = self.db_writer.get_processed_checksums()
logger.info(f"Found {len(processed_checksums)} already processed files")
# Connect to SFTP and get new files
try:
with SFTPClient(
host=self.sftp_host,
username=self.sftp_username,
password=self.sftp_password,
key_path=self.sftp_key_path,
port=self.sftp_port,
remote_path=self.sftp_remote_path
) as sftp_client:
if not sftp_client.connect():
logger.error("Failed to connect to SFTP server")
return
# Get new files
new_files = sftp_client.get_new_files(processed_checksums)
logger.info(f"Found {len(new_files)} new/changed files")
# Download and process each file
for filename, checksum in new_files:
logger.info(f"Processing {filename}")
# Download file
if sftp_client.download_file(filename, str(self.local_data_dir)):
# Verify checksum
local_checksum = self.get_file_checksum_local(self.local_data_dir / filename)
if local_checksum != checksum:
logger.warning(f"Checksum mismatch for {filename}")
continue
# Process file
if self.process_file(filename, checksum):
logger.info(f"Successfully processed {filename}")
else:
logger.error(f"Failed to process {filename}")
else:
logger.error(f"Failed to download {filename}")
except Exception as e:
logger.error(f"Error in fetch_and_process: {e}")
def update_schedule(self):
"""Update schedule based on current interval from database"""
if not self.db_writer:
if not self.connect_db():
return
try:
new_interval = self.db_writer.get_fetch_interval()
if new_interval != self.current_interval:
logger.info(f"Updating schedule interval to {new_interval} minutes")
schedule.clear()
schedule.every(new_interval).minutes.do(self.fetch_and_process)
self.current_interval = new_interval
except Exception as e:
logger.error(f"Failed to update schedule: {e}")
def run(self):
"""Run the job server"""
logger.info("Starting Job Server")
# Initial database connection
if not self.connect_db():
logger.error("Failed to connect to database. Exiting.")
sys.exit(1)
# Get initial interval and schedule
self.current_interval = self.db_writer.get_fetch_interval()
logger.info(f"Initial fetch interval: {self.current_interval} minutes")
schedule.every(self.current_interval).minutes.do(self.fetch_and_process)
# Run initial fetch
self.fetch_and_process()
# Main loop
logger.info("Job server running. Press Ctrl+C to stop.")
try:
while True:
schedule.run_pending()
# Check for interval updates every minute
time.sleep(60)
self.update_schedule()
except KeyboardInterrupt:
logger.info("Shutting down job server")
if self.db_writer:
self.db_writer.disconnect()
if __name__ == "__main__":
server = JobServer()
server.run()