Skip to main content
Glama

Wireshark MCP

app.py9.69 kB
#!/usr/bin/env python3 """ Web interface for Wireshark MCP A simple Flask application that provides a web interface for using Wireshark MCP to analyze network captures and generate AI-friendly contexts. """ import os import tempfile import logging import json from typing import Dict, Any, List, Optional from datetime import datetime from flask import Flask, request, render_template, jsonify, redirect, url_for, flash from werkzeug.utils import secure_filename from wireshark_mcp import WiresharkMCP, Protocol, Filter from wireshark_mcp.formatters import ClaudeFormatter # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Initialize Flask app = Flask(__name__) app.secret_key = os.environ.get('SECRET_KEY', 'dev-key-for-testing-only') # Configure upload settings UPLOAD_FOLDER = os.path.join(tempfile.gettempdir(), 'wireshark_mcp_uploads') ALLOWED_EXTENSIONS = {'pcap', 'pcapng', 'cap'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 50 * 1024 * 1024 # 50MB limit # Create upload folder if it doesn't exist os.makedirs(UPLOAD_FOLDER, exist_ok=True) def allowed_file(filename: str) -> bool: """Check if the file extension is allowed.""" return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @app.route('/') def index(): """Render the home page.""" protocols = [p.value for p in Protocol] return render_template('index.html', protocols=protocols) @app.route('/upload', methods=['POST']) def upload_file(): """Handle file upload and initial processing.""" if 'file' not in request.files: flash('No file part', 'error') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('No selected file', 'error') return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) timestamp = datetime.now().strftime('%Y%m%d%H%M%S') unique_filename = f"{timestamp}_{filename}" filepath = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) file.save(filepath) logger.info(f"Saved uploaded file to {filepath}") # Redirect to analysis page return redirect(url_for('analyze', filename=unique_filename)) flash('Invalid file type', 'error') return redirect(request.url) @app.route('/analyze/<filename>') def analyze(filename: str): """Analyze a packet capture file.""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if not os.path.exists(filepath): flash('File not found', 'error') return redirect(url_for('index')) # Generate basic packet summary for display try: mcp = WiresharkMCP(filepath) context = mcp.generate_context(max_packets=100, include_statistics=True) # Extract summary data summary = context.get('summary', {}) statistics = context.get('statistics', {}) # Get available protocols in the capture available_protocols = [] for proto, count in summary.get('protocols', {}).items(): if count > 0: # Check if we have an analyzer for this protocol try: p = next((p for p in Protocol if p.value.upper() == proto.upper()), None) if p: available_protocols.append({ 'name': p.value, 'count': count, 'has_analyzer': p in [Protocol.HTTP, Protocol.DNS, Protocol.SMTP, Protocol.TLS] }) except: # If protocol isn't in our enum, skip it pass return render_template( 'analyze.html', filename=filename, summary=summary, statistics=statistics, available_protocols=available_protocols ) except Exception as e: logger.error(f"Error analyzing file: {e}", exc_info=True) flash(f"Error analyzing file: {str(e)}", 'error') return redirect(url_for('index')) @app.route('/protocol/<filename>/<protocol>') def analyze_protocol(filename: str, protocol: str): """Analyze a specific protocol in the packet capture.""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if not os.path.exists(filepath): flash('File not found', 'error') return redirect(url_for('index')) try: # Get protocol enum proto = next((p for p in Protocol if p.value.upper() == protocol.upper()), None) if not proto: flash(f"Unknown protocol: {protocol}", 'error') return redirect(url_for('analyze', filename=filename)) # Extract protocol data mcp = WiresharkMCP(filepath) proto_context = mcp.extract_protocol( protocol=proto, include_headers=True, include_body=False, max_conversations=10 ) # Extract insights if available try: proto_insights = mcp.protocol_insights( protocol=proto, extract_queries=True, analyze_response_codes=True, detect_tunneling=True ) except: proto_insights = {} # Format for Claude formatter = ClaudeFormatter() claude_prompt = formatter.format_context( proto_context, query=f"Analyze this {protocol} traffic and identify any security concerns or unusual patterns." ) return render_template( 'protocol.html', filename=filename, protocol=protocol, context=proto_context, insights=proto_insights, claude_prompt=claude_prompt ) except Exception as e: logger.error(f"Error analyzing protocol: {e}", exc_info=True) flash(f"Error analyzing protocol: {str(e)}", 'error') return redirect(url_for('analyze', filename=filename)) @app.route('/api/protocols/<filename>') def api_get_protocols(filename: str): """API endpoint to get available protocols in a capture file.""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 try: mcp = WiresharkMCP(filepath) context = mcp.generate_context(max_packets=100) protocols = context.get('summary', {}).get('protocols', {}) return jsonify({'protocols': protocols}) except Exception as e: logger.error(f"API error: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/api/context/<filename>') def api_get_context(filename: str): """API endpoint to get basic context for a capture file.""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 try: mcp = WiresharkMCP(filepath) context = mcp.generate_context(max_packets=100, include_statistics=True) return jsonify(context) except Exception as e: logger.error(f"API error: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/api/protocol/<filename>/<protocol>') def api_get_protocol(filename: str, protocol: str): """API endpoint to get protocol-specific data.""" filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if not os.path.exists(filepath): return jsonify({'error': 'File not found'}), 404 try: # Get protocol enum proto = next((p for p in Protocol if p.value.upper() == protocol.upper()), None) if not proto: return jsonify({'error': f'Unknown protocol: {protocol}'}), 400 # Extract protocol data mcp = WiresharkMCP(filepath) proto_context = mcp.extract_protocol( protocol=proto, include_headers=True, include_body=False, max_conversations=10 ) return jsonify(proto_context) except Exception as e: logger.error(f"API error: {e}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/cleanup', methods=['POST']) def cleanup(): """Clean up temporary files to free disk space.""" try: count = 0 for filename in os.listdir(app.config['UPLOAD_FOLDER']): filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) # Remove files older than 1 day if os.path.isfile(filepath) and (datetime.now().timestamp() - os.path.getmtime(filepath)) > 86400: os.remove(filepath) count += 1 flash(f"Removed {count} old files", 'success') except Exception as e: logger.error(f"Error cleaning up: {e}", exc_info=True) flash(f"Error cleaning up: {str(e)}", 'error') return redirect(url_for('index')) @app.errorhandler(404) def page_not_found(e): """Handle 404 errors.""" return render_template('404.html'), 404 @app.errorhandler(500) def server_error(e): """Handle 500 errors.""" return render_template('500.html'), 500 if __name__ == '__main__': # In production, use a proper WSGI server app.run(debug=True, host='0.0.0.0', port=5000)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/sarthaksiddha/Wireshark-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server