app.py•9.69 kB
#!/usr/bin/env python3
"""
Web interface for Wireshark MCP
A simple Flask application that provides a web interface for using Wireshark MCP
to analyze network captures and generate AI-friendly contexts.
"""
import os
import tempfile
import logging
import json
from typing import Dict, Any, List, Optional
from datetime import datetime
from flask import Flask, request, render_template, jsonify, redirect, url_for, flash
from werkzeug.utils import secure_filename
from wireshark_mcp import WiresharkMCP, Protocol, Filter
from wireshark_mcp.formatters import ClaudeFormatter
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Initialize Flask
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', 'dev-key-for-testing-only')
# Configure upload settings
UPLOAD_FOLDER = os.path.join(tempfile.gettempdir(), 'wireshark_mcp_uploads')
ALLOWED_EXTENSIONS = {'pcap', 'pcapng', 'cap'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024  # 50MB limit
# Create upload folder if it doesn't exist
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
def allowed_file(filename: str) -> bool:
    """Check if the file extension is allowed."""
    return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/')
def index():
    """Render the home page."""
    protocols = [p.value for p in Protocol]
    return render_template('index.html', protocols=protocols)
@app.route('/upload', methods=['POST'])
def upload_file():
    """Handle file upload and initial processing."""
    if 'file' not in request.files:
        flash('No file part', 'error')
        return redirect(request.url)
    
    file = request.files['file']
    if file.filename == '':
        flash('No selected file', 'error')
        return redirect(request.url)
    
    if file and allowed_file(file.filename):
        filename = secure_filename(file.filename)
        timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
        unique_filename = f"{timestamp}_{filename}"
        filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
        
        file.save(filepath)
        logger.info(f"Saved uploaded file to {filepath}")
        
        # Redirect to analysis page
        return redirect(url_for('analyze', filename=unique_filename))
    
    flash('Invalid file type', 'error')
    return redirect(request.url)
@app.route('/analyze/<filename>')
def analyze(filename: str):
    """Analyze a packet capture file."""
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    
    if not os.path.exists(filepath):
        flash('File not found', 'error')
        return redirect(url_for('index'))
    
    # Generate basic packet summary for display
    try:
        mcp = WiresharkMCP(filepath)
        context = mcp.generate_context(max_packets=100, include_statistics=True)
        
        # Extract summary data
        summary = context.get('summary', {})
        statistics = context.get('statistics', {})
        
        # Get available protocols in the capture
        available_protocols = []
        for proto, count in summary.get('protocols', {}).items():
            if count > 0:
                # Check if we have an analyzer for this protocol
                try:
                    p = next((p for p in Protocol if p.value.upper() == proto.upper()), None)
                    if p:
                        available_protocols.append({
                            'name': p.value,
                            'count': count,
                            'has_analyzer': p in [Protocol.HTTP, Protocol.DNS, Protocol.SMTP, Protocol.TLS]
                        })
                except:
                    # If protocol isn't in our enum, skip it
                    pass
        
        return render_template(
            'analyze.html', 
            filename=filename, 
            summary=summary,
            statistics=statistics,
            available_protocols=available_protocols
        )
    
    except Exception as e:
        logger.error(f"Error analyzing file: {e}", exc_info=True)
        flash(f"Error analyzing file: {str(e)}", 'error')
        return redirect(url_for('index'))
@app.route('/protocol/<filename>/<protocol>')
def analyze_protocol(filename: str, protocol: str):
    """Analyze a specific protocol in the packet capture."""
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    
    if not os.path.exists(filepath):
        flash('File not found', 'error')
        return redirect(url_for('index'))
    
    try:
        # Get protocol enum
        proto = next((p for p in Protocol if p.value.upper() == protocol.upper()), None)
        
        if not proto:
            flash(f"Unknown protocol: {protocol}", 'error')
            return redirect(url_for('analyze', filename=filename))
        
        # Extract protocol data
        mcp = WiresharkMCP(filepath)
        proto_context = mcp.extract_protocol(
            protocol=proto,
            include_headers=True,
            include_body=False,
            max_conversations=10
        )
        
        # Extract insights if available
        try:
            proto_insights = mcp.protocol_insights(
                protocol=proto,
                extract_queries=True,
                analyze_response_codes=True,
                detect_tunneling=True
            )
        except:
            proto_insights = {}
        
        # Format for Claude
        formatter = ClaudeFormatter()
        claude_prompt = formatter.format_context(
            proto_context,
            query=f"Analyze this {protocol} traffic and identify any security concerns or unusual patterns."
        )
        
        return render_template(
            'protocol.html',
            filename=filename,
            protocol=protocol,
            context=proto_context,
            insights=proto_insights,
            claude_prompt=claude_prompt
        )
    
    except Exception as e:
        logger.error(f"Error analyzing protocol: {e}", exc_info=True)
        flash(f"Error analyzing protocol: {str(e)}", 'error')
        return redirect(url_for('analyze', filename=filename))
@app.route('/api/protocols/<filename>')
def api_get_protocols(filename: str):
    """API endpoint to get available protocols in a capture file."""
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    
    if not os.path.exists(filepath):
        return jsonify({'error': 'File not found'}), 404
    
    try:
        mcp = WiresharkMCP(filepath)
        context = mcp.generate_context(max_packets=100)
        protocols = context.get('summary', {}).get('protocols', {})
        return jsonify({'protocols': protocols})
    
    except Exception as e:
        logger.error(f"API error: {e}", exc_info=True)
        return jsonify({'error': str(e)}), 500
@app.route('/api/context/<filename>')
def api_get_context(filename: str):
    """API endpoint to get basic context for a capture file."""
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    
    if not os.path.exists(filepath):
        return jsonify({'error': 'File not found'}), 404
    
    try:
        mcp = WiresharkMCP(filepath)
        context = mcp.generate_context(max_packets=100, include_statistics=True)
        return jsonify(context)
    
    except Exception as e:
        logger.error(f"API error: {e}", exc_info=True)
        return jsonify({'error': str(e)}), 500
@app.route('/api/protocol/<filename>/<protocol>')
def api_get_protocol(filename: str, protocol: str):
    """API endpoint to get protocol-specific data."""
    filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
    
    if not os.path.exists(filepath):
        return jsonify({'error': 'File not found'}), 404
    
    try:
        # Get protocol enum
        proto = next((p for p in Protocol if p.value.upper() == protocol.upper()), None)
        
        if not proto:
            return jsonify({'error': f'Unknown protocol: {protocol}'}), 400
        
        # Extract protocol data
        mcp = WiresharkMCP(filepath)
        proto_context = mcp.extract_protocol(
            protocol=proto,
            include_headers=True,
            include_body=False,
            max_conversations=10
        )
        
        return jsonify(proto_context)
    
    except Exception as e:
        logger.error(f"API error: {e}", exc_info=True)
        return jsonify({'error': str(e)}), 500
@app.route('/cleanup', methods=['POST'])
def cleanup():
    """Clean up temporary files to free disk space."""
    try:
        count = 0
        for filename in os.listdir(app.config['UPLOAD_FOLDER']):
            filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
            # Remove files older than 1 day
            if os.path.isfile(filepath) and (datetime.now().timestamp() - os.path.getmtime(filepath)) > 86400:
                os.remove(filepath)
                count += 1
        
        flash(f"Removed {count} old files", 'success')
    except Exception as e:
        logger.error(f"Error cleaning up: {e}", exc_info=True)
        flash(f"Error cleaning up: {str(e)}", 'error')
    
    return redirect(url_for('index'))
@app.errorhandler(404)
def page_not_found(e):
    """Handle 404 errors."""
    return render_template('404.html'), 404
@app.errorhandler(500)
def server_error(e):
    """Handle 500 errors."""
    return render_template('500.html'), 500
if __name__ == '__main__':
    # In production, use a proper WSGI server
    app.run(debug=True, host='0.0.0.0', port=5000)