"""
Tmux Session Manager for Pentest MCP Server.
Handles persistent tmux sessions that survive SSH disconnections.
"""
import time
import logging
from typing import Dict, List, Optional, Any
from .ssh_manager import SSHManager
logger = logging.getLogger(__name__)
class TmuxSessionError(Exception):
"""Custom exception for tmux session issues."""
pass
class SessionInfo:
"""Information about a tmux session."""
def __init__(self, session_id: str, created_at: float = None):
self.session_id = session_id
self.created_at = created_at or time.time()
self.status = "active"
self.last_command = ""
self.command_history = [] # Track all commands
self.output_buffer = ""
self.last_activity = time.time()
self.socket_name = None # Track which socket a session belongs to
class TmuxManager:
"""Manages persistent tmux sessions for pentest operations."""
def __init__(self, ssh_manager: SSHManager):
self.ssh = ssh_manager
self.sessions: Dict[str, SessionInfo] = {}
self.socket_name = ssh_manager.config.TMUX_SOCKET_NAME
async def initialize(self) -> bool:
"""
Initialize tmux environment and recover existing sessions.
Returns:
bool: True if initialization successful
"""
try:
# Ensure tmux is available
result = await self.ssh.run_command("which tmux", timeout=10)
if result.exit_status != 0:
raise TmuxSessionError("tmux is not installed on target system")
# Get tmux version
result = await self.ssh.run_command("tmux -V")
logger.info(f"Tmux version: {result.stdout.strip()}")
# Recover existing sessions
await self.recover_sessions()
return True
except Exception as e:
logger.error(f"Failed to initialize tmux: {e}")
return False
async def create_session(self, session_id: str, shell: str = "bash") -> Dict[str, Any]:
"""
Create a new persistent tmux session.
Args:
session_id: Unique identifier for the session
shell: Shell to use (default: bash)
Returns:
dict: Session creation result
"""
try:
# Check if session already exists
if await self._session_exists(session_id):
return {
"status": "error",
"message": f"Session '{session_id}' already exists"
}
# Create new tmux session
cmd = f"tmux -S /tmp/{self.socket_name} new-session -d -s '{session_id}' {shell}"
result = await self.ssh.run_command(cmd, timeout=30)
if result.exit_status != 0:
return {
"status": "error",
"message": f"Failed to create session: {result.stderr}"
}
# Store session info
session_info = SessionInfo(session_id)
session_info.socket_name = self.socket_name
self.sessions[session_id] = session_info
logger.info(f"Created tmux session: {session_id}")
return {
"status": "created",
"session_id": session_id,
"message": "Session created successfully"
}
except Exception as e:
logger.error(f"Error creating session {session_id}: {e}")
return {
"status": "error",
"message": f"Exception: {str(e)}"
}
async def list_sessions(self) -> List[Dict[str, Any]]:
"""
List all active tmux sessions.
Returns:
list: List of session information
"""
try:
cmd = f"tmux -S /tmp/{self.socket_name} list-sessions -F '" + "#{session_name},#{session_created},#{session_activity}'"
result = await self.ssh.run_command(cmd, timeout=10)
sessions = []
if result.exit_status == 0 and result.stdout.strip():
for line in result.stdout.strip().split('\n'):
if line:
parts = line.split(',')
if len(parts) >= 3:
session_id = parts[0]
created = parts[1]
activity = parts[2]
# Get output length
output_length = 0
if session_id in self.sessions:
output_length = len(self.sessions[session_id].output_buffer)
session_info = self.sessions.get(session_id, SessionInfo(""))
# Get last command from session log + tmux history (session-specific approach)
try:
# Get session-specific command log
session_log = await self._read_session_command_log(session_id)
# Get recent live tmux commands
tmux_history = await self._parse_command_history_for_last_command(session_id)
# Combine session log + live tmux for accurate count and last command
if session_log or tmux_history:
merged = await self._merge_command_histories(session_log, tmux_history)
last_command = merged[-1]["command"] if merged else session_info.last_command
command_count = len(merged)
else:
# Fallback to in-memory tracking
last_command = session_info.last_command
command_count = len(session_info.command_history)
except Exception as e:
logger.warning(f"Failed to get session-specific history for {session_id}: {e}")
# Fallback to in-memory tracking
last_command = session_info.last_command
command_count = len(session_info.command_history)
sessions.append({
"session_id": session_id,
"created_at": created,
"status": "active",
"last_command": last_command,
"command_count": command_count,
"output_length": output_length,
"last_activity": activity,
"socket_name": self.socket_name,
"socket_path": f"/tmp/{self.socket_name}"
})
return sessions
except Exception as e:
logger.error(f"Error listing sessions: {e}")
return []
async def kill_session(self, session_id: str) -> Dict[str, Any]:
"""
Terminate a tmux session.
Args:
session_id: Session to terminate
Returns:
dict: Termination result
"""
try:
if not await self._session_exists(session_id):
return {
"status": "error",
"message": f"Session '{session_id}' does not exist"
}
cmd = f"tmux -S /tmp/{self.socket_name} kill-session -t '{session_id}'"
result = await self.ssh.run_command(cmd, timeout=10)
if result.exit_status == 0:
# Remove from our tracking
if session_id in self.sessions:
del self.sessions[session_id]
# Delete session log file
await self._delete_session_log_file(session_id)
return {
"status": "killed",
"message": f"Session '{session_id}' terminated and log file cleaned up"
}
else:
return {
"status": "error",
"message": f"Failed to kill session: {result.stderr}"
}
except Exception as e:
logger.error(f"Error killing session {session_id}: {e}")
return {
"status": "error",
"message": f"Exception: {str(e)}"
}
async def execute_command(self, session_id: str, command: str) -> Dict[str, Any]:
"""
Execute a command in a tmux session.
Args:
session_id: Target session
command: Command to execute
Returns:
dict: Execution result
"""
try:
if not await self._session_exists(session_id):
return {
"status": "error",
"message": f"Session '{session_id}' does not exist"
}
# Send command to tmux session
escaped_command = command.replace("'", r"'\''")
cmd = f"tmux -S /tmp/{self.socket_name} send-keys -t '{session_id}' '{escaped_command}' Enter"
result = await self.ssh.run_command(cmd, timeout=30)
if result.exit_status == 0:
current_time = time.time()
# Update session info
if session_id in self.sessions:
session_info = self.sessions[session_id]
session_info.last_command = command
session_info.command_history.append({
"command": command,
"timestamp": current_time
})
session_info.last_activity = current_time
# Write command to persistent session log
await self._write_command_to_session_log(session_id, command, current_time)
return {
"status": "sent",
"message": "Command sent successfully",
"command": command,
"session_id": session_id
}
else:
return {
"status": "error",
"message": f"Failed to send command: {result.stderr}"
}
except Exception as e:
logger.error(f"Error executing command in session {session_id}: {e}")
return {
"status": "error",
"message": f"Exception: {str(e)}"
}
async def capture_pane(self, session_id: str, lines: Optional[int] = None) -> Dict[str, Any]:
"""
Capture output from a tmux session pane.
Args:
session_id: Target session
lines: Number of lines to capture (None = all)
Returns:
dict: Captured output
"""
try:
if not await self._session_exists(session_id):
return {
"status": "error",
"message": f"Session '{session_id}' does not exist",
"output": ""
}
# Capture pane content
if lines:
cmd = f"tmux -S /tmp/{self.socket_name} capture-pane -t '{session_id}' -p -S -{lines}"
else:
cmd = f"tmux -S /tmp/{self.socket_name} capture-pane -t '{session_id}' -p"
result = await self.ssh.run_command(cmd, timeout=30)
if result.exit_status == 0:
output = result.stdout
# Update session buffer
if session_id in self.sessions:
self.sessions[session_id].output_buffer = output
self.sessions[session_id].last_activity = time.time()
return {
"status": "success",
"output": output,
"timestamp": time.time(),
"has_more": len(output) > 0
}
else:
return {
"status": "error",
"message": f"Failed to capture pane: {result.stderr}",
"output": ""
}
except Exception as e:
logger.error(f"Error capturing pane from session {session_id}: {e}")
return {
"status": "error",
"message": f"Exception: {str(e)}",
"output": ""
}
async def send_input(self, session_id: str, text: str, press_enter: bool = True) -> Dict[str, Any]:
"""
Send input to an interactive session.
Args:
session_id: Target session
text: Text to send
press_enter: Whether to press Enter after text
Returns:
dict: Send result
"""
try:
if not await self._session_exists(session_id):
return {
"status": "error",
"message": f"Session '{session_id}' does not exist"
}
# Escape the text for tmux
escaped_text = text.replace("'", r"'\''")
if press_enter:
cmd = f"tmux -S /tmp/{self.socket_name} send-keys -t '{session_id}' '{escaped_text}' Enter"
else:
cmd = f"tmux -S /tmp/{self.socket_name} send-keys -t '{session_id}' '{escaped_text}'"
result = await self.ssh.run_command(cmd, timeout=10)
if result.exit_status == 0:
return {
"status": "sent",
"message": "Input sent successfully"
}
else:
return {
"status": "error",
"message": f"Failed to send input: {result.stderr}"
}
except Exception as e:
logger.error(f"Error sending input to session {session_id}: {e}")
return {
"status": "error",
"message": f"Exception: {str(e)}"
}
async def recover_sessions(self) -> List[str]:
"""
Recover orphaned tmux sessions after reconnection.
Returns:
list: List of recovered session IDs
"""
try:
cmd = f"tmux -S /tmp/{self.socket_name} list-sessions -F '" + "#{session_name}' 2>/dev/null || true"
result = await self.ssh.run_command(cmd, timeout=10)
recovered = []
if result.exit_status == 0 and result.stdout.strip():
for line in result.stdout.strip().split('\n'):
session_id = line.strip()
if session_id and session_id not in self.sessions:
# Recover session
session_info = SessionInfo(session_id)
session_info.socket_name = self.socket_name
self.sessions[session_id] = session_info
recovered.append(session_id)
logger.info(f"Recovered tmux session: {session_id} from socket: {self.socket_name}")
return recovered
except Exception as e:
logger.error(f"Error recovering sessions: {e}")
return []
async def _session_exists(self, session_id: str) -> bool:
"""
Check if a tmux session exists.
Args:
session_id: Session to check
Returns:
bool: True if session exists
"""
try:
cmd = f"tmux -S /tmp/{self.socket_name} has-session -t '{session_id}' 2>/dev/null"
result = await self.ssh.run_command(cmd, timeout=5)
return result.exit_status == 0
except Exception:
return False
def get_session_info(self, session_id: str) -> Optional[SessionInfo]:
"""
Get information about a specific session.
Args:
session_id: Session to get info for
Returns:
SessionInfo or None: Session information
"""
return self.sessions.get(session_id)
def get_all_sessions(self) -> Dict[str, SessionInfo]:
"""
Get all tracked sessions.
Returns:
dict: All session information
"""
return self.sessions.copy()
async def get_session_history(self, session_id: str) -> Dict[str, Any]:
"""
Get session-specific command history that persists across reboots.
This provides persistent command history that survives reboots by combining:
1. Persistent session-specific log files (~/.mcp_history/session_ID.log)
2. Live commands from current tmux session (not yet logged)
Args:
session_id: Session to get history for
Returns:
dict: Complete session-specific persistent and live command history
"""
try:
# 1. Get persistent session log (survives reboots)
session_log_history = await self._read_session_command_log(session_id)
logger.info(f"Found {len(session_log_history)} commands in session log")
# 2. Get live tmux history (recent commands not yet logged)
live_history = []
if await self._session_exists(session_id):
cmd = f"tmux -S /tmp/{self.socket_name} capture-pane -t '{session_id}' -p -S -100"
result = await self.ssh.run_command(cmd, timeout=10)
if result.exit_status == 0:
live_history = await self._parse_command_history(result.stdout, session_id)
# Mark as live tmux history
for cmd in live_history:
cmd["source"] = "tmux_live_history"
# 3. Merge session log + live tmux (remove duplicates)
merged_history = await self._merge_command_histories(session_log_history, live_history)
# 4. Get session metadata
session_info = self.sessions.get(session_id)
if not session_info:
session_info = SessionInfo(session_id)
session_info.socket_name = self.socket_name
return {
"status": "success",
"session_id": session_id,
"socket_name": self.socket_name,
"socket_path": f"/tmp/{self.socket_name}",
"created_at": session_info.created_at,
"last_activity": session_info.last_activity,
"command_count": len(merged_history),
"session_log_count": len(session_log_history),
"live_history_count": len(live_history),
"history": merged_history,
"source": "session_log_plus_tmux_live",
"log_file": f"~/.mcp_history/session_{session_id}.log"
}
except Exception as e:
logger.error(f"Error getting session-specific history for {session_id}: {e}")
return {
"status": "error",
"message": f"Exception: {str(e)}",
"history": []
}
async def _parse_command_history(self, pane_output: str, session_id: str) -> List[Dict[str, Any]]:
"""
Parse command history from tmux pane output.
Args:
pane_output: Raw tmux pane output
session_id: Session identifier
Returns:
list: Parsed command history
"""
import re
from datetime import datetime
commands = []
lines = pane_output.split('\n')
# Common shell prompt patterns
prompt_patterns = [
r'^[^\s]*\$\s+(.+)$', # bash/zsh: user$ command
r'^[^\s]*#\s+(.+)$', # root shell: root# command
r'^.*[\$#>]\s+(.+)$', # generic prompt ending with $, #, or >
r'^┌.*\n└.*\$\s+(.+)$', # fancy prompts (multi-line)
]
i = 0
command_index = 1
while i < len(lines):
line = lines[i].strip()
# Skip empty lines and pure output lines
if not line:
i += 1
continue
# Try to match command patterns
command = None
for pattern in prompt_patterns:
match = re.search(pattern, line)
if match:
command = match.group(1).strip()
break
# Special handling for fancy prompts (like the one we saw)
if '┌' in line and i + 1 < len(lines):
next_line = lines[i + 1].strip()
match = re.search(r'└.*\$\s+(.+)$', next_line)
if match:
command = match.group(1).strip()
i += 1 # Skip the next line since we processed it
if command:
# Filter out non-commands (empty, just whitespace, etc.)
if command and not command.isspace():
commands.append({
"command": command,
"index": command_index,
"timestamp": time.time(), # Approximate timestamp
"session_id": session_id,
"source": "tmux_history_parse"
})
command_index += 1
i += 1
# Remove duplicates while preserving order
seen = set()
unique_commands = []
for cmd in commands:
cmd_text = cmd["command"]
if cmd_text not in seen:
seen.add(cmd_text)
unique_commands.append(cmd)
return unique_commands
async def _detect_shell(self) -> str:
"""
Detect the current shell type for history file selection.
Returns:
str: Shell type (bash, zsh, etc.)
"""
try:
# Try to get shell from environment
result = await self.ssh.run_command("echo $SHELL", timeout=5)
if result.exit_status == 0 and result.stdout.strip():
shell_path = result.stdout.strip()
shell_name = shell_path.split('/')[-1] # Extract shell name from path
return shell_name
# Fallback: try to detect from process
result = await self.ssh.run_command("ps -o comm= -p $$", timeout=5)
if result.exit_status == 0 and result.stdout.strip():
return result.stdout.strip()
# Default fallback
return "bash"
except Exception as e:
logger.warning(f"Failed to detect shell: {e}, defaulting to bash")
return "bash"
async def _read_session_command_log(self, session_id: str) -> List[Dict[str, Any]]:
"""
Read persistent session-specific command log.
Args:
session_id: Session identifier
Returns:
list: Parsed session command history
"""
try:
# Create MCP history directory if it doesn't exist
await self.ssh.run_command("mkdir -p ~/.mcp_history", timeout=5)
# Session-specific log file
log_file = f"~/.mcp_history/session_{session_id}.log"
# Read session log file
cmd = f"cat {log_file} 2>/dev/null || echo 'NO_SESSION_HISTORY'"
result = await self.ssh.run_command(cmd, timeout=10)
if result.exit_status != 0 or 'NO_SESSION_HISTORY' in result.stdout:
logger.info(f"No session history found at {log_file}")
return []
return await self._parse_session_log_content(result.stdout, session_id)
except Exception as e:
logger.error(f"Error reading session command log: {e}")
return []
async def _parse_session_log_content(self, log_content: str, session_id: str) -> List[Dict[str, Any]]:
"""
Parse session log content into structured format.
Args:
log_content: Raw session log content
session_id: Session identifier
Returns:
list: Structured session command history
"""
import json
commands = []
lines = log_content.strip().split('\n')
for line in lines:
if line.strip():
try:
# Each line is a JSON object with command data
cmd_data = json.loads(line)
cmd_data['source'] = 'session_log'
commands.append(cmd_data)
except json.JSONDecodeError:
# Skip malformed lines
logger.warning(f"Skipping malformed log line: {line}")
continue
return commands
async def _write_command_to_session_log(self, session_id: str, command: str, timestamp: float) -> bool:
"""
Write a command to the session-specific log file.
Args:
session_id: Session identifier
command: Command that was executed
timestamp: Command execution timestamp
Returns:
bool: True if successful
"""
try:
import json
# Create command log entry
log_entry = {
"command": command,
"timestamp": timestamp,
"session_id": session_id,
"source": "session_log"
}
# Convert to JSON line
json_line = json.dumps(log_entry)
# Append to session log file
log_file = f"~/.mcp_history/session_{session_id}.log"
cmd = f"echo '{json_line}' >> {log_file}"
result = await self.ssh.run_command(cmd, timeout=5)
if result.exit_status == 0:
logger.info(f"Logged command to session {session_id}: {command}")
return True
else:
logger.error(f"Failed to write to session log: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error writing command to session log: {e}")
return False
async def _delete_session_log_file(self, session_id: str) -> bool:
"""
Delete the session-specific log file when a session is terminated.
Args:
session_id: Session identifier
Returns:
bool: True if successful or file didn't exist
"""
try:
log_file = f"~/.mcp_history/session_{session_id}.log"
# Check if file exists and delete it
cmd = f"rm -f {log_file} 2>/dev/null || true"
result = await self.ssh.run_command(cmd, timeout=5)
if result.exit_status == 0:
logger.info(f"Deleted session log file: {log_file}")
return True
else:
logger.warning(f"Failed to delete session log file {log_file}: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error deleting session log file for {session_id}: {e}")
return False
async def cleanup_orphaned_session_logs(self) -> Dict[str, Any]:
"""
Clean up session log files for sessions that no longer exist.
Useful for maintenance and preventing disk space waste.
Returns:
dict: Cleanup results
"""
try:
# Get list of active sessions
active_sessions = set()
cmd = f"tmux -S /tmp/{self.socket_name} list-sessions -F '" + "#{session_name}' 2>/dev/null || true"
result = await self.ssh.run_command(cmd, timeout=10)
if result.exit_status == 0 and result.stdout.strip():
for line in result.stdout.strip().split('\n'):
session_id = line.strip()
if session_id:
active_sessions.add(session_id)
# Get list of log files
log_files = []
cmd = "ls ~/.mcp_history/session_*.log 2>/dev/null || true"
result = await self.ssh.run_command(cmd, timeout=10)
if result.exit_status == 0 and result.stdout.strip():
for line in result.stdout.strip().split('\n'):
if line.strip():
log_files.append(line.strip())
# Find orphaned log files
orphaned_files = []
for log_file in log_files:
# Extract session_id from filename: session_ID.log
import os
filename = os.path.basename(log_file)
if filename.startswith('session_') and filename.endswith('.log'):
session_id = filename[8:-4] # Remove 'session_' and '.log'
if session_id not in active_sessions:
orphaned_files.append((session_id, log_file))
# Delete orphaned files
deleted_count = 0
for session_id, log_file in orphaned_files:
cmd = f"rm -f {log_file} 2>/dev/null || true"
result = await self.ssh.run_command(cmd, timeout=5)
if result.exit_status == 0:
deleted_count += 1
logger.info(f"Cleaned up orphaned log file: {log_file}")
return {
"status": "success",
"active_sessions": len(active_sessions),
"total_log_files": len(log_files),
"orphaned_files_found": len(orphaned_files),
"orphaned_files_deleted": deleted_count,
"cleaned_files": [log_file for _, log_file in orphaned_files]
}
except Exception as e:
logger.error(f"Error during orphaned session log cleanup: {e}")
return {
"status": "error",
"message": f"Cleanup failed: {str(e)}"
}
# Removed shell history parsing methods - now using session-specific logs
async def _merge_command_histories(self, shell_history: List[Dict[str, Any]], live_history: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Merge shell history and live tmux history, removing duplicates and sorting by timestamp.
Args:
shell_history: Commands from persistent shell history files
live_history: Commands from current tmux session
Returns:
list: Merged and deduplicated command history sorted by timestamp
"""
try:
# Create a set to track unique commands and avoid duplicates
seen_commands = set()
merged = []
# Combine both histories
all_commands = shell_history + live_history
# Sort by timestamp (oldest first)
all_commands.sort(key=lambda x: x.get('timestamp', 0))
# Deduplicate while preserving the most recent occurrence
for cmd_obj in reversed(all_commands): # Reverse to keep most recent
command_text = cmd_obj.get('command', '').strip()
# Skip empty commands
if not command_text or command_text.isspace():
continue
# Skip duplicates (keep the most recent due to reverse iteration)
if command_text not in seen_commands:
seen_commands.add(command_text)
merged.append(cmd_obj)
# Reverse back to chronological order (oldest first)
merged.reverse()
# Re-index the merged history
for i, cmd_obj in enumerate(merged):
cmd_obj['merged_index'] = i + 1
logger.info(f"Merged {len(shell_history)} shell + {len(live_history)} live commands into {len(merged)} unique commands")
return merged
except Exception as e:
logger.error(f"Error merging command histories: {e}")
# Fallback: return shell history if merge fails
return shell_history
async def _parse_command_history_for_last_command(self, session_id: str) -> List[Dict[str, Any]]:
"""
Efficiently get command history for list_sessions (lighter version).
Args:
session_id: Session identifier
Returns:
list: Parsed command history (last 50 lines only for efficiency)
"""
try:
# Only get last 50 lines for efficiency in list_sessions
cmd = f"tmux -S /tmp/{self.socket_name} capture-pane -t '{session_id}' -p -S -50"
result = await self.ssh_manager.run_command(cmd, timeout=5)
if result.exit_status == 0:
return await self._parse_command_history(result.stdout, session_id)
return []
except Exception:
return []