"""Configuration management for the bug bounty MCP server."""
import os
import yaml
from pathlib import Path
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from .models import BugBountyProgram, Platform
class ToolConfig(BaseModel):
"""Configuration for external tools."""
enabled: bool = True
path: Optional[str] = None
timeout: int = 300
rate_limit: Optional[int] = None
extra_args: List[str] = Field(default_factory=list)
config: Dict[str, Any] = Field(default_factory=dict)
class ServerConfig(BaseModel):
"""Main server configuration."""
database_path: str = "./data/bugbounty.db"
log_level: str = "INFO"
log_file: str = "./logs/bugbounty.log"
rate_limit_enabled: bool = True
rate_limit_requests: int = 10
rate_limit_period: int = 1
require_scope_validation: bool = True
allow_destructive_tests: bool = False
audit_log_enabled: bool = True
cache_enabled: bool = True
cache_ttl: int = 3600
proxy_url: Optional[str] = None
proxy_enabled: bool = False
class ConfigManager:
"""Manages configuration for the bug bounty MCP server."""
def __init__(self, config_dir: Optional[Path] = None):
"""Initialize configuration manager.
Args:
config_dir: Directory containing configuration files
"""
self.config_dir = config_dir or Path.cwd() / "config"
self.config_dir.mkdir(parents=True, exist_ok=True)
# Load environment variables
load_dotenv()
# Initialize configurations
self.server_config = self._load_server_config()
self.programs: Dict[str, BugBountyProgram] = {}
self.tools: Dict[str, ToolConfig] = {}
# Load programs and tools
self._load_programs()
self._load_tools()
def _load_server_config(self) -> ServerConfig:
"""Load server configuration from environment variables."""
return ServerConfig(
database_path=os.getenv("DATABASE_PATH", "./data/bugbounty.db"),
log_level=os.getenv("LOG_LEVEL", "INFO"),
log_file=os.getenv("LOG_FILE", "./logs/bugbounty.log"),
rate_limit_enabled=os.getenv("RATE_LIMIT_ENABLED", "true").lower() == "true",
rate_limit_requests=int(os.getenv("RATE_LIMIT_REQUESTS", "10")),
rate_limit_period=int(os.getenv("RATE_LIMIT_PERIOD", "1")),
require_scope_validation=os.getenv("REQUIRE_SCOPE_VALIDATION", "true").lower() == "true",
allow_destructive_tests=os.getenv("ALLOW_DESTRUCTIVE_TESTS", "false").lower() == "true",
audit_log_enabled=os.getenv("AUDIT_LOG_ENABLED", "true").lower() == "true",
proxy_url=os.getenv("PROXY_URL"),
proxy_enabled=os.getenv("PROXY_ENABLED", "false").lower() == "true",
)
def _load_programs(self) -> None:
"""Load bug bounty programs from configuration file."""
programs_file = self.config_dir / "programs.yaml"
if not programs_file.exists():
# Create example file
self._create_example_programs_file(programs_file)
return
try:
with open(programs_file, 'r') as f:
data = yaml.safe_load(f)
if data and 'programs' in data:
for prog_data in data['programs']:
# Add API token from environment if available
platform = prog_data.get('platform', '')
token_env = f"{platform.upper()}_API_TOKEN"
if token_env in os.environ:
prog_data['api_token'] = os.environ[token_env]
program = BugBountyProgram(**prog_data)
self.programs[program.program_id] = program
except Exception as e:
print(f"Error loading programs: {e}")
def _load_tools(self) -> None:
"""Load tool configurations from configuration file."""
tools_file = self.config_dir / "tools.yaml"
if not tools_file.exists():
# Create example file
self._create_example_tools_file(tools_file)
return
try:
with open(tools_file, 'r') as f:
data = yaml.safe_load(f)
if data and 'tools' in data:
for tool_name, tool_data in data['tools'].items():
self.tools[tool_name] = ToolConfig(**tool_data)
except Exception as e:
print(f"Error loading tools: {e}")
def _create_example_programs_file(self, path: Path) -> None:
"""Create an example programs configuration file."""
example = {
'programs': [
{
'program_id': 'example-program',
'platform': 'hackerone',
'name': 'Example Program',
'url': 'https://hackerone.com/example',
'enrolled': False,
'in_scope': [
{
'type': 'domain',
'target': '*.example.com',
'notes': 'All subdomains of example.com',
'wildcard': True
}
],
'out_of_scope': [
{
'type': 'domain',
'target': 'blog.example.com',
'notes': 'Blog is out of scope'
}
],
'max_severity': 'critical'
}
]
}
with open(path, 'w') as f:
yaml.dump(example, f, default_flow_style=False, sort_keys=False)
def _create_example_tools_file(self, path: Path) -> None:
"""Create an example tools configuration file."""
example = {
'tools': {
'nuclei': {
'enabled': True,
'timeout': 600,
'config': {
'severity_threshold': 'medium',
'update_templates': True
}
},
'subfinder': {
'enabled': True,
'timeout': 300
},
'nmap': {
'enabled': True,
'timeout': 600,
'extra_args': ['-Pn']
},
'sqlmap': {
'enabled': True,
'timeout': 900,
'config': {
'risk_level': 1,
'level': 3,
'threads': 5
}
},
'ffuf': {
'enabled': True,
'timeout': 300,
'rate_limit': 100
}
}
}
with open(path, 'w') as f:
yaml.dump(example, f, default_flow_style=False, sort_keys=False)
def get_program(self, program_id: str) -> Optional[BugBountyProgram]:
"""Get a program by ID."""
return self.programs.get(program_id)
def get_tool_config(self, tool_name: str) -> Optional[ToolConfig]:
"""Get tool configuration by name."""
return self.tools.get(tool_name)
def save_program(self, program: BugBountyProgram) -> None:
"""Save a program to configuration."""
self.programs[program.program_id] = program
self._save_programs()
def _save_programs(self) -> None:
"""Save all programs to configuration file."""
programs_file = self.config_dir / "programs.yaml"
programs_data = {
'programs': [
program.dict(exclude={'api_token'})
for program in self.programs.values()
]
}
with open(programs_file, 'w') as f:
yaml.dump(programs_data, f, default_flow_style=False, sort_keys=False)
def list_programs(self, platform: Optional[Platform] = None, enrolled_only: bool = False) -> List[BugBountyProgram]:
"""List programs with optional filtering."""
programs = list(self.programs.values())
if platform:
programs = [p for p in programs if p.platform == platform]
if enrolled_only:
programs = [p for p in programs if p.enrolled]
return programs