#!/usr/bin/env python3
"""
Weather Data Scheduler
This script handles two cron jobs:
1. Update weather forecast data every 15 minutes from api.data.gov.my
2. Delete old weather forecast records daily at 12:05 AM
All times are in Malaysia Time (GMT+8).
"""
import os
import sys
import time
import logging
from datetime import datetime, date
from typing import List, Dict, Any
import requests
import schedule
import pytz
import mysql.connector
from mysql.connector import Error as MySQLError
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Telegram configuration
TELEGRAM_BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
TELEGRAM_CHAT_ID = os.getenv('TELEGRAM_CHAT_ID', '')
TELEGRAM_ENABLED = bool(TELEGRAM_BOT_TOKEN and TELEGRAM_CHAT_ID)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('weather_scheduler.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Constants
API_ENDPOINT = "https://api.data.gov.my/weather/forecast"
MALAYSIA_TZ = pytz.timezone('Asia/Kuala_Lumpur')
# Database configuration with Railway service reference support
# Follows the pattern from grammar-im project
# On Railway: Maps MYSQL* (auto-generated) to DB_* (custom) variables
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'port': int(os.getenv('DB_PORT', '3306')),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', ''),
'database': os.getenv('DB_NAME', 'weather_by_met'),
'charset': 'utf8mb4',
'collation': 'utf8mb4_unicode_ci'
}
# ============================================================================
# Database Utilities
# ============================================================================
def get_db_connection():
"""Create and return a MySQL database connection."""
try:
connection = mysql.connector.connect(**DB_CONFIG)
return connection
except MySQLError as e:
logger.error(f"Failed to connect to database: {e}")
raise
def execute_query(query: str, params: tuple = None, fetch: bool = True) -> Any:
"""Execute a SQL query with error handling."""
connection = None
cursor = None
try:
connection = get_db_connection()
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params or ())
if fetch:
results = cursor.fetchall()
return results
else:
connection.commit()
return cursor.rowcount
except MySQLError as e:
if connection:
connection.rollback()
logger.error(f"Query execution failed: {e}")
raise
finally:
if cursor:
cursor.close()
if connection:
connection.close()
# ============================================================================
# Telegram Notification Utilities
# ============================================================================
def send_telegram_message(message: str) -> bool:
"""
Send a message to Telegram channel using Bot API.
Args:
message: Message text to send
Returns:
True if successful, False otherwise
"""
if not TELEGRAM_ENABLED:
logger.debug("Telegram notifications disabled - no credentials in .env")
return False
try:
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
payload = {
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": "HTML"
}
response = requests.post(url, json=payload, timeout=10)
response.raise_for_status()
logger.info("✉️ Telegram notification sent successfully")
return True
except requests.RequestException as e:
logger.error(f"Failed to send Telegram message: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error sending Telegram message: {e}")
return False
def send_telegram_error(job_name: str, error: str) -> bool:
"""
Send an error alert to Telegram channel.
Args:
job_name: Name of the job that failed
error: Error message/details
Returns:
True if successful, False otherwise
"""
current_time = datetime.now(MALAYSIA_TZ).strftime('%Y-%m-%d %H:%M:%S %Z')
message = (
f"<b>⚠️ Weather Scheduler Error</b>\n\n"
f"<b>Job:</b> {job_name}\n"
f"<b>Time:</b> {current_time}\n"
f"<b>Error:</b> <code>{error}</code>"
)
return send_telegram_message(message)
def send_telegram_success(job_name: str, details: str) -> bool:
"""
Send a success notification to Telegram channel.
Args:
job_name: Name of the job that completed
details: Details about the successful operation
Returns:
True if successful, False otherwise
"""
current_time = datetime.now(MALAYSIA_TZ).strftime('%Y-%m-%d %H:%M:%S %Z')
message = (
f"<b>✅ Weather Scheduler Success</b>\n\n"
f"<b>Job:</b> {job_name}\n"
f"<b>Time:</b> {current_time}\n"
f"<b>Details:</b> {details}"
)
return send_telegram_message(message)
# ============================================================================
# Job 1: Update Weather Forecast Data
# ============================================================================
def fetch_weather_data_from_api() -> List[Dict[str, Any]]:
"""
Fetch weather forecast data from the Malaysian government API.
Returns:
List of weather forecast records
Raises:
requests.RequestException: If API request fails
"""
try:
logger.info(f"Fetching weather data from {API_ENDPOINT}")
# Fetch data from API (no limit to get all records)
response = requests.get(API_ENDPOINT, timeout=30)
response.raise_for_status()
data = response.json()
# The API returns data directly or in a 'data' field if meta=true
if isinstance(data, list):
records = data
elif isinstance(data, dict) and 'data' in data:
records = data['data']
else:
logger.error(f"Unexpected API response format: {type(data)}")
return []
logger.info(f"Successfully fetched {len(records)} weather records from API")
return records
except requests.RequestException as e:
logger.error(f"Failed to fetch weather data from API: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error while fetching weather data: {e}")
raise
def filter_old_records(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Filter out weather records with forecast dates older than today (Malaysia Time).
This prevents re-inserting old records into the database. The API returns a rolling
forecast that includes past dates, so we filter them out here.
Args:
records: List of weather forecast records from API
Returns:
List of records with forecast_date >= today (Malaysia Time)
"""
if not records:
return records
# Get today's date in Malaysia timezone
today = datetime.now(MALAYSIA_TZ).date()
filtered_records = []
filtered_count = 0
for record in records:
try:
forecast_date_str = record.get('date', '')
# Parse forecast_date (expected format: YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS)
if forecast_date_str:
# Handle both date-only and datetime formats
forecast_date = datetime.fromisoformat(forecast_date_str.split('T')[0]).date()
if forecast_date >= today:
filtered_records.append(record)
else:
filtered_count += 1
except (ValueError, AttributeError) as e:
logger.warning(f"Failed to parse forecast_date '{forecast_date_str}': {e}")
# Include record if we can't parse the date (fail-safe)
filtered_records.append(record)
if filtered_count > 0:
logger.info(f"Filtered out {filtered_count} old weather records (forecast_date < {today})")
return filtered_records
def upsert_weather_records(records: List[Dict[str, Any]]) -> int:
"""
Insert or update weather records in the database.
Uses INSERT ... ON DUPLICATE KEY UPDATE to efficiently handle updates.
Args:
records: List of weather forecast records from API
Returns:
Number of records affected (inserted or updated)
"""
if not records:
logger.warning("No records to upsert")
return 0
connection = None
cursor = None
affected_rows = 0
try:
connection = get_db_connection()
cursor = connection.cursor()
# Prepare the upsert query
query = """
INSERT INTO weather_forecasts (
location_id, location_name, forecast_date,
morning_forecast, afternoon_forecast, night_forecast,
summary_forecast, summary_when, min_temp, max_temp
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON DUPLICATE KEY UPDATE
location_name = VALUES(location_name),
morning_forecast = VALUES(morning_forecast),
afternoon_forecast = VALUES(afternoon_forecast),
night_forecast = VALUES(night_forecast),
summary_forecast = VALUES(summary_forecast),
summary_when = VALUES(summary_when),
min_temp = VALUES(min_temp),
max_temp = VALUES(max_temp),
updated_at = CURRENT_TIMESTAMP
"""
# Process each record
for record in records:
try:
# Extract location data (nested structure)
location = record.get('location', {})
location_id = location.get('location_id', '')
location_name = location.get('location_name', '')
# Extract forecast data
forecast_date = record.get('date', '')
morning_forecast = record.get('morning_forecast', '')
afternoon_forecast = record.get('afternoon_forecast', '')
night_forecast = record.get('night_forecast', '')
summary_forecast = record.get('summary_forecast', '')
summary_when = record.get('summary_when', '')
min_temp = record.get('min_temp')
max_temp = record.get('max_temp')
# Prepare parameters
params = (
location_id, location_name, forecast_date,
morning_forecast, afternoon_forecast, night_forecast,
summary_forecast, summary_when, min_temp, max_temp
)
# Execute upsert
cursor.execute(query, params)
affected_rows += cursor.rowcount
except Exception as e:
logger.error(f"Failed to upsert record {record.get('location', {}).get('location_id', 'unknown')}: {e}")
continue
# Commit all changes
connection.commit()
logger.info(f"Successfully upserted {affected_rows} weather records")
return affected_rows
except MySQLError as e:
if connection:
connection.rollback()
logger.error(f"Database error during upsert: {e}")
raise
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def update_weather_forecast_job():
"""
Cron Job 1: Update weather forecast data every 15 minutes.
This job fetches the latest weather forecast data from the Malaysian government API,
filters out old records (forecast_date < today in Malaysia Time), and updates the
local database. It runs every 15 minutes.
"""
try:
logger.info("=" * 80)
logger.info("Starting weather forecast update job")
# Fetch data from API
records = fetch_weather_data_from_api()
if not records:
logger.warning("No weather data received from API")
return
# Filter out old records (forecast_date < today in Malaysia Time)
filtered_records = filter_old_records(records)
if not filtered_records:
logger.warning("No valid weather data after filtering old records")
return
# Upsert records into database
affected = upsert_weather_records(filtered_records)
logger.info(f"Weather forecast update completed: {affected} records affected")
logger.info("=" * 80)
except Exception as e:
logger.error(f"Weather forecast update job failed: {e}", exc_info=True)
# Send error alert to Telegram
send_telegram_error("Update Weather Forecast", str(e))
# ============================================================================
# Job 2: Delete Old Weather Records
# ============================================================================
def delete_old_weather_records():
"""
Cron Job 2: Delete old weather forecast records daily at 12:05 AM.
This job removes weather forecast records where the forecast_date is before today.
Runs daily at 12:05 AM Malaysia Time (GMT+8).
"""
try:
logger.info("=" * 80)
logger.info("Starting old weather records cleanup job")
# Get today's date in Malaysia timezone
today = datetime.now(MALAYSIA_TZ).date()
# Delete records older than today
query = "DELETE FROM weather_forecasts WHERE forecast_date < %s"
params = (today,)
deleted_count = execute_query(query, params, fetch=False)
logger.info(f"Cleanup completed: {deleted_count} old weather records deleted")
logger.info("=" * 80)
except Exception as e:
logger.error(f"Old weather records cleanup job failed: {e}", exc_info=True)
# Send error alert to Telegram
send_telegram_error("Delete Old Records", str(e))
# ============================================================================
# Database Schema Initialization
# ============================================================================
def init_database_schema():
"""
Initialize database schema if it doesn't exist.
Creates the weather_forecasts table with proper structure.
"""
connection = None
cursor = None
try:
connection = get_db_connection()
cursor = connection.cursor()
create_table_query = """
CREATE TABLE IF NOT EXISTS weather_forecasts (
id INT AUTO_INCREMENT PRIMARY KEY,
location_id VARCHAR(10) NOT NULL,
location_name VARCHAR(100) NOT NULL,
forecast_date DATE NOT NULL,
morning_forecast VARCHAR(100),
afternoon_forecast VARCHAR(100),
night_forecast VARCHAR(100),
summary_forecast VARCHAR(100),
summary_when VARCHAR(50),
min_temp INT,
max_temp INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
UNIQUE KEY unique_location_date (location_id, forecast_date),
KEY idx_forecast_date (forecast_date),
KEY idx_location_id (location_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
"""
cursor.execute(create_table_query)
connection.commit()
logger.info("✅ Database schema initialized successfully")
return True
except MySQLError as e:
logger.error(f"Failed to initialize database schema: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during schema initialization: {e}")
return False
finally:
if cursor:
cursor.close()
if connection:
connection.close()
# ============================================================================
# Scheduler Setup
# ============================================================================
def setup_scheduler():
"""
Set up the job scheduler with two cron jobs:
1. Update weather forecast every 15 minutes
2. Delete old records daily at 12:05 AM (Malaysia Time)
"""
logger.info("Setting up weather data scheduler")
# Job 1: Update weather forecast every 15 minutes
schedule.every(15).minutes.do(update_weather_forecast_job)
logger.info("✓ Scheduled Job 1: Update weather forecast every 15 minutes")
# Job 2: Delete old records daily at 12:05 AM (Malaysia Time)
# Railway runs on UTC, so 16:05 UTC = 00:05 Malaysia Time (next day)
schedule.every().day.at("16:05").do(delete_old_weather_records)
logger.info("✓ Scheduled Job 2: Delete old records daily at 12:05 AM (Malaysia Time)")
# Run the update job immediately on startup
logger.info("Running initial weather forecast update...")
update_weather_forecast_job()
def run_scheduler():
"""
Run the scheduler loop indefinitely.
This function runs the scheduled jobs and keeps the script running.
"""
logger.info("=" * 80)
logger.info("Weather Data Scheduler Started")
logger.info(f"Current time: {datetime.now(MALAYSIA_TZ).strftime('%Y-%m-%d %H:%M:%S %Z')}")
logger.info("=" * 80)
# Initialize database schema on startup
logger.info("Initializing database schema...")
init_database_schema()
setup_scheduler()
# Run scheduler loop
try:
while True:
schedule.run_pending()
time.sleep(60) # Check every minute
except KeyboardInterrupt:
logger.info("Scheduler stopped by user")
except Exception as e:
logger.error(f"Scheduler error: {e}", exc_info=True)
raise
# ============================================================================
# Main Entry Point
# ============================================================================
if __name__ == "__main__":
try:
run_scheduler()
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)