Skip to main content
Glama
orchestrator.py6.93 kB
"""Chunking Orchestrator - Central coordinator for SEC filing chunking. The orchestrator is responsible for: 1. Selecting the appropriate strategy based on form type 2. Invoking the strategy's process method 3. Providing a unified interface for the API/MCP layers Usage: from src.server.domain.chunking import ChunkingOrchestrator # Get all chunks for a filing for chunk in ChunkingOrchestrator.process(filing, ticker="AAPL"): print(chunk.text) # Get specific items only for chunk in ChunkingOrchestrator.process( filing, ticker="AAPL", items=["Item 1A", "Item 7"] ): print(chunk.metadata["item"]) """ from typing import Dict, Generator, List, Optional, Type from src.server.domain.chunking.base import Chunk, ChunkingStrategy from src.server.domain.chunking.strategies import ( TenKStrategy, SixKStrategy, EightKStrategy, TwentyFStrategy, ) from src.server.utils.logger import logger class ChunkingOrchestrator: """Central coordinator for SEC filing chunking. Uses the Strategy Pattern to route filings to appropriate handlers based on their form type. Strategy mapping: - 10-K, 10-Q → TenKStrategy - 20-F → TwentyFStrategy - 6-K → SixKStrategy - 8-K → EightKStrategy - Unknown → TenKStrategy (default fallback) """ # Singleton strategy instances _strategy_instances: Dict[str, ChunkingStrategy] = {} # Form type to strategy class mapping _strategy_mapping: Dict[str, Type[ChunkingStrategy]] = { "10-K": TenKStrategy, "10-Q": TenKStrategy, "20-F": TwentyFStrategy, "6-K": SixKStrategy, "8-K": EightKStrategy, } # Default strategy for unknown form types _default_strategy_class: Type[ChunkingStrategy] = TenKStrategy @classmethod def _get_strategy_instance(cls, form_type: str) -> ChunkingStrategy: """Get or create a strategy instance for the given form type. Strategies are cached as singletons for efficiency. """ # Normalize form type: "10-K/A" -> "10-K" base_form = form_type.upper().split('/')[0].strip() if form_type else "" # Get strategy class strategy_class = cls._strategy_mapping.get(base_form, cls._default_strategy_class) # Get or create instance class_name = strategy_class.__name__ if class_name not in cls._strategy_instances: cls._strategy_instances[class_name] = strategy_class() logger.debug(f"Created new strategy instance: {class_name}") return cls._strategy_instances[class_name] @classmethod def get_strategy(cls, form_type: str) -> ChunkingStrategy: """Get the appropriate strategy for a form type. Args: form_type: SEC form type (e.g., "10-K", "6-K", "8-K/A") Returns: ChunkingStrategy instance """ return cls._get_strategy_instance(form_type) @classmethod def process( cls, filing, ticker: str, items: Optional[List[str]] = None, ) -> Generator[Chunk, None, None]: """Main entry point: process a filing and yield chunks. This method: 1. Determines the filing's form type 2. Selects the appropriate chunking strategy 3. Delegates to the strategy's process method Args: filing: An edgartools Filing object ticker: Stock ticker symbol items: Optional list of items to filter (e.g., ["Item 1A", "Item 7"]) Yields: Chunk objects with text and metadata Example: from edgar import Company company = Company("AAPL") filing = company.get_filings(form="10-K").latest(1)[0] for chunk in ChunkingOrchestrator.process(filing, ticker="AAPL"): print(f"[{chunk.metadata['item']}] {chunk.text[:100]}...") """ form_type = getattr(filing, 'form', '') or '' logger.info(f"🎯 ChunkingOrchestrator: Processing {form_type} for {ticker}") strategy = cls.get_strategy(form_type) logger.info(f"📋 Selected strategy: {strategy.__class__.__name__}") chunk_count = 0 for chunk in strategy.process(filing, ticker, items): chunk_count += 1 yield chunk logger.info(f"✅ ChunkingOrchestrator: Yielded {chunk_count} chunks") @classmethod def process_with_header_footer( cls, filing, ticker: str, items: Optional[List[str]] = None, ) -> Generator[dict, None, None]: """Process filing and yield NDJSON-compatible dictionaries. This is a convenience method for streaming responses that includes header and footer objects. Yields: - First: {"type": "header", "doc_id": "...", "ticker": "...", ...} - Middle: {"type": "chunk", "text": "...", "metadata": {...}} - Last: {"type": "footer", "chunks_count": N, "status": "success"} """ doc_id = getattr(filing, 'accession_no', '') form_type = getattr(filing, 'form', '') filing_date = str(getattr(filing, 'filing_date', '')) # Yield header yield { "type": "header", "doc_id": doc_id, "ticker": ticker, "form": form_type, "filing_date": filing_date, } # Yield chunks chunk_count = 0 for chunk in cls.process(filing, ticker, items): chunk_count += 1 yield chunk.to_dict() # Yield footer yield { "type": "footer", "chunks_count": chunk_count, "status": "success", } @classmethod def register_strategy(cls, form_type: str, strategy_class: Type[ChunkingStrategy]): """Register a custom strategy for a form type. This allows extensions without modifying core code. Args: form_type: Form type to register (e.g., "DEF 14A") strategy_class: ChunkingStrategy subclass Example: class ProxyStrategy(ChunkingStrategy): ... ChunkingOrchestrator.register_strategy("DEF 14A", ProxyStrategy) """ cls._strategy_mapping[form_type.upper()] = strategy_class logger.info(f"Registered strategy {strategy_class.__name__} for {form_type}") @classmethod def list_supported_forms(cls) -> List[str]: """Return list of all supported form types.""" return list(cls._strategy_mapping.keys())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server