We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from src.server.core.dependencies import Container
from src.server.domain.services.filings_service import FilingsService
router = APIRouter(prefix="/filings", tags=["filings"])
# --- Request Models ---
class ProcessDocumentRequest(BaseModel):
doc_id: str
url: str
doc_type: str
ticker: Optional[str] = None
# --- Endpoints ---
@router.get("/sec/periodic")
async def get_periodic_sec_filings(
ticker: str = Query(..., description="US stock ticker (e.g., AAPL)"),
year: Optional[int] = Query(None, description="Fiscal year"),
quarter: Optional[int] = Query(None, description="Fiscal quarter (1-4)"),
forms: Optional[List[str]] = Query(None, description="Filing forms (e.g., 10-K, 10-Q)"),
limit: int = Query(10, description="Max results when year is omitted"),
):
"""Fetch SEC periodic filings (10-K/10-Q)."""
service: FilingsService = Container.filings_service()
try:
return await service.fetch_periodic_sec_filings(
ticker=ticker,
forms=forms,
year=year,
quarter=quarter,
limit=limit
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sec/event")
async def get_event_sec_filings(
ticker: str = Query(..., description="US stock ticker (e.g., AAPL)"),
start_date: Optional[str] = Query(None, description="Start date (YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="End date (YYYY-MM-DD)"),
forms: Optional[List[str]] = Query(None, description="Filing forms (e.g., 8-K)"),
limit: int = Query(10, description="Max results"),
):
"""Fetch SEC event-driven filings (8-K, etc.)."""
service: FilingsService = Container.filings_service()
try:
return await service.fetch_event_sec_filings(
ticker=ticker,
forms=forms,
start_date=start_date,
end_date=end_date,
limit=limit
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/ashare")
async def get_ashare_filings(
symbol: str = Query(..., description="A-share symbol (e.g., 600519)"),
filing_types: Optional[List[str]] = Query(None, description="Report types (annual, quarterly)"),
start_date: Optional[str] = Query(None, description="Start date (YYYY-MM-DD)"),
end_date: Optional[str] = Query(None, description="End date (YYYY-MM-DD)"),
limit: int = Query(10, description="Max results"),
):
"""Fetch A-share announcements."""
service: FilingsService = Container.filings_service()
try:
return await service.fetch_ashare_filings(
symbol=symbol,
filing_types=filing_types,
start_date=start_date,
end_date=end_date,
limit=limit
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/process")
async def process_document(request: ProcessDocumentRequest):
"""Process a document (download, clean, upload to MinIO)."""
service: FilingsService = Container.filings_service()
try:
result = await service.process_document(
doc_id=request.doc_id,
url=request.url,
doc_type=request.doc_type,
ticker=request.ticker
)
if result.get("status") == "failed" or result.get("status") == "error":
raise HTTPException(status_code=500, detail=result.get("error", "Unknown error"))
return result
except Exception as e:
# If the service raised an exception directly
raise HTTPException(status_code=500, detail=str(e))
@router.get("/markdown")
async def get_filing_markdown(
ticker: str = Query(..., description="Stock ticker (e.g., AAPL or NASDAQ:AAPL)"),
doc_id: str = Query(..., description="SEC Accession Number (e.g., 0000320193-25-000079)"),
stream: bool = Query(False, description="If true, return plain text Markdown; otherwise return JSON"),
):
"""Get SEC filing content as Markdown with caching.
This endpoint:
1. Checks MinIO cache for pre-converted Markdown
2. If cached, returns immediately (fast path)
3. If not cached, fetches from SEC, converts to Markdown, caches, and returns
Use `stream=true` for plain text response (suitable for direct file download).
"""
from fastapi.responses import PlainTextResponse
service: FilingsService = Container.filings_service()
try:
result = await service.get_filing_markdown(ticker=ticker, doc_id=doc_id)
if result.get("status") == "error":
raise HTTPException(
status_code=404 if "not found" in result.get("error", "").lower() else 500,
detail=result.get("error", "Unknown error")
)
# Return plain text if stream=true
if stream:
return PlainTextResponse(
content=result.get("content", ""),
media_type="text/markdown",
headers={
"X-Cached": str(result.get("cached", False)),
"X-Doc-Id": doc_id,
"X-Ticker": ticker,
}
)
# Default: return JSON with metadata
return result
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def _chunk_to_text(chunk_obj) -> str:
"""Convert edgartools chunk to text string.
edgartools ChunkedDocument.chunks_for_item() returns list[TextBlock].
TextBlock has a .text property to get the actual content.
"""
if isinstance(chunk_obj, str):
return chunk_obj
if isinstance(chunk_obj, list):
# Join all TextBlocks in the list
parts = []
for elem in chunk_obj:
# TextBlock has .text property
if hasattr(elem, 'text'):
text = elem.text.strip() if elem.text else ""
else:
text = str(elem).strip()
if text:
parts.append(text)
return "\n".join(parts)
# Single TextBlock
if hasattr(chunk_obj, 'text'):
return chunk_obj.text.strip() if chunk_obj.text else ""
# Fallback: just convert to string
return str(chunk_obj)
@router.get("/chunks")
async def get_document_chunks_stream(
ticker: str = Query(..., description="Stock ticker (e.g., AAPL or NASDAQ:AAPL)"),
doc_id: str = Query(..., description="SEC Accession Number (e.g., 0000320193-25-000079)"),
items: Optional[List[str]] = Query(None, description="Optional list of items to extract (e.g., Item 1A, Item 7)"),
):
"""Get SEC filing semantic chunks with item labels (Streaming NDJSON).
Uses the Strategy Pattern via ChunkingOrchestrator to handle different filing types:
- 10-K/10-Q/20-F: Structured item-based chunking
- 6-K: Attachment extraction (EX-99.1)
- 8-K: Event-driven section chunking
Returns NDJSON (Newline Delimited JSON) format for streaming:
- First line: {"type": "header", "doc_id": "...", "ticker": "...", "form": "...", "filing_date": "..."}
- Chunk lines: {"type": "chunk", "text": "...", "metadata": {...}}
- Last line: {"type": "footer", "chunks_count": N, "status": "success"}
Args:
ticker: Stock ticker
doc_id: SEC Accession Number
items: Optional list of items to extract. Defaults to important sections.
"""
from fastapi.responses import StreamingResponse
from src.server.utils.logger import logger
from src.server.domain.chunking import ChunkingOrchestrator
import json
async def generate_chunks():
"""Generator for streaming NDJSON chunks using ChunkingOrchestrator."""
try:
# Extract pure symbol
pure_symbol = ticker.split(":")[-1] if ":" in ticker else ticker
accession_number = doc_id.replace("SEC:", "")
logger.info(f"๐ get_document_chunks (stream): ticker={pure_symbol}, doc_id={accession_number}")
# Get the filing using edgartools via sec_utils (avoids ticker.txt download)
from src.server.utils.sec_utils import get_company
company = get_company(pure_symbol)
filings = company.get_filings().latest(100)
target_filing = None
if filings:
for filing in filings:
if filing.accession_no == accession_number:
target_filing = filing
break
if not target_filing:
yield json.dumps({
"type": "error",
"error": f"Filing not found: {accession_number} for {pure_symbol}"
}) + "\n"
return
logger.info(f"๐ Found filing: {target_filing.form} dated {target_filing.filing_date}")
# Normalize items parameter
items_to_extract = None
if items and len(items) > 0 and items[0]:
items_to_extract = items
# Use ChunkingOrchestrator with Strategy Pattern
# This delegates to the appropriate strategy based on form type:
# - TenKStrategy for 10-K/10-Q/20-F
# - SixKStrategy for 6-K (attachment extraction)
# - EightKStrategy for 8-K
chunk_count = 0
for item in ChunkingOrchestrator.process_with_header_footer(
filing=target_filing,
ticker=pure_symbol,
items=items_to_extract,
):
if item["type"] == "chunk":
chunk_count += 1
yield json.dumps(item) + "\n"
logger.info(f"โ
Streamed {chunk_count} chunks via ChunkingOrchestrator")
except Exception as e:
logger.error(f"Streaming chunks failed: {e}")
import traceback
logger.error(traceback.format_exc())
yield json.dumps({
"type": "error",
"error": str(e)
}) + "\n"
return StreamingResponse(
generate_chunks(),
media_type="application/x-ndjson",
headers={
"X-Content-Type-Options": "nosniff",
"Cache-Control": "no-cache",
}
)
async def _api_fallback_chunking(filing, ticker: str, doc_id: str) -> dict:
"""Fallback chunking using markdown."""
from src.server.utils.logger import logger
try:
markdown_content = filing.markdown()
if not markdown_content:
return {
"status": "error",
"error": "Empty markdown content",
}
paragraphs = markdown_content.split("\n\n")
chunks = []
TARGET_SIZE = 4000
current_chunk = ""
chunk_index = 0
for para in paragraphs:
if len(current_chunk) + len(para) > TARGET_SIZE and current_chunk:
chunks.append({
"text": current_chunk.strip(),
"metadata": {
"ticker": ticker,
"doc_id": doc_id,
"form": filing.form,
"item": "fallback",
"item_name": "Fallback Chunking",
"filing_date": str(filing.filing_date),
"chunk_index": chunk_index,
}
})
chunk_index += 1
current_chunk = para
else:
current_chunk += "\n\n" + para if current_chunk else para
if current_chunk.strip():
chunks.append({
"text": current_chunk.strip(),
"metadata": {
"ticker": ticker,
"doc_id": doc_id,
"form": filing.form,
"item": "fallback",
"item_name": "Fallback Chunking",
"filing_date": str(filing.filing_date),
"chunk_index": chunk_index,
}
})
return {
"status": "success",
"doc_id": doc_id,
"ticker": ticker,
"form": filing.form,
"filing_date": str(filing.filing_date),
"chunks_count": len(chunks),
"chunks": chunks,
"fallback": True,
}
except Exception as e:
return {
"status": "error",
"error": str(e),
}