Skip to main content
Glama
fix_critical_issues.py11 kB
#!/usr/bin/env python3 """ Fix critical data extraction issues in Financial MCPs - Updates User-Agents - Implements data validation - Adds error handling - Updates outdated selectors """ import os import re from pathlib import Path from typing import List, Tuple, Optional def fix_user_agents(content: str, filename: str) -> Tuple[str, List[str]]: """Fix User-Agent issues""" changes_made = [] # Fix SEC placeholder User-Agent if 'Your Company Name your.email@company.com' in content: content = content.replace( "'User-Agent': 'Your Company Name your.email@company.com'", "'User-Agent': 'FinancialMCP/1.0 (Personal Research Tool; Contact: research@example.com)'" ) changes_made.append("Fixed SEC placeholder User-Agent") # Fix incomplete browser User-Agents incomplete_ua = "'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'" complete_ua = "'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'" if incomplete_ua in content: content = content.replace(incomplete_ua, complete_ua) changes_made.append("Updated incomplete browser User-Agent") return content, changes_made def add_data_validation(content: str, filename: str) -> Tuple[str, List[str]]: """Add data validation to scrapers""" changes_made = [] # Add validation helper if not present validation_helper = ''' def validate_price(self, price_str: str) -> Optional[float]: """Validate and parse price data""" try: # Remove common formatting clean_price = re.sub(r'[$,]', '', str(price_str)) price = float(clean_price) # Sanity checks if price <= 0 or price > 1000000: return None return price except: return None def validate_date(self, date_str: str) -> Optional[str]: """Validate date format""" try: # Try to parse various date formats from datetime import datetime for fmt in ['%Y-%m-%d', '%m/%d/%Y', '%B %d, %Y', '%d-%b-%Y']: try: datetime.strptime(date_str.strip(), fmt) return date_str.strip() except: continue return None except: return None def validate_percentage(self, pct_str: str) -> Optional[float]: """Validate percentage data""" try: # Remove % and convert clean_pct = re.sub(r'[%()]', '', str(pct_str)) pct = float(clean_pct) # Sanity check if abs(pct) > 1000: # More than 1000% change is suspicious return None return pct except: return None ''' # Add validation methods after class definition if 'class' in content and 'validate_price' not in content: # Find the __init__ method and add validation methods after it init_match = re.search(r'(def __init__.*?\n(?:.*?\n)*?)\n def', content, re.MULTILINE) if init_match: insert_pos = init_match.end(1) content = content[:insert_pos] + validation_helper + content[insert_pos:] changes_made.append("Added data validation methods") return content, changes_made def update_selectors(content: str, filename: str) -> Tuple[str, List[str]]: """Update outdated HTML selectors with more robust alternatives""" changes_made = [] selector_updates = { # Finviz updates "'fullview-news-outer'": "'news-table'", "'fullview-ratings-outer'": "'ratings-outer'", "'snapshot-table2'": "'snapshot-table'", # Generic updates for robustness "soup.find('table', {'class': 'tableFile2'})": "soup.find('table', {'class': re.compile('tableFile')}) or soup.find('table', {'summary': re.compile('Document')})", "soup.find('span', {'class': 'companyName'})": "soup.find('span', {'class': re.compile('companyName')}) or soup.find(text=re.compile('CIK#:'))", # Add fallback selectors "fin-streamer[data-field=\"regularMarketPrice\"]": "fin-streamer[data-field=\"regularMarketPrice\"], [data-symbol][data-field=\"regularMarketPrice\"], .Fw\\(b\\).Fz\\(36px\\)" } for old, new in selector_updates.items(): if old in content: content = content.replace(old, new) changes_made.append(f"Updated selector: {old[:30]}...") return content, changes_made def add_error_handling(content: str, filename: str) -> Tuple[str, List[str]]: """Improve error handling with retries and better messages""" changes_made = [] # Add retry decorator if not present retry_decorator = '''import asyncio from functools import wraps def async_retry(max_attempts=3, delay=1): """Retry decorator for async functions""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_attempts - 1: await asyncio.sleep(delay * (attempt + 1)) continue raise last_exception return wrapper return decorator ''' # Add retry decorator at the top of file if not present if 'async_retry' not in content and 'import asyncio' in content: # Find the last import statement import_match = list(re.finditer(r'^import.*\n|^from.*import.*\n', content, re.MULTILINE)) if import_match: insert_pos = import_match[-1].end() content = content[:insert_pos] + "\n" + retry_decorator + content[insert_pos:] changes_made.append("Added retry decorator") # Improve error messages generic_errors = [ (r"return \[{'error': f'Failed to.*?: {str\(e\)}'}\]", "return [{'error': f'Failed to {operation}: {type(e).__name__}: {str(e)}', 'retry_possible': True}]"), (r"except Exception as e:\s*return \[{'error':", "except aiohttp.ClientError as e:\n return [{'error': f'Network error: {str(e)}', 'retry_possible': True}]\n except Exception as e:\n return [{'error':") ] for pattern, replacement in generic_errors: if re.search(pattern, content): content = re.sub(pattern, replacement, content) changes_made.append("Improved error handling") return content, changes_made def add_rate_limiting(content: str, filename: str) -> Tuple[str, List[str]]: """Add rate limiting to prevent bans""" changes_made = [] rate_limiter = ''' def __init__(self): self.session: Optional[aiohttp.ClientSession] = None self.last_request_time = {} self.min_delay = 1.0 # Minimum 1 second between requests to same domain ''' rate_limit_method = ''' async def rate_limit(self, url: str): """Implement rate limiting per domain""" from urllib.parse import urlparse domain = urlparse(url).netloc if domain in self.last_request_time: elapsed = time.time() - self.last_request_time[domain] if elapsed < self.min_delay: await asyncio.sleep(self.min_delay - elapsed) self.last_request_time[domain] = time.time() ''' # Add rate limiting if not present if 'rate_limit' not in content and '__init__' in content: # Update __init__ to include rate limiting init_pattern = r'(def __init__\(self\):.*?self\.session.*?\n)' match = re.search(init_pattern, content, re.DOTALL) if match: old_init = match.group(1) new_init = old_init.rstrip() + '\n self.last_request_time = {}\n self.min_delay = 1.0 # Rate limiting\n' content = content.replace(old_init, new_init) # Add rate_limit method after __init__ content = content.replace(new_init, new_init + rate_limit_method) # Add time import if needed if 'import time' not in content: content = 'import time\n' + content changes_made.append("Added rate limiting") return content, changes_made def process_file(filepath: Path) -> List[str]: """Process a single Python file""" print(f"\nProcessing {filepath.name}...") with open(filepath, 'r') as f: content = f.read() original_content = content all_changes = [] # Apply fixes fixes = [ fix_user_agents, add_data_validation, update_selectors, add_error_handling, add_rate_limiting ] for fix_func in fixes: content, changes = fix_func(content, filepath.name) all_changes.extend(changes) # Write back if changes were made if content != original_content: with open(filepath, 'w') as f: f.write(content) print(f"✅ Applied {len(all_changes)} fixes:") for change in all_changes: print(f" - {change}") else: print("✓ No changes needed") return all_changes def main(): """Fix critical issues in all MCP scrapers""" base_dir = Path("/Users/LuisRincon/SEC-MCP/FinancialMCPs") print("🔧 Fixing Critical Data Extraction Issues") print("=" * 50) total_changes = 0 # Process each MCP's main.py mcps = [ "SEC_SCRAPER_MCP", "NEWS_SENTIMENT_SCRAPER", "ANALYST_RATINGS_SCRAPER", "INSTITUTIONAL_SCRAPER", "ALTERNATIVE_DATA_SCRAPER", "INDUSTRY_ASSUMPTIONS_ENGINE", "ECONOMIC_DATA_COLLECTOR", "RESEARCH_ADMINISTRATOR" ] for mcp in mcps: main_file = base_dir / mcp / "src" / "main.py" if main_file.exists(): changes = process_file(main_file) total_changes += len(changes) else: print(f"\n❌ {mcp}/src/main.py not found") print("\n" + "=" * 50) print(f"✅ Total fixes applied: {total_changes}") print("\n⚠️ IMPORTANT: The MCPs still need:") print("1. Proper API keys for services that require them") print("2. More sophisticated sentiment analysis (consider using NLP libraries)") print("3. Fallback data sources when primary sources fail") print("4. Comprehensive testing with real data") print("\nRestart Claude Desktop to apply these changes!") if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/LuisRincon23/SEC-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server