"""
Data Manager for Lark Base
High-level data operations including batch processing, data synchronization,
and transformation utilities.
"""
import logging
from typing import Any, Dict, List, Optional, Callable
from datetime import datetime, timezone
from concurrent.futures import ThreadPoolExecutor, as_completed
from lark_client import LarkBaseClient, LarkConfig
logger = logging.getLogger(__name__)
class DataManager:
"""
High-level data manager for Lark Base operations.
Provides utilities for batch operations, data synchronization,
and transformation operations.
"""
def __init__(self, client: LarkBaseClient):
"""
Initialize data manager.
Args:
client: LarkBaseClient instance
"""
self.client = client
logger.info("Initialized DataManager")
# Batch Operations
def batch_upsert_records(
self,
app_token: str,
table_id: str,
records: List[Dict[str, Any]],
key_field: str,
batch_size: int = 500
) -> Dict[str, int]:
"""
Upsert (update or insert) records based on a key field.
Args:
app_token: Base (app) token
table_id: Table ID
records: List of record dictionaries
key_field: Field name to use as unique identifier
batch_size: Number of records per batch
Returns:
Dictionary with counts of created and updated records
"""
logger.info(f"Starting upsert of {len(records)} records")
# Get existing records
existing_records = self.client.get_all_records(
app_token=app_token,
table_id=table_id,
field_names=[key_field]
)
# Build lookup map
existing_map = {}
for record in existing_records:
key_value = record['fields'].get(key_field)
if key_value:
existing_map[key_value] = record['record_id']
# Separate into creates and updates
to_create = []
to_update = []
for record in records:
key_value = record.get(key_field)
if not key_value:
logger.warning(f"Record missing key field '{key_field}', skipping")
continue
if key_value in existing_map:
to_update.append({
'record_id': existing_map[key_value],
'fields': record
})
else:
to_create.append(record)
# Process creates in batches
created_count = 0
for i in range(0, len(to_create), batch_size):
batch = to_create[i:i + batch_size]
self.client.batch_create_records(app_token, table_id, batch)
created_count += len(batch)
logger.info(f"Created batch: {created_count}/{len(to_create)}")
# Process updates in batches
updated_count = 0
for i in range(0, len(to_update), batch_size):
batch = to_update[i:i + batch_size]
self.client.batch_update_records(app_token, table_id, batch)
updated_count += len(batch)
logger.info(f"Updated batch: {updated_count}/{len(to_update)}")
result = {
'created': created_count,
'updated': updated_count,
'total': created_count + updated_count
}
logger.info(f"Upsert completed: {result}")
return result
def sync_data_incremental(
self,
app_token: str,
table_id: str,
data_source: Callable[[], List[Dict[str, Any]]],
key_field: str,
timestamp_field: str = 'updated_at',
last_sync_time: Optional[datetime] = None
) -> Dict[str, Any]:
"""
Incrementally sync data from an external source.
Args:
app_token: Base (app) token
table_id: Table ID
data_source: Callable that returns list of records
key_field: Field name to use as unique identifier
timestamp_field: Field name containing last update timestamp
last_sync_time: Only sync records updated after this time
Returns:
Sync statistics
"""
logger.info(f"Starting incremental sync from {last_sync_time}")
# Fetch data from source
source_records = data_source()
logger.info(f"Fetched {len(source_records)} records from source")
# Filter by timestamp if provided
if last_sync_time:
filtered_records = []
for record in source_records:
record_time = record.get(timestamp_field)
if record_time and isinstance(record_time, datetime):
if record_time > last_sync_time:
filtered_records.append(record)
source_records = filtered_records
logger.info(f"Filtered to {len(source_records)} updated records")
# Upsert records
result = self.batch_upsert_records(
app_token=app_token,
table_id=table_id,
records=source_records,
key_field=key_field
)
result['sync_time'] = datetime.now(timezone.utc)
result['last_sync_time'] = last_sync_time
return result
# Data Transformation
def transform_and_load(
self,
app_token: str,
table_id: str,
source_records: List[Dict[str, Any]],
transformer: Callable[[Dict[str, Any]], Dict[str, Any]],
key_field: str,
parallel: bool = False,
max_workers: int = 4
) -> Dict[str, int]:
"""
Transform and load data with custom transformation function.
Args:
app_token: Base (app) token
table_id: Table ID
source_records: List of source records
transformer: Function to transform each record
key_field: Field name to use as unique identifier
parallel: Whether to use parallel processing
max_workers: Number of parallel workers
Returns:
Load statistics
"""
logger.info(f"Transforming {len(source_records)} records")
if parallel:
transformed_records = self._transform_parallel(
source_records,
transformer,
max_workers
)
else:
transformed_records = [transformer(r) for r in source_records]
# Filter out None results (failed transformations)
transformed_records = [r for r in transformed_records if r is not None]
logger.info(f"Successfully transformed {len(transformed_records)} records")
# Load into Lark Base
return self.batch_upsert_records(
app_token=app_token,
table_id=table_id,
records=transformed_records,
key_field=key_field
)
def _transform_parallel(
self,
records: List[Dict[str, Any]],
transformer: Callable[[Dict[str, Any]], Dict[str, Any]],
max_workers: int
) -> List[Dict[str, Any]]:
"""
Transform records in parallel.
Args:
records: List of records to transform
transformer: Transformation function
max_workers: Number of parallel workers
Returns:
List of transformed records
"""
transformed = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_record = {
executor.submit(transformer, record): record
for record in records
}
for future in as_completed(future_to_record):
try:
result = future.result()
if result:
transformed.append(result)
except Exception as e:
logger.error(f"Transformation failed: {e}")
return transformed
# Field Mapping
def map_fields(
self,
source_record: Dict[str, Any],
field_mapping: Dict[str, str],
default_values: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Map fields from source to target schema.
Args:
source_record: Source record dictionary
field_mapping: Dictionary mapping source field names to target field names
default_values: Optional default values for missing fields
Returns:
Mapped record dictionary
"""
mapped_record = {}
# Apply field mapping
for source_field, target_field in field_mapping.items():
if source_field in source_record:
mapped_record[target_field] = source_record[source_field]
# Apply default values for missing fields
if default_values:
for field, value in default_values.items():
if field not in mapped_record:
mapped_record[field] = value
return mapped_record
# Data Validation
def validate_records(
self,
records: List[Dict[str, Any]],
required_fields: List[str],
validators: Optional[Dict[str, Callable[[Any], bool]]] = None
) -> Dict[str, Any]:
"""
Validate records before loading.
Args:
records: List of records to validate
required_fields: List of required field names
validators: Optional dictionary of field names to validation functions
Returns:
Validation results with valid and invalid records
"""
valid_records = []
invalid_records = []
for record in records:
is_valid = True
errors = []
# Check required fields
for field in required_fields:
if field not in record or record[field] is None:
is_valid = False
errors.append(f"Missing required field: {field}")
# Run custom validators
if validators:
for field, validator in validators.items():
if field in record:
try:
if not validator(record[field]):
is_valid = False
errors.append(f"Validation failed for field: {field}")
except Exception as e:
is_valid = False
errors.append(f"Validator error for {field}: {e}")
if is_valid:
valid_records.append(record)
else:
invalid_records.append({
'record': record,
'errors': errors
})
result = {
'valid_count': len(valid_records),
'invalid_count': len(invalid_records),
'valid_records': valid_records,
'invalid_records': invalid_records
}
logger.info(f"Validation: {result['valid_count']} valid, "
f"{result['invalid_count']} invalid")
return result
# Data Aggregation
def aggregate_data(
self,
records: List[Dict[str, Any]],
group_by: str,
aggregations: Dict[str, Callable[[List[Any]], Any]]
) -> List[Dict[str, Any]]:
"""
Aggregate data by grouping and applying aggregation functions.
Args:
records: List of records to aggregate
group_by: Field name to group by
aggregations: Dictionary of field names to aggregation functions
Returns:
List of aggregated records
"""
# Group records
groups = {}
for record in records:
key = record.get(group_by)
if key:
if key not in groups:
groups[key] = []
groups[key].append(record)
# Apply aggregations
aggregated = []
for key, group_records in groups.items():
agg_record = {group_by: key}
for field, agg_func in aggregations.items():
values = [r.get(field) for r in group_records if field in r]
if values:
agg_record[field] = agg_func(values)
aggregated.append(agg_record)
logger.info(f"Aggregated {len(records)} records into {len(aggregated)} groups")
return aggregated
# Data Export
def export_to_dict(
self,
app_token: str,
table_id: str,
field_names: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Export table data to list of dictionaries.
Args:
app_token: Base (app) token
table_id: Table ID
field_names: Optional list of field names to export
Returns:
List of record dictionaries
"""
records = self.client.get_all_records(
app_token=app_token,
table_id=table_id,
field_names=field_names
)
export_data = []
for record in records:
export_record = {
'record_id': record['record_id'],
**record['fields']
}
export_data.append(export_record)
logger.info(f"Exported {len(export_data)} records")
return export_data
# Utility Functions
@staticmethod
def add_timestamp_fields(record: Dict[str, Any]) -> Dict[str, Any]:
"""
Add timestamp fields to a record.
Args:
record: Record dictionary
Returns:
Record with timestamp fields added
"""
now = datetime.now(timezone.utc)
timestamp_ms = int(now.timestamp() * 1000)
record['synced_at'] = timestamp_ms
if 'created_at' not in record:
record['created_at'] = timestamp_ms
return record
@staticmethod
def convert_to_lark_datetime(dt: datetime) -> int:
"""
Convert Python datetime to Lark timestamp (milliseconds).
Args:
dt: Python datetime object
Returns:
Timestamp in milliseconds
"""
return int(dt.timestamp() * 1000)
@staticmethod
def convert_from_lark_datetime(timestamp_ms: int) -> datetime:
"""
Convert Lark timestamp to Python datetime.
Args:
timestamp_ms: Timestamp in milliseconds
Returns:
Python datetime object
"""
return datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)