"""
Lark Base Client for Data Management
This module provides a comprehensive client for managing Lark Base (Bitable) data,
including tables, records, fields, and dashboard integration.
"""
import logging
import time
from typing import Any, Dict, List, Optional, Union
from dataclasses import dataclass
import lark_oapi as lark
from lark_oapi.api.bitable.v1 import (
ListAppTableRequest,
CreateAppTableRequest,
CreateAppTableRecordRequest,
BatchCreateAppTableRecordRequest,
BatchUpdateAppTableRecordRequest,
UpdateAppTableRecordRequest,
DeleteAppTableRecordRequest,
ListAppTableRecordRequest,
ListAppTableFieldRequest,
CreateAppTableFieldRequest,
)
logger = logging.getLogger(__name__)
@dataclass
class LarkConfig:
"""Configuration for Lark API client."""
app_id: str
app_secret: str
log_level: str = "INFO"
def __post_init__(self):
"""Set up logging after initialization."""
logging.basicConfig(
level=getattr(logging, self.log_level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
class RateLimiter:
"""Simple rate limiter to avoid API throttling."""
def __init__(self, max_requests_per_second: int = 10):
"""
Initialize rate limiter.
Args:
max_requests_per_second: Maximum API requests per second
"""
self.max_requests = max_requests_per_second
self.min_interval = 1.0 / max_requests_per_second
self.last_request_time = 0.0
def wait_if_needed(self) -> None:
"""Wait if necessary to respect rate limits."""
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < self.min_interval:
sleep_time = self.min_interval - time_since_last_request
logger.debug(f"Rate limiting: sleeping for {sleep_time:.3f}s")
time.sleep(sleep_time)
self.last_request_time = time.time()
class LarkBaseClient:
"""
Client for managing Lark Base (Bitable) data.
This client provides CRUD operations for tables, records, and fields,
with built-in error handling, logging, and rate limiting.
"""
def __init__(self, config: LarkConfig, rate_limit: int = 10):
"""
Initialize Lark Base client.
Args:
config: Lark API configuration
rate_limit: Maximum requests per second (default: 10)
"""
self.config = config
self.client = lark.Client.builder() \
.app_id(config.app_id) \
.app_secret(config.app_secret) \
.log_level(getattr(lark.LogLevel, config.log_level.upper())) \
.build()
self.rate_limiter = RateLimiter(rate_limit)
logger.info(f"Initialized LarkBaseClient with app_id: {config.app_id[:8]}...")
def _handle_response(self, response: Any, operation: str) -> Any:
"""
Handle API response with error checking.
Args:
response: API response object
operation: Description of the operation for logging
Returns:
Response data if successful
Raises:
Exception: If API returns an error
"""
if not response.success():
error_msg = f"{operation} failed: {response.code} - {response.msg}"
logger.error(error_msg)
logger.error(f"Request ID: {response.request_id}")
raise Exception(error_msg)
logger.info(f"{operation} succeeded")
return response.data
# Table Operations
def list_tables(self, app_token: str, page_size: int = 100) -> List[Dict[str, Any]]:
"""
List all tables in a Lark Base.
Args:
app_token: Base (app) token
page_size: Number of tables per page (max: 100)
Returns:
List of table information dictionaries
"""
self.rate_limiter.wait_if_needed()
request = ListAppTableRequest.builder() \
.app_token(app_token) \
.page_size(page_size) \
.build()
response = self.client.bitable.v1.app_table.list(request)
data = self._handle_response(response, f"List tables in {app_token}")
tables = []
if data and hasattr(data, 'items'):
for item in data.items:
tables.append({
'table_id': item.table_id,
'name': item.name,
'revision': item.revision if hasattr(item, 'revision') else None
})
logger.info(f"Found {len(tables)} tables")
return tables
def create_table(
self,
app_token: str,
table_name: str,
default_view_name: Optional[str] = None
) -> str:
"""
Create a new table in a Lark Base.
Args:
app_token: Base (app) token
table_name: Name of the new table
default_view_name: Name for the default view (optional)
Returns:
Table ID of the created table
"""
self.rate_limiter.wait_if_needed()
builder = CreateAppTableRequest.builder() \
.app_token(app_token)
# Build table configuration
table_builder = lark.bitable.v1.ReqTable.builder() \
.name(table_name)
if default_view_name:
table_builder.default_view_name(default_view_name)
request = builder.request_body(table_builder.build()).build()
response = self.client.bitable.v1.app_table.create(request)
data = self._handle_response(response, f"Create table '{table_name}'")
table_id = data.table_id
logger.info(f"Created table '{table_name}' with ID: {table_id}")
return table_id
# Field Operations
def list_fields(
self,
app_token: str,
table_id: str,
page_size: int = 100
) -> List[Dict[str, Any]]:
"""
List all fields in a table.
Args:
app_token: Base (app) token
table_id: Table ID
page_size: Number of fields per page (max: 100)
Returns:
List of field information dictionaries
"""
self.rate_limiter.wait_if_needed()
request = ListAppTableFieldRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size) \
.build()
response = self.client.bitable.v1.app_table_field.list(request)
data = self._handle_response(response, f"List fields in table {table_id}")
fields = []
if data and hasattr(data, 'items'):
for item in data.items:
fields.append({
'field_id': item.field_id,
'field_name': item.field_name,
'type': item.type,
'property': item.property if hasattr(item, 'property') else None
})
logger.info(f"Found {len(fields)} fields")
return fields
def create_field(
self,
app_token: str,
table_id: str,
field_name: str,
field_type: int,
property_dict: Optional[Dict[str, Any]] = None
) -> str:
"""
Create a new field in a table.
Args:
app_token: Base (app) token
table_id: Table ID
field_name: Name of the new field
field_type: Field type (1=Text, 2=Number, 3=SingleSelect, 4=MultiSelect,
5=DateTime, 7=Checkbox, etc.)
property_dict: Optional field properties
Returns:
Field ID of the created field
"""
self.rate_limiter.wait_if_needed()
field_builder = lark.bitable.v1.AppTableField.builder() \
.field_name(field_name) \
.type(field_type)
if property_dict:
field_builder.property(property_dict)
request = CreateAppTableFieldRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.request_body(field_builder.build()) \
.build()
response = self.client.bitable.v1.app_table_field.create(request)
data = self._handle_response(response, f"Create field '{field_name}'")
field_id = data.field_id
logger.info(f"Created field '{field_name}' with ID: {field_id}")
return field_id
# Record Operations
def list_records(
self,
app_token: str,
table_id: str,
page_size: int = 100,
page_token: Optional[str] = None,
field_names: Optional[List[str]] = None,
filter_expression: Optional[str] = None
) -> Dict[str, Any]:
"""
List records in a table.
Args:
app_token: Base (app) token
table_id: Table ID
page_size: Number of records per page (max: 500)
page_token: Pagination token for next page
field_names: List of field names to return (returns all if None)
filter_expression: Filter expression to apply
Returns:
Dictionary with 'items' (records) and 'page_token' for pagination
"""
self.rate_limiter.wait_if_needed()
builder = ListAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.page_size(page_size)
if page_token:
builder.page_token(page_token)
if field_names:
builder.field_names(field_names)
if filter_expression:
builder.filter(filter_expression)
request = builder.build()
response = self.client.bitable.v1.app_table_record.list(request)
data = self._handle_response(response, f"List records in table {table_id}")
records = []
if data and hasattr(data, 'items'):
for item in data.items:
record = {
'record_id': item.record_id,
'fields': item.fields if hasattr(item, 'fields') else {}
}
records.append(record)
result = {
'items': records,
'page_token': data.page_token if hasattr(data, 'page_token') else None,
'has_more': data.has_more if hasattr(data, 'has_more') else False
}
logger.info(f"Retrieved {len(records)} records")
return result
def create_record(
self,
app_token: str,
table_id: str,
fields: Dict[str, Any]
) -> str:
"""
Create a single record in a table.
Args:
app_token: Base (app) token
table_id: Table ID
fields: Dictionary of field names to values
Returns:
Record ID of the created record
"""
self.rate_limiter.wait_if_needed()
record = lark.bitable.v1.AppTableRecord.builder() \
.fields(fields) \
.build()
request = CreateAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.request_body(record) \
.build()
response = self.client.bitable.v1.app_table_record.create(request)
data = self._handle_response(response, "Create record")
record_id = data.record.record_id
logger.info(f"Created record with ID: {record_id}")
return record_id
def batch_create_records(
self,
app_token: str,
table_id: str,
records: List[Dict[str, Any]]
) -> List[str]:
"""
Create multiple records in a table (batch operation).
Args:
app_token: Base (app) token
table_id: Table ID
records: List of record dictionaries (field names to values)
Returns:
List of created record IDs
"""
self.rate_limiter.wait_if_needed()
record_objects = []
for record_data in records:
record = lark.bitable.v1.AppTableRecord.builder() \
.fields(record_data) \
.build()
record_objects.append(record)
request = BatchCreateAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.request_body(
lark.bitable.v1.BatchCreateAppTableRecordRequestBody.builder()
.records(record_objects)
.build()
) \
.build()
response = self.client.bitable.v1.app_table_record.batch_create(request)
data = self._handle_response(response, f"Batch create {len(records)} records")
record_ids = [r.record_id for r in data.records]
logger.info(f"Created {len(record_ids)} records")
return record_ids
def update_record(
self,
app_token: str,
table_id: str,
record_id: str,
fields: Dict[str, Any]
) -> None:
"""
Update a single record in a table.
Args:
app_token: Base (app) token
table_id: Table ID
record_id: Record ID to update
fields: Dictionary of field names to new values
"""
self.rate_limiter.wait_if_needed()
record = lark.bitable.v1.AppTableRecord.builder() \
.fields(fields) \
.build()
request = UpdateAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.record_id(record_id) \
.request_body(record) \
.build()
response = self.client.bitable.v1.app_table_record.update(request)
self._handle_response(response, f"Update record {record_id}")
def batch_update_records(
self,
app_token: str,
table_id: str,
records: List[Dict[str, Any]]
) -> List[str]:
"""
Update multiple records in a table (batch operation).
Args:
app_token: Base (app) token
table_id: Table ID
records: List of dictionaries with 'record_id' and 'fields' keys
Returns:
List of updated record IDs
"""
self.rate_limiter.wait_if_needed()
record_objects = []
for record_data in records:
record = lark.bitable.v1.AppTableRecord.builder() \
.record_id(record_data['record_id']) \
.fields(record_data['fields']) \
.build()
record_objects.append(record)
request = BatchUpdateAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.request_body(
lark.bitable.v1.BatchUpdateAppTableRecordRequestBody.builder()
.records(record_objects)
.build()
) \
.build()
response = self.client.bitable.v1.app_table_record.batch_update(request)
data = self._handle_response(response, f"Batch update {len(records)} records")
record_ids = [r.record_id for r in data.records]
logger.info(f"Updated {len(record_ids)} records")
return record_ids
def delete_record(
self,
app_token: str,
table_id: str,
record_id: str
) -> None:
"""
Delete a single record from a table.
Args:
app_token: Base (app) token
table_id: Table ID
record_id: Record ID to delete
"""
self.rate_limiter.wait_if_needed()
request = DeleteAppTableRecordRequest.builder() \
.app_token(app_token) \
.table_id(table_id) \
.record_id(record_id) \
.build()
response = self.client.bitable.v1.app_table_record.delete(request)
self._handle_response(response, f"Delete record {record_id}")
# Utility Methods
def get_all_records(
self,
app_token: str,
table_id: str,
field_names: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""
Get all records from a table (handles pagination automatically).
Args:
app_token: Base (app) token
table_id: Table ID
field_names: List of field names to return (returns all if None)
Returns:
List of all records
"""
all_records = []
page_token = None
while True:
result = self.list_records(
app_token=app_token,
table_id=table_id,
page_size=500, # Max page size
page_token=page_token,
field_names=field_names
)
all_records.extend(result['items'])
if not result['has_more']:
break
page_token = result['page_token']
logger.info(f"Retrieved all {len(all_records)} records from table")
return all_records