"""
Job server that periodically fetches files from SFTP and processes them
"""
import time
import os
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from sqlalchemy.orm import Session
from database import SessionLocal, init_db
from sftp_client import SFTPClient
from xml_parser import XMLParser
from data_storage import DataStorage
from config import Config
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class JobServer:
"""Job server for periodic file fetching and processing"""
def __init__(self):
self.scheduler = BackgroundScheduler()
self.sftp_client = SFTPClient()
self.fetch_interval_hours = Config.FETCH_INTERVAL_HOURS
self.running = False
# Initialize database
init_db()
def fetch_and_process_files(self):
"""Fetch new files from SFTP and process them"""
logger.info("Starting file fetch and process job...")
try:
# Connect to SFTP
if not self.sftp_client.connect():
logger.error("Failed to connect to SFTP server")
return
# Get list of already processed files from database
db = SessionLocal()
try:
from database import FileRecord
processed_files = {fr.filename for fr in db.query(FileRecord).all()}
# Get new files
all_files = self.sftp_client.list_files()
new_files = [f for f in all_files if f not in processed_files]
if not new_files:
logger.info("No new files to process")
return
logger.info(f"Found {len(new_files)} new file(s): {new_files}")
# Process each new file
for filename in new_files:
try:
# Download file
local_path = self.sftp_client.download_file(filename)
if not local_path:
continue
# Parse XML
parser = XMLParser(local_path)
parsed_data = parser.parse()
# Check if already processed (by checksum)
storage = DataStorage(db)
if storage.file_already_processed(filename):
logger.info(f"File {filename} already processed, skipping")
os.remove(local_path)
continue
# Save to database
storage.save_file_data(parsed_data)
logger.info(f"Successfully processed {filename}")
# Clean up local file
os.remove(local_path)
except Exception as e:
logger.error(f"Error processing file {filename}: {e}")
continue
finally:
db.close()
except Exception as e:
logger.error(f"Error in fetch_and_process_files: {e}")
finally:
self.sftp_client.disconnect()
def start(self):
"""Start the job server"""
if self.running:
logger.warning("Job server is already running")
return
# Schedule the job
trigger = IntervalTrigger(hours=self.fetch_interval_hours)
self.scheduler.add_job(
self.fetch_and_process_files,
trigger=trigger,
id='fetch_pm_files',
name='Fetch and process PM counter files',
replace_existing=True
)
# Run immediately on start
self.fetch_and_process_files()
# Start scheduler
self.scheduler.start()
self.running = True
logger.info(f"Job server started. Fetch interval: {self.fetch_interval_hours} hours")
def stop(self):
"""Stop the job server"""
if not self.running:
return
self.scheduler.shutdown()
self.running = False
logger.info("Job server stopped")
def update_fetch_interval(self, hours: float):
"""Update the fetch interval"""
self.fetch_interval_hours = hours
if self.running:
self.stop()
self.start()
logger.info(f"Fetch interval updated to {hours} hours")
if __name__ == "__main__":
server = JobServer()
server.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down job server...")
server.stop()