#!/usr/bin/env python3
"""
Scan bug bounty platforms for new/latest programs.
Supports: Intigriti, Bugcrowd, HackerOne
"""
import requests
import json
import sys
import yaml
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Any, Optional
import time
from dotenv import load_dotenv
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
# Load environment variables
load_dotenv(Path(__file__).parent.parent / ".env")
def load_api_tokens() -> Dict[str, Any]:
"""Load API tokens from .env file or config file."""
tokens = {}
# Try to load from .env first
hackerone_token = os.getenv('HACKERONE_API_TOKEN')
bugcrowd_token = os.getenv('BUGCROWD_API_TOKEN')
intigriti_token = os.getenv('INTIGRITI_API_TOKEN')
intigriti_client_id = os.getenv('INTIGRITI_CLIENT_ID')
intigriti_client_secret = os.getenv('INTIGRITI_CLIENT_SECRET')
if hackerone_token:
tokens['hackerone'] = {'api_token': hackerone_token}
if bugcrowd_token:
tokens['bugcrowd'] = {'api_token': bugcrowd_token}
if intigriti_token:
tokens['intigriti'] = {'api_token': intigriti_token}
elif intigriti_client_id and intigriti_client_secret:
tokens['intigriti'] = {
'client_id': intigriti_client_id,
'client_secret': intigriti_client_secret
}
# If no .env tokens found, try config file
if not tokens:
config_dir = Path(__file__).parent.parent / "config"
token_file = config_dir / "api-tokens.yaml"
if token_file.exists():
try:
with open(token_file, 'r') as f:
data = yaml.safe_load(f)
return data.get('platforms', {})
except Exception as e:
print(f"ā ļø Warning: Could not load API tokens from config: {e}")
return tokens
class PlatformScanner:
"""Base class for platform scanners."""
def __init__(self, api_config: Optional[Dict[str, Any]] = None):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36'
})
self.api_config = api_config or {}
def scan_programs(self) -> List[Dict[str, Any]]:
"""Scan for programs. Override in subclasses."""
raise NotImplementedError
class IntigritiScanner(PlatformScanner):
"""Scanner for Intigriti platform."""
BASE_URL = "https://app.intigriti.com"
def scan_programs(self) -> List[Dict[str, Any]]:
"""Scan Intigriti for programs."""
print("š Scanning Intigriti...")
# Check for API credentials
auth_headers = {}
# Check for simple API token first
if self.api_config.get('api_token'):
print(" ā¹ļø Using API token")
auth_headers['Authorization'] = f"Bearer {self.api_config['api_token']}"
# Otherwise check for OAuth credentials
elif self.api_config.get('client_id') and self.api_config.get('client_secret'):
print(" ā¹ļø Using OAuth credentials")
try:
# Get OAuth token
token_response = self.session.post(
"https://login.intigriti.com/connect/token",
data={
'client_id': self.api_config['client_id'],
'client_secret': self.api_config['client_secret'],
'grant_type': 'client_credentials',
'scope': 'researcher'
}
)
if token_response.status_code == 200:
token_data = token_response.json()
auth_headers['Authorization'] = f"Bearer {token_data['access_token']}"
else:
print(f" ā ļø OAuth failed: {token_response.status_code}")
except Exception as e:
print(f" ā ļø OAuth error: {e}")
try:
# Try multiple endpoints
endpoints = [
"https://api.intigriti.com/core/researcher/programs",
"https://api.intigriti.com/core/program",
"https://app.intigriti.com/api/core/program",
]
headers = {**self.session.headers, **auth_headers, 'Accept': 'application/json'}
for endpoint in endpoints:
try:
response = self.session.get(endpoint, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
programs = []
# Parse programs
records = data.get('records', []) if isinstance(data, dict) else data
if isinstance(records, dict):
records = records.get('data', [])
for program in records:
if isinstance(program, dict):
# Extract program details
program_info = {
'platform': 'intigriti',
'name': program.get('companyHandle') or program.get('name', 'Unknown'),
'handle': program.get('handle', ''),
'url': f"https://app.intigriti.com/programs/{program.get('companyHandle', program.get('handle', ''))}/detail",
'status': program.get('status', 'unknown'),
'confidentiality': program.get('confidentialityLevel', {}).get('value', 'public') if isinstance(program.get('confidentialityLevel'), dict) else 'public',
'min_bounty': program.get('minBounty', {}).get('value', 0) if isinstance(program.get('minBounty'), dict) else 0,
'max_bounty': program.get('maxBounty', {}).get('value', 0) if isinstance(program.get('maxBounty'), dict) else 0,
'created_at': program.get('createdAt', ''),
}
programs.append(program_info)
if programs:
print(f" ā Found {len(programs)} programs")
return programs
except:
continue
# If all endpoints fail
if not auth_headers:
print(" ā ļø API endpoints require authentication")
print(" ā¹ļø Add your Intigriti API credentials to config/api-tokens.yaml")
else:
print(" ā ļø API endpoints unavailable")
print(" ā¹ļø Manual check recommended at https://app.intigriti.com/researcher/programs")
return []
except Exception as e:
print(f" ā Error: {e}")
return []
class BugcrowdScanner(PlatformScanner):
"""Scanner for Bugcrowd platform."""
BASE_URL = "https://bugcrowd.com"
def scan_programs(self) -> List[Dict[str, Any]]:
"""Scan Bugcrowd for programs."""
print("š Scanning Bugcrowd...")
# Check for API token
auth_headers = {}
if self.api_config.get('api_token'):
print(" ā¹ļø Using API token")
auth_headers['Authorization'] = f"Token {self.api_config['api_token']}"
try:
# Try multiple endpoints
endpoints = [
"https://bugcrowd.com/programs.json",
"https://bugcrowd.com/programs/engagements.json",
"https://api.bugcrowd.com/engagements",
]
headers = {**self.session.headers, **auth_headers, 'Accept': 'application/json'}
for endpoint in endpoints:
try:
response = self.session.get(endpoint, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
programs = []
# Parse programs - try different data structures
program_list = data.get('programs', []) or data.get('engagements', []) or []
if isinstance(data, list):
program_list = data
for program in program_list:
if isinstance(program, dict):
program_info = {
'platform': 'bugcrowd',
'name': program.get('name', 'Unknown'),
'code': program.get('code', '') or program.get('slug', ''),
'url': f"https://bugcrowd.com{program.get('program_url', '')}",
'status': 'public' if program.get('public', False) else 'private',
'accepts_submissions': program.get('accepts_submissions', False),
'min_bounty': program.get('min_rewards', {}).get('amount', 0) if isinstance(program.get('min_rewards'), dict) else 0,
'max_bounty': program.get('max_rewards', {}).get('amount', 0) if isinstance(program.get('max_rewards'), dict) else 0,
}
programs.append(program_info)
if programs:
print(f" ā Found {len(programs)} programs")
return programs
except:
continue
# If all endpoints fail
if not auth_headers:
print(" ā ļø API endpoints require authentication")
print(" ā¹ļø Add your Bugcrowd API token to config/api-tokens.yaml")
else:
print(" ā ļø API endpoints unavailable")
print(" ā¹ļø Manual check recommended at https://bugcrowd.com/programs")
return []
except Exception as e:
print(f" ā Error: {e}")
return []
class HackerOneScanner(PlatformScanner):
"""Scanner for HackerOne platform."""
BASE_URL = "https://hackerone.com"
API_URL = "https://hackerone.com/graphql"
def scan_programs(self) -> List[Dict[str, Any]]:
"""Scan HackerOne for programs."""
print("š Scanning HackerOne...")
try:
# GraphQL query for programs
query = """
query DirectoryQuery {
teams(first: 100, secure_order_by: {launched_at: {_direction: DESC}}) {
edges {
node {
handle
name
currency
offers_bounties
state
submission_state
started_accepting_at
base_bounty
url
}
}
}
}
"""
response = self.session.post(
self.API_URL,
json={'query': query},
headers={
'Accept': 'application/json',
'Content-Type': 'application/json',
}
)
if response.status_code == 200:
data = response.json()
programs = []
# Parse programs
edges = data.get('data', {}).get('teams', {}).get('edges', [])
for edge in edges:
node = edge.get('node', {})
program_info = {
'platform': 'hackerone',
'name': node.get('name', 'Unknown'),
'handle': node.get('handle', ''),
'url': f"https://hackerone.com/{node.get('handle', '')}",
'status': node.get('state', 'unknown'),
'submission_state': node.get('submission_state', 'unknown'),
'offers_bounties': node.get('offers_bounties', False),
'base_bounty': node.get('base_bounty', 0),
'currency': node.get('currency', 'USD'),
'started_at': node.get('started_accepting_at', ''),
}
programs.append(program_info)
print(f" ā Found {len(programs)} programs")
return programs
else:
print(f" ā Failed: HTTP {response.status_code}")
return []
except Exception as e:
print(f" ā Error: {e}")
return []
def save_results(programs: List[Dict[str, Any]], output_file: str):
"""Save scan results to file."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
results = {
'timestamp': timestamp,
'total_programs': len(programs),
'platforms': {
'intigriti': len([p for p in programs if p['platform'] == 'intigriti']),
'bugcrowd': len([p for p in programs if p['platform'] == 'bugcrowd']),
'hackerone': len([p for p in programs if p['platform'] == 'hackerone']),
},
'programs': programs
}
# Save JSON
with open(output_file, 'w') as f:
json.dump(results, f, indent=2)
print(f"\nš¾ Results saved to: {output_file}")
def print_summary(programs: List[Dict[str, Any]]):
"""Print summary of findings."""
print("\n" + "="*80)
print("š SCAN SUMMARY")
print("="*80)
by_platform = {}
for program in programs:
platform = program['platform']
if platform not in by_platform:
by_platform[platform] = []
by_platform[platform].append(program)
for platform, progs in by_platform.items():
print(f"\nšÆ {platform.upper()}: {len(progs)} programs")
# Show first 10 programs
for i, prog in enumerate(progs[:10], 1):
name = prog.get('name', 'Unknown')[:50]
url = prog.get('url', '')
print(f" {i}. {name}")
print(f" ā {url}")
if len(progs) > 10:
print(f" ... and {len(progs) - 10} more")
print("\n" + "="*80)
def print_new_programs(programs: List[Dict[str, Any]], days: int = 30):
"""Print programs that appear to be new (based on available date fields)."""
print("\n" + "="*80)
print(f"š PROGRAMS FROM LAST {days} DAYS")
print("="*80)
cutoff_date = datetime.now() - timedelta(days=days)
new_programs = []
for program in programs:
# Check various date fields
date_str = program.get('created_at') or program.get('started_at', '')
if date_str:
try:
# Parse ISO format date
prog_date = datetime.fromisoformat(date_str.replace('Z', '+00:00'))
if prog_date.replace(tzinfo=None) > cutoff_date:
new_programs.append(program)
except:
pass
if new_programs:
by_platform = {}
for program in new_programs:
platform = program['platform']
if platform not in by_platform:
by_platform[platform] = []
by_platform[platform].append(program)
for platform, progs in by_platform.items():
print(f"\nšÆ {platform.upper()}: {len(progs)} new programs")
for i, prog in enumerate(progs, 1):
name = prog.get('name', 'Unknown')
url = prog.get('url', '')
date = prog.get('created_at') or prog.get('started_at', 'Unknown date')
print(f" {i}. {name} ({date})")
print(f" ā {url}")
else:
print(f"\nā ļø No programs with date information found in last {days} days")
print(" (Note: Not all platforms provide creation dates via public API)")
print("\n" + "="*80)
def main():
"""Main function."""
print("="*80)
print("šÆ BUG BOUNTY PLATFORM SCANNER")
print("="*80)
print(f"Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
# Load API tokens
api_tokens = load_api_tokens()
if api_tokens:
print("š API tokens loaded")
if api_tokens.get('intigriti', {}).get('client_id'):
print(" ā Intigriti credentials found")
if api_tokens.get('bugcrowd', {}).get('api_token'):
print(" ā Bugcrowd token found")
if api_tokens.get('hackerone', {}).get('api_token'):
print(" ā HackerOne token found")
print()
else:
print("ā¹ļø No API tokens found (using public access)")
print(" Create config/api-tokens.yaml for authenticated access\n")
# Initialize scanners with API config
scanners = [
IntigritiScanner(api_tokens.get('intigriti', {})),
BugcrowdScanner(api_tokens.get('bugcrowd', {})),
HackerOneScanner(api_tokens.get('hackerone', {})),
]
all_programs = []
# Scan each platform
for scanner in scanners:
programs = scanner.scan_programs()
all_programs.extend(programs)
time.sleep(1) # Be respectful with requests
if all_programs:
# Print summary
print_summary(all_programs)
# Print new programs
print_new_programs(all_programs, days=30)
# Save results
output_dir = Path(__file__).parent.parent / "reports"
output_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = output_dir / f"platform-scan_{timestamp}.json"
save_results(all_programs, str(output_file))
# Also save latest
latest_file = output_dir / "platform-scan_latest.json"
save_results(all_programs, str(latest_file))
print(f"\nā
Scan complete! Found {len(all_programs)} total programs")
else:
print("\nā No programs found. Check your internet connection or try again later.")
print("š” Tip: Add API tokens to config/api-tokens.yaml for better access")
if __name__ == "__main__":
main()