#!/usr/bin/env python3
"""
Bond New Issue Calendar Module — Phase 165
Track upcoming corporate and sovereign bond issuances from SEC EDGAR filings
Monitor new debt offerings to identify market sentiment and capital raising activity
Data Sources:
- SEC EDGAR: Registration statements (S-1, S-3, 424B5, F-1, F-3)
- Form S-3: Shelf registration for debt securities by US issuers
- Form 424B5: Prospectus for debt offerings
- Form 8-K: Material event disclosure (often includes bond offering announcements)
Refresh: Daily
Coverage: US corporate bond market, sovereign offerings filed with SEC
Author: QUANTCLAW DATA Build Agent
Phase: 165
"""
import sys
import json
import requests
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from collections import defaultdict
from pathlib import Path
# ========== SEC EDGAR CONFIGURATION ==========
SEC_BASE_URL = "https://data.sec.gov"
SEC_FILINGS_URL = f"{SEC_BASE_URL}/submissions"
SEC_SEARCH_URL = f"{SEC_BASE_URL}/cgi-bin/browse-edgar"
# User-Agent is REQUIRED by SEC EDGAR API
USER_AGENT = "QuantClaw/1.0 (quantclaw@moneyclaw.com)"
# Form types that typically contain bond issuance information
BOND_FORM_TYPES = ['S-3', '424B5', '424B2', 'F-3', '8-K', 'S-1', 'F-1', 'FWP']
# Keywords that indicate bond/debt offerings
DEBT_KEYWORDS = [
'debt securities', 'notes due', 'senior notes', 'subordinated notes',
'debentures', 'bonds', 'bond offering', 'debt offering',
'fixed income', 'interest rate', 'maturity date', 'principal amount'
]
# ========== CORE FUNCTIONS ==========
def search_recent_filings(form_types: List[str] = None, days_back: int = 30) -> Dict:
"""
Search SEC EDGAR for recent bond-related filings
Args:
form_types: List of form types to search (default: BOND_FORM_TYPES)
days_back: Number of days to look back
Returns:
Dictionary with recent filings
"""
if form_types is None:
form_types = BOND_FORM_TYPES
try:
# SEC provides RSS feeds for recent filings
# We'll use the public search API
start_date = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')
headers = {
'User-Agent': USER_AGENT,
'Accept': 'application/json'
}
all_filings = []
for form_type in form_types:
# Use SEC's company search API
params = {
'action': 'getcompany',
'type': form_type,
'dateb': '', # End date (today)
'datea': start_date.replace('-', ''), # Start date
'count': 100,
'output': 'atom'
}
try:
# Note: SEC's EDGAR system may return XML/Atom feeds
# For this implementation, we'll parse the available data
response = requests.get(
'https://www.sec.gov/cgi-bin/browse-edgar',
params=params,
headers=headers,
timeout=15
)
# Parse the response for filing information
# This is a simplified implementation
if response.status_code == 200:
# Extract filing entries from the response
content = response.text
# For this example, we'll create sample data structure
# In production, you'd parse the actual XML/HTML response
pass
except Exception as e:
print(f"Error fetching {form_type}: {str(e)}", file=sys.stderr)
continue
return {
'success': True,
'filings': all_filings,
'period': f'{days_back} days',
'form_types': form_types
}
except Exception as e:
return {
'success': False,
'error': str(e),
'filings': []
}
def get_company_filings(cik: str, form_types: List[str] = None, count: int = 20) -> Dict:
"""
Get recent filings for a specific company by CIK
Args:
cik: Company's Central Index Key (CIK)
form_types: Form types to filter
count: Number of filings to retrieve
Returns:
Dictionary with company filings
"""
if form_types is None:
form_types = BOND_FORM_TYPES
try:
# Pad CIK to 10 digits
cik_padded = cik.zfill(10)
headers = {
'User-Agent': USER_AGENT,
'Accept': 'application/json'
}
url = f"{SEC_FILINGS_URL}/CIK{cik_padded}.json"
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
data = response.json()
company_name = data.get('name', 'Unknown')
# Filter filings by form type
recent_filings = data.get('filings', {}).get('recent', {})
if not recent_filings:
return {
'success': False,
'error': 'No filings found',
'company': company_name
}
# Parse filings
filings = []
forms = recent_filings.get('form', [])
filing_dates = recent_filings.get('filingDate', [])
accession_numbers = recent_filings.get('accessionNumber', [])
primary_docs = recent_filings.get('primaryDocument', [])
for i, form in enumerate(forms):
if i >= count:
break
if form in form_types:
filing = {
'form_type': form,
'filing_date': filing_dates[i] if i < len(filing_dates) else None,
'accession_number': accession_numbers[i] if i < len(accession_numbers) else None,
'primary_document': primary_docs[i] if i < len(primary_docs) else None,
'url': f"https://www.sec.gov/cgi-bin/viewer?action=view&cik={cik_padded}&accession_number={accession_numbers[i].replace('-', '')}&xbrl_type=v"
}
filings.append(filing)
return {
'success': True,
'company': company_name,
'cik': cik,
'filings': filings,
'total_found': len(filings)
}
except requests.exceptions.RequestException as e:
return {
'success': False,
'error': f'SEC API request failed: {str(e)}',
'company': None,
'filings': []
}
except Exception as e:
return {
'success': False,
'error': str(e),
'company': None,
'filings': []
}
def analyze_filing_content(cik: str, accession_number: str) -> Dict:
"""
Analyze a specific filing for bond issuance details
Args:
cik: Company CIK
accession_number: SEC accession number
Returns:
Dict with extracted bond details
"""
try:
cik_padded = cik.zfill(10)
accession_clean = accession_number.replace('-', '')
headers = {
'User-Agent': USER_AGENT,
'Accept': 'text/html'
}
# Construct document URL
# Format: https://www.sec.gov/Archives/edgar/data/{CIK}/{ACCESSION}/{document}
base_url = f"https://www.sec.gov/cgi-bin/viewer"
params = {
'action': 'view',
'cik': cik_padded,
'accession_number': accession_clean,
'xbrl_type': 'v'
}
response = requests.get(base_url, params=params, headers=headers, timeout=15)
if response.status_code != 200:
return {
'success': False,
'error': f'Failed to fetch filing: {response.status_code}'
}
content = response.text.lower()
# Extract key information using regex patterns
details = {
'has_debt_keywords': any(keyword in content for keyword in DEBT_KEYWORDS),
'principal_amount': _extract_principal_amount(content),
'interest_rate': _extract_interest_rate(content),
'maturity_date': _extract_maturity_date(content),
'rating': _extract_rating(content)
}
return {
'success': True,
'data': details,
'accession_number': accession_number
}
except Exception as e:
return {
'success': False,
'error': str(e),
'data': {}
}
def _extract_principal_amount(content: str) -> Optional[str]:
"""Extract principal amount from filing text"""
patterns = [
r'\$\s*([\d,]+)\s*(?:million|billion)',
r'principal amount of \$\s*([\d,]+)',
r'aggregate principal amount.*?\$\s*([\d,]+)'
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return match.group(1)
return None
def _extract_interest_rate(content: str) -> Optional[str]:
"""Extract interest rate from filing text"""
patterns = [
r'([\d.]+)%\s*(?:per annum|annual|interest)',
r'interest rate of ([\d.]+)%',
r'coupon rate.*?([\d.]+)%'
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return f"{match.group(1)}%"
return None
def _extract_maturity_date(content: str) -> Optional[str]:
"""Extract maturity date from filing text"""
patterns = [
r'maturity date.*?(\d{4})',
r'due (\d{4})',
r'mature.*?(\w+ \d{1,2}, \d{4})'
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return match.group(1)
return None
def _extract_rating(content: str) -> Optional[str]:
"""Extract credit rating from filing text"""
patterns = [
r'(?:rating|rated).*?(aaa|aa|a|bbb|bb|b|ccc|cc|c)',
r"(?:moody's|s&p|fitch).*?(?:rating|rated).*?([a-z]{1,3}[+-]?)"
]
for pattern in patterns:
match = re.search(pattern, content, re.IGNORECASE)
if match:
return match.group(1).upper()
return None
def get_upcoming_issues(days_back: int = 30, min_amount_millions: float = 100) -> Dict:
"""
Get calendar of upcoming bond issues
Args:
days_back: Number of days to search back
min_amount_millions: Minimum principal amount in millions
Returns:
Dict with upcoming bond issues
"""
try:
# Major bond issuers (sample CIKs)
major_issuers = {
'0000320193': 'Apple Inc.',
'0001018724': 'Amazon.com Inc.',
'0001652044': 'Alphabet Inc.',
'0000789019': 'Microsoft Corporation',
'0000019617': 'JPMorgan Chase & Co.',
'0000070858': 'Bank of America Corp',
'0000886982': 'Citigroup Inc.',
'0000831001': 'Goldman Sachs Group Inc.',
'0001467858': 'Tesla Inc.',
'0000200406': 'Johnson & Johnson'
}
all_issues = []
for cik, company in major_issuers.items():
filings = get_company_filings(cik, form_types=['424B5', 'S-3', '424B2'], count=5)
if filings.get('success') and filings.get('filings'):
for filing in filings['filings']:
# Check if filing is recent
filing_date = filing.get('filing_date')
if filing_date:
file_dt = datetime.strptime(filing_date, '%Y-%m-%d')
if (datetime.now() - file_dt).days <= days_back:
issue = {
'company': company,
'cik': cik,
'form_type': filing.get('form_type'),
'filing_date': filing_date,
'url': filing.get('url')
}
all_issues.append(issue)
# Sort by filing date (most recent first)
all_issues.sort(key=lambda x: x.get('filing_date', ''), reverse=True)
return {
'success': True,
'data': {
'issues': all_issues,
'count': len(all_issues),
'period_days': days_back,
'timestamp': datetime.now().isoformat()
}
}
except Exception as e:
return {
'success': False,
'error': str(e),
'data': {}
}
def get_issuer_history(ticker_or_cik: str, years: int = 2) -> Dict:
"""
Get historical bond issuance for a specific company
Args:
ticker_or_cik: Company ticker or CIK
years: Years of history to retrieve
Returns:
Dict with issuance history
"""
try:
# If ticker provided, would need to convert to CIK
# For this implementation, assume CIK is provided
cik = ticker_or_cik
days = years * 365
filings = get_company_filings(cik, form_types=BOND_FORM_TYPES, count=50)
if not filings.get('success'):
return filings
# Group by year
by_year = defaultdict(list)
for filing in filings.get('filings', []):
filing_date = filing.get('filing_date')
if filing_date:
year = filing_date[:4]
by_year[year].append(filing)
return {
'success': True,
'company': filings.get('company'),
'cik': cik,
'data': dict(by_year),
'total_filings': len(filings.get('filings', [])),
'period_years': years
}
except Exception as e:
return {
'success': False,
'error': str(e),
'data': {}
}
def get_bond_dashboard() -> Dict:
"""
Get comprehensive bond new issue dashboard
Returns:
Dict with dashboard data
"""
try:
upcoming = get_upcoming_issues(days_back=30)
dashboard = {
'timestamp': datetime.now().isoformat(),
'recent_issues': upcoming.get('data', {}),
'sources': ['SEC EDGAR'],
'form_types_monitored': BOND_FORM_TYPES,
'coverage': 'US corporate and sovereign bond offerings'
}
return {
'success': True,
'data': dashboard
}
except Exception as e:
return {
'success': False,
'error': str(e),
'data': {}
}
# ========== CLI INTERFACE ==========
def main():
"""CLI interface"""
if len(sys.argv) < 2:
print("Usage: python bond_new_issue.py <command> [options]")
print("\nCommands:")
print(" bond-upcoming [days] - Upcoming bond issues (default: 30 days)")
print(" bond-issuer <cik> [years] - Historical issuance for company (default: 2 years)")
print(" bond-company <cik> [count] - Recent filings for company (default: 20)")
print(" bond-analyze <cik> <accession> - Analyze specific filing")
print(" bond-dashboard - Comprehensive new issue dashboard")
print("\nExamples:")
print(" python bond_new_issue.py bond-upcoming 60")
print(" python bond_new_issue.py bond-issuer 0000320193 3")
print(" python bond_new_issue.py bond-company 0000320193 10")
print(" python bond_new_issue.py bond-dashboard")
return
command = sys.argv[1]
if command == 'bond-upcoming':
days = int(sys.argv[2]) if len(sys.argv) > 2 else 30
result = get_upcoming_issues(days_back=days)
print(json.dumps(result, indent=2))
elif command == 'bond-issuer':
if len(sys.argv) < 3:
print("Error: CIK required")
return 1
cik = sys.argv[2]
years = int(sys.argv[3]) if len(sys.argv) > 3 else 2
result = get_issuer_history(cik, years=years)
print(json.dumps(result, indent=2))
elif command == 'bond-company':
if len(sys.argv) < 3:
print("Error: CIK required")
return 1
cik = sys.argv[2]
count = int(sys.argv[3]) if len(sys.argv) > 3 else 20
result = get_company_filings(cik, count=count)
print(json.dumps(result, indent=2))
elif command == 'bond-analyze':
if len(sys.argv) < 4:
print("Error: CIK and accession number required")
return 1
cik = sys.argv[2]
accession = sys.argv[3]
result = analyze_filing_content(cik, accession)
print(json.dumps(result, indent=2))
elif command == 'bond-dashboard':
result = get_bond_dashboard()
print(json.dumps(result, indent=2))
else:
print(f"Unknown command: {command}")
return 1
if __name__ == '__main__':
main()