Skip to main content
Glama
marc-shade

Threat Intelligence MCP Server

by marc-shade
dashboard.py27.9 kB
#!/usr/bin/env python3 """ Threat Intelligence Dashboard Real-time web dashboard for threat monitoring. """ import json import os from datetime import datetime, timedelta from pathlib import Path from typing import Any from flask import Flask, render_template_string, jsonify, request from flask_cors import CORS from .config import ( THREAT_FEEDS, threat_cache, validate_ip, validate_hash, get_enabled_feeds, get_timestamp, DATA_DIR, CACHE_DIR, ) app = Flask(__name__) CORS(app) # Dashboard HTML template DASHBOARD_HTML = """ <!DOCTYPE html> <html lang="en" data-theme="dark"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Threat Intelligence Dashboard - Pixel's Eye</title> <style> :root { --bg-primary: #0a0e17; --bg-secondary: #111827; --bg-card: #1f2937; --text-primary: #f3f4f6; --text-secondary: #9ca3af; --accent-green: #10b981; --accent-red: #ef4444; --accent-yellow: #f59e0b; --accent-blue: #3b82f6; --accent-purple: #8b5cf6; --border: #374151; } * { box-sizing: border-box; margin: 0; padding: 0; } body { font-family: 'SF Mono', 'Monaco', 'Consolas', monospace; background: var(--bg-primary); color: var(--text-primary); min-height: 100vh; } .header { background: linear-gradient(135deg, var(--bg-secondary), var(--bg-card)); padding: 1.5rem 2rem; border-bottom: 1px solid var(--border); display: flex; justify-content: space-between; align-items: center; } .header h1 { font-size: 1.5rem; display: flex; align-items: center; gap: 0.75rem; } .header h1 .pixel { font-size: 2rem; } .status-badge { background: var(--accent-green); color: black; padding: 0.25rem 0.75rem; border-radius: 9999px; font-size: 0.75rem; font-weight: bold; animation: pulse 2s infinite; } @keyframes pulse { 0%, 100% { opacity: 1; } 50% { opacity: 0.7; } } .container { max-width: 1600px; margin: 0 auto; padding: 1.5rem; } .grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 1.5rem; } .card { background: var(--bg-card); border-radius: 0.75rem; border: 1px solid var(--border); overflow: hidden; } .card-header { padding: 1rem 1.25rem; border-bottom: 1px solid var(--border); display: flex; justify-content: space-between; align-items: center; } .card-header h2 { font-size: 0.875rem; text-transform: uppercase; letter-spacing: 0.05em; color: var(--text-secondary); } .card-body { padding: 1.25rem; } .stat-grid { display: grid; grid-template-columns: repeat(2, 1fr); gap: 1rem; } .stat { text-align: center; padding: 1rem; background: var(--bg-secondary); border-radius: 0.5rem; } .stat-value { font-size: 2rem; font-weight: bold; color: var(--accent-blue); } .stat-label { font-size: 0.75rem; color: var(--text-secondary); margin-top: 0.25rem; } .stat.danger .stat-value { color: var(--accent-red); } .stat.warning .stat-value { color: var(--accent-yellow); } .stat.success .stat-value { color: var(--accent-green); } .feed-list { list-style: none; } .feed-item { display: flex; justify-content: space-between; align-items: center; padding: 0.75rem; border-bottom: 1px solid var(--border); } .feed-item:last-child { border-bottom: none; } .feed-name { font-weight: 500; } .feed-count { background: var(--bg-secondary); padding: 0.25rem 0.75rem; border-radius: 9999px; font-size: 0.875rem; color: var(--accent-blue); } .alert-list { list-style: none; } .alert-item { padding: 0.75rem; margin-bottom: 0.5rem; border-radius: 0.5rem; border-left: 3px solid; } .alert-item.critical { background: rgba(239, 68, 68, 0.1); border-color: var(--accent-red); } .alert-item.high { background: rgba(245, 158, 11, 0.1); border-color: var(--accent-yellow); } .alert-item.medium { background: rgba(59, 130, 246, 0.1); border-color: var(--accent-blue); } .kev-list { max-height: 400px; overflow-y: auto; } .kev-item { padding: 0.75rem; border-bottom: 1px solid var(--border); } .kev-cve { font-weight: bold; color: var(--accent-red); } .kev-vendor { color: var(--accent-purple); font-size: 0.875rem; } .kev-name { font-size: 0.875rem; color: var(--text-secondary); margin-top: 0.25rem; } .refresh-btn { background: var(--accent-blue); color: white; border: none; padding: 0.5rem 1rem; border-radius: 0.5rem; cursor: pointer; font-family: inherit; } .refresh-btn:hover { background: #2563eb; } .last-update { font-size: 0.75rem; color: var(--text-secondary); } .wide-card { grid-column: span 2; } @media (max-width: 768px) { .wide-card { grid-column: span 1; } } .loading { text-align: center; padding: 2rem; color: var(--text-secondary); } .loading::after { content: '...'; animation: dots 1.5s infinite; } @keyframes dots { 0%, 20% { content: '.'; } 40% { content: '..'; } 60%, 100% { content: '...'; } } .search-box { display: flex; gap: 0.5rem; margin-bottom: 1rem; } .search-box input { flex: 1; padding: 0.5rem; background: var(--bg-secondary); border: 1px solid var(--border); border-radius: 0.5rem; color: var(--text-primary); font-family: inherit; } .search-box button { padding: 0.5rem 1rem; background: var(--accent-purple); border: none; border-radius: 0.5rem; color: white; cursor: pointer; } .result-box { padding: 1rem; background: var(--bg-secondary); border-radius: 0.5rem; font-size: 0.875rem; white-space: pre-wrap; max-height: 200px; overflow-y: auto; } .threat-clean { color: var(--accent-green); } .threat-low { color: var(--accent-blue); } .threat-medium { color: var(--accent-yellow); } .threat-high { color: var(--accent-red); } .threat-critical { color: #ff0000; font-weight: bold; } </style> </head> <body> <header class="header"> <h1> <span class="pixel">🐕</span> Threat Intelligence Dashboard </h1> <div style="display: flex; align-items: center; gap: 1rem;"> <span class="last-update" id="lastUpdate">Loading...</span> <span class="status-badge">LIVE</span> <button class="refresh-btn" onclick="refreshAll()">Refresh</button> </div> </header> <div class="container"> <div class="grid"> <!-- IP Check Card --> <div class="card"> <div class="card-header"> <h2>IP Reputation Check</h2> </div> <div class="card-body"> <div class="search-box"> <input type="text" id="ipInput" placeholder="Enter IP address..."> <button onclick="checkIP()">Check</button> </div> <div class="result-box" id="ipResult">Enter an IP to check reputation</div> </div> </div> <!-- Hash Check Card --> <div class="card"> <div class="card-header"> <h2>Hash Reputation Check</h2> </div> <div class="card-body"> <div class="search-box"> <input type="text" id="hashInput" placeholder="Enter MD5/SHA1/SHA256..."> <button onclick="checkHash()">Check</button> </div> <div class="result-box" id="hashResult">Enter a hash to check reputation</div> </div> </div> <!-- Summary Stats --> <div class="card"> <div class="card-header"> <h2>Threat Summary</h2> </div> <div class="card-body"> <div class="stat-grid"> <div class="stat danger"> <div class="stat-value" id="maliciousIps">-</div> <div class="stat-label">Malicious IPs</div> </div> <div class="stat warning"> <div class="stat-value" id="maliciousUrls">-</div> <div class="stat-label">Malware URLs</div> </div> <div class="stat"> <div class="stat-value" id="recentCves">-</div> <div class="stat-label">Recent KEVs (7d)</div> </div> <div class="stat success"> <div class="stat-value" id="feedsActive">-</div> <div class="stat-label">Active Feeds</div> </div> </div> </div> </div> <!-- Feed Status --> <div class="card"> <div class="card-header"> <h2>Threat Feeds</h2> </div> <div class="card-body"> <ul class="feed-list" id="feedList"> <li class="loading">Loading feeds</li> </ul> </div> </div> <!-- Cache Stats --> <div class="card"> <div class="card-header"> <h2>Cache Statistics</h2> </div> <div class="card-body"> <div class="stat-grid"> <div class="stat"> <div class="stat-value" id="cacheSize">-</div> <div class="stat-label">Cached Items</div> </div> <div class="stat"> <div class="stat-value" id="cacheMaxSize">-</div> <div class="stat-label">Max Size</div> </div> </div> <div style="margin-top: 1rem; text-align: center;"> <button class="refresh-btn" style="background: var(--accent-red);" onclick="clearCache()">Clear Cache</button> </div> </div> </div> <!-- Network Status --> <div class="card"> <div class="card-header"> <h2>Network Devices</h2> </div> <div class="card-body"> <div class="stat-grid"> <div class="stat success"> <div class="stat-value" id="devicesOnline">-</div> <div class="stat-label">Online</div> </div> <div class="stat"> <div class="stat-value" id="clusterNodes">-</div> <div class="stat-label">Cluster Nodes</div> </div> <div class="stat warning"> <div class="stat-value" id="unknownDevices">-</div> <div class="stat-label">Unknown</div> </div> <div class="stat danger"> <div class="stat-value" id="threatMatches">0</div> <div class="stat-label">Threats Detected</div> </div> </div> </div> </div> <!-- CISA KEV --> <div class="card wide-card"> <div class="card-header"> <h2>CISA Known Exploited Vulnerabilities (Recent)</h2> </div> <div class="card-body"> <div class="kev-list" id="kevList"> <div class="loading">Loading KEV data</div> </div> </div> </div> <!-- Alerts --> <div class="card"> <div class="card-header"> <h2>Active Alerts</h2> </div> <div class="card-body"> <ul class="alert-list" id="alertList"> <li class="alert-item medium">Dashboard initialized</li> </ul> </div> </div> </div> </div> <script> const API_BASE = window.location.origin; async function fetchData(endpoint) { try { const response = await fetch(`${API_BASE}${endpoint}`); return await response.json(); } catch (error) { console.error(`Error fetching ${endpoint}:`, error); return null; } } async function checkIP() { const ip = document.getElementById('ipInput').value.trim(); const resultBox = document.getElementById('ipResult'); if (!ip) { resultBox.textContent = 'Please enter an IP address'; return; } resultBox.textContent = 'Checking...'; const data = await fetchData(`/api/check-ip/${ip}`); if (data) { const level = data.threat_level || 'unknown'; resultBox.innerHTML = ` <span class="threat-${level}">Threat Level: ${level.toUpperCase()}</span> IP: ${data.ip} Sources Checked: ${data.sources_checked || 0} Threats Found: ${data.threats_found || 0} ${data.details ? 'Details: ' + JSON.stringify(data.details, null, 2) : ''}`; } else { resultBox.textContent = 'Error checking IP'; } } async function checkHash() { const hash = document.getElementById('hashInput').value.trim(); const resultBox = document.getElementById('hashResult'); if (!hash) { resultBox.textContent = 'Please enter a hash'; return; } resultBox.textContent = 'Checking...'; const data = await fetchData(`/api/check-hash/${hash}`); if (data) { if (data.success === false) { resultBox.textContent = `Error: ${data.error}`; } else { resultBox.innerHTML = ` Hash Type: ${data.hash_type || 'unknown'} Hash: ${data.hash} Threats Found: ${data.threats_found || 0} ${data.malware_names?.length ? 'Malware: ' + data.malware_names.join(', ') : ''}`; } } else { resultBox.textContent = 'Error checking hash'; } } async function clearCache() { const data = await fetchData('/api/cache/clear'); if (data && data.success) { alert('Cache cleared successfully'); updateCacheStats(); } } async function updateSummary() { const data = await fetchData('/api/summary'); if (!data) return; document.getElementById('maliciousIps').textContent = data.totals?.malicious_ips?.toLocaleString() || '0'; document.getElementById('maliciousUrls').textContent = data.totals?.malicious_urls?.toLocaleString() || '0'; document.getElementById('recentCves').textContent = data.totals?.recent_cves || '0'; document.getElementById('feedsActive').textContent = data.feeds_count || Object.keys(data.feeds || {}).length; // Update feed list const feedList = document.getElementById('feedList'); if (data.feeds) { feedList.innerHTML = Object.entries(data.feeds).map(([name, info]) => ` <li class="feed-item"> <span class="feed-name">${name.replace(/_/g, ' ')}</span> <span class="feed-count">${info.enabled ? 'Active' : 'Disabled'}</span> </li> `).join(''); } // Update alerts if (data.alerts && data.alerts.length > 0) { const alertList = document.getElementById('alertList'); alertList.innerHTML = data.alerts.map(alert => ` <li class="alert-item ${alert.severity}"> ${alert.message} </li> `).join(''); } } async function updateKEV() { const data = await fetchData('/api/kev'); if (!data || !data.vulnerabilities) return; const kevList = document.getElementById('kevList'); kevList.innerHTML = data.vulnerabilities.slice(0, 20).map(kev => ` <div class="kev-item"> <span class="kev-cve">${kev.cve_id}</span> <span class="kev-vendor">${kev.vendor} - ${kev.product}</span> <div class="kev-name">${kev.name}</div> </div> `).join('') || '<div class="loading">No recent KEVs</div>'; } async function updateNetworkStatus() { const data = await fetchData('/api/network'); if (!data) return; document.getElementById('devicesOnline').textContent = data.total_devices || '-'; document.getElementById('clusterNodes').textContent = data.cluster_nodes_online || '-'; document.getElementById('unknownDevices').textContent = data.unknown_devices || '-'; document.getElementById('threatMatches').textContent = data.threat_matches || '0'; } async function updateCacheStats() { const data = await fetchData('/api/cache/stats'); if (!data) return; document.getElementById('cacheSize').textContent = data.size || 0; document.getElementById('cacheMaxSize').textContent = data.max_size || 0; } function updateTimestamp() { document.getElementById('lastUpdate').textContent = `Last updated: ${new Date().toLocaleTimeString()}`; } async function refreshAll() { await Promise.all([ updateSummary(), updateKEV(), updateNetworkStatus(), updateCacheStats() ]); updateTimestamp(); } // Initial load refreshAll(); // Auto-refresh every 60 seconds setInterval(refreshAll, 60000); </script> </body> </html> """ def main(): """Entry point for the dashboard.""" run_dashboard() @app.route('/') def index(): """Serve the main dashboard page.""" return render_template_string(DASHBOARD_HTML) @app.route('/api/summary') def api_summary() -> Any: """Get threat summary data.""" enabled_feeds = get_enabled_feeds() summary = { "timestamp": get_timestamp(), "totals": { "malicious_ips": 0, "malicious_urls": 0, "recent_cves": 0 }, "feeds": {name: {"enabled": feed.enabled, "type": feed.feed_type.value} for name, feed in THREAT_FEEDS.items()}, "feeds_count": len(enabled_feeds), "alerts": [], "cache_stats": threat_cache.stats() } # Check cache for feed data cache_file = CACHE_DIR / "summary_cache.json" if cache_file.exists(): try: with open(cache_file) as f: cached = json.load(f) if cached.get("timestamp"): cache_time = datetime.fromisoformat(cached["timestamp"]) if datetime.now() - cache_time < timedelta(minutes=30): # Merge cached totals summary["totals"] = cached.get("totals", summary["totals"]) except Exception: pass return jsonify(summary) @app.route('/api/kev') def api_kev() -> Any: """Get CISA KEV data.""" cache_file = CACHE_DIR / "kev_cache.json" if cache_file.exists(): try: with open(cache_file) as f: data = json.load(f) # Filter to recent CVEs (last 30 days) cutoff = datetime.now() - timedelta(days=30) recent_vulns = [] for vuln in data.get("vulnerabilities", []): if vuln.get("date_added"): try: added = datetime.fromisoformat(vuln["date_added"]) if added >= cutoff: recent_vulns.append(vuln) except ValueError: pass return jsonify({"vulnerabilities": recent_vulns}) except Exception: pass return jsonify({"vulnerabilities": []}) @app.route('/api/network') def api_network() -> Any: """Get network status.""" network_data = { "total_devices": 0, "cluster_nodes_online": 0, "unknown_devices": 0, "threat_matches": 0 } # Read from network scanner data agentic_path = os.environ.get("AGENTIC_SYSTEM_PATH", "${AGENTIC_SYSTEM_PATH:-/opt/agentic}") history_file = Path(agentic_path) / "mcp-servers/network-scanner-mcp/data/device_history.json" known_file = Path(agentic_path) / "mcp-servers/network-scanner-mcp/data/known_devices.json" if history_file.exists(): try: with open(history_file) as f: history = json.load(f) network_data["total_devices"] = len(history.get("devices", {})) except Exception: pass if known_file.exists(): try: with open(known_file) as f: known = json.load(f) # Count cluster nodes (infrastructure type) cluster_count = sum(1 for d in known.get("devices", {}).values() if d.get("type") == "infrastructure") network_data["cluster_nodes_online"] = cluster_count # Count unknown (total - known) known_count = len(known.get("devices", {})) network_data["unknown_devices"] = max(0, network_data["total_devices"] - known_count) except Exception: pass return jsonify(network_data) @app.route('/api/check-ip/<ip>') def api_check_ip(ip: str) -> Any: """Check IP reputation via REST API.""" is_valid, error = validate_ip(ip) if not is_valid: return jsonify({ "success": False, "error": error, "ip": ip }) # Check cache first cached = threat_cache.get(f"ip:{ip}") if cached: return jsonify(cached) # Return basic info - full check requires async MCP tools result = { "success": True, "ip": ip, "threat_level": "unknown", "sources_checked": 0, "threats_found": 0, "message": "For full reputation check, use the MCP tools directly", "timestamp": get_timestamp() } return jsonify(result) @app.route('/api/check-hash/<file_hash>') def api_check_hash(file_hash: str) -> Any: """Check hash reputation via REST API.""" is_valid, hash_type, error = validate_hash(file_hash) if not is_valid: return jsonify({ "success": False, "error": error, "hash": file_hash }) # Check cache first cached = threat_cache.get(f"hash:{file_hash.lower()}") if cached: return jsonify(cached) # Return basic info result = { "success": True, "hash": file_hash.lower(), "hash_type": hash_type, "threats_found": 0, "malware_names": [], "message": "For full reputation check, use the MCP tools directly", "timestamp": get_timestamp() } return jsonify(result) @app.route('/api/cache/stats') def api_cache_stats() -> Any: """Get cache statistics.""" return jsonify(threat_cache.stats()) @app.route('/api/cache/clear', methods=['GET', 'POST']) def api_cache_clear() -> Any: """Clear the threat cache.""" threat_cache.clear() return jsonify({ "success": True, "message": "Cache cleared", "timestamp": get_timestamp() }) @app.route('/api/feeds') def api_feeds() -> Any: """List all configured threat feeds.""" feeds = {} for name, feed in THREAT_FEEDS.items(): feeds[name] = { "name": feed.name, "description": feed.description, "url": feed.url, "feed_type": feed.feed_type.value, "enabled": feed.enabled, "requires_api_key": feed.requires_api_key } return jsonify({ "success": True, "feeds": feeds, "total_feeds": len(feeds), "enabled_feeds": len(get_enabled_feeds()) }) @app.route('/api/health') def api_health() -> Any: """Health check endpoint.""" return jsonify({ "status": "healthy", "timestamp": get_timestamp(), "cache_size": threat_cache.stats()["size"], "feeds_configured": len(THREAT_FEEDS) }) def run_dashboard(host: str = '0.0.0.0', port: int = 8889) -> None: """Run the Flask dashboard.""" DATA_DIR.mkdir(parents=True, exist_ok=True) CACHE_DIR.mkdir(parents=True, exist_ok=True) print(f"Starting Threat Intelligence Dashboard on http://{host}:{port}") app.run(host=host, port=port, debug=False, threaded=True) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/marc-shade/threat-intel-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server