"""
Joern Server Manager for spawning and managing individual Joern server instances per CPG
"""
import logging
import time
import os
from typing import Dict, Optional
import docker
from docker.errors import DockerException, NotFound, APIError
from .port_manager import PortManager
logger = logging.getLogger(__name__)
class JoernServerManager:
"""Manages individual Joern server instances running in Docker container using Docker Python API"""
def __init__(self, joern_binary_path: str = "joern", container_name: str = "codebadger-joern-server", config=None):
self.joern_binary = joern_binary_path
self.container_name = container_name
self.config = config
# Initialize PortManager with config values
if config:
self.port_manager = PortManager(port_min=config.joern.port_min, port_max=config.joern.port_max)
else:
self.port_manager = PortManager()
self.docker_client = docker.from_env()
# _exec_ids will store the exec instance IDs for running joern servers
self._exec_ids: Dict[str, str] = {} # codebase_hash -> exec_id or container_id
self._ports: Dict[str, int] = {} # codebase_hash -> port
# _clients stores reusable JoernServerClient instances for connection pooling
self._clients: Dict[str, 'JoernServerClient'] = {} # codebase_hash -> JoernServerClient
def spawn_server(self, codebase_hash: str) -> int:
"""
Spawn a new Joern server instance INSIDE the existing Docker container for the given codebase
Args:
codebase_hash: The codebase identifier
Returns:
Port number where the server is running (on host, maps to container)
"""
try:
# Check if server already exists for THIS codebase
if codebase_hash in self._ports:
port = self._ports[codebase_hash]
if self.is_server_running(codebase_hash):
logger.info(f"Joern server for {codebase_hash} already running on port {port}")
return port
else:
logger.warning(f"Server for {codebase_hash} registered but not running, cleaning up")
self._cleanup_server(codebase_hash)
# Allocate a port (on host side - maps to container)
port = self.port_manager.allocate_port(codebase_hash)
# Get the existing container
try:
container = self.docker_client.containers.get(self.container_name)
except NotFound:
logger.error(f"Container {self.container_name} not found")
self.port_manager.release_port(codebase_hash)
raise RuntimeError(f"Container {self.container_name} not found")
# Start Joern server inside the existing container using exec
# Use nohup and background to keep it running
# IMPORTANT: Run in unique directory to isolate Joern workspace
# Use parameterized commands to prevent command injection
work_dir = f"/tmp/joern-server-{codebase_hash}"
log_file = f"/tmp/joern-{codebase_hash}.log"
# Build command as array to prevent injection
joern_cmd = [
"bash", "-c",
f"mkdir -p '{work_dir}' && cd '{work_dir}' && nohup /opt/joern/joern-cli/joern --server --server-host 0.0.0.0 --server-port {port} > '{log_file}' 2>&1 &"
]
logger.info(f"Starting Joern server for {codebase_hash} on port {port} inside container {self.container_name}")
logger.debug(f"Command: {joern_cmd}")
# Execute the command in the container
exec_result = container.exec_run(
cmd=joern_cmd,
detach=True, # Run in background
stream=False
)
# Store exec info
self._exec_ids[codebase_hash] = f"exec-{codebase_hash}"
self._ports[codebase_hash] = port
logger.info(f"Joern server command executed, waiting for server to be ready on port {port}...")
# Wait for server to start
if self._wait_for_server(port, timeout=60):
logger.info(f"Joern server for {codebase_hash} started successfully on port {port}")
return port
else:
# Cleanup on failure - check logs
logger.error(f"Joern server for {codebase_hash} failed to become ready on port {port}")
try:
# Use parameterized command to prevent injection
log_result = container.exec_run(
cmd=["cat", log_file],
stream=False
)
if log_result.exit_code == 0:
logger.error(f"Joern server log:\n{log_result.output.decode('utf-8')}")
except Exception as log_error:
logger.warning(f"Could not read log file: {log_error}")
self._cleanup_server(codebase_hash)
raise RuntimeError(f"Joern server for {codebase_hash} failed to start on port {port}")
except DockerException as e:
logger.error(f"Docker error while spawning Joern server for {codebase_hash}: {e}", exc_info=True)
self._cleanup_server(codebase_hash)
raise
except Exception as e:
logger.error(f"Failed to spawn Joern server for {codebase_hash}: {e}", exc_info=True)
self._cleanup_server(codebase_hash)
raise
def get_or_create_client(self, codebase_hash: str) -> 'JoernServerClient':
"""
Get or create a JoernServerClient for the given codebase with connection pooling
Args:
codebase_hash: The codebase identifier
Returns:
JoernServerClient instance with reusable connection pool
Raises:
RuntimeError if no server is running for the codebase
"""
# Return cached client if available
if codebase_hash in self._clients:
return self._clients[codebase_hash]
# Verify server is running
if codebase_hash not in self._ports:
raise RuntimeError(f"No Joern server running for codebase {codebase_hash}")
port = self._ports[codebase_hash]
# Create new client with connection pooling
from .joern_client import JoernServerClient
# Prepare config dict for HTTP pooling
http_config = {}
if self.config:
joern_cfg = self.config.joern
http_config = {
"http_pool_connections": joern_cfg.http_pool_connections,
"http_pool_maxsize": joern_cfg.http_pool_maxsize,
"http_max_retries": joern_cfg.http_max_retries,
"http_backoff_factor": joern_cfg.http_backoff_factor,
}
client = JoernServerClient(
host="localhost",
port=port,
username=self.config.joern.server_auth_username if self.config else None,
password=self.config.joern.server_auth_password if self.config else None,
config=http_config
)
# Cache the client
self._clients[codebase_hash] = client
logger.debug(f"Created and cached JoernServerClient for {codebase_hash} on port {port}")
return client
def load_cpg(self, codebase_hash: str, cpg_path: str, timeout: int = 600) -> bool:
"""
Load a CPG into the Joern server for the given codebase
Args:
codebase_hash: The codebase identifier
cpg_path: Path to the CPG file
timeout: Timeout for loading operation (default 600s for large codebases)
Returns:
True if CPG was loaded successfully
"""
try:
if codebase_hash not in self._ports:
raise RuntimeError(f"No Joern server running for codebase {codebase_hash}")
port = self._ports[codebase_hash]
# Get or create cached client with connection pooling
client = self.get_or_create_client(codebase_hash)
# Convert host path to container path for Joern running in Docker
# Host path like /home/aleks/.../playground/cpgs/hash/cpg.bin -> /playground/cpgs/hash/cpg.bin
container_cpg_path = cpg_path
if "/playground/" in cpg_path:
parts = cpg_path.split("/playground/")
if len(parts) >= 2:
container_cpg_path = f"/playground/{parts[-1]}"
logger.info(f"Loading CPG {cpg_path} (container: {container_cpg_path}) into Joern server for {codebase_hash} (port {port})")
# Retry loading with exponential backoff
max_retries = 3
for attempt in range(max_retries):
try:
# Pass the container path to the client with explicit project name
success = client.load_cpg(container_cpg_path, project_name=codebase_hash, timeout=timeout)
if success:
logger.info(f"CPG loaded successfully for {codebase_hash}")
return True
else:
logger.warning(f"CPG load attempt {attempt + 1}/{max_retries} failed for {codebase_hash}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff: 1s, 2s, 4s
logger.info(f"Waiting {wait_time}s before retry...")
time.sleep(wait_time)
except Exception as e:
logger.warning(f"CPG load attempt {attempt + 1}/{max_retries} error: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt
logger.info(f"Waiting {wait_time}s before retry...")
time.sleep(wait_time)
else:
raise
logger.error(f"Failed to load CPG for {codebase_hash} after {max_retries} attempts")
return False
except Exception as e:
logger.error(f"Error loading CPG for {codebase_hash}: {e}")
return False
def get_server_port(self, codebase_hash: str) -> Optional[int]:
"""
Get the port for the Joern server of the given codebase
Args:
codebase_hash: The codebase identifier
Returns:
Port number or None if no server is running
"""
return self._ports.get(codebase_hash)
def is_server_running(self, codebase_hash: str) -> bool:
"""
Check if the Joern server for the given codebase is running by checking port connectivity
Args:
codebase_hash: The codebase identifier
Returns:
True if server is running
"""
if codebase_hash not in self._ports:
return False
port = self._ports[codebase_hash]
# Check if we can connect to the port
import socket
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('localhost', port))
sock.close()
return result == 0
except Exception as e:
logger.debug(f"Failed to check server status for {codebase_hash} on port {port}: {e}")
return False
def terminate_server(self, codebase_hash: str) -> bool:
"""
Terminate the Joern server for the given codebase
Args:
codebase_hash: The codebase identifier
Returns:
True if server was terminated successfully
"""
try:
if codebase_hash not in self._exec_ids:
logger.warning(f"No server found for codebase {codebase_hash}")
return False
port = self._ports.get(codebase_hash)
logger.info(f"Terminating Joern server for {codebase_hash} on port {port}")
# Kill the Joern process inside the container
try:
container = self.docker_client.containers.get(self.container_name)
# Find and kill the joern process on this port
# Use parameterized command to prevent injection
kill_cmd = ["bash", "-c", f"pkill -f 'joern.*--server-port {port}' || true"]
container.exec_run(cmd=kill_cmd)
logger.info(f"Killed Joern server process for {codebase_hash}")
except Exception as e:
logger.warning(f"Error killing Joern process: {e}")
# Cleanup internal state and release port
self._cleanup_server(codebase_hash)
return True
except Exception as e:
logger.error(f"Error terminating Joern server for {codebase_hash}: {e}")
return False
def terminate_all_servers(self) -> None:
"""Terminate all running Joern servers"""
logger.info("Terminating all Joern servers")
codebases = list(self._exec_ids.keys())
for codebase_hash in codebases:
self.terminate_server(codebase_hash)
logger.info("All Joern servers terminated")
def get_running_servers(self) -> Dict[str, int]:
"""Get information about all running servers"""
return {
codebase_hash: port
for codebase_hash, port in self._ports.items()
if self.is_server_running(codebase_hash)
}
def _wait_for_server(self, port: int, timeout: int = 30) -> bool:
"""Wait for Joern server to be ready on the given port"""
import socket
start_time = time.time()
server_responding = False
while time.time() - start_time < timeout:
try:
# Try to connect to the port
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex(('localhost', port))
sock.close()
if result == 0:
# Server port is open, now check HTTP
try:
import requests
response = requests.get(f"http://localhost:{port}", timeout=2)
# Server responds (even 404 is OK - means it's up)
if response.status_code in [200, 404]:
server_responding = True
# Wait a bit more for Joern to fully initialize
logger.debug(f"Server responding on port {port}, waiting for full initialization...")
sleep_time = self.config.joern.server_init_sleep_time if self.config else 3.0
time.sleep(sleep_time) # Give Joern more time to initialize
return True
except Exception as e:
logger.debug(f"HTTP check failed: {e}")
except Exception as e:
logger.debug(f"Connection check failed: {e}")
time.sleep(1)
return server_responding
def _cleanup_server(self, codebase_hash: str) -> None:
"""Clean up server resources"""
if codebase_hash in self._exec_ids:
del self._exec_ids[codebase_hash]
if codebase_hash in self._ports:
port = self._ports[codebase_hash]
self.port_manager.release_port(codebase_hash)
del self._ports[codebase_hash]
logger.debug(f"Cleaned up resources for {codebase_hash} (port {port})")
# Close and cleanup client session for connection pooling
if codebase_hash in self._clients:
client = self._clients[codebase_hash]
try:
client.close()
logger.debug(f"Closed HTTP session for {codebase_hash}")
except Exception as e:
logger.warning(f"Error closing HTTP session for {codebase_hash}: {e}")
del self._clients[codebase_hash]