Skip to main content
Glama
filings_service.py21.3 kB
# src/server/domain/services/filings_service.py """FilingsService – fetches regulatory filings and announcements. Uses AdapterManager to retrieve data from appropriate sources (Finnhub for SEC, Akshare for A-share). Returns structured data (JSON). """ from datetime import datetime from typing import Any, Dict, List, Optional from src.server.utils.logger import logger class FilingsService: """Service for retrieving regulatory filings and announcements.""" def __init__(self, adapter_manager, minio_client=None): self.adapter_manager = adapter_manager self.minio_client = minio_client self.logger = logger # Initialize edgartools identity and cache via sec_utils # This ensures consistent identity and avoids ticker.txt downloads from src.server.utils.sec_utils import get_company, get_cik_or_symbol # No need to call set_identity here, sec_utils handles it on import async def fetch_periodic_sec_filings( self, ticker: str, forms: Optional[List[str]] = None, year: Optional[int] = None, quarter: Optional[int] = None, limit: int = 10, ) -> List[Dict[str, Any]]: """Fetch SEC periodic filings (10-K/10-Q) with year/quarter. Args: ticker: US stock ticker forms: Filing forms (default: ["10-Q"]) year: Fiscal year (e.g., 2024) quarter: Fiscal quarter (1-4) limit: Max results when year is omitted Returns: List of filing dictionaries """ filing_types = forms or ["10-K", "10-Q", "20-F", "6-K"] # Convert year/quarter to date range for adapter start_date = None end_date = None if year: # Use year as date range start_date = f"{year}-01-01" end_date = f"{year}-12-31" return await self._fetch_filings( ticker, filing_types, start_date, end_date, limit ) async def fetch_event_sec_filings( self, ticker: str, forms: Optional[List[str]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 10, ) -> List[Dict[str, Any]]: """Fetch SEC event-driven filings (8-K, 3/4/5) with date range. Args: ticker: US stock ticker forms: Filing forms (default: ["8-K"]) start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) limit: Max results Returns: List of filing dictionaries """ filing_types = forms or ["8-K", "6-K"] return await self._fetch_filings( ticker, filing_types, start_date, end_date, limit ) async def fetch_ashare_filings( self, symbol: str, filing_types: Optional[List[str]] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, limit: int = 10, ) -> List[Dict[str, Any]]: """Fetch A-share announcements. Note: Ticker normalization is handled by AkshareAdapter. """ # Delegate to unified method - adapter will handle format return await self._fetch_filings( symbol, filing_types, start_date, end_date, limit ) async def _fetch_filings( self, ticker: str, filing_types: Optional[List[str]], start_date_str: Optional[str], end_date_str: Optional[str], limit: int, ) -> List[Dict[str, Any]]: """Internal unified method to fetch filings via AdapterManager.""" try: start = ( datetime.strptime(start_date_str, "%Y-%m-%d") if start_date_str else None ) end = datetime.strptime(end_date_str, "%Y-%m-%d") if end_date_str else None filings = await self.adapter_manager.get_filings( ticker, start, end, limit, filing_types ) return filings except Exception as e: self.logger.error(f"Failed to fetch filings for {ticker}: {e}") return [{"error": str(e)}] async def process_document( self, doc_id: str, url: str, doc_type: str, ticker: str = None, ) -> Dict[str, Any]: """Process a single document: download, extract text, and upload to MinIO. Uses `edgartools` for SEC filings to ensure correct parsing of index pages and tables. Args: doc_id: Unique document identifier (Accession Number for SEC) url: Document URL doc_type: Document type (e.g., "10-K", "8-K") ticker: Stock ticker (Required for SEC filings to use edgartools) Returns: Dict with storage_path and status """ if not self.minio_client: return {"error": "MinIO client not configured"} try: # Check if it is an SEC filing (based on doc_type or doc_id format) is_sec = doc_type in ["10-K", "10-Q", "8-K", "20-F", "6-K"] or "SEC" in doc_id or "edgar" in url if is_sec: if not ticker: return { "doc_id": doc_id, "status": "failed", "error": "Ticker is required for processing SEC filings with edgartools." } # Extract pure symbol from EXCHANGE:SYMBOL format (e.g., 'NASDAQ:AAPL' -> 'AAPL') pure_symbol = self._extract_symbol(ticker) self.logger.info(f"Processing SEC filing for {ticker} (pure symbol: {pure_symbol})") # 1. Initialize Company using sec_utils (avoids ticker.txt download) from src.server.utils.sec_utils import get_company self.logger.info(f"Initializing Company for: {pure_symbol}") company = get_company(pure_symbol) # 2. Find the specific filing # edgartools doesn't support get_by_accession directly, so we fetch recent filings and filter # doc_id for SEC is usually the Accession Number (e.g., 0000320193-25-000079) accession_number = doc_id.replace("SEC:", "") # Handle potential prefix # Fetch a batch of filings (e.g., latest 50) to find the match # This is a trade-off: we assume the filing is relatively recent. # If not found, we might need a different strategy or larger limit. filings = company.get_filings().latest(50) target_filing = None if filings: for filing in filings: if filing.accession_no == accession_number: target_filing = filing break if not target_filing: # Try fetching by form type if accession match failed (fallback) filings_by_form = company.get_filings(form=doc_type).latest(20) if filings_by_form: for filing in filings_by_form: if filing.accession_no == accession_number: target_filing = filing break if not target_filing: return { "doc_id": doc_id, "status": "failed", "error": f"Could not find filing with accession number {accession_number} for {ticker} using edgartools." } # 3. Convert to Markdown using edgartools # This handles the index page resolution and table conversion automatically self.logger.info(f"Converting SEC filing {accession_number} to Markdown using edgartools...") # Start with main document full_markdown = f"# {doc_type} Filing: {ticker} ({target_filing.filing_date})\n\n" try: main_content = target_filing.markdown() if main_content: full_markdown += "## Main Document\n\n" + main_content + "\n\n" except Exception as e: self.logger.warning(f"Failed to convert main document: {e}") # Process Attachments (Crucial for 8-K/6-K) if target_filing.attachments: self.logger.info(f"Processing attachments for {accession_number}...") has_attachments = False for attachment in target_filing.attachments: # Filter criteria for relevant attachments: # 1. Document Type starts with EX-99 (Press Releases, Earnings) # 2. Description contains keywords like "PRESS RELEASE", "EARNINGS", "ANNOUNCEMENT" # 3. Avoid XML/XBRL/Graphics doc_type_upper = (attachment.document_type or "").upper() desc_upper = (attachment.description or "").upper() is_relevant = ( doc_type_upper.startswith("EX-99") or "PRESS RELEASE" in desc_upper or "EARNINGS" in desc_upper or "ANNOUNCEMENT" in desc_upper or "RESULTS" in desc_upper ) if is_relevant: try: att_content = attachment.markdown() if att_content: if not has_attachments: full_markdown += "---\n\n# Attachments\n\n" has_attachments = True full_markdown += f"## Attachment: {attachment.document_type} - {attachment.description or ''}\n\n" full_markdown += att_content + "\n\n" self.logger.info(f"Included attachment: {attachment.document_type}") except Exception as e: self.logger.warning(f"Failed to convert attachment {attachment.document_type}: {e}") if not full_markdown.strip(): return { "doc_id": doc_id, "status": "failed", "error": "edgartools returned empty content (main + attachments)." } # 4. Upload to MinIO object_name = f"processed/{doc_type}/{doc_id}.md" storage_path = await self.minio_client.upload_bytes( object_name, full_markdown.encode('utf-8'), "text/markdown" ) return { "doc_id": doc_id, "type": "text", "url": url, "status": "success", "storage_path": storage_path, "message": "SEC filing (including attachments) processed via edgartools and uploaded." } else: # Non-SEC logic (e.g. PDF direct download for A-shares) import aiohttp async with aiohttp.ClientSession() as session: headers = { "User-Agent": "ValueCell/1.0 (contact@valuecell.ai)", "Accept": "*/*", } async with session.get(url, headers=headers) as response: if response.status != 200: return { "doc_id": doc_id, "status": "failed", "error": f"Download failed with status {response.status}" } content_type = response.headers.get("Content-Type", "").lower() content_bytes = await response.read() is_pdf = "pdf" in content_type or url.lower().endswith(".pdf") if is_pdf: object_name = f"raw/{doc_type}/{doc_id}.pdf" storage_path = await self.minio_client.upload_bytes( object_name, content_bytes, "application/pdf" ) return { "doc_id": doc_id, "type": "pdf", "url": url, "status": "success", "storage_path": storage_path, "message": "PDF uploaded to MinIO." } else: # Fallback for non-SEC HTML (unlikely path given current scope) return { "doc_id": doc_id, "status": "failed", "error": "Unsupported document type for non-SEC source." } except Exception as e: self.logger.error(f"Process document failed for {doc_id}: {e}") import traceback self.logger.error(traceback.format_exc()) return { "doc_id": doc_id, "status": "error", "error": str(e) } async def get_filing_markdown( self, ticker: str, doc_id: str, ) -> Dict[str, Any]: """Get SEC filing content as Markdown with MinIO caching. This method: 1. Checks if the Markdown is already cached in MinIO 2. If cached, returns the cached content 3. If not cached, fetches from SEC using edgartools, caches, and returns Args: ticker: Stock ticker (e.g., 'AAPL' or 'NASDAQ:AAPL') doc_id: SEC Accession Number (e.g., '0000320193-25-000079') Returns: Dict with 'content' (Markdown string), 'cached' (bool), and 'status' """ # Normalize doc_id (remove potential prefix) accession_number = doc_id.replace("SEC:", "") # Build cache key path in MinIO pure_symbol = self._extract_symbol(ticker) cache_object_name = f"cache/markdown/{pure_symbol}/{accession_number}.md" try: # 1. Check MinIO cache first if self.minio_client: exists = await self.minio_client.object_exists(cache_object_name) if exists: self.logger.info(f"✅ Cache HIT for {ticker}/{doc_id}") cached_bytes = await self.minio_client.download_bytes(cache_object_name) if cached_bytes: return { "status": "success", "cached": True, "content": cached_bytes.decode('utf-8'), "doc_id": doc_id, "ticker": ticker, } self.logger.info(f"📥 Cache MISS for {ticker}/{doc_id}, fetching from SEC...") # 2. Fetch from SEC using edgartools via sec_utils from src.server.utils.sec_utils import get_company company = get_company(pure_symbol) # Search for the filing by accession number filings = company.get_filings().latest(50) target_filing = None if filings: for filing in filings: if filing.accession_no == accession_number: target_filing = filing break if not target_filing: # Try broader search filings_all = company.get_filings().latest(100) if filings_all: for filing in filings_all: if filing.accession_no == accession_number: target_filing = filing break if not target_filing: return { "status": "error", "cached": False, "error": f"Filing not found: {doc_id} for {ticker}", "doc_id": doc_id, "ticker": ticker, } # 3. Convert to Markdown self.logger.info(f"🔄 Converting {accession_number} to Markdown...") doc_type = target_filing.form or "SEC" full_markdown = f"# {doc_type} Filing: {ticker} ({target_filing.filing_date})\n\n" # Main document try: main_content = target_filing.markdown() if main_content: full_markdown += "## Main Document\n\n" + main_content + "\n\n" except Exception as e: self.logger.warning(f"Failed to convert main document: {e}") # Process Attachments (important for 8-K/6-K) if target_filing.attachments: self.logger.info(f"Processing attachments for {accession_number}...") has_attachments = False for attachment in target_filing.attachments: doc_type_upper = (attachment.document_type or "").upper() desc_upper = (attachment.description or "").upper() is_relevant = ( doc_type_upper.startswith("EX-99") or "PRESS RELEASE" in desc_upper or "EARNINGS" in desc_upper or "ANNOUNCEMENT" in desc_upper or "RESULTS" in desc_upper ) if is_relevant: try: att_content = attachment.markdown() if att_content: if not has_attachments: full_markdown += "---\n\n# Attachments\n\n" has_attachments = True full_markdown += f"## Attachment: {attachment.document_type} - {attachment.description or ''}\n\n" full_markdown += att_content + "\n\n" self.logger.info(f"Included attachment: {attachment.document_type}") except Exception as e: self.logger.warning(f"Failed to convert attachment {attachment.document_type}: {e}") if not full_markdown.strip(): return { "status": "error", "cached": False, "error": "edgartools returned empty content", "doc_id": doc_id, "ticker": ticker, } # 4. Cache to MinIO if self.minio_client: try: await self.minio_client.upload_bytes( cache_object_name, full_markdown.encode('utf-8'), "text/markdown" ) self.logger.info(f"💾 Cached Markdown to MinIO: {cache_object_name}") except Exception as e: self.logger.warning(f"Failed to cache to MinIO: {e}") return { "status": "success", "cached": False, "content": full_markdown, "doc_id": doc_id, "ticker": ticker, "form_type": target_filing.form, "filing_date": str(target_filing.filing_date), } except Exception as e: self.logger.error(f"get_filing_markdown failed for {ticker}/{doc_id}: {e}") import traceback self.logger.error(traceback.format_exc()) return { "status": "error", "cached": False, "error": str(e), "doc_id": doc_id, "ticker": ticker, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server