import Quartz
import Vision
from Cocoa import NSURL
import sys
import os
import json
import re
import math
from PIL import Image, ImageOps
import numpy as np
def get_image_url(image_path):
return NSURL.fileURLWithPath_(image_path)
def normalize_bbox(bbox):
return {
"x": bbox.origin.x,
"y": bbox.origin.y,
"w": bbox.size.width,
"h": bbox.size.height,
}
def analyze_style_for_blocks(image_path, blocks):
"""
Analyze color style for text blocks to distinguish 'handwriting' (colored) from 'print' (black).
"""
try:
# Load image with Pillow
# ...
if image_path.lower().endswith('.pdf'):
return
img = Image.open(image_path)
img = ImageOps.exif_transpose(img).convert('RGB')
width, height = img.size
for block in blocks:
# Determine block bbox in pixels
# Vision bbox is normalized [0,1], origin bottom-left
# Pillow: origin top-left
# Use the 'bbox' from block which is Union of items
b_x, b_y, b_w, b_h = block['bbox']['x'], block['bbox']['y'], block['bbox']['w'], block['bbox']['h']
# Convert to Pixel Coords
# Vision X is same: x * width
# Vision Y (bottom-left) -> Top-Left for Pillow?
# Vision y is distance from bottom.
# Pillow y_top = height - (v_y + v_h) * height
# But wait, let's look at normalize_bbox: x, y is origin.
# In Vision, origin is bottom-left.
# So y is bottom edge? No, origin is bottom-left corner of the rect.
# Rect extends typically Up and Right.
# actually Vision bbox.origin.y is the coordinate of the bottom edge (0 is bottom of image).
# Vision bbox: x, y, w, h are normalized [0, 1]
# Origin is Bottom-Left.
# Pillow: Origin is Top-Left.
# Vision:
# y is distance from bottom.
# bottom_y_norm = bbox.y
# top_y_norm = bbox.y + bbox.h
# Pillow Y:
# top_y_px = height * (1 - top_y_norm)
# bottom_y_px = height * (1 - bottom_y_norm)
# Code:
# y1 (top) = height * (1 - (b_y + b_h))
# y2 (bottom) = height * (1 - b_y)
x1 = int(b_x * width)
y1 = int(height * (1 - (b_y + b_h)))
x2 = int((b_x + b_w) * width)
y2 = int(height * (1 - b_y))
# Clamp
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(width, x2), min(height, y2)
if x2 <= x1 or y2 <= y1:
block["type"] = "print"
continue
# Crop
patch = img.crop((x1, y1, x2, y2))
# Analyze
# Heuristic: Convert to HSV. High Saturation -> Handwriting/Annotation.
# Low Saturation & Low Value -> Black Print.
arr = np.array(patch)
# Simple check: calculate mean saturation
# RGB -> HSV is expensive? Just use simplistic "Colorfulness"
# formula: std(R), std(G), std(B)? Or |R-G| + |R-B| + |G-B|
# Pixel-wise |R-G| + |R-B| + |G-B|
# We only care about "Dark" pixels (text), ignore White background
# White bg: R,G,B > 200
mask = np.mean(arr, axis=2) < 220 # Text is darker than 220
# Red text: R=255, G=0, B=0 -> Diff = 510
# Black text: R=0,G=0,B=0 -> Diff = 0
# Check if empty (no dark pixels)
if not np.any(mask):
block["type"] = "print"
continue
text_pixels = arr[mask]
# Calculate average deviation from grayscale
# For grayscale, R=G=B. Deviation = |R-G| + |R-B| + |G-B|
r, g, b = text_pixels[:, 0], text_pixels[:, 1], text_pixels[:, 2]
# Mean absolute diff
diff = np.mean(np.abs(r - g) + np.abs(r - b) + np.abs(g - b))
# Threshold: Scanned/Compressed black text is usually < 10
if diff > 10:
block["type"] = "emphasized"
else:
block["type"] = "print"
except Exception as e:
print(f"Warning: Style analysis failed: {e}")
# Default to print
pass
def recognize_text_with_layout(image_path):
all_pages_results = recognize(image_path)
final_layout_data = []
for page_index, results in all_pages_results:
# ... (Extract items) ...
layout_items = []
for observation in results:
candidate = observation.topCandidates_(1)[0]
bbox = observation.boundingBox()
layout_items.append({
"text": candidate.string(),
"confidence": candidate.confidence(),
"bbox": normalize_bbox(bbox)
})
# Cluster
blocks = cluster_into_blocks(layout_items)
sorted_blocks = sort_blocks(blocks)
# New: Analyze Style (Mutates blocks)
# Construct structured info first to get bboxes nicely
# But we need block-level bbox.
# Let's create the final dicts first, then analyze
page_blocks_data = []
for idx, block in enumerate(sorted_blocks):
block_text = construct_block_text(block)
min_x, min_y, max_x, max_y = get_block_bbox(block)
block_data = {
"id": idx + 1,
"page": page_index,
"text": block_text,
"bbox": {
"x": min_x,
"y": min_y,
"w": max_x - min_x,
"h": max_y - min_y
},
"type": "print", # Default
"lines": block
}
page_blocks_data.append(block_data)
# Analyze style for the PAGE items
analyze_style_for_blocks(image_path, page_blocks_data)
final_layout_data.extend(page_blocks_data)
return final_layout_data
# ... (Main) ...
def create_request_handler(image_url):
return Vision.VNImageRequestHandler.alloc().initWithURL_options_(image_url, None)
def create_cgimage_request_handler(cg_image):
return Vision.VNImageRequestHandler.alloc().initWithCGImage_options_(cg_image, None)
def perform_ocr_on_handler(handler, fast=False):
request = Vision.VNRecognizeTextRequest.alloc().init()
if fast:
request.setRecognitionLevel_(Vision.VNRequestTextRecognitionLevelFast)
else:
request.setRecognitionLevel_(Vision.VNRequestTextRecognitionLevelAccurate)
request.setUsesLanguageCorrection_(True)
request.setRecognitionLanguages_(["zh-Hans", "zh-Hant", "en-US"])
success, error = handler.performRequests_error_([request], None)
if not success:
raise RuntimeError(f"OCR failed: {error}")
return request.results()
def get_pdf_page_images(pdf_path):
url = NSURL.fileURLWithPath_(pdf_path)
pdf_doc = Quartz.CGPDFDocumentCreateWithURL(url)
if not pdf_doc:
raise ValueError(f"Could not open PDF file: {pdf_path}")
page_count = Quartz.CGPDFDocumentGetNumberOfPages(pdf_doc)
for page_index in range(1, page_count + 1):
page = Quartz.CGPDFDocumentGetPage(pdf_doc, page_index)
if not page:
continue
rect = Quartz.CGPDFPageGetBoxRect(page, Quartz.kCGPDFMediaBox)
# Handle zero-size pages defensively
width = max(1, int(rect.size.width))
height = max(1, int(rect.size.height))
color_space = Quartz.CGColorSpaceCreateDeviceRGB()
context = Quartz.CGBitmapContextCreate(
None,
width,
height,
8,
width * 4,
color_space,
Quartz.kCGImageAlphaPremultipliedLast
)
if not context:
continue
Quartz.CGContextSetRGBFillColor(context, 1.0, 1.0, 1.0, 1.0)
Quartz.CGContextFillRect(context, rect)
Quartz.CGContextDrawPDFPage(context, page)
cg_image = Quartz.CGBitmapContextCreateImage(context)
yield page_index, cg_image
def recognize(image_path, fast=False):
lower_path = image_path.lower()
results_list = []
if lower_path.endswith('.pdf'):
for page_index, cg_image in get_pdf_page_images(image_path):
handler = create_cgimage_request_handler(cg_image)
results = perform_ocr_on_handler(handler, fast)
results_list.append((page_index, results))
else:
image_url = get_image_url(image_path)
if not image_url:
raise ValueError(f"Invalid image path: {image_path}")
handler = create_request_handler(image_url)
results = perform_ocr_on_handler(handler, fast)
results_list.append((1, results))
return results_list
# ==========================================
# Advanced Layout Analysis: Block Aggregation
# ==========================================
def get_item_vertical_range(item):
# Vision: y=0 is bottom, y=1 is top.
# Item: y is bottom, y+h is top.
# To map to screen coords (0 at top), we do 1 - (y+h) for top, 1 - y for bottom.
# But let's stay in Vision coords for consistency, just remembering:
# Top is Higher Y, Bottom is Lower Y.
bottom = item['bbox']['y']
top = item['bbox']['y'] + item['bbox']['h']
return bottom, top
def is_vertically_connected(item_top, item_bottom, other):
# Check if 'other' is immediately below 'item'
# item is above other
# item.bottom should be close to other.top
other_bottom, other_top = get_item_vertical_range(other)
# Gap calculation
# Since item is above, item.bottom (y) should be >= other.top (y+h)
# gap = item.bottom - other.top
gap = item_bottom - other_top
# Avg height for scale
avg_h = (item_top - item_bottom + other_top - other_bottom) / 2
# Allow reasonable gap for paragraph/cell spacing
# 2.5 * avg_h allows for looser line spacing (titles/headers) while still trying to respect clear visual breaks
if -0.8 * avg_h < gap < 2.5 * avg_h:
return True
return False
def is_horizontally_aligned(item, other):
# Check X overlap
l1, r1 = item['bbox']['x'], item['bbox']['x'] + item['bbox']['w']
l2, r2 = other['bbox']['x'], other['bbox']['x'] + other['bbox']['w']
overlap_w = max(0, min(r1, r2) - max(l1, l2))
min_w = min(item['bbox']['w'], other['bbox']['w'])
# 1. Significant Overlap
if overlap_w > 0.3 * min_w:
return True
# 2. Left Alignment (common for columns)
if abs(l1 - l2) < 0.02: # 2% alignment tolerance
return True
# 3. Center Alignment (common for table headers)
c1 = l1 + item['bbox']['w'] / 2
c2 = l2 + other['bbox']['w'] / 2
if abs(c1 - c2) < 0.02:
return True
return False
def cluster_into_blocks(layout_items):
"""
Cluster items into blocks (connected components).
"""
n = len(layout_items)
parent = list(range(n))
def find(i):
if parent[i] != i:
parent[i] = find(parent[i])
return parent[i]
def union(i, j):
root_i = find(i)
root_j = find(j)
if root_i != root_j:
parent[root_i] = root_j
# Build graph
# Optimizable: sort by Y first to reduce N^2 checks
# But N is usually small (<1000) for OCR, N^2 is fine.
items_with_vert = []
for idx, item in enumerate(layout_items):
b, t = get_item_vertical_range(item)
items_with_vert.append((idx, item, b, t))
for i in range(n):
for j in range(i + 1, n):
idx1, item1, b1, t1 = items_with_vert[i]
idx2, item2, b2, t2 = items_with_vert[j]
# Determine relative position: who is above?
# t1 > t2 usually means 1 is above 2 (if not overlapping excessively)
if t1 > t2:
upper, lower = (idx1, item1, b1, t1), (idx2, item2, b2, t2)
else:
upper, lower = (idx2, item2, b2, t2), (idx1, item1, b1, t1)
if is_vertically_connected(upper[3], upper[2], lower[1]) and \
is_horizontally_aligned(upper[1], lower[1]):
union(idx1, idx2)
# Group by root
clusters = {}
for i in range(n):
root = find(i)
if root not in clusters:
clusters[root] = []
clusters[root].append(layout_items[i])
return list(clusters.values())
def get_block_bbox(block):
min_x = min(i['bbox']['x'] for i in block)
max_x = max(i['bbox']['x'] + i['bbox']['w'] for i in block)
min_y = min(i['bbox']['y'] for i in block) # Bottom
max_y = max(i['bbox']['y'] + i['bbox']['h'] for i in block) # Top
return min_x, min_y, max_x, max_y
def sort_blocks(blocks):
"""
Sort blocks in reading order (Top-Down, then Left-Right).
Strategies: XY-Cut recursive, or simple sorting with bands.
Simple sort: Sort by Top Y (descending).
If Top Y diff is small, sort by Left X.
Top Y is max_y in Vision coords.
"""
# Helper to clean sort
# Vision Y is bottom-left, so Higher Y is Top.
# Convert to Top-Let Origin for easier mental model: top = 1 - max_y
def sort_key(b):
_, _, _, max_y = get_block_bbox(b)
return -max_y # Higher Y (Top) comes first
# Initial sort by Y
blocks.sort(key=sort_key)
# Banding logic
sorted_blocks = []
if not blocks:
return []
current_band = [blocks[0]]
_, _, _, first_top = get_block_bbox(blocks[0])
for i in range(1, len(blocks)):
blk = blocks[i]
_, _, _, top = get_block_bbox(blk)
# If within 2% height tolerance -> same band (row)
if abs(first_top - top) < 0.05: # 5% screen height band? or dynamic?
current_band.append(blk)
else:
# Sort band by X
current_band.sort(key=lambda b: get_block_bbox(b)[0])
sorted_blocks.extend(current_band)
# New band
current_band = [blk]
first_top = top
if current_band:
current_band.sort(key=lambda b: get_block_bbox(b)[0])
sorted_blocks.extend(current_band)
return sorted_blocks
def smart_merge_text(text):
pattern = r'([\u4e00-\u9fa5])[\n\s]+([\u4e00-\u9fa5])'
return re.sub(pattern, r'\1\2', text)
def construct_block_text(block):
# Sort lines within block: Top to Bottom
block.sort(key=lambda i: i['bbox']['y'] + i['bbox']['h'], reverse=True)
lines = [i['text'] for i in block]
raw_text = "\n".join(lines)
return smart_merge_text(raw_text)
def recognize_text(image_path):
all_pages_results = recognize(image_path)
final_text_parts = []
for page_index, results in all_pages_results:
layout_items = []
for observation in results:
candidate = observation.topCandidates_(1)[0]
bbox = observation.boundingBox()
layout_items.append({
"text": candidate.string(),
"bbox": normalize_bbox(bbox)
})
# 1. Cluster Loop
blocks = cluster_into_blocks(layout_items)
# 2. Sort Blocks
sorted_blocks = sort_blocks(blocks)
# 3. Construct text
block_texts = [construct_block_text(b) for b in sorted_blocks]
page_text = "\n\n".join(block_texts)
final_text_parts.append(page_text)
return "\n\n".join(final_text_parts)
def recognize_text_with_layout(image_path):
all_pages_results = recognize(image_path)
final_layout_data = []
for page_index, results in all_pages_results:
# 1. Extract raw items
layout_items = []
for observation in results:
candidate = observation.topCandidates_(1)[0]
bbox = observation.boundingBox()
layout_items.append({
"text": candidate.string(),
"confidence": candidate.confidence(),
"bbox": normalize_bbox(bbox)
})
# 2. Cluster Loop
blocks = cluster_into_blocks(layout_items)
# 3. Sort Blocks
sorted_blocks = sort_blocks(blocks)
# 4. Construct Structured Block Data
page_blocks_data = []
for idx, block in enumerate(sorted_blocks):
block_text = construct_block_text(block)
min_x, min_y, max_x, max_y = get_block_bbox(block)
block_data = {
"id": idx + 1,
"page": page_index,
"text": block_text,
"bbox": {
"x": min_x,
"y": min_y,
"w": max_x - min_x,
"h": max_y - min_y
},
"type": "print", # Default
"lines": block # Include original lines details
}
page_blocks_data.append(block_data)
# 5. Analyze Style (Distinguish handwriting/annotation by color)
analyze_style_for_blocks(image_path, page_blocks_data)
final_layout_data.extend(page_blocks_data)
return final_layout_data
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Usage: python ocr.py <image_or_pdf_path>")
sys.exit(1)
path = sys.argv[1]
if not os.path.exists(path):
print(f"File not found: {path}")
sys.exit(1)
try:
# print("--- Text ---")
# print(recognize_text(path))
print("\n--- Layout (Values for LLM) ---")
print(json.dumps(recognize_text_with_layout(path), indent=2, ensure_ascii=False))
except Exception as e:
print(f"Error: {e}")