"""
Batch Update Example
This example demonstrates efficient bulk operations for updating large datasets
in Lark Base, including validation, transformation, and error handling.
"""
import os
from datetime import datetime, timezone
from typing import List, Dict, Any
from dotenv import load_dotenv
from lark_client import LarkBaseClient, LarkConfig
from data_manager import DataManager
def generate_sample_data(count: int = 100) -> List[Dict[str, Any]]:
"""
Generate sample product data for testing.
Args:
count: Number of sample records to generate
Returns:
List of product records
"""
categories = ['Electronics', 'Clothing', 'Home & Garden', 'Sports', 'Books']
statuses = ['In Stock', 'Low Stock', 'Out of Stock']
products = []
for i in range(1, count + 1):
product = {
"Product ID": f"PRD{i:04d}",
"Product Name": f"Sample Product {i}",
"Category": categories[i % len(categories)],
"Price": round(19.99 + (i * 5.5), 2),
"Stock Quantity": max(0, 100 - (i * 2)),
"Status": statuses[(i * 3) % len(statuses)],
"Rating": round(3.0 + (i % 20) / 10, 1),
"Sales Count": i * 10,
}
products.append(product)
return products
def apply_price_discount(
records: List[Dict[str, Any]],
category: str,
discount_percent: float
) -> List[Dict[str, Any]]:
"""
Apply a price discount to products in a specific category.
Args:
records: List of product records
category: Category to apply discount to
discount_percent: Discount percentage (e.g., 20 for 20% off)
Returns:
List of updated records
"""
updated_records = []
for record in records:
if record.get('Category') == category:
original_price = record['Price']
discounted_price = original_price * (1 - discount_percent / 100)
record['Price'] = round(discounted_price, 2)
record['Discount Applied'] = f"{discount_percent}%"
updated_records.append(record)
return updated_records
def update_stock_status(record: Dict[str, Any]) -> Dict[str, Any]:
"""
Update stock status based on quantity.
Args:
record: Product record
Returns:
Updated record with correct status
"""
quantity = record.get('Stock Quantity', 0)
if quantity == 0:
record['Status'] = 'Out of Stock'
elif quantity < 20:
record['Status'] = 'Low Stock'
else:
record['Status'] = 'In Stock'
return record
def validate_product(value: Any) -> bool:
"""Validate product data."""
if isinstance(value, (int, float)):
return value >= 0
return True
def main():
"""Run batch update example."""
# Load environment variables
load_dotenv()
# Initialize client and data manager
config = LarkConfig(
app_id=os.getenv('LARK_APP_ID'),
app_secret=os.getenv('LARK_APP_SECRET'),
log_level='INFO'
)
client = LarkBaseClient(config, rate_limit=10)
data_manager = DataManager(client)
app_token = os.getenv('LARK_APP_TOKEN')
print("=== Batch Update Example ===\n")
# Step 1: Create Products table
print("Step 1: Creating Products table...")
table_id = client.create_table(
app_token=app_token,
table_name="Products Inventory",
default_view_name="All Products"
)
print(f"Created table: {table_id}\n")
# Step 2: Add fields
print("Step 2: Adding fields...")
fields = [
("Product ID", 1),
("Product Name", 1),
("Category", 3),
("Price", 2),
("Stock Quantity", 2),
("Status", 3),
("Rating", 2),
("Sales Count", 2),
("Discount Applied", 1),
]
for field_name, field_type in fields:
if field_type == 3: # Single Select
property_dict = {"options": [{"name": "Electronics"}, {"name": "Clothing"},
{"name": "Home & Garden"}, {"name": "Sports"},
{"name": "Books"}, {"name": "In Stock"},
{"name": "Low Stock"}, {"name": "Out of Stock"}]}
else:
property_dict = None
client.create_field(
app_token=app_token,
table_id=table_id,
field_name=field_name,
field_type=field_type,
property_dict=property_dict
)
print("Fields created\n")
# Step 3: Generate and load initial data
print("Step 3: Generating and loading initial data...")
initial_data = generate_sample_data(100)
# Validate before loading
validation_result = data_manager.validate_records(
records=initial_data,
required_fields=["Product ID", "Product Name", "Price"],
validators={
"Price": validate_product,
"Stock Quantity": validate_product
}
)
print(f"Validation: {validation_result['valid_count']} valid, "
f"{validation_result['invalid_count']} invalid\n")
# Load valid records
result = data_manager.batch_upsert_records(
app_token=app_token,
table_id=table_id,
records=validation_result['valid_records'],
key_field="Product ID",
batch_size=500
)
print(f"Initial load: Created {result['created']} records\n")
# Step 4: Batch update - Apply discounts
print("Step 4: Applying 20% discount to Electronics category...")
# Get all records
all_records = data_manager.export_to_dict(app_token, table_id)
# Apply discount
discounted = apply_price_discount(all_records, "Electronics", 20.0)
print(f"Applied discount to {len(discounted)} products\n")
# Update records
update_result = data_manager.batch_upsert_records(
app_token=app_token,
table_id=table_id,
records=discounted,
key_field="Product ID"
)
print(f"Updated {update_result['updated']} records with new prices\n")
# Step 5: Batch update - Update stock status
print("Step 5: Updating stock status based on quantity...")
result = data_manager.transform_and_load(
app_token=app_token,
table_id=table_id,
source_records=all_records,
transformer=update_stock_status,
key_field="Product ID",
parallel=True,
max_workers=4
)
print(f"Updated {result['updated']} records with correct stock status\n")
# Step 6: Aggregate data for reporting
print("Step 6: Generating category-level aggregates...")
category_stats = data_manager.aggregate_data(
records=all_records,
group_by="Category",
aggregations={
"Sales Count": sum,
"Price": lambda x: round(sum(x) / len(x), 2), # Average price
"Stock Quantity": sum
}
)
print("\nCategory Statistics:")
print("-" * 60)
for stat in category_stats:
print(f" {stat['Category']}")
print(f" Total Sales: {stat['Sales Count']:,}")
print(f" Avg Price: ${stat['Price']:.2f}")
print(f" Total Stock: {stat['Stock Quantity']:,}")
print()
# Step 7: Verify final state
print("Step 7: Verifying final data...")
final_records = client.get_all_records(app_token, table_id)
print(f"Total records in table: {len(final_records)}\n")
# Show sample of updated records
print("Sample of updated products (first 5 Electronics):")
print("-" * 80)
electronics = [r for r in final_records if r['fields'].get('Category') == 'Electronics'][:5]
for record in electronics:
fields = record['fields']
print(f" {fields['Product Name']}")
print(f" Price: ${fields['Price']:.2f} | Discount: {fields.get('Discount Applied', 'None')}")
print(f" Stock: {fields['Stock Quantity']} | Status: {fields['Status']}")
print(f" Rating: {fields['Rating']} | Sales: {fields['Sales Count']:,}")
print()
# Step 8: Demonstrate field mapping
print("Step 8: Field mapping example...")
# Simulate external data with different field names
external_data = [
{
"sku": "PRD0001",
"title": "Updated Product 1",
"cost": 99.99,
"inventory": 150
}
]
field_mapping = {
"sku": "Product ID",
"title": "Product Name",
"cost": "Price",
"inventory": "Stock Quantity"
}
mapped_data = [
data_manager.map_fields(record, field_mapping)
for record in external_data
]
print(f"Mapped {len(mapped_data)} external records to Lark Base schema")
print(f"Example: {mapped_data[0]}\n")
# Update with mapped data
mapped_result = data_manager.batch_upsert_records(
app_token=app_token,
table_id=table_id,
records=mapped_data,
key_field="Product ID"
)
print(f"Updated {mapped_result['updated']} records from external source\n")
print("=== Batch operations completed successfully! ===")
print(f"\nTable ID: {table_id}")
print(f"Total records: {len(final_records)}")
print("\nKey takeaways:")
print("1. Batch operations can handle 100+ records efficiently")
print("2. Validation ensures data quality before loading")
print("3. Field mapping allows integration with external systems")
print("4. Parallel processing speeds up transformations")
print("5. Aggregations provide quick insights into your data")
if __name__ == '__main__':
main()