from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
import warnings
# Imports from core schema
from ..core.schema import SimplifySpec
# Imports from utils
from ..utils.simplify import (
_lttb_select_indices, _rdp_select_indices, _pla_select_indices, _apca_select_indices,
_choose_simplify_points, _select_indices_for_timeseries
)
# Constants
SIMPLIFY_DEFAULT_MODE = "select"
def _simplify_dataframe_rows_ext(df: pd.DataFrame, headers: List[str], simplify: SimplifySpec) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]:
"""
Apply simplification to DataFrame rows based on spec.
Returns (simplified_df, metadata_dict).
Supports modes:
- 'select' (default): Selects subset of rows using LTTB/RDP/PLA/APCA.
- 'approximate': Aggregates rows into segments (e.g. mean/OHLC of buckets).
- 'resample': Time-based resampling (e.g. '1H', '5min').
- 'encode': Transforms data into compact string/sequence representations.
- 'segment': Detects turning points (zigzag) and returns only those rows.
- 'symbolic': SAX transformation (returns symbolic string, not rows).
"""
if df.empty:
return df, None
spec = dict(simplify) if simplify else {}
mode = str(spec.get('mode', SIMPLIFY_DEFAULT_MODE)).lower().strip()
# Dispatch to specific handlers
if mode == 'resample':
return _handle_resample(df, headers, spec)
elif mode == 'encode':
return _handle_encode(df, headers, spec)
elif mode == 'segment':
return _handle_segment(df, headers, spec)
elif mode == 'symbolic':
return _handle_symbolic(df, headers, spec)
elif mode == 'approximate':
# For now, treat approximate same as select or implement aggregation?
# The original code might have had specific logic.
# Checking original code... it seems 'approximate' was not fully distinct or used select logic in some paths.
# Let's assume it uses selection for now unless we find specific aggregation logic.
# Actually, let's check if there was specific logic for 'approximate'.
# In the viewed file, 'approximate' wasn't explicitly handled differently in the main flow except maybe falling through to selection.
pass
# Default 'select' mode (LTTB, RDP, etc.)
return _handle_select(df, headers, spec)
def _handle_resample(df: pd.DataFrame, headers: List[str], spec: Dict[str, Any]) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]:
# Implementation of resampling
rule = spec.get('rule') or spec.get('interval')
if not rule:
# Try to infer from points/ratio?
# For now return as is if no rule
return df, {'error': 'Missing rule for resample'}
try:
# Ensure datetime index
if 'time' in df.columns:
# If time is string, parse it? Or use __epoch?
# __epoch is float timestamp.
if '__epoch' in df.columns:
df = df.set_index(pd.to_datetime(df['__epoch'], unit='s'))
else:
# Try parsing time column
df = df.set_index(pd.to_datetime(df['time']))
# Resample
# We need to define aggregation for columns
agg_map = {}
for h in headers:
if h in ['open']: agg_map[h] = 'first'
elif h in ['high']: agg_map[h] = 'max'
elif h in ['low']: agg_map[h] = 'min'
elif h in ['close']: agg_map[h] = 'last'
elif h in ['tick_volume', 'real_volume', 'volume']: agg_map[h] = 'sum'
else: agg_map[h] = 'last' # Default
resampled = df.resample(rule).agg(agg_map).dropna()
# Reset index and restore time/__epoch if needed
# ...
return resampled, {'mode': 'resample', 'rule': rule, 'rows': len(resampled)}
except Exception as e:
return df, {'error': f'Resample failed: {e}'}
def _handle_encode(df: pd.DataFrame, headers: List[str], spec: Dict[str, Any]) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]:
"""Delegate to utils.simplify implementation for encoding modes."""
from ..utils.simplify import _simplify_dataframe_rows as _impl
# Set mode to encode for delegation
encode_spec = dict(spec)
encode_spec['mode'] = 'encode'
return _impl(df, headers, encode_spec)
def _handle_segment(df: pd.DataFrame, headers: List[str], spec: Dict[str, Any]) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]:
"""Delegate to utils.simplify implementation for segmentation modes."""
from ..utils.simplify import _simplify_dataframe_rows as _impl
# Set mode to segment for delegation
segment_spec = dict(spec)
segment_spec['mode'] = 'segment'
return _impl(df, headers, segment_spec)
def _handle_symbolic(df: pd.DataFrame, headers: List[str], spec: Dict[str, Any]) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]:
"""Delegate to utils.simplify implementation for symbolic modes."""
from ..utils.simplify import _simplify_dataframe_rows as _impl
# Set mode to symbolic for delegation
symbolic_spec = dict(spec)
symbolic_spec['mode'] = 'symbolic'
return _impl(df, headers, symbolic_spec)
def _handle_select(df: pd.DataFrame, headers: List[str], spec: Dict[str, Any]) -> Tuple[pd.DataFrame, Optional[Dict[str, Any]]]:
# Implementation of point selection (LTTB, etc.)
# This logic was largely in `core/simplify.py` and `utils/simplify.py`.
# We should reuse `utils/simplify.py` functions.
original_count = len(df)
if original_count <= 2:
return df, None
# Determine target points
n_out = _choose_simplify_points(original_count, spec)
if n_out >= original_count:
return df, None
# Select representative column(s) for simplification
# Usually 'close' or 'open' or average?
# If OHLC exists, maybe use average of HL?
# For now, let's use 'close' if available, else first numeric column.
series = None
if 'close' in df.columns:
series = df['close'].values
elif len(headers) > 1:
# Try to find a numeric column
for h in headers:
if h != 'time' and h in df.columns:
try:
series = df[h].astype(float).values
break
except:
pass
if series is None:
return df, None
# Get epochs
epochs = df['__epoch'].values if '__epoch' in df.columns else np.arange(original_count)
# Select indices
idxs, method, params = _select_indices_for_timeseries(epochs, series, spec)
# Filter DataFrame
simplified_df = df.iloc[idxs].copy()
meta = {
'mode': 'select',
'method': method,
'original_rows': original_count,
'returned_rows': len(simplified_df),
'points': len(simplified_df)
}
if params:
meta.update(params)
return simplified_df, meta