"""
LMS Automation module for PAF-IAST LMS system
Handles login, session management, and data extraction using Playwright
"""
import asyncio
import logging
import os
import time
from typing import Dict, List, Optional, Any
from datetime import datetime, timedelta
import json
import requests
from bs4 import BeautifulSoup
from playwright.async_api import (
async_playwright,
Browser,
BrowserContext,
Page,
TimeoutError as PlaywrightTimeoutError,
)
from captcha_solver import CaptchaSolver
from session_manager import SessionManager
from config import Config
logger = logging.getLogger(__name__)
class LMSAutomation:
"""Main LMS automation class for PAF-IAST using Playwright"""
def __init__(self):
self.config = Config()
self.session_manager = SessionManager()
self.captcha_solver = CaptchaSolver()
self.playwright = None
self.browser = None
self.context = None
self.page = None
self.session = requests.Session()
self.is_logged_in = False
self.base_url = "https://lms.paf-iast.edu.pk"
self.login_url = f"{self.base_url}/StudentAccount/Account/Login"
# Browser configuration - uses config setting for headless mode
self.headless_mode = self.config.BROWSER_HEADLESS
logger.info(
f"🚀 LMS Automation initialized with headless mode: {self.headless_mode}"
)
def set_headless_mode(self, headless: bool = True):
"""Set browser headless mode (True = no browser window, False = visible browser)"""
self.headless_mode = headless
logger.info(f"Browser headless mode set to: {headless}")
async def _setup_browser(
self, headless: bool = True
) -> Page: # Changed default to True
"""Setup Playwright browser and page"""
try:
if self.playwright is None:
self.playwright = await async_playwright().start()
if self.browser is None:
# Launch browser with appropriate options (optimized for headless)
browser_args = [
"--no-sandbox",
"--disable-dev-shm-usage",
"--disable-gpu",
"--disable-blink-features=AutomationControlled",
]
# Add additional headless optimization args
if headless:
browser_args.extend(
[
"--disable-extensions",
# Note: NOT disabling images since CAPTCHA images are required
"--disable-javascript-harmony-shipping",
"--disable-background-timer-throttling",
"--disable-renderer-backgrounding",
"--disable-backgrounding-occluded-windows",
"--disable-ipc-flooding-protection",
"--disable-hang-monitor",
"--disable-features=TranslateUI",
"--disable-default-apps",
"--disable-sync",
"--disable-background-networking",
"--disable-plugins",
"--disable-plugins-discovery",
"--disable-notifications",
"--disable-web-security", # Can help with automation detection
"--no-first-run",
"--no-default-browser-check",
"--disable-component-extensions-with-background-pages",
"--disable-client-side-phishing-detection",
"--disable-popup-blocking",
]
)
logger.info(f"🌐 Launching Chromium browser (headless: {headless})...")
self.browser = await self.playwright.chromium.launch(
headless=headless,
args=browser_args,
)
if self.context is None:
# Create browser context with realistic settings (optimized for headless)
context_options = {
"viewport": {"width": 1920, "height": 1080},
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}
# Add extra settings for headless mode
if headless:
context_options.update(
{
"java_script_enabled": True,
"ignore_https_errors": True,
}
)
self.context = await self.browser.new_context(**context_options)
# Set timeouts from configuration
browser_timeout_ms = self.config.BROWSER_TIMEOUT * 1000
page_load_timeout_ms = self.config.PAGE_LOAD_TIMEOUT * 1000
self.context.set_default_timeout(browser_timeout_ms)
self.context.set_default_navigation_timeout(page_load_timeout_ms)
logger.info(
f"⏱️ Browser timeouts set - Default: {browser_timeout_ms}ms, Navigation: {page_load_timeout_ms}ms"
)
if self.page is None:
self.page = await self.context.new_page()
return self.page
except Exception as e:
logger.error(f"Browser setup error: {str(e)}")
raise
async def _ensure_page(self, headless: bool = True) -> Page:
"""Ensure we have a working page connection"""
try:
# Check if page exists and is responsive
if self.page is not None:
try:
# Test if page is still responsive
await self.page.evaluate("document.readyState")
return self.page
except Exception:
# Page is not responsive, close and recreate
await self._cleanup_browser()
# Create new page
return await self._setup_browser(headless)
except Exception as e:
logger.error(f"Page setup error: {str(e)}")
raise
async def _wait_for_page_load(self, timeout: int = 30000) -> bool:
"""Wait for page to fully load with multiple checks"""
try:
if self.page is None:
return False
# Wait for network to be idle (no requests for 500ms)
await self.page.wait_for_load_state("networkidle", timeout=timeout)
# Wait for document ready state
await self.page.wait_for_function(
"document.readyState === 'complete'", timeout=timeout
)
# Additional wait for any dynamic content
await asyncio.sleep(2)
return True
except PlaywrightTimeoutError:
logger.warning("Page load timeout - continuing anyway")
return False
except Exception as e:
logger.error(f"Page load check error: {str(e)}")
return False
async def login(
self,
username: str,
password: str,
lms_url: str = None,
save_session: bool = True,
) -> Dict[str, Any]:
"""
Login to PAF-IAST LMS using Playwright
Args:
username: Student registration number
password: Student password
lms_url: LMS URL (optional, defaults to PAF-IAST)
save_session: Whether to save session for future use
Returns:
Dict with login status and message
"""
try:
# Check if we have a valid saved session
if save_session and self.session_manager.load_session():
if await self._verify_session():
self.is_logged_in = True
return {
"success": True,
"message": "Logged in using saved session",
"timestamp": datetime.now().isoformat(),
}
# Fresh login required
logger.info(
f"Starting fresh login process (headless mode: {self.headless_mode})"
)
# Ensure we have a working page using instance configuration
page = await self._ensure_page(headless=self.headless_mode)
# Set up dialog handler for JavaScript alerts (like "invalid captcha")
dialog_messages = []
async def handle_dialog(dialog):
"""Handle JavaScript alerts/dialogs"""
message = dialog.message
dialog_messages.append(message)
logger.info(f"🔔 JavaScript alert detected: '{message}'")
await dialog.accept() # Accept the alert to continue
page.on("dialog", handle_dialog)
# Navigate to login page
logger.info(f"Navigating to: {self.login_url}")
await page.goto(self.login_url)
# Wait for page to fully load
logger.info("Waiting for page to load completely...")
await self._wait_for_page_load(timeout=60000)
# Additional wait for any slow-loading elements
await asyncio.sleep(3)
# Wait for login form to load with retry mechanism
logger.info("Looking for login form elements...")
# Wait for the page to have the basic structure
try:
await page.wait_for_selector("form", timeout=60000)
logger.info("Form detected, waiting for login fields...")
except PlaywrightTimeoutError:
logger.warning("No form detected, but continuing...")
# Find username field (Registration Number) - PAF-IAST uses "RegNo"
username_field = None
for attempt in range(3): # Try 3 times
try:
logger.info(f"Attempt {attempt + 1}: Looking for username field...")
# Try different selectors
selectors = [
'input[name="RegNo"]',
'input[name="Username"]',
"input#RegNo",
'input[id="RegNo"]',
]
for selector in selectors:
try:
username_field = await page.wait_for_selector(
selector, timeout=10000
)
if username_field:
logger.info(
f"Found username field with selector: {selector}"
)
break
except PlaywrightTimeoutError:
continue
if username_field:
break
if attempt < 2:
logger.warning(
f"Username field not found, retrying in 5 seconds..."
)
await asyncio.sleep(5)
# Refresh the page if needed
if attempt == 1:
logger.info("Refreshing page...")
await page.reload()
await self._wait_for_page_load()
except Exception as e:
logger.error(f"Error finding username field: {str(e)}")
if attempt < 2:
await asyncio.sleep(5)
if username_field is None:
raise Exception("Could not find username field after multiple attempts")
# Find password field with retry
logger.info("Looking for password field...")
password_field = None
password_selectors = [
'input[name="Password"]',
"input#Password",
'input[type="password"]',
]
for selector in password_selectors:
try:
password_field = await page.wait_for_selector(
selector, timeout=10000
)
if password_field:
logger.info(f"Found password field with selector: {selector}")
break
except PlaywrightTimeoutError:
continue
if password_field is None:
raise Exception("Could not find password field")
# Handle CAPTCHA with retry logic for invalid CAPTCHA alerts
logger.info("Starting CAPTCHA solving with retry logic...")
max_login_attempts = 5
login_attempt = 0
while login_attempt < max_login_attempts:
login_attempt += 1
logger.info(f"🔄 Login attempt {login_attempt}/{max_login_attempts}")
# Clear previous dialog messages for this attempt
dialog_messages.clear()
try:
# Solve CAPTCHA
logger.info("Solving CAPTCHA...")
captcha_text = await self._solve_captcha()
if not captcha_text:
logger.warning(
"CAPTCHA solving failed, waiting for auto-refresh..."
)
if login_attempt < max_login_attempts:
# Since CAPTCHAs auto-refresh on the PAF-IAST system, we don't need manual refresh
# Just wait a moment and continue with the next attempt
await asyncio.sleep(
3
) # Wait for any auto-refresh to complete
logger.info(
"⏳ Continuing with next attempt (CAPTCHA may have auto-refreshed)..."
)
continue
else:
return {
"success": False,
"message": "Failed to solve CAPTCHA after multiple attempts",
"timestamp": datetime.now().isoformat(),
}
logger.info(f"✅ CAPTCHA solved: '{captcha_text}'")
# Always re-fill ALL form fields for each attempt (since auto-refresh clears them)
logger.info(
"Re-filling all form fields (username, password, and CAPTCHA)..."
)
# Re-locate form fields if needed (in case of auto-refresh)
fields_need_refresh = False
try:
# Quick check if existing fields are still usable
if username_field:
await username_field.is_visible()
if password_field:
await password_field.is_visible()
except Exception:
fields_need_refresh = True
logger.info(
"Form fields need to be re-located due to page refresh..."
)
if fields_need_refresh:
logger.info("Re-locating form fields...")
# Find username field
username_field = None
username_selectors = [
'input[name="RegNo"]',
'input[name="Username"]',
"input#RegNo",
]
for selector in username_selectors:
try:
username_field = await page.wait_for_selector(
selector, timeout=10000
)
if username_field:
logger.info(f"Found username field: {selector}")
break
except PlaywrightTimeoutError:
continue
if not username_field:
raise Exception("Could not find username field")
# Find password field
password_field = None
password_selectors = [
'input[name="Password"]',
"input#Password",
'input[type="password"]',
]
for selector in password_selectors:
try:
password_field = await page.wait_for_selector(
selector, timeout=10000
)
if password_field:
logger.info(f"Found password field: {selector}")
break
except PlaywrightTimeoutError:
continue
if not password_field:
raise Exception("Could not find password field")
else:
logger.info("✅ Existing form fields are still valid")
# Fill ALL form fields for each attempt using page-level interactions (most stable)
logger.info(
"Filling all form fields (username, password, CAPTCHA)..."
)
try:
# Fill username - ALWAYS re-fill since auto-refresh clears it
await page.fill(
'input[name="RegNo"], input[name="Username"], input#RegNo',
"",
)
await page.fill(
'input[name="RegNo"], input[name="Username"], input#RegNo',
username,
)
logger.info(f"✅ Username re-filled: {username}")
# Fill password - ALWAYS re-fill since auto-refresh clears it
await page.fill(
'input[name="Password"], input#Password, input[type="password"]',
"",
)
await page.fill(
'input[name="Password"], input#Password, input[type="password"]',
password,
)
logger.info("✅ Password re-filled")
# Fill CAPTCHA - ALWAYS re-fill since auto-refresh clears it
await page.fill('input[name="CaptchaInputText"]', "")
await page.fill('input[name="CaptchaInputText"]', captcha_text)
logger.info(f"✅ CAPTCHA filled: {captcha_text}")
except Exception as fill_error:
logger.error(f"Error filling form fields: {fill_error}")
# Fallback to element-based approach
logger.info("Fallback to element-based form filling...")
# Re-locate elements before filling
username_field = await page.wait_for_selector(
'input[name="RegNo"], input[name="Username"], input#RegNo',
timeout=10000,
)
password_field = await page.wait_for_selector(
'input[name="Password"], input#Password, input[type="password"]',
timeout=10000,
)
captcha_field = await page.wait_for_selector(
'input[name="CaptchaInputText"]', timeout=10000
)
if username_field and password_field and captcha_field:
# Clear and fill username
await username_field.fill("")
await username_field.fill(username)
logger.info(f"✅ Username re-filled (fallback): {username}")
# Clear and fill password
await password_field.fill("")
await password_field.fill(password)
logger.info("✅ Password re-filled (fallback)")
# Clear and fill CAPTCHA
await captcha_field.fill("")
await captcha_field.fill(captcha_text)
logger.info(f"✅ CAPTCHA filled (fallback): {captcha_text}")
else:
raise Exception(
"Could not fill form fields with either method"
)
# Submit form using page-level click (more resilient to DOM changes)
logger.info("Submitting login form...")
try:
# Try page-level click first (most stable)
button_selectors = [
"button#loading-btn",
'button[type="submit"]',
'input[type="submit"][value*="Login"]',
'button:has-text("Login")',
]
button_clicked = False
for selector in button_selectors:
try:
await page.click(selector, timeout=5000)
logger.info(
f"✅ Login form submitted using selector: {selector}"
)
button_clicked = True
break
except:
continue
if not button_clicked:
# Fallback to element-based approach
logger.info("Fallback to element-based button click...")
login_button = None
for selector in button_selectors:
try:
login_button = await page.wait_for_selector(
selector, timeout=3000
)
if login_button:
await login_button.click()
logger.info(
f"✅ Login button clicked: {selector}"
)
button_clicked = True
break
except:
continue
if not button_clicked:
raise Exception(
"Could not click login button with any method"
)
except Exception as click_error:
logger.error(f"Error clicking login button: {click_error}")
raise click_error
# Wait for response
logger.info("Waiting for login response...")
await asyncio.sleep(5) # Wait for server response
# Check for JavaScript alerts (like "invalid captcha")
captcha_error_detected = False
alert_error_message = ""
if dialog_messages:
for message in dialog_messages:
if message and (
(
"captcha" in message.lower()
and (
"invalid" in message.lower()
or "incorrect" in message.lower()
or "wrong" in message.lower()
)
)
or "captcha" in message.lower()
and "try again" in message.lower()
):
captcha_error_detected = True
alert_error_message = message
logger.warning(
f"🔔 CAPTCHA error detected in JavaScript alert: '{message}'"
)
break
# Check for validation errors in DOM (especially CAPTCHA errors)
dom_error_detected = False
dom_error_message = ""
other_error_message = ""
try:
error_element = await page.wait_for_selector(
".text-danger, .validation-summary-errors, .alert-danger",
timeout=8000,
)
if error_element:
error_text = await error_element.text_content()
error_text = error_text.strip() if error_text else ""
logger.warning(
f"⚠️ Validation error detected: '{error_text}'"
)
# Check if it's a CAPTCHA error
if error_text and (
(
"captcha" in error_text.lower()
and (
"invalid" in error_text.lower()
or "incorrect" in error_text.lower()
or "wrong" in error_text.lower()
)
)
or "captcha" in error_text.lower()
and "try again" in error_text.lower()
):
dom_error_detected = True
dom_error_message = error_text
logger.warning(
f"🔄 CAPTCHA validation failed: {error_text}"
)
else:
# Store non-CAPTCHA errors separately
other_error_message = error_text
except PlaywrightTimeoutError:
# No immediate DOM errors found
pass
# Handle CAPTCHA errors (from either alerts or DOM)
# NOTE: PAF-IAST LMS automatically refreshes CAPTCHA when invalid,
# so we don't need to manually refresh - just solve the new one
if captcha_error_detected or dom_error_detected:
error_source = (
"JavaScript alert"
if captcha_error_detected
else "DOM validation"
)
error_msg = (
alert_error_message
if captcha_error_detected
else dom_error_message
)
logger.warning(
f"🔄 CAPTCHA error detected via {error_source}: {error_msg}"
)
if login_attempt < max_login_attempts:
logger.info(
"CAPTCHA auto-refreshed by server, solving new CAPTCHA..."
)
# Since the CAPTCHA auto-refreshes when invalid, we don't need to manually refresh
# Just wait a moment for the new CAPTCHA to load and continue
await asyncio.sleep(
2
) # Brief wait for auto-refresh to complete
continue # Try again with the new auto-refreshed CAPTCHA
else:
return {
"success": False,
"message": f"CAPTCHA validation failed after {max_login_attempts} attempts: {error_msg}",
"timestamp": datetime.now().isoformat(),
}
elif other_error_message and not (
captcha_error_detected or dom_error_detected
):
# Other validation error (not CAPTCHA related)
return {
"success": False,
"message": f"Login validation error: {other_error_message}",
"timestamp": datetime.now().isoformat(),
}
# Check if login was successful
await asyncio.sleep(3) # Additional wait for redirect
current_url = page.url
page_content = await page.content()
# Multiple indicators of successful login
success_indicators = [
"Dashboard" in page_content,
"/MyDashboard/" in current_url,
"Student Dashboard" in page_content,
"logout" in page_content.lower(),
"My Profile" in page_content,
]
# Multiple indicators of failed login
failure_indicators = [
"Student Login" in page_content and "/Login" in current_url,
"Enter Registration No." in page_content,
"Invalid credentials" in page_content,
"CAPTCHA" in page_content and "/Login" in current_url,
]
if any(success_indicators):
# Login successful!
self.is_logged_in = True
# Save session if requested
if save_session:
await self._save_session()
logger.info("🎉 Login successful!")
return {
"success": True,
"message": f"Successfully logged into PAF-IAST LMS (attempt {login_attempt})",
"timestamp": datetime.now().isoformat(),
"redirect_url": current_url,
}
elif any(failure_indicators):
# Still on login page, continue retrying
logger.warning(
f"Still on login page after attempt {login_attempt}"
)
if login_attempt < max_login_attempts:
logger.info("Refreshing page and trying again...")
await page.reload()
await self._wait_for_page_load()
await asyncio.sleep(3)
continue
else:
# Uncertain state - check URL
if "Login" not in current_url:
# Likely successful
self.is_logged_in = True
if save_session:
await self._save_session()
return {
"success": True,
"message": f"Successfully logged into PAF-IAST LMS (attempt {login_attempt})",
"timestamp": datetime.now().isoformat(),
"redirect_url": current_url,
}
else:
# Still seems to be on login page
if login_attempt < max_login_attempts:
logger.warning("Login status unclear, retrying...")
await page.reload()
await self._wait_for_page_load()
await asyncio.sleep(3)
continue
except Exception as attempt_error:
logger.error(
f"Error in login attempt {login_attempt}: {str(attempt_error)}"
)
if login_attempt < max_login_attempts:
logger.info("Refreshing page and retrying due to error...")
try:
await page.reload()
await self._wait_for_page_load()
await asyncio.sleep(3)
except:
# If reload fails, try to get a fresh page
page = await self._ensure_page(headless=self.headless_mode)
await page.goto(self.login_url)
await self._wait_for_page_load()
await asyncio.sleep(3)
continue
else:
raise attempt_error
# If we reach here, all attempts failed
return {
"success": False,
"message": f"Login failed after {max_login_attempts} attempts. Please check credentials or try again later.",
"timestamp": datetime.now().isoformat(),
}
except PlaywrightTimeoutError:
return {
"success": False,
"message": "Login page took too long to load",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Login error: {str(e)}")
return {
"success": False,
"message": f"Login failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
async def _solve_captcha(self) -> Optional[str]:
"""Solve CAPTCHA on login page using the proven working method"""
try:
if self.page is None:
return None
logger.info("🎯 Starting CAPTCHA solving with proven method...")
# Find CAPTCHA image using multiple selectors
captcha_img = None
captcha_selectors = [
"img#CaptchaImage",
'img[src*="DefaultCaptcha/Generate"]',
'img[src*="Captcha"]',
]
for selector in captcha_selectors:
try:
captcha_img = await self.page.wait_for_selector(
selector, timeout=10000
)
if captcha_img:
logger.info(f"✅ Found CAPTCHA image with selector: {selector}")
break
except PlaywrightTimeoutError:
continue
if captcha_img is None:
logger.error("❌ Could not find CAPTCHA image")
return None
# Use Playwright to screenshot the CAPTCHA element directly
timestamp = int(time.time())
captcha_path = f"temp_captcha_{timestamp}.png"
# Screenshot the CAPTCHA element (this maintains quality and session)
await captcha_img.screenshot(path=captcha_path)
logger.info(f"📸 CAPTCHA image saved to: {captcha_path}")
# Solve CAPTCHA using our enhanced solver with preprocessing
captcha_text = await self.captcha_solver.solve_text_captcha(
captcha_path, preprocess=True
)
# Clean up temp file
if os.path.exists(captcha_path):
os.remove(captcha_path)
if captcha_text:
# Clean and validate the CAPTCHA text
captcha_text = captcha_text.strip().upper()
# Remove any common OCR artifacts
captcha_text = (
captcha_text.replace(" ", "").replace(".", "").replace(",", "")
)
logger.info(f"✅ Cleaned CAPTCHA text: '{captcha_text}'")
# Validate CAPTCHA text (PAF-IAST usually has 4-6 character CAPTCHAs)
if (
len(captcha_text) >= 3
and len(captcha_text) <= 8
and captcha_text.isalnum()
):
logger.info(f"🎉 CAPTCHA validation passed: '{captcha_text}'")
return captcha_text
else:
logger.warning(
f"⚠️ CAPTCHA text failed validation: '{captcha_text}'"
)
return None
else:
logger.warning("❌ CAPTCHA solver returned empty result")
return None
except Exception as e:
logger.error(f"❌ CAPTCHA solving error: {str(e)}")
return None
async def get_attendance(
self, subject_code: Optional[str] = None, semester: Optional[str] = None
) -> Dict[str, Any]:
"""Get attendance records using Playwright"""
if not self.is_logged_in:
return {"success": False, "message": "Not logged in"}
try:
# Navigate to attendance section
attendance_url = (
f"{self.base_url}/Attendance/StudentAttendance/AllAttendance"
)
await self.page.goto(attendance_url)
await self._wait_for_page_load()
# Parse attendance data
page_content = await self.page.content()
soup = BeautifulSoup(page_content, "html.parser")
# Extract attendance information
attendance_data = {
"success": True,
"message": "Attendance data retrieved",
"data": [],
"timestamp": datetime.now().isoformat(),
}
# Look for attendance tables or cards
attendance_tables = soup.find_all("table")
for table in attendance_tables:
# Extract subject, present, absent, percentage
rows = table.find_all("tr")
for row in rows[1:]: # Skip header
cells = row.find_all(["td", "th"])
if len(cells) >= 4:
subject = cells[0].get_text(strip=True)
present = cells[1].get_text(strip=True)
absent = cells[2].get_text(strip=True)
percentage = cells[3].get_text(strip=True)
attendance_data["data"].append(
{
"subject": subject,
"present": present,
"absent": absent,
"percentage": percentage,
}
)
return attendance_data
except Exception as e:
logger.error(f"Attendance retrieval error: {str(e)}")
return {
"success": False,
"message": f"Failed to get attendance: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
async def get_marks(
self,
subject_code: Optional[str] = None,
exam_type: str = "all",
semester: Optional[str] = None,
) -> Dict[str, Any]:
"""Get marks and grades using Playwright - navigates through Courses dashboard"""
if not self.is_logged_in:
return {
"success": False,
"message": "Not logged in to LMS. Please login first.",
}
try:
page = await self._ensure_page(headless=self.headless_mode)
# First navigate to the Courses dashboard
courses_url = f"{self.base_url}/Courses/MyCourses/AllCourses"
logger.info(f"Navigating to courses page: {courses_url}")
await page.goto(courses_url, wait_until="domcontentloaded")
await self._wait_for_page_load()
marks_data = {
"success": True,
"message": "Marks data retrieved",
"data": [],
"courses": [],
"timestamp": datetime.now().isoformat(),
}
# Get list of available courses first
courses = []
try:
# Enhanced dynamic content loading
await page.wait_for_timeout(3000)
# Force dynamic content loading by triggering JavaScript events
await page.evaluate(
"""
// Dispatch DOMContentLoaded and load events
document.dispatchEvent(new Event('DOMContentLoaded'));
window.dispatchEvent(new Event('load'));
// Trigger jQuery ready if available
if (typeof $ !== 'undefined') {
$(document).ready();
$(document).trigger('ready');
}
// Scroll to trigger lazy loading
window.scrollTo(0, document.body.scrollHeight);
"""
)
await page.wait_for_timeout(2000)
# Scroll back and wait
await page.evaluate("window.scrollTo(0, 0)")
await page.wait_for_timeout(2000)
# Try to find and execute common initialization functions
await page.evaluate(
"""
const initFunctions = ['initializePage', 'loadCourseActions', 'setupActionDropdowns', 'initCourseTable'];
initFunctions.forEach(funcName => {
if (typeof window[funcName] === 'function') {
try { window[funcName](); } catch(e) {}
}
});
"""
)
await page.wait_for_timeout(1000)
# Look for course tables - based on debug output, we know there are tables with course data
course_tables = await page.query_selector_all("table")
logger.info(f"Found {len(course_tables)} tables on courses page")
for table_idx, table in enumerate(course_tables):
try:
# Get table rows (skip header)
rows = await table.query_selector_all("tbody tr")
if not rows:
rows = await table.query_selector_all("tr")
rows = rows[1:] # Skip header
logger.info(
f"Table {table_idx + 1}: Found {len(rows)} course rows"
)
for row_idx, row in enumerate(rows):
try:
cells = await row.query_selector_all("td")
if (
len(cells) >= 3
): # Minimum: S#, Course Code, Course Title
# Extract course information
course_code_cell = (
cells[1] if len(cells) > 1 else cells[0]
)
course_title_cell = (
cells[2] if len(cells) > 2 else cells[1]
)
course_code = await course_code_cell.text_content()
course_title = (
await course_title_cell.text_content()
)
if (
course_code
and course_code.strip()
and course_title
and course_title.strip()
):
course_code = course_code.strip()
course_title = course_title.strip()
# Look for assessment forms in this row
assessment_forms = await row.query_selector_all(
"form[action*='Assessment']"
)
consolidated_forms = await row.query_selector_all(
"form[action*='GetCourseAssessmentConsolidated']"
)
# Extract form details for direct submission
form_details = []
for form in (
assessment_forms + consolidated_forms
):
try:
action = await form.get_attribute(
"action"
)
method = (
await form.get_attribute("method")
or "POST"
)
# Extract form inputs
inputs = await form.query_selector_all(
"input"
)
form_data = {}
for input_elem in inputs:
name = (
await input_elem.get_attribute(
"name"
)
)
value = (
await input_elem.get_attribute(
"value"
)
)
if name and value:
form_data[name] = value
form_details.append(
{
"action": action,
"method": method,
"data": form_data,
}
)
except Exception as e:
logger.debug(
f"Error extracting form details: {str(e)}"
)
course_info = {
"name": f"{course_code} - {course_title}",
"code": course_code,
"title": course_title,
"table_index": table_idx,
"row_index": row_idx,
"element": row,
"assessment_forms": len(assessment_forms),
"consolidated_forms": len(
consolidated_forms
),
"form_details": form_details,
"has_dropdown": False, # Will update this below
"has_marks_link": len(assessment_forms) > 0
or len(consolidated_forms) > 0,
}
# Look for action dropdowns or buttons in this row
dropdowns = await row.query_selector_all(
"select, .dropdown-toggle"
)
action_buttons = await row.query_selector_all(
"button, a[href*='Assessment']"
)
if dropdowns or action_buttons:
course_info["has_dropdown"] = True
course_info["dropdown_elements"] = len(
dropdowns
)
course_info["action_buttons"] = len(
action_buttons
)
courses.append(course_info)
logger.info(
f"Found course: {course_code} - {course_title} (Forms: {len(assessment_forms)}, Dropdowns: {len(dropdowns)})"
)
except Exception as e:
logger.debug(
f"Error processing row {row_idx} in table {table_idx}: {str(e)}"
)
continue
except Exception as e:
logger.debug(f"Error processing table {table_idx}: {str(e)}")
continue
marks_data["courses"] = [
{"name": c["name"], "code": c.get("code", "Unknown")}
for c in courses
]
logger.info(f"Found {len(courses)} courses")
except Exception as e:
logger.warning(f"Could not extract course list: {str(e)}")
# If specific subject_code is provided, filter courses
target_courses = courses
if subject_code:
target_courses = [
c
for c in courses
if subject_code.upper() in c.get("code", "").upper()
or subject_code.upper() in c["name"].upper()
]
if not target_courses:
return {
"success": False,
"message": f"Course with code '{subject_code}' not found",
"available_courses": marks_data["courses"],
"timestamp": datetime.now().isoformat(),
}
# Navigate to marks for each target course
for course in target_courses:
try:
logger.info(f"Getting marks for course: {course['name']}")
# Try different methods to access marks
marks_accessed = False
# Method 1: Direct form submission (most reliable)
if course.get("form_details"):
try:
for form_detail in course["form_details"]:
action_url = form_detail["action"]
method = form_detail["method"]
form_data = form_detail["data"].copy()
# Make action URL absolute if needed
if action_url.startswith("/"):
action_url = f"{self.base_url}{action_url}"
elif not action_url.startswith("http"):
action_url = f"{self.base_url}/{action_url}"
logger.info(
f"Attempting direct form submission to: {action_url}"
)
success = await self._navigate_with_retry(
page, action_url, method, form_data
)
if success:
# Check if we actually got assessment data
page_content = await page.content()
if any(
keyword in page_content.lower()
for keyword in [
"assessment",
"marks",
"grade",
"quiz",
"assignment",
]
):
marks_accessed = True
logger.info(
f"Successfully accessed marks via direct form submission"
)
break
else:
logger.warning(
"Form submission succeeded but no assessment data found"
)
except Exception as e:
logger.warning(f"Error in Method 1 (direct form): {str(e)}")
# Method 2: Try action dropdown with anti-forgery handling
if not marks_accessed and course.get("has_dropdown"):
try:
dropdown = await course["element"].query_selector(
"select.action-dropdown, .dropdown-toggle"
)
if dropdown:
# Get verification token before interacting with dropdown
verification_token = await self._get_verification_token(
page
)
# Click dropdown and look for marks/assessment option
await dropdown.click()
await page.wait_for_timeout(1000)
# Look for marks/assessment option
marks_option = await page.query_selector(
"option[value*='Assessment'], option[value*='marks'], a[href*='Assessment']"
)
if marks_option:
# If it's a form submission, handle it securely
option_value = await marks_option.get_attribute(
"value"
)
if option_value:
# This might be a form submission, handle with token
form = await course["element"].query_selector(
"form"
)
if form:
# Add verification token to form if present
if verification_token:
token_input = await form.query_selector(
"input[name='__RequestVerificationToken']"
)
if not token_input:
# Create hidden input for token
await page.evaluate(
f"""
const form = arguments[0];
const tokenInput = document.createElement('input');
tokenInput.type = 'hidden';
tokenInput.name = '__RequestVerificationToken';
tokenInput.value = '{verification_token}';
form.appendChild(tokenInput);
""",
form,
)
await marks_option.click()
await self._wait_for_page_load()
marks_accessed = True
except Exception as e:
logger.warning(f"Error in Method 1 (dropdown): {str(e)}")
# Method 3: Try direct marks link with anti-forgery handling
if not marks_accessed and course.get("has_marks_link"):
try:
marks_link = await course["element"].query_selector(
"a[href*='Assessment'], a[href*='marks']"
)
if marks_link:
# Get verification token before clicking link
verification_token = await self._get_verification_token(
page
)
# Check if this link requires a form submission
href = await marks_link.get_attribute("href")
if href and "javascript:" in href.lower():
# This is likely a JavaScript form submission, handle carefully
if verification_token:
# Inject token into page if needed
await page.evaluate(
f"""
window.__RequestVerificationToken = '{verification_token}';
"""
)
await marks_link.click()
await self._wait_for_page_load()
marks_accessed = True
except Exception as e:
logger.warning(f"Error in Method 2 (direct link): {str(e)}")
# Method 4: Try direct URL navigation if we can extract course ID
if not marks_accessed:
try:
# Look for course ID in the element
course_id = None
onclick_attr = await course["element"].get_attribute(
"onclick"
)
data_id = await course["element"].get_attribute(
"data-course-id"
)
if data_id:
course_id = data_id
elif onclick_attr and "courseId" in onclick_attr:
import re
match = re.search(r"courseId[=:](\d+)", onclick_attr)
if match:
course_id = match.group(1)
if course_id:
# Navigate directly to assessment page with retry logic
assessment_url = f"{self.base_url}/Courses/Assessment/GetCourseAssessmentConsolidated"
# Try with course ID parameter first
form_data = {"courseId": course_id}
success = await self._navigate_with_retry(
page, assessment_url, "POST", form_data
)
if success:
marks_accessed = True
else:
# Fallback: try with GET method and query parameter
assessment_url_with_id = (
f"{assessment_url}?courseId={course_id}"
)
success = await self._navigate_with_retry(
page, assessment_url_with_id, "GET"
)
if success:
marks_accessed = True
except Exception as e:
logger.warning(f"Error in Method 3 (direct URL): {str(e)}")
# Method 5: Generic assessment URL (if no specific course ID found)
if not marks_accessed:
try:
assessment_url = f"{self.base_url}/Courses/Assessment/GetCourseAssessmentConsolidated"
success = await self._navigate_with_retry(
page, assessment_url, "GET"
)
if success:
marks_accessed = True
except Exception:
pass
if marks_accessed:
# Extract marks data from the assessment page
course_marks = await self._extract_marks_from_page(
page, course["name"], exam_type
)
marks_data["data"].extend(course_marks)
# Navigate back to courses page for next iteration
if len(target_courses) > 1:
await page.goto(courses_url, wait_until="domcontentloaded")
await self._wait_for_page_load()
else:
logger.warning(
f"Could not access marks for course: {course['name']}"
)
marks_data["data"].append(
{
"course": course["name"],
"status": "Could not access marks",
"error": "No accessible marks link found",
}
)
except Exception as e:
logger.error(
f"Error getting marks for course {course['name']}: {str(e)}"
)
marks_data["data"].append(
{"course": course["name"], "status": "Error", "error": str(e)}
)
if not marks_data["data"]:
marks_data["message"] = "No marks data found or accessible"
marks_data["success"] = True # Still successful, just no data
return marks_data
except PlaywrightTimeoutError:
logger.error("Timeout while accessing courses/marks section")
return {
"success": False,
"message": "Timeout while accessing courses. The LMS may be slow or unavailable.",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Marks retrieval error: {str(e)}")
return {
"success": False,
"message": f"Failed to get marks: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
async def _extract_marks_from_page(
self, page: Page, course_name: str, exam_type: str = "all"
) -> List[Dict[str, Any]]:
"""Extract marks data from the assessment page"""
marks_list = []
try:
# Wait a moment for the page to fully load
await page.wait_for_timeout(2000)
# Look for marks tables or assessment data
assessment_selectors = [
"table.marks-table tbody tr",
"table.assessment-table tbody tr",
"table tbody tr",
".assessment-item",
".marks-row",
"[data-assessment-type]",
]
for selector in assessment_selectors:
try:
elements = await page.query_selector_all(selector)
if elements:
for element in elements:
try:
# Extract assessment information
row_text = await element.text_content()
if not row_text or not row_text.strip():
continue
# Try to extract structured data from table cells
cells = await element.query_selector_all("td")
if len(cells) >= 3:
# Table format: Assessment Type | Marks | Total | Grade, etc.
assessment_type = await cells[0].text_content()
marks_obtained = (
await cells[1].text_content()
if len(cells) > 1
else "N/A"
)
total_marks = (
await cells[2].text_content()
if len(cells) > 2
else "N/A"
)
grade = (
await cells[3].text_content()
if len(cells) > 3
else "N/A"
)
# Clean up the extracted text
assessment_type = (
assessment_type.strip()
if assessment_type
else "Unknown"
)
marks_obtained = (
marks_obtained.strip()
if marks_obtained
else "N/A"
)
total_marks = (
total_marks.strip() if total_marks else "N/A"
)
grade = grade.strip() if grade else "N/A"
# Filter by exam_type if specified
if exam_type != "all":
if (
exam_type.lower()
not in assessment_type.lower()
):
continue
mark_entry = {
"course": course_name,
"assessment_type": assessment_type,
"marks_obtained": marks_obtained,
"total_marks": total_marks,
"grade": grade,
"percentage": self._calculate_percentage(
marks_obtained, total_marks
),
}
marks_list.append(mark_entry)
except Exception as e:
logger.warning(
f"Error extracting marks from element: {str(e)}"
)
continue
# If we found marks data, break from selector loop
if marks_list:
break
except Exception:
continue
# If still no data found, add a placeholder entry
if not marks_list:
marks_list.append(
{
"course": course_name,
"assessment_type": "No assessments found",
"marks_obtained": "N/A",
"total_marks": "N/A",
"grade": "N/A",
"percentage": "N/A",
"note": "Assessment page accessed but no marks data could be extracted",
}
)
except Exception as e:
logger.error(f"Error extracting marks from page: {str(e)}")
marks_list.append(
{
"course": course_name,
"assessment_type": "Error",
"marks_obtained": "N/A",
"total_marks": "N/A",
"grade": "N/A",
"percentage": "N/A",
"error": str(e),
}
)
return marks_list
def _calculate_percentage(self, obtained: str, total: str) -> str:
"""Calculate percentage from obtained and total marks"""
try:
if obtained == "N/A" or total == "N/A":
return "N/A"
obtained_num = float(obtained)
total_num = float(total)
if total_num == 0:
return "N/A"
percentage = (obtained_num / total_num) * 100
return f"{percentage:.1f}%"
except (ValueError, TypeError):
return "N/A"
async def get_timetable(
self, date: Optional[str] = None, week: bool = False
) -> Dict[str, Any]:
"""Get class timetable using Playwright"""
if not self.is_logged_in:
return {"success": False, "message": "Not logged in"}
try:
# Navigate to timetable section
timetable_url = (
f"{self.base_url}/MyDashboard/Index" # Timetable might be on dashboard
)
await self.page.goto(timetable_url)
await self._wait_for_page_load()
page_content = await self.page.content()
soup = BeautifulSoup(page_content, "html.parser")
timetable_data = {
"success": True,
"message": "Timetable data retrieved",
"data": [],
"timestamp": datetime.now().isoformat(),
}
# Parse timetable data
timetable_tables = soup.find_all("table")
for table in timetable_tables:
rows = table.find_all("tr")
for row in rows[1:]: # Skip header
cells = row.find_all(["td", "th"])
if len(cells) >= 4:
time_slot = cells[0].get_text(strip=True)
subject = cells[1].get_text(strip=True)
room = cells[2].get_text(strip=True)
instructor = (
cells[3].get_text(strip=True) if len(cells) > 3 else "N/A"
)
timetable_data["data"].append(
{
"time": time_slot,
"subject": subject,
"room": room,
"instructor": instructor,
}
)
return timetable_data
except Exception as e:
logger.error(f"Timetable retrieval error: {str(e)}")
return {
"success": False,
"message": f"Failed to get timetable: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
async def solve_captcha(
self, captcha_image_path: str, preprocess: bool = True
) -> Dict[str, Any]:
"""Solve CAPTCHA from image file"""
try:
captcha_text = await self.captcha_solver.solve_text_captcha(
captcha_image_path, preprocess
)
return {
"success": True,
"captcha_text": captcha_text,
"message": "CAPTCHA solved successfully",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"success": False,
"message": f"CAPTCHA solving failed: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
async def logout(self) -> Dict[str, Any]:
"""Logout from LMS and clean up"""
try:
if self.page:
# Try to find logout link
try:
logout_link = await self.page.wait_for_selector(
'a:has-text("Logout")', timeout=5000
)
if logout_link:
await logout_link.click()
except PlaywrightTimeoutError:
pass
# Clean up browser resources
await self._cleanup_browser()
# Clear session
self.session_manager.clear_session()
self.is_logged_in = False
return {
"success": True,
"message": "Successfully logged out",
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"success": False,
"message": f"Logout error: {str(e)}",
"timestamp": datetime.now().isoformat(),
}
async def _cleanup_browser(self):
"""Clean up browser resources"""
try:
if self.page:
await self.page.close()
self.page = None
if self.context:
await self.context.close()
self.context = None
if self.browser:
await self.browser.close()
self.browser = None
if self.playwright:
await self.playwright.stop()
self.playwright = None
except Exception as e:
logger.error(f"Browser cleanup error: {str(e)}")
async def _verify_session(self) -> bool:
"""Verify if current session is still valid"""
try:
if self.page is None:
page = await self._setup_browser(headless=self.headless_mode)
else:
page = self.page
# Try to access a protected page
await page.goto(f"{self.base_url}/MyDashboard/Index")
await self._wait_for_page_load()
# Check if we're redirected to login
if "login" in page.url.lower():
return False
return True
except Exception:
return False
async def view_application_status(
self,
application_type: Optional[str] = None,
semester: Optional[str] = None,
) -> Dict[str, Any]:
"""
View status of all submitted applications from MyApplications page.
Args:
application_type: Type of application to filter (optional)
semester: Semester filter for applications (optional)
Returns:
Dict containing the application status results
"""
if not self.is_logged_in:
return {
"success": False,
"message": "Not logged in to LMS. Please login first.",
"data": None,
}
try:
page = await self._ensure_page(headless=self.headless_mode)
# Navigate to the specific MyApplications URL for PAF-IAST
applications_url = (
f"{self.base_url}/StudentAccount/Applications/MyApplications"
)
logger.info(f"Navigating to applications status page: {applications_url}")
await page.goto(applications_url, wait_until="domcontentloaded")
await self._wait_for_page_load()
# Wait for the page to fully load
await asyncio.sleep(2)
applications_status = []
# Try to find application status table or list
status_selectors = [
"table.table tbody tr",
".application-list .application-item",
".status-table tbody tr",
".applications tbody tr",
"tbody tr",
".list-group-item",
]
found_applications = False
for selector in status_selectors:
try:
elements = await page.query_selector_all(selector)
logger.info(
f"Found {len(elements)} elements with selector: {selector}"
)
for element in elements:
try:
# Get all text content from the row
row_text = await element.text_content()
if not row_text or row_text.strip() == "":
continue
# Try to extract data from table cells
cells = await element.query_selector_all("td")
if len(cells) >= 3:
# Standard table format: [Type/Name, ID, Status, Date, etc.]
app_name = await cells[0].text_content()
app_id = (
await cells[1].text_content()
if len(cells) > 1
else "N/A"
)
status = (
await cells[2].text_content()
if len(cells) > 2
else "Unknown"
)
date = (
await cells[3].text_content()
if len(cells) > 3
else "N/A"
)
applications_status.append(
{
"application_name": (
app_name.strip() if app_name else "Unknown"
),
"application_id": (
app_id.strip() if app_id else "N/A"
),
"status": (
status.strip() if status else "Unknown"
),
"date": date.strip() if date else "N/A",
"type": (
self._classify_application_type(app_name)
if app_name
else "unknown"
),
"raw_text": row_text.strip(),
}
)
found_applications = True
elif len(cells) == 1:
# Single cell or different format
content = await cells[0].text_content()
if content and content.strip():
applications_status.append(
{
"application_name": "Unknown",
"application_id": "N/A",
"status": content.strip(),
"date": "N/A",
"type": "unknown",
"raw_text": content.strip(),
}
)
found_applications = True
except Exception as e:
logger.debug(f"Error processing application row: {str(e)}")
continue
if found_applications:
break
except Exception as e:
logger.debug(f"Error with selector {selector}: {str(e)}")
continue
# If no table found, try to get page content for manual parsing
if not found_applications:
try:
page_content = await page.inner_text("body")
if (
"no applications" in page_content.lower()
or "no records" in page_content.lower()
):
return {
"success": True,
"message": "No applications found",
"data": {
"applications": [],
"total_count": 0,
"filter_applied": {
"type": application_type,
"semester": semester,
},
"page_url": applications_url,
},
}
else:
# Try to extract any visible application information
lines = page_content.split("\n")
for line in lines:
if any(
keyword in line.lower()
for keyword in ["application", "request", "submission"]
):
applications_status.append(
{
"application_name": "Unknown",
"application_id": "N/A",
"status": "Found mention",
"date": "N/A",
"type": "unknown",
"raw_text": line.strip(),
}
)
except Exception as e:
logger.error(f"Error parsing page content: {str(e)}")
# Filter by application type if specified
if application_type and application_type != "all":
applications_status = [
app
for app in applications_status
if app.get("type") == application_type
]
return {
"success": True,
"message": f"Retrieved status for {len(applications_status)} applications",
"data": {
"applications": applications_status,
"total_count": len(applications_status),
"filter_applied": {"type": application_type, "semester": semester},
"page_url": applications_url,
},
}
except PlaywrightTimeoutError:
logger.error("Timeout while accessing applications status page")
return {
"success": False,
"message": "Timeout while accessing applications status. The LMS may be slow or unavailable.",
"data": None,
}
except Exception as e:
logger.error(f"Error viewing application status: {str(e)}")
return {
"success": False,
"message": f"Error viewing application status: {str(e)}",
"data": None,
}
async def create_application(
self,
application_type: str,
subject: Optional[str] = None,
message: Optional[str] = None,
additional_data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Create and submit a new application using the CreateApplication page.
Args:
application_type: Type of application to create
subject: Subject/title of the application
message: Detailed message/content of the application
additional_data: Any additional form data needed
Returns:
Dict containing the application creation results
"""
if not self.is_logged_in:
return {
"success": False,
"message": "Not logged in to LMS. Please login first.",
"data": None,
}
try:
page = await self._ensure_page(headless=self.headless_mode)
# Navigate to the specific CreateApplication URL for PAF-IAST
create_url = (
f"{self.base_url}/StudentAccount/Applications/CreateApplication"
)
logger.info(f"Navigating to create application page: {create_url}")
await page.goto(create_url, wait_until="domcontentloaded")
await self._wait_for_page_load()
# Wait for the page to fully load
await asyncio.sleep(2)
filled_fields = []
submission_result = {}
# Try to find the application type dropdown
type_selectors = [
"select[name*='type']",
"select[name*='Type']",
"select[id*='type']",
"select[id*='Type']",
"#ApplicationType",
"#applicationType",
"select.form-control",
"select",
]
type_filled = False
for selector in type_selectors:
try:
type_dropdown = await page.query_selector(selector)
if type_dropdown:
# Get available options
options = await type_dropdown.query_selector_all("option")
option_texts = []
for option in options:
text = await option.text_content()
value = await option.get_attribute("value")
option_texts.append(
{"text": text.strip() if text else "", "value": value}
)
# Try to select the appropriate option
for option in options:
option_text = await option.text_content()
option_value = await option.get_attribute("value")
if (
option_text
and application_type.lower() in option_text.lower()
) or (
option_value
and application_type.lower() in option_value.lower()
):
await option.click()
filled_fields.append(f"Application type: {option_text}")
type_filled = True
break
if not type_filled and options:
# If no exact match, try the first non-empty option
for option in options[
1:
]: # Skip first option (usually empty)
option_value = await option.get_attribute("value")
if option_value and option_value.strip():
await option.click()
option_text = await option.text_content()
filled_fields.append(
f"Application type (default): {option_text}"
)
type_filled = True
break
submission_result["available_types"] = option_texts
break
except Exception as e:
logger.debug(f"Error with type selector {selector}: {str(e)}")
continue
if not type_filled:
return {
"success": False,
"message": "Could not find or select application type dropdown",
"data": {"error": "Application type dropdown not found"},
}
# Try to fill subject field
if subject:
subject_selectors = [
"input[name*='subject']",
"input[name*='Subject']",
"input[id*='subject']",
"input[id*='Subject']",
"input[placeholder*='subject']",
"input[type='text']",
]
for selector in subject_selectors:
try:
subject_field = await page.query_selector(selector)
if subject_field:
await subject_field.clear()
await subject_field.fill(subject)
filled_fields.append(f"Subject: {subject}")
break
except Exception as e:
logger.debug(
f"Error with subject selector {selector}: {str(e)}"
)
continue
# Try to fill message/content field
if message:
message_selectors = [
"textarea[name*='message']",
"textarea[name*='Message']",
"textarea[name*='content']",
"textarea[name*='Content']",
"textarea[id*='message']",
"textarea[id*='Message']",
"textarea[placeholder*='message']",
"textarea",
]
for selector in message_selectors:
try:
message_field = await page.query_selector(selector)
if message_field:
await message_field.clear()
await message_field.fill(message)
filled_fields.append(
f"Message: {message[:50]}..."
if len(message) > 50
else f"Message: {message}"
)
break
except Exception as e:
logger.debug(
f"Error with message selector {selector}: {str(e)}"
)
continue
# Fill any additional form fields
if additional_data:
for field_name, field_value in additional_data.items():
try:
# Try various selectors for the field
field_selectors = [
f"input[name='{field_name}']",
f"input[id='{field_name}']",
f"select[name='{field_name}']",
f"select[id='{field_name}']",
f"textarea[name='{field_name}']",
f"textarea[id='{field_name}']",
]
field_filled = False
for selector in field_selectors:
try:
field_element = await page.query_selector(selector)
if field_element:
tag_name = await field_element.evaluate(
"el => el.tagName.toLowerCase()"
)
if tag_name == "select":
# Handle dropdown
await field_element.select_option(
str(field_value)
)
else:
# Handle input/textarea
await field_element.clear()
await field_element.fill(str(field_value))
filled_fields.append(f"{field_name}: {field_value}")
field_filled = True
break
except Exception:
continue
if not field_filled:
logger.warning(f"Could not fill field: {field_name}")
except Exception as e:
logger.error(
f"Error filling additional field {field_name}: {str(e)}"
)
# Try to submit the form
submit_selectors = [
"button[type='submit']",
"input[type='submit']",
"button:has-text('Submit')",
"button:has-text('Create')",
"button:has-text('Send')",
".btn-primary",
".submit-btn",
]
submitted = False
for selector in submit_selectors:
try:
submit_button = await page.query_selector(selector)
if submit_button:
# Check if button is enabled
is_disabled = await submit_button.is_disabled()
if not is_disabled:
await submit_button.click()
submitted = True
# Wait for submission response
await asyncio.sleep(3)
break
except Exception as e:
logger.debug(f"Error with submit selector {selector}: {str(e)}")
continue
if not submitted:
return {
"success": False,
"message": "Could not find or click submit button",
"data": {
"filled_fields": filled_fields,
"error": "Submit button not found or disabled",
},
}
# Check for success/error messages after submission
try:
await asyncio.sleep(2)
page_content = await page.inner_text("body")
success_indicators = ["success", "submitted", "created", "sent"]
error_indicators = ["error", "failed", "invalid", "required"]
submission_status = "unknown"
status_message = ""
if any(
indicator in page_content.lower()
for indicator in success_indicators
):
submission_status = "success"
# Try to extract success message
success_selectors = [
".alert-success",
".success-message",
".alert.alert-success",
]
for selector in success_selectors:
try:
message_elem = await page.query_selector(selector)
if message_elem:
status_message = await message_elem.text_content()
break
except Exception:
continue
elif any(
indicator in page_content.lower() for indicator in error_indicators
):
submission_status = "error"
# Try to extract error message
error_selectors = [
".alert-danger",
".error-message",
".alert.alert-danger",
]
for selector in error_selectors:
try:
message_elem = await page.query_selector(selector)
if message_elem:
status_message = await message_elem.text_content()
break
except Exception:
continue
submission_result.update(
{
"submission_status": submission_status,
"status_message": (
status_message.strip() if status_message else ""
),
"page_content_snippet": (
page_content[:500] if page_content else ""
),
}
)
except Exception as e:
logger.error(f"Error checking submission result: {str(e)}")
return {
"success": submitted,
"message": f"Application {'submitted successfully' if submitted else 'submission failed'}",
"data": {
"filled_fields": filled_fields,
"submission_result": submission_result,
"page_url": create_url,
"application_type": application_type,
"subject": subject,
"message_length": len(message) if message else 0,
},
}
except PlaywrightTimeoutError:
logger.error("Timeout while accessing create application page")
return {
"success": False,
"message": "Timeout while accessing create application page. The LMS may be slow or unavailable.",
"data": None,
}
except Exception as e:
logger.error(f"Error creating application: {str(e)}")
return {
"success": False,
"message": f"Error creating application: {str(e)}",
"data": None,
}
async def _save_session(self):
"""Save current browser session"""
try:
if self.context:
# Get cookies from the browser context
cookies = await self.context.cookies()
self.session_manager.save_session(
{"cookies": cookies, "timestamp": datetime.now().isoformat()}
)
except Exception as e:
logger.error(f"Session save error: {str(e)}")
async def _navigate_with_retry(
self,
page: Page,
url: str,
method: str = "GET",
form_data: dict = None,
max_retries: int = 2,
) -> bool:
"""Navigate with retry logic for verification token errors"""
for attempt in range(max_retries + 1):
try:
logger.info(
f"Navigation attempt {attempt + 1}/{max_retries + 1} to {url}"
)
success = await self._make_secure_request(page, url, method, form_data)
if success:
# Check for verification token error in page content
try:
page_text = await page.inner_text("body")
if (
"requestverificationtoken" in page_text.lower()
or "anti-forgery" in page_text.lower()
):
logger.info(
"Detected verification token error, refreshing and retrying..."
)
# Go back to courses page to get fresh token
courses_url = (
f"{self.base_url}/Courses/MyCourses/AllCourses"
)
await page.goto(courses_url, wait_until="domcontentloaded")
await self._wait_for_page_load()
continue
else:
return True
except Exception as e:
logger.warning(f"Error checking page content: {str(e)}")
return True # Assume success if we can't check
except Exception as e:
logger.error(f"Navigation attempt {attempt + 1} error: {str(e)}")
if attempt < max_retries:
await page.wait_for_timeout(2000)
return False
async def _get_verification_token(self, page: Page) -> str:
"""Extract the anti-forgery verification token from the page"""
try:
# Look for the verification token in different formats
token_selectors = [
"input[name='__RequestVerificationToken']",
"input[name='RequestVerificationToken']",
"input[name='_token']",
"meta[name='_token']",
"meta[name='csrf-token']",
"[name*='RequestVerificationToken']",
"input[type='hidden'][name*='Token']",
"input[type='hidden'][name*='Verification']",
]
for selector in token_selectors:
try:
element = await page.query_selector(selector)
if element:
# Get value from input or content from meta
tag_name = await element.evaluate(
"el => el.tagName.toLowerCase()"
)
if tag_name == "input":
token = await element.get_attribute("value")
else:
token = await element.get_attribute("content")
if token and token.strip():
logger.info(
f"Found verification token using selector: {selector}"
)
return token.strip()
except Exception as e:
logger.debug(f"Error with selector {selector}: {str(e)}")
continue
# Try to extract from page source as fallback
try:
page_content = await page.content()
import re
# Look for token in various patterns
patterns = [
r'<input[^>]*name="__RequestVerificationToken"[^>]*value="([^"]+)"[^>]*>',
r'<input[^>]*value="([^"]+)"[^>]*name="__RequestVerificationToken"[^>]*>',
r'"__RequestVerificationToken"\s*:\s*"([^"]+)"',
r'RequestVerificationToken["\']?\s*[:=]\s*["\']([^"\']+)["\']',
r'getAntiForgeryToken\(\)\s*{\s*return\s*["\']([^"\']+)["\']',
r'antiForgeryToken["\']?\s*[:=]\s*["\']([^"\']+)["\']',
]
for pattern in patterns:
match = re.search(pattern, page_content, re.IGNORECASE)
if match:
token = match.group(1)
logger.info(f"Found verification token using regex pattern")
return token
# Try extracting from JavaScript variables
js_token = await page.evaluate(
"""
() => {
// Try common JavaScript variable names for tokens
const tokenVars = [
'window.__RequestVerificationToken',
'window.requestVerificationToken',
'window.antiForgeryToken',
'window.csrfToken',
'__RequestVerificationToken'
];
for (let varName of tokenVars) {
try {
let value = eval(varName);
if (value && typeof value === 'string' && value.length > 10) {
return value;
}
} catch (e) {
// Variable doesn't exist, continue
}
}
// Try to find token in hidden inputs
const hiddenInputs = document.querySelectorAll('input[type="hidden"]');
for (let input of hiddenInputs) {
if (input.name && input.name.toLowerCase().includes('token') && input.value) {
return input.value;
}
}
return null;
}
"""
)
if js_token:
logger.info(f"Found verification token using JavaScript evaluation")
return js_token
except Exception as e:
logger.debug(f"Error in fallback token extraction: {str(e)}")
logger.warning("No verification token found on page")
return ""
except Exception as e:
logger.error(f"Error extracting verification token: {str(e)}")
return ""
async def _make_secure_request(
self, page: Page, url: str, method: str = "GET", form_data: dict = None
) -> bool:
"""Make a secure request with anti-forgery token"""
try:
if method.upper() == "GET":
await page.goto(url, wait_until="domcontentloaded")
await self._wait_for_page_load()
return True
elif method.upper() == "POST":
# Get verification token before making POST request
verification_token = await self._get_verification_token(page)
if form_data is None:
form_data = {}
# Add verification token to form data
if verification_token:
form_data["__RequestVerificationToken"] = verification_token
# If we have form data, try to find and fill a form
if form_data:
# Look for a form on the page
form = await page.query_selector("form")
if form:
# Fill form fields
for field_name, field_value in form_data.items():
try:
field_selector = f"input[name='{field_name}'], select[name='{field_name}'], textarea[name='{field_name}']"
field_element = await page.query_selector(
field_selector
)
if field_element:
await field_element.fill(str(field_value))
except Exception:
pass
# Submit the form
submit_button = await page.query_selector(
"input[type='submit'], button[type='submit']"
)
if submit_button:
await submit_button.click()
await self._wait_for_page_load()
return True
# Fallback: navigate with query parameters
query_params = "&".join([f"{k}={v}" for k, v in form_data.items()])
full_url = f"{url}?{query_params}"
await page.goto(full_url, wait_until="domcontentloaded")
await self._wait_for_page_load()
return True
except Exception as e:
logger.error(f"Error making secure request: {str(e)}")
return False