"""
CAPTCHA Solver for PAF-IAST LMS
Handles text-based CAPTCHA challenges using multiple methods:
1. Google Gemini Vision API (most accurate)
2. Custom PAF solver with preprocessing
3. Traditional OCR with Tesseract
"""
import cv2
import numpy as np
import pytesseract
from PIL import Image, ImageEnhance, ImageFilter
import logging
from typing import Optional, List
import re
import os
import asyncio
import base64
import requests
import json
import tempfile
logger = logging.getLogger(__name__)
class CaptchaSolver:
"""Enhanced CAPTCHA solving utilities using multiple methods"""
def __init__(self):
# Configure tesseract (you may need to set the path)
# pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
# Initialize Gemini Vision API from environment variables
self.gemini_api_key = os.getenv("GEMINI_API_KEY")
self.gemini_base_url = "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent"
try:
if self.gemini_api_key:
self.vision_available = True
logger.info("✅ Gemini Vision CAPTCHA solver initialized")
else:
self.vision_available = False
logger.warning("⚠️ No Gemini API key found, Vision solver disabled")
except Exception as e:
logger.warning(f"⚠️ Could not initialize Vision solver: {e}")
self.vision_available = False
async def solve_text_captcha(
self, image_path: str, preprocess: bool = True
) -> Optional[str]:
"""
Solve text-based CAPTCHA using the best available method
Args:
image_path: Path to CAPTCHA image
preprocess: Whether to preprocess image for better OCR
Returns:
Extracted text or None if failed
"""
try:
logger.info(f"🎯 Solving CAPTCHA: {image_path}")
# Step 1: Preprocess the image if requested
if preprocess:
processed_image_path = self._preprocess_captcha_image(image_path)
else:
processed_image_path = image_path
# Method 1: Try Gemini Vision with preprocessed image (most accurate)
if self.vision_available:
logger.info("🤖 Using Gemini Vision API with preprocessed image...")
result = await self._solve_with_gemini_vision(processed_image_path)
if result and len(result) >= 4 and result.replace(" ", "").isalnum():
cleaned_result = result.replace(" ", "").upper()
logger.info(f"✅ Gemini Vision solver success: {cleaned_result}")
return cleaned_result
else:
logger.warning(
"❌ Gemini Vision solver failed or returned invalid result"
)
# Try with original image if preprocessing failed
if processed_image_path != image_path:
logger.info("� Retrying Gemini Vision with original image...")
result = await self._solve_with_gemini_vision(image_path)
if (
result
and len(result) >= 4
and result.replace(" ", "").isalnum()
):
cleaned_result = result.replace(" ", "").upper()
logger.info(
f"✅ Gemini Vision (original) success: {cleaned_result}"
)
return cleaned_result
# Method 2: Fallback to traditional OCR with preprocessed image
logger.info("🔧 Falling back to traditional OCR...")
result = await self._solve_with_ocr(processed_image_path, preprocess)
if result:
logger.info(f"✅ OCR fallback success: {result}")
return result.upper()
# Method 3: Last resort - try advanced segmentation
logger.info("🔬 Trying advanced character segmentation...")
result = await self._solve_with_segmentation(image_path)
if result:
logger.info(f"✅ Segmentation success: {result}")
return result.upper()
logger.error("❌ All CAPTCHA solving methods failed")
return None
except Exception as e:
logger.error(f"❌ CAPTCHA solving error: {str(e)}")
return None
async def _solve_with_ocr(
self, image_path: str, preprocess: bool = True
) -> Optional[str]:
"""
Fallback CAPTCHA solving using traditional OCR (Tesseract)
Args:
image_path: Path to CAPTCHA image
preprocess: Whether to preprocess image for better OCR
Returns:
Extracted text or None if failed
"""
try:
# Load image
image = Image.open(image_path)
if preprocess:
image = self._preprocess_image(image)
# Extract text using Tesseract
# Configure for CAPTCHA text (usually single line, alphanumeric)
custom_config = r"--oem 3 --psm 8 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
text = pytesseract.image_to_string(image, config=custom_config)
# Clean extracted text
cleaned_text = self._clean_text(text)
logger.info(f"OCR CAPTCHA result: {cleaned_text}")
return cleaned_text
except Exception as e:
logger.error(f"OCR CAPTCHA solving error: {str(e)}")
return None
def _preprocess_image(self, image: Image.Image) -> Image.Image:
"""
Preprocess CAPTCHA image for better OCR accuracy
Args:
image: PIL Image object
Returns:
Preprocessed PIL Image
"""
try:
# Convert to RGB if necessary
if image.mode != "RGB":
image = image.convert("RGB")
# Resize image (make it larger for better OCR)
width, height = image.size
image = image.resize((width * 3, height * 3), Image.LANCZOS)
# Convert to grayscale
image = image.convert("L")
# Enhance contrast
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(2.0)
# Apply threshold to make it binary
threshold = 127
image = image.point(lambda x: 0 if x < threshold else 255, "1")
# Convert back to grayscale for morphological operations
image = image.convert("L")
# Convert to OpenCV format for advanced processing
cv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# Apply morphological operations to clean up
kernel = np.ones((2, 2), np.uint8)
cv_image = cv2.morphologyEx(cv_image, cv2.MORPH_CLOSE, kernel)
cv_image = cv2.morphologyEx(cv_image, cv2.MORPH_OPEN, kernel)
# Remove noise
cv_image = cv2.medianBlur(cv_image, 3)
# Convert back to PIL
image = Image.fromarray(cv2.cvtColor(cv_image, cv2.COLOR_BGR2RGB))
return image
except Exception as e:
logger.error(f"Image preprocessing error: {str(e)}")
return image
def _clean_text(self, text: str) -> str:
"""
Clean extracted CAPTCHA text
Args:
text: Raw OCR output
Returns:
Cleaned text
"""
if not text:
return ""
# Remove whitespace and newlines
text = text.strip().replace("\n", "").replace(" ", "")
# Remove non-alphanumeric characters
text = re.sub(r"[^A-Za-z0-9]", "", text)
# Convert to uppercase (many CAPTCHAs are case-insensitive)
text = text.upper()
# Remove common OCR mistakes
replacements = {
"O": "0", # Letter O to number 0
"I": "1", # Letter I to number 1
"S": "5", # Letter S to number 5
"G": "6", # Letter G to number 6
"B": "8", # Letter B to number 8
}
# Apply replacements only if the text looks like it should be numeric
if text.isalnum() and len(text) >= 4:
for old, new in replacements.items():
text = text.replace(old, new)
return text
async def solve_math_captcha(self, image_path: str) -> Optional[str]:
"""
Solve mathematical CAPTCHA (e.g., "2 + 3 = ?")
Args:
image_path: Path to CAPTCHA image
Returns:
Calculated result as string
"""
try:
# Extract text first
image = Image.open(image_path)
image = self._preprocess_image(image)
# More permissive config for math expressions
custom_config = (
r"--oem 3 --psm 8 -c tessedit_char_whitelist=0123456789+-*/=?x "
)
text = pytesseract.image_to_string(image, config=custom_config)
text = text.strip()
logger.info(f"Math CAPTCHA text: {text}")
# Extract mathematical expression
# Look for patterns like "2 + 3 = ?" or "2+3="
math_pattern = r"(\d+)\s*([+\-*/])\s*(\d+)\s*=?\s*\??"
match = re.search(math_pattern, text)
if match:
num1 = int(match.group(1))
operator = match.group(2)
num2 = int(match.group(3))
if operator == "+":
result = num1 + num2
elif operator == "-":
result = num1 - num2
elif operator == "*" or operator == "x":
result = num1 * num2
elif operator == "/":
result = num1 // num2 # Integer division
else:
return None
logger.info(f"Math CAPTCHA solved: {num1} {operator} {num2} = {result}")
return str(result)
return None
except Exception as e:
logger.error(f"Math CAPTCHA solving error: {str(e)}")
return None
async def solve_with_multiple_attempts(
self, image_path: str, max_attempts: int = 3
) -> Optional[str]:
"""
Try solving CAPTCHA with multiple preprocessing approaches
Args:
image_path: Path to CAPTCHA image
max_attempts: Maximum number of attempts
Returns:
Best guess for CAPTCHA text
"""
attempts = []
try:
# Attempt 1: Standard preprocessing
result1 = await self.solve_text_captcha(image_path, preprocess=True)
if result1:
attempts.append(result1)
# Attempt 2: No preprocessing
result2 = await self.solve_text_captcha(image_path, preprocess=False)
if result2:
attempts.append(result2)
# Attempt 3: Try as math CAPTCHA
result3 = await self.solve_math_captcha(image_path)
if result3:
attempts.append(result3)
# Return the most common result or the first valid one
if attempts:
# If we have multiple results, return the most common
from collections import Counter
counter = Counter(attempts)
most_common = counter.most_common(1)[0][0]
return most_common
return None
except Exception as e:
logger.error(f"Multiple attempts CAPTCHA solving error: {str(e)}")
return None
def validate_captcha_text(self, text: str, expected_length: int = None) -> bool:
"""
Validate extracted CAPTCHA text
Args:
text: Extracted text
expected_length: Expected length of CAPTCHA
Returns:
True if text appears valid
"""
if not text:
return False
# Check if text is alphanumeric
if not text.isalnum():
return False
# Check length if specified
if expected_length and len(text) != expected_length:
return False
# Check if text is reasonable (not too short/long)
if len(text) < 3 or len(text) > 10:
return False
return True
def _encode_image(self, image_path: str) -> str:
"""
Encode image to base64 string for Gemini API
Args:
image_path: Path to the image file
Returns:
Base64 encoded image string
"""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode("utf-8")
def _preprocess_image_for_vision(self, image_path: str) -> str:
"""
Preprocess CAPTCHA image for Gemini Vision API
Args:
image_path: Path to the original image
Returns:
Path to the preprocessed image
"""
try:
logger.info(f"🔧 Preprocessing image for Vision API: {image_path}")
# Open the image
image = Image.open(image_path)
# Convert to RGB if necessary (handles GIF and other formats)
if image.mode != "RGB":
image = image.convert("RGB")
# Resize if too small (improve quality for small CAPTCHAs)
width, height = image.size
if width < 200 or height < 80:
scale_factor = max(200 / width, 80 / height)
new_width = int(width * scale_factor)
new_height = int(height * scale_factor)
image = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
logger.info(
f"📏 Resized image from {width}x{height} to {new_width}x{new_height}"
)
# Convert to numpy array for OpenCV processing
img_array = np.array(image)
# Convert to grayscale
if len(img_array.shape) == 3:
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
else:
gray = img_array
# Apply Gaussian blur to reduce noise
blurred = cv2.GaussianBlur(gray, (3, 3), 0)
# Apply adaptive thresholding to create binary image
binary = cv2.adaptiveThreshold(
blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2
)
# Remove small noise using morphological operations
kernel = np.ones((2, 2), np.uint8)
cleaned = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
cleaned = cv2.morphologyEx(cleaned, cv2.MORPH_OPEN, kernel)
# Convert back to PIL Image
processed_image = Image.fromarray(cleaned)
# Enhance contrast
enhancer = ImageEnhance.Contrast(processed_image)
processed_image = enhancer.enhance(1.5)
# Enhance sharpness
enhancer = ImageEnhance.Sharpness(processed_image)
processed_image = enhancer.enhance(2.0)
# Save the processed image to a temporary file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
temp_path = temp_file.name
temp_file.close()
processed_image.save(temp_path, "PNG")
logger.info(f"✅ Preprocessed image saved to: {temp_path}")
return temp_path
except Exception as e:
logger.error(f"❌ Error preprocessing image: {str(e)}")
logger.info("🔄 Falling back to original image")
return image_path
async def _solve_with_gemini_vision(self, image_path: str) -> Optional[str]:
"""
Solve CAPTCHA using Google Gemini Vision API
Args:
image_path: Path to the CAPTCHA image
Returns:
4-letter CAPTCHA solution or None if failed
"""
preprocessed_path = None
try:
logger.info(f"🤖 Using Google Gemini Vision to solve CAPTCHA: {image_path}")
if not os.path.exists(image_path):
logger.error(f"❌ Image file not found: {image_path}")
return None
# Preprocess the image for better recognition
preprocessed_path = self._preprocess_image_for_vision(image_path)
# Encode the preprocessed image
base64_image = self._encode_image(preprocessed_path)
# Prepare the API request for Gemini
payload = {
"contents": [
{
"parts": [
{
"text": """You are an expert at reading CAPTCHAs. Please look at this CAPTCHA image and extract the text.
IMPORTANT REQUIREMENTS:
- The CAPTCHA contains exactly 4 letters (A-Z)
- The letters are case-insensitive (treat them all as uppercase)
- Only return the 4 letters, nothing else
- If you're not completely sure about a letter, make your best guess
- The text might be slightly distorted or have noise
Please return ONLY the 4-letter answer in uppercase, like: ABCD"""
},
{
"inline_data": {
"mime_type": "image/png",
"data": base64_image,
}
},
]
}
],
"generationConfig": {
"temperature": 0.1, # Low temperature for consistent results
"maxOutputTokens": 50, # Short response
"topP": 0.8,
"topK": 10,
},
}
# Make the API request
url = f"{self.gemini_base_url}?key={self.gemini_api_key}"
headers = {"Content-Type": "application/json"}
response = requests.post(
url,
headers=headers,
json=payload,
timeout=30,
)
if response.status_code == 200:
result = response.json()
# Extract the response text from Gemini's response format
if "candidates" in result and len(result["candidates"]) > 0:
candidate = result["candidates"][0]
if "content" in candidate and "parts" in candidate["content"]:
content = (
candidate["content"]["parts"][0]["text"].strip().upper()
)
# Extract only the letters (remove any extra text)
letters_only = "".join(c for c in content if c.isalpha())
if len(letters_only) >= 4:
captcha_solution = letters_only[:4]
logger.info(
f"🎉 Gemini Vision solution: {captcha_solution}"
)
return captcha_solution
elif len(letters_only) > 0:
# Pad with likely characters if too short
while len(letters_only) < 4:
letters_only += "A"
logger.info(
f"🎉 Gemini Vision solution (padded): {letters_only}"
)
return letters_only
else:
logger.warning(
"❌ Gemini Vision could not extract valid letters"
)
return None
else:
logger.warning("❌ Gemini Vision response format unexpected")
return None
else:
logger.error(
f"❌ Gemini API request failed: {response.status_code} - {response.text}"
)
return None
except Exception as e:
logger.error(f"❌ Error solving CAPTCHA with Gemini Vision: {str(e)}")
return None
finally:
# Clean up temporary preprocessed image file
if preprocessed_path and preprocessed_path != image_path:
try:
os.unlink(preprocessed_path)
logger.info(f"🧹 Cleaned up temporary file: {preprocessed_path}")
except Exception as e:
logger.warning(f"⚠️ Could not cleanup temporary file: {e}")
def _preprocess_captcha_image(self, image_path: str) -> str:
"""
Preprocess CAPTCHA image using OpenCV for better recognition
Args:
image_path: Path to the original CAPTCHA image
Returns:
Path to the preprocessed image
"""
try:
# Load image
image = cv2.imread(image_path)
if image is None:
logger.error(f"Could not load image: {image_path}")
return image_path
logger.info("🔧 Starting image preprocessing...")
# Method 1: Try color-based segmentation first (for colored backgrounds)
processed_image = self._preprocess_color_segmentation(image)
# If color segmentation didn't work well, try grayscale method
if processed_image is None:
processed_image = self._preprocess_grayscale_method(image)
# Save preprocessed image
preprocessed_path = image_path.replace(".png", "_preprocessed.png")
cv2.imwrite(preprocessed_path, processed_image)
logger.info(f"✅ Preprocessing complete: {preprocessed_path}")
return preprocessed_path
except Exception as e:
logger.error(f"❌ Preprocessing failed: {str(e)}")
return image_path
def _preprocess_color_segmentation(self, image) -> Optional[np.ndarray]:
"""
Method 2: Hybrid preprocessing with color segmentation
"""
try:
# Convert BGR to HSV for better color filtering
hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
# Define color ranges for text (usually dark colors)
# Adjust these ranges based on your CAPTCHA colors
lower_dark = np.array([0, 0, 0]) # Lower bound for dark colors
upper_dark = np.array([180, 255, 80]) # Upper bound for dark colors
# Create mask for dark text
mask = cv2.inRange(hsv, lower_dark, upper_dark)
# Apply median blur to remove noise
mask = cv2.medianBlur(mask, 3)
# Morphological operations to clean up
kernel = np.ones((2, 2), np.uint8)
mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
# Convert back to 3-channel for consistency
result = cv2.cvtColor(mask, cv2.COLOR_GRAY2BGR)
# Check if we got meaningful content (not mostly empty)
white_pixels = np.sum(mask == 255)
total_pixels = mask.shape[0] * mask.shape[1]
if white_pixels / total_pixels > 0.1: # At least 10% content
logger.info("✅ Color segmentation successful")
return result
else:
logger.info("⚠️ Color segmentation didn't find enough content")
return None
except Exception as e:
logger.error(f"❌ Color segmentation failed: {str(e)}")
return None
def _preprocess_grayscale_method(self, image) -> np.ndarray:
"""
Method 2: Grayscale preprocessing with adaptive thresholding
"""
try:
# Convert to grayscale
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
# Apply median blur to reduce noise
gray = cv2.medianBlur(gray, 3)
# Apply adaptive threshold to handle varying lighting
binary = cv2.adaptiveThreshold(
gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY_INV, 11, 2
)
# Alternative: Use Otsu's thresholding
_, binary_otsu = cv2.threshold(
gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU
)
# Choose the better result (more content usually means better)
content_adaptive = np.sum(binary == 255)
content_otsu = np.sum(binary_otsu == 255)
if (
content_adaptive > content_otsu * 0.5
and content_adaptive < content_otsu * 2
):
binary = binary # Use adaptive
logger.info("✅ Using adaptive threshold")
else:
binary = binary_otsu # Use Otsu
logger.info("✅ Using Otsu threshold")
# Morphological operations to clean up
kernel = np.ones((2, 2), np.uint8)
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel)
binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
# Convert back to 3-channel
result = cv2.cvtColor(binary, cv2.COLOR_GRAY2BGR)
return result
except Exception as e:
logger.error(f"❌ Grayscale preprocessing failed: {str(e)}")
# Return original image as fallback
return image
def _preprocess_advanced_segmentation(self, image) -> List[np.ndarray]:
"""
Method 3: Advanced segmentation - separate individual characters
"""
try:
# First apply basic preprocessing
processed = self._preprocess_grayscale_method(image)
gray = cv2.cvtColor(processed, cv2.COLOR_BGR2GRAY)
# Find contours (character shapes)
contours, _ = cv2.findContours(
gray, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
# Filter contours by size (remove noise)
min_area = 100 # Minimum area for a character
max_area = 2000 # Maximum area for a character
valid_contours = []
for contour in contours:
area = cv2.contourArea(contour)
if min_area < area < max_area:
valid_contours.append(contour)
# Sort contours left to right
valid_contours = sorted(
valid_contours, key=lambda c: cv2.boundingRect(c)[0]
)
# Extract individual characters
characters = []
for contour in valid_contours:
x, y, w, h = cv2.boundingRect(contour)
# Add some padding
padding = 5
x = max(0, x - padding)
y = max(0, y - padding)
w = min(image.shape[1] - x, w + 2 * padding)
h = min(image.shape[0] - y, h + 2 * padding)
character = image[y : y + h, x : x + w]
characters.append(character)
logger.info(f"✅ Segmented {len(characters)} characters")
return characters
except Exception as e:
logger.error(f"❌ Advanced segmentation failed: {str(e)}")
return [image] # Return original as fallback
async def _solve_with_segmentation(self, image_path: str) -> Optional[str]:
"""
Solve CAPTCHA using advanced character segmentation
"""
try:
# Load and segment the image
image = cv2.imread(image_path)
if image is None:
return None
characters = self._preprocess_advanced_segmentation(image)
if not characters:
return None
# Process each character individually with Gemini
if self.vision_available:
logger.info(
f"🔬 Processing {len(characters)} segmented characters with Gemini..."
)
result_chars = []
for i, char_img in enumerate(characters):
# Save character image temporarily
char_path = image_path.replace(".png", f"_char_{i}.png")
cv2.imwrite(char_path, char_img)
# Solve individual character
char_result = await self._solve_with_gemini_vision(char_path)
if char_result and len(char_result) == 1 and char_result.isalpha():
result_chars.append(char_result.upper())
# Clean up temp file
if os.path.exists(char_path):
os.remove(char_path)
if len(result_chars) >= 4:
final_result = "".join(result_chars[:4])
logger.info(f"✅ Segmentation result: {final_result}")
return final_result
return None
except Exception as e:
logger.error(f"❌ Segmentation method failed: {str(e)}")
return None