Skip to main content
Glama
main.py20 kB
import time #!/usr/bin/env python import json import asyncio from datetime import datetime from typing import List, Dict, Any, Optional import re from urllib.parse import urljoin import aiohttp from bs4 import BeautifulSoup from mcp.server import Server from mcp.server.models import InitializationOptions from mcp.types import Tool, TextContent import mcp.server.stdio as stdio import asyncio from functools import wraps # Import advanced modules import sys from pathlib import Path sys.path.append(str(Path(__file__).parent.parent.parent / "shared")) from xbrl_parser import XBRLParser from financial_analysis import DCFModel, FinancialMetrics from research_report_generator import ResearchReportGenerator def async_retry(max_attempts=3, delay=1): """Retry decorator for async functions""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: await asyncio.sleep(delay * (attempt + 1)) continue raise last_exception return wrapper return decorator class SECScraper: """Scraper for SEC EDGAR filings and financial data""" def __init__(self): self.session: Optional[aiohttp.ClientSession] = None self.last_request_time = {} # Initialize advanced components self.analysis_enhanced = True self.xbrl_parser = XBRLParser() self.dcf_model = DCFModel() self.min_delay = 1.0 # Rate limiting self.sec_headers = { 'User-Agent': 'FinancialMCP/1.0 (Personal Research Tool; Contact: research@example.com)', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' } self.yahoo_headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' } async def rate_limit(self, url: str): """Implement rate limiting per domain""" from urllib.parse import urlparse domain = urlparse(url).netloc if domain in self.last_request_time: elapsed = time.time() - self.last_request_time[domain] if elapsed < self.min_delay: await asyncio.sleep(self.min_delay - elapsed) self.last_request_time[domain] = time.time() async def setup(self): """Setup aiohttp session""" if not self.session or self.session.closed: self.session = aiohttp.ClientSession() async def cleanup(self): """Cleanup aiohttp session""" if self.session: await self.session.close() async def search_company_filings(self, ticker: str, filing_type: str = None) -> List[Dict[str, Any]]: """Search for company filings on SEC EDGAR""" # First, get CIK from ticker cik_url = f"https://www.sec.gov/cgi-bin/browse-edgar?CIK={ticker}&owner=exclude" try: async with self.session.get(cik_url, headers=self.sec_headers) as response: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Extract CIK cik_elem = soup.find('span', {'class': re.compile('companyName')}) or soup.find(text=re.compile('CIK#:')) if not cik_elem: return [{'error': f'Company not found for ticker {ticker}'}] cik_text = cik_elem.text cik_match = re.search(r'CIK#: (\d+)', cik_text) if not cik_match: return [{'error': 'Could not extract CIK'}] cik = cik_match.group(1).zfill(10) # Pad with zeros # Search for filings if filing_type: search_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&type={filing_type}&count=10" else: search_url = f"https://www.sec.gov/cgi-bin/browse-edgar?action=getcompany&CIK={cik}&count=10" async with self.session.get(search_url, headers=self.sec_headers) as filing_response: filing_html = await filing_response.text() filing_soup = BeautifulSoup(filing_html, 'html.parser') filings = [] filing_table = filing_soup.find('table', {'class': re.compile('tableFile')}) or soup.find('table', {'summary': re.compile('Document')}) if filing_table: rows = filing_table.find_all('tr')[1:] # Skip header for row in rows[:10]: # Get latest 10 filings cells = row.find_all('td') if len(cells) >= 4: filing_link = cells[1].find('a') doc_link = cells[2].find('a', {'id': re.compile('documentsbutton')}) filing = { 'filing_type': cells[0].text.strip(), 'filing_date': cells[3].text.strip(), 'description': cells[2].text.strip(), } if doc_link: filing['documents_url'] = urljoin('https://www.sec.gov', doc_link.get('href', '')) filings.append(filing) return filings except aiohttp.ClientError as e: return [{'error': f'Network error: {str(e)}', 'retry_possible': True}] except Exception as e: return [{'error': f'Failed to search filings: {type(e).__name__}: {str(e)}', 'retry_possible': True}] async def scrape_10k_financials(self, ticker: str) -> Dict[str, Any]: """Scrape financial statements from latest 10-K filing""" filings = await self.search_company_filings(ticker, '10-K') if not filings or 'error' in filings[0]: return {'error': 'No 10-K filings found'} latest_10k = filings[0] if 'documents_url' not in latest_10k: return {'error': 'No document URL found'} try: async with self.session.get(latest_10k['documents_url'], headers=self.sec_headers) as response: html = await response.text() soup = BeautifulSoup(html, 'html.parser') # Find the main 10-K document doc_table = soup.find('table', {'class': 'tableFile'}) if not doc_table: return {'error': 'Document table not found'} main_doc_link = None for row in doc_table.find_all('tr')[1:]: cells = row.find_all('td') if cells and '10-K' in cells[3].text: link = cells[2].find('a') if link: main_doc_link = urljoin('https://www.sec.gov', link.get('href', '')) break if not main_doc_link: return {'error': '10-K document link not found'} # Get the 10-K content async with self.session.get(main_doc_link, headers=self.sec_headers) as doc_response: doc_html = await doc_response.text() doc_soup = BeautifulSoup(doc_html, 'html.parser') # Extract financial data (simplified - real implementation would parse XBRL) financial_data = { 'ticker': ticker, 'filing_date': latest_10k['filing_date'], 'document_url': main_doc_link, 'sections': {} } # Look for key sections text_content = doc_soup.get_text() # Extract revenue mentions revenue_matches = re.findall(r'(?:revenue|net sales).*?\$[\d,]+(?:\.\d+)?\s*(?:million|billion)', text_content, re.IGNORECASE) if revenue_matches: financial_data['sections']['revenue_mentions'] = revenue_matches[:5] # Extract income mentions income_matches = re.findall(r'net (?:income|loss).*?\$[\d,]+(?:\.\d+)?\s*(?:million|billion)', text_content, re.IGNORECASE) if income_matches: financial_data['sections']['income_mentions'] = income_matches[:5] return financial_data except Exception as e: return {'error': f'Failed to scrape 10-K: {str(e)}'} async def scrape_10q_earnings(self, ticker: str) -> Dict[str, Any]: """Scrape quarterly earnings from latest 10-Q filing""" filings = await self.search_company_filings(ticker, '10-Q') if not filings or 'error' in filings[0]: return {'error': 'No 10-Q filings found'} latest_10q = filings[0] return { 'ticker': ticker, 'filing_type': '10-Q', 'filing_date': latest_10q['filing_date'], 'description': latest_10q.get('description', ''), 'documents_url': latest_10q.get('documents_url', ''), 'note': 'Full 10-Q parsing would extract detailed quarterly financials' } async def scrape_8k_events(self, ticker: str) -> List[Dict[str, Any]]: """Scrape recent 8-K material events""" filings = await self.search_company_filings(ticker, '8-K') if not filings or 'error' in filings[0]: return [{'error': 'No 8-K filings found'}] # Return the recent 8-K filings with their descriptions events = [] for filing in filings[:5]: # Get latest 5 events events.append({ 'ticker': ticker, 'filing_type': '8-K', 'filing_date': filing['filing_date'], 'description': filing.get('description', ''), 'documents_url': filing.get('documents_url', '') }) return events async def parse_xbrl_data(self, ticker: str, filing_type: str = '10-K') -> Dict[str, Any]: """Parse XBRL structured data from filings using the XBRL parser""" try: # Get latest filing filings = await self.search_company_filings(ticker, filing_type) if not filings or 'error' in filings[0]: return {'error': f'No {filing_type} filings found'} latest_filing = filings[0] if 'documents_url' not in latest_filing: return {'error': 'No document URL found'} # Use XBRL parser to fetch and parse xbrl_content = await self.xbrl_parser.fetch_xbrl_from_filing( latest_filing['documents_url'], self.session ) if xbrl_content: parsed_data = self.xbrl_parser.parse_xbrl_string(xbrl_content) key_metrics = self.xbrl_parser.extract_key_metrics(parsed_data) return { 'ticker': ticker, 'filing_type': filing_type, 'filing_date': latest_filing['filing_date'], 'parsed_data': parsed_data, 'key_metrics': key_metrics } else: return { 'ticker': ticker, 'filing_type': filing_type, 'note': 'XBRL data not available, using HTML extraction', 'filing_date': latest_filing['filing_date'] } except Exception as e: return {'error': f'Failed to parse XBRL: {str(e)}'} async def get_current_price(self, ticker: str) -> Dict[str, Any]: """Get current stock price from Yahoo Finance""" url = f"https://finance.yahoo.com/quote/{ticker}" try: async with self.session.get(url, headers=self.yahoo_headers) as response: html = await response.text() soup = BeautifulSoup(html, 'html.parser') price_data = { 'ticker': ticker, 'source': 'yahoo_finance', 'timestamp': datetime.now().isoformat() } # Find price price_elem = soup.find('fin-streamer', {'data-field': 'regularMarketPrice'}) if price_elem: price_data['price'] = price_elem.get('data-value', price_elem.text) # Find change change_elem = soup.find('fin-streamer', {'data-field': 'regularMarketChange'}) if change_elem: price_data['change'] = change_elem.get('data-value', change_elem.text) # Find change percent change_pct_elem = soup.find('fin-streamer', {'data-field': 'regularMarketChangePercent'}) if change_pct_elem: price_data['change_percent'] = change_pct_elem.get('data-value', change_pct_elem.text) # Find volume volume_elem = soup.find('fin-streamer', {'data-field': 'regularMarketVolume'}) if volume_elem: price_data['volume'] = volume_elem.get('data-value', volume_elem.text) # Find market cap market_cap_elem = soup.find('td', {'data-test': 'MARKET_CAP-value'}) if market_cap_elem: price_data['market_cap'] = market_cap_elem.text return price_data except Exception as e: return {'error': f'Failed to get price: {str(e)}'} # Initialize server server = Server("sec-scraper") scraper = SECScraper() # Define tools @server.list_tools() async def list_tools() -> List[Tool]: return [ Tool( name="scrape_10k_financials", description="Scrape financial statements from the latest 10-K annual report", inputSchema={ "type": "object", "properties": { "ticker": { "type": "string", "description": "Stock ticker symbol (e.g., AAPL, MSFT)" } }, "required": ["ticker"] } ), Tool( name="scrape_10q_earnings", description="Scrape quarterly earnings data from the latest 10-Q report", inputSchema={ "type": "object", "properties": { "ticker": { "type": "string", "description": "Stock ticker symbol" } }, "required": ["ticker"] } ), Tool( name="scrape_8k_events", description="Scrape recent 8-K material events and announcements", inputSchema={ "type": "object", "properties": { "ticker": { "type": "string", "description": "Stock ticker symbol" } }, "required": ["ticker"] } ), Tool( name="parse_xbrl_data", description="Parse XBRL structured financial data from SEC filings", inputSchema={ "type": "object", "properties": { "ticker": { "type": "string", "description": "Stock ticker symbol" }, "filing_type": { "type": "string", "description": "Type of filing (10-K, 10-Q, etc.)", "default": "10-K" } }, "required": ["ticker"] } ), Tool( name="get_current_price", description="Get current stock price and market data", inputSchema={ "type": "object", "properties": { "ticker": { "type": "string", "description": "Stock ticker symbol" } }, "required": ["ticker"] } ) ] @server.call_tool() async def call_tool(name: str, arguments: dict) -> List[TextContent]: await scraper.setup() try: if name == "scrape_10k_financials": ticker = arguments["ticker"].upper() result = await scraper.scrape_10k_financials(ticker) return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "scrape_10q_earnings": ticker = arguments["ticker"].upper() result = await scraper.scrape_10q_earnings(ticker) return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "scrape_8k_events": ticker = arguments["ticker"].upper() result = await scraper.scrape_8k_events(ticker) return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "parse_xbrl_data": ticker = arguments["ticker"].upper() filing_type = arguments.get("filing_type", "10-K") result = await scraper.parse_xbrl_data(ticker, filing_type) return [TextContent( type="text", text=json.dumps(result, indent=2) )] elif name == "get_current_price": ticker = arguments["ticker"].upper() result = await scraper.get_current_price(ticker) return [TextContent( type="text", text=json.dumps(result, indent=2) )] else: return [TextContent( type="text", text=f"Unknown tool: {name}" )] except Exception as e: return [TextContent( type="text", text=json.dumps({"error": str(e)}, indent=2) )] async def main(): async with stdio.stdio_server() as (read_stream, write_stream): await server.run( read_stream, write_stream, InitializationOptions( server_name="sec-scraper", server_version="0.1.0", capabilities={} ) ) if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LuisRincon23/SEC-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server