Skip to main content
Glama
ApoorvBrooklyn

PM Counter Monitoring MCP Server

job_server.py5.05 kB
""" Job server that periodically fetches files from SFTP and processes them """ import time import os from datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy.orm import Session from database import SessionLocal, init_db from sftp_client import SFTPClient from xml_parser import XMLParser from data_storage import DataStorage from config import Config import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class JobServer: """Job server for periodic file fetching and processing""" def __init__(self): self.scheduler = BackgroundScheduler() self.sftp_client = SFTPClient() self.fetch_interval_hours = Config.FETCH_INTERVAL_HOURS self.running = False # Initialize database init_db() def fetch_and_process_files(self): """Fetch new files from SFTP and process them""" logger.info("Starting file fetch and process job...") try: # Connect to SFTP if not self.sftp_client.connect(): logger.error("Failed to connect to SFTP server") return # Get list of already processed files from database db = SessionLocal() try: from database import FileRecord processed_files = {fr.filename for fr in db.query(FileRecord).all()} # Get new files all_files = self.sftp_client.list_files() new_files = [f for f in all_files if f not in processed_files] if not new_files: logger.info("No new files to process") return logger.info(f"Found {len(new_files)} new file(s): {new_files}") # Process each new file for filename in new_files: try: # Download file local_path = self.sftp_client.download_file(filename) if not local_path: continue # Parse XML parser = XMLParser(local_path) parsed_data = parser.parse() # Check if already processed (by checksum) storage = DataStorage(db) if storage.file_already_processed(filename): logger.info(f"File {filename} already processed, skipping") os.remove(local_path) continue # Save to database storage.save_file_data(parsed_data) logger.info(f"Successfully processed {filename}") # Clean up local file os.remove(local_path) except Exception as e: logger.error(f"Error processing file {filename}: {e}") continue finally: db.close() except Exception as e: logger.error(f"Error in fetch_and_process_files: {e}") finally: self.sftp_client.disconnect() def start(self): """Start the job server""" if self.running: logger.warning("Job server is already running") return # Schedule the job trigger = IntervalTrigger(hours=self.fetch_interval_hours) self.scheduler.add_job( self.fetch_and_process_files, trigger=trigger, id='fetch_pm_files', name='Fetch and process PM counter files', replace_existing=True ) # Run immediately on start self.fetch_and_process_files() # Start scheduler self.scheduler.start() self.running = True logger.info(f"Job server started. Fetch interval: {self.fetch_interval_hours} hours") def stop(self): """Stop the job server""" if not self.running: return self.scheduler.shutdown() self.running = False logger.info("Job server stopped") def update_fetch_interval(self, hours: float): """Update the fetch interval""" self.fetch_interval_hours = hours if self.running: self.stop() self.start() logger.info(f"Fetch interval updated to {hours} hours") if __name__ == "__main__": server = JobServer() server.start() try: while True: time.sleep(1) except KeyboardInterrupt: logger.info("Shutting down job server...") server.stop()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ApoorvBrooklyn/Networking-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server