Skip to main content
Glama
ocr.py18.4 kB
import Quartz import Vision from Cocoa import NSURL import sys import os import json import re import math from PIL import Image, ImageOps import numpy as np def get_image_url(image_path): return NSURL.fileURLWithPath_(image_path) def normalize_bbox(bbox): return { "x": bbox.origin.x, "y": bbox.origin.y, "w": bbox.size.width, "h": bbox.size.height, } def analyze_style_for_blocks(image_path, blocks): """ Analyze color style for text blocks to distinguish 'handwriting' (colored) from 'print' (black). """ try: # Load image with Pillow # ... if image_path.lower().endswith('.pdf'): return img = Image.open(image_path) img = ImageOps.exif_transpose(img).convert('RGB') width, height = img.size for block in blocks: # Determine block bbox in pixels # Vision bbox is normalized [0,1], origin bottom-left # Pillow: origin top-left # Use the 'bbox' from block which is Union of items b_x, b_y, b_w, b_h = block['bbox']['x'], block['bbox']['y'], block['bbox']['w'], block['bbox']['h'] # Convert to Pixel Coords # Vision X is same: x * width # Vision Y (bottom-left) -> Top-Left for Pillow? # Vision y is distance from bottom. # Pillow y_top = height - (v_y + v_h) * height # But wait, let's look at normalize_bbox: x, y is origin. # In Vision, origin is bottom-left. # So y is bottom edge? No, origin is bottom-left corner of the rect. # Rect extends typically Up and Right. # actually Vision bbox.origin.y is the coordinate of the bottom edge (0 is bottom of image). # Vision bbox: x, y, w, h are normalized [0, 1] # Origin is Bottom-Left. # Pillow: Origin is Top-Left. # Vision: # y is distance from bottom. # bottom_y_norm = bbox.y # top_y_norm = bbox.y + bbox.h # Pillow Y: # top_y_px = height * (1 - top_y_norm) # bottom_y_px = height * (1 - bottom_y_norm) # Code: # y1 (top) = height * (1 - (b_y + b_h)) # y2 (bottom) = height * (1 - b_y) x1 = int(b_x * width) y1 = int(height * (1 - (b_y + b_h))) x2 = int((b_x + b_w) * width) y2 = int(height * (1 - b_y)) # Clamp x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(width, x2), min(height, y2) if x2 <= x1 or y2 <= y1: block["type"] = "print" continue # Crop patch = img.crop((x1, y1, x2, y2)) # Analyze # Heuristic: Convert to HSV. High Saturation -> Handwriting/Annotation. # Low Saturation & Low Value -> Black Print. arr = np.array(patch) # Simple check: calculate mean saturation # RGB -> HSV is expensive? Just use simplistic "Colorfulness" # formula: std(R), std(G), std(B)? Or |R-G| + |R-B| + |G-B| # Pixel-wise |R-G| + |R-B| + |G-B| # We only care about "Dark" pixels (text), ignore White background # White bg: R,G,B > 200 mask = np.mean(arr, axis=2) < 220 # Text is darker than 220 # Red text: R=255, G=0, B=0 -> Diff = 510 # Black text: R=0,G=0,B=0 -> Diff = 0 # Check if empty (no dark pixels) if not np.any(mask): block["type"] = "print" continue text_pixels = arr[mask] # Calculate average deviation from grayscale # For grayscale, R=G=B. Deviation = |R-G| + |R-B| + |G-B| r, g, b = text_pixels[:, 0], text_pixels[:, 1], text_pixels[:, 2] # Mean absolute diff diff = np.mean(np.abs(r - g) + np.abs(r - b) + np.abs(g - b)) # Threshold: Scanned/Compressed black text is usually < 10 if diff > 10: block["type"] = "emphasized" else: block["type"] = "print" except Exception as e: print(f"Warning: Style analysis failed: {e}") # Default to print pass def recognize_text_with_layout(image_path): all_pages_results = recognize(image_path) final_layout_data = [] for page_index, results in all_pages_results: # ... (Extract items) ... layout_items = [] for observation in results: candidate = observation.topCandidates_(1)[0] bbox = observation.boundingBox() layout_items.append({ "text": candidate.string(), "confidence": candidate.confidence(), "bbox": normalize_bbox(bbox) }) # Cluster blocks = cluster_into_blocks(layout_items) sorted_blocks = sort_blocks(blocks) # New: Analyze Style (Mutates blocks) # Construct structured info first to get bboxes nicely # But we need block-level bbox. # Let's create the final dicts first, then analyze page_blocks_data = [] for idx, block in enumerate(sorted_blocks): block_text = construct_block_text(block) min_x, min_y, max_x, max_y = get_block_bbox(block) block_data = { "id": idx + 1, "page": page_index, "text": block_text, "bbox": { "x": min_x, "y": min_y, "w": max_x - min_x, "h": max_y - min_y }, "type": "print", # Default "lines": block } page_blocks_data.append(block_data) # Analyze style for the PAGE items analyze_style_for_blocks(image_path, page_blocks_data) final_layout_data.extend(page_blocks_data) return final_layout_data # ... (Main) ... def create_request_handler(image_url): return Vision.VNImageRequestHandler.alloc().initWithURL_options_(image_url, None) def create_cgimage_request_handler(cg_image): return Vision.VNImageRequestHandler.alloc().initWithCGImage_options_(cg_image, None) def perform_ocr_on_handler(handler, fast=False): request = Vision.VNRecognizeTextRequest.alloc().init() if fast: request.setRecognitionLevel_(Vision.VNRequestTextRecognitionLevelFast) else: request.setRecognitionLevel_(Vision.VNRequestTextRecognitionLevelAccurate) request.setUsesLanguageCorrection_(True) request.setRecognitionLanguages_(["zh-Hans", "zh-Hant", "en-US"]) success, error = handler.performRequests_error_([request], None) if not success: raise RuntimeError(f"OCR failed: {error}") return request.results() def get_pdf_page_images(pdf_path): url = NSURL.fileURLWithPath_(pdf_path) pdf_doc = Quartz.CGPDFDocumentCreateWithURL(url) if not pdf_doc: raise ValueError(f"Could not open PDF file: {pdf_path}") page_count = Quartz.CGPDFDocumentGetNumberOfPages(pdf_doc) for page_index in range(1, page_count + 1): page = Quartz.CGPDFDocumentGetPage(pdf_doc, page_index) if not page: continue rect = Quartz.CGPDFPageGetBoxRect(page, Quartz.kCGPDFMediaBox) # Handle zero-size pages defensively width = max(1, int(rect.size.width)) height = max(1, int(rect.size.height)) color_space = Quartz.CGColorSpaceCreateDeviceRGB() context = Quartz.CGBitmapContextCreate( None, width, height, 8, width * 4, color_space, Quartz.kCGImageAlphaPremultipliedLast ) if not context: continue Quartz.CGContextSetRGBFillColor(context, 1.0, 1.0, 1.0, 1.0) Quartz.CGContextFillRect(context, rect) Quartz.CGContextDrawPDFPage(context, page) cg_image = Quartz.CGBitmapContextCreateImage(context) yield page_index, cg_image def recognize(image_path, fast=False): lower_path = image_path.lower() results_list = [] if lower_path.endswith('.pdf'): for page_index, cg_image in get_pdf_page_images(image_path): handler = create_cgimage_request_handler(cg_image) results = perform_ocr_on_handler(handler, fast) results_list.append((page_index, results)) else: image_url = get_image_url(image_path) if not image_url: raise ValueError(f"Invalid image path: {image_path}") handler = create_request_handler(image_url) results = perform_ocr_on_handler(handler, fast) results_list.append((1, results)) return results_list # ========================================== # Advanced Layout Analysis: Block Aggregation # ========================================== def get_item_vertical_range(item): # Vision: y=0 is bottom, y=1 is top. # Item: y is bottom, y+h is top. # To map to screen coords (0 at top), we do 1 - (y+h) for top, 1 - y for bottom. # But let's stay in Vision coords for consistency, just remembering: # Top is Higher Y, Bottom is Lower Y. bottom = item['bbox']['y'] top = item['bbox']['y'] + item['bbox']['h'] return bottom, top def is_vertically_connected(item_top, item_bottom, other): # Check if 'other' is immediately below 'item' # item is above other # item.bottom should be close to other.top other_bottom, other_top = get_item_vertical_range(other) # Gap calculation # Since item is above, item.bottom (y) should be >= other.top (y+h) # gap = item.bottom - other.top gap = item_bottom - other_top # Avg height for scale avg_h = (item_top - item_bottom + other_top - other_bottom) / 2 # Allow reasonable gap for paragraph/cell spacing # 2.5 * avg_h allows for looser line spacing (titles/headers) while still trying to respect clear visual breaks if -0.8 * avg_h < gap < 2.5 * avg_h: return True return False def is_horizontally_aligned(item, other): # Check X overlap l1, r1 = item['bbox']['x'], item['bbox']['x'] + item['bbox']['w'] l2, r2 = other['bbox']['x'], other['bbox']['x'] + other['bbox']['w'] overlap_w = max(0, min(r1, r2) - max(l1, l2)) min_w = min(item['bbox']['w'], other['bbox']['w']) # 1. Significant Overlap if overlap_w > 0.3 * min_w: return True # 2. Left Alignment (common for columns) if abs(l1 - l2) < 0.02: # 2% alignment tolerance return True # 3. Center Alignment (common for table headers) c1 = l1 + item['bbox']['w'] / 2 c2 = l2 + other['bbox']['w'] / 2 if abs(c1 - c2) < 0.02: return True return False def cluster_into_blocks(layout_items): """ Cluster items into blocks (connected components). """ n = len(layout_items) parent = list(range(n)) def find(i): if parent[i] != i: parent[i] = find(parent[i]) return parent[i] def union(i, j): root_i = find(i) root_j = find(j) if root_i != root_j: parent[root_i] = root_j # Build graph # Optimizable: sort by Y first to reduce N^2 checks # But N is usually small (<1000) for OCR, N^2 is fine. items_with_vert = [] for idx, item in enumerate(layout_items): b, t = get_item_vertical_range(item) items_with_vert.append((idx, item, b, t)) for i in range(n): for j in range(i + 1, n): idx1, item1, b1, t1 = items_with_vert[i] idx2, item2, b2, t2 = items_with_vert[j] # Determine relative position: who is above? # t1 > t2 usually means 1 is above 2 (if not overlapping excessively) if t1 > t2: upper, lower = (idx1, item1, b1, t1), (idx2, item2, b2, t2) else: upper, lower = (idx2, item2, b2, t2), (idx1, item1, b1, t1) if is_vertically_connected(upper[3], upper[2], lower[1]) and \ is_horizontally_aligned(upper[1], lower[1]): union(idx1, idx2) # Group by root clusters = {} for i in range(n): root = find(i) if root not in clusters: clusters[root] = [] clusters[root].append(layout_items[i]) return list(clusters.values()) def get_block_bbox(block): min_x = min(i['bbox']['x'] for i in block) max_x = max(i['bbox']['x'] + i['bbox']['w'] for i in block) min_y = min(i['bbox']['y'] for i in block) # Bottom max_y = max(i['bbox']['y'] + i['bbox']['h'] for i in block) # Top return min_x, min_y, max_x, max_y def sort_blocks(blocks): """ Sort blocks in reading order (Top-Down, then Left-Right). Strategies: XY-Cut recursive, or simple sorting with bands. Simple sort: Sort by Top Y (descending). If Top Y diff is small, sort by Left X. Top Y is max_y in Vision coords. """ # Helper to clean sort # Vision Y is bottom-left, so Higher Y is Top. # Convert to Top-Let Origin for easier mental model: top = 1 - max_y def sort_key(b): _, _, _, max_y = get_block_bbox(b) return -max_y # Higher Y (Top) comes first # Initial sort by Y blocks.sort(key=sort_key) # Banding logic sorted_blocks = [] if not blocks: return [] current_band = [blocks[0]] _, _, _, first_top = get_block_bbox(blocks[0]) for i in range(1, len(blocks)): blk = blocks[i] _, _, _, top = get_block_bbox(blk) # If within 2% height tolerance -> same band (row) if abs(first_top - top) < 0.05: # 5% screen height band? or dynamic? current_band.append(blk) else: # Sort band by X current_band.sort(key=lambda b: get_block_bbox(b)[0]) sorted_blocks.extend(current_band) # New band current_band = [blk] first_top = top if current_band: current_band.sort(key=lambda b: get_block_bbox(b)[0]) sorted_blocks.extend(current_band) return sorted_blocks def smart_merge_text(text): pattern = r'([\u4e00-\u9fa5])[\n\s]+([\u4e00-\u9fa5])' return re.sub(pattern, r'\1\2', text) def construct_block_text(block): # Sort lines within block: Top to Bottom block.sort(key=lambda i: i['bbox']['y'] + i['bbox']['h'], reverse=True) lines = [i['text'] for i in block] raw_text = "\n".join(lines) return smart_merge_text(raw_text) def recognize_text(image_path): all_pages_results = recognize(image_path) final_text_parts = [] for page_index, results in all_pages_results: layout_items = [] for observation in results: candidate = observation.topCandidates_(1)[0] bbox = observation.boundingBox() layout_items.append({ "text": candidate.string(), "bbox": normalize_bbox(bbox) }) # 1. Cluster Loop blocks = cluster_into_blocks(layout_items) # 2. Sort Blocks sorted_blocks = sort_blocks(blocks) # 3. Construct text block_texts = [construct_block_text(b) for b in sorted_blocks] page_text = "\n\n".join(block_texts) final_text_parts.append(page_text) return "\n\n".join(final_text_parts) def recognize_text_with_layout(image_path): all_pages_results = recognize(image_path) final_layout_data = [] for page_index, results in all_pages_results: # 1. Extract raw items layout_items = [] for observation in results: candidate = observation.topCandidates_(1)[0] bbox = observation.boundingBox() layout_items.append({ "text": candidate.string(), "confidence": candidate.confidence(), "bbox": normalize_bbox(bbox) }) # 2. Cluster Loop blocks = cluster_into_blocks(layout_items) # 3. Sort Blocks sorted_blocks = sort_blocks(blocks) # 4. Construct Structured Block Data page_blocks_data = [] for idx, block in enumerate(sorted_blocks): block_text = construct_block_text(block) min_x, min_y, max_x, max_y = get_block_bbox(block) block_data = { "id": idx + 1, "page": page_index, "text": block_text, "bbox": { "x": min_x, "y": min_y, "w": max_x - min_x, "h": max_y - min_y }, "type": "print", # Default "lines": block # Include original lines details } page_blocks_data.append(block_data) # 5. Analyze Style (Distinguish handwriting/annotation by color) analyze_style_for_blocks(image_path, page_blocks_data) final_layout_data.extend(page_blocks_data) return final_layout_data if __name__ == "__main__": if len(sys.argv) < 2: print("Usage: python ocr.py <image_or_pdf_path>") sys.exit(1) path = sys.argv[1] if not os.path.exists(path): print(f"File not found: {path}") sys.exit(1) try: # print("--- Text ---") # print(recognize_text(path)) print("\n--- Layout (Values for LLM) ---") print(json.dumps(recognize_text_with_layout(path), indent=2, ensure_ascii=False)) except Exception as e: print(f"Error: {e}")

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/wenjiazhu/macos-ocr-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server