Skip to main content
Glama
ip.rs4.15 kB
/// /// This module contains tools for analyzing IP addresses. /// use crate::cortex; use rmcp::{ handler::server::tool::Parameters, model::*, schemars, ErrorData, }; use std::net::IpAddr; #[derive(Debug, serde::Deserialize, schemars::JsonSchema)] pub struct AnalyzeIpParams { #[schemars(description = "The IP address to analyze.")] pub ip: String, #[schemars( description = "Optional: The name of the AbuseIPDB analyzer instance in Cortex. Defaults to 'AbuseIPDB_1_0'." )] pub analyzer_name: Option<String>, #[schemars(description = "Optional: Maximum number of retries to wait for the analyzer job to complete. Defaults to 5.")] pub max_retries: Option<usize>, } pub async fn analyze_ip_with_abuseipdb( server: &crate::CortexToolsServer, Parameters(params): Parameters<AnalyzeIpParams>, ) -> Result<CallToolResult, ErrorData> { let ip_to_analyze = params.ip; // Validate IP address if let Err(e) = validate_ip(&ip_to_analyze) { return Ok(CallToolResult::error(vec![Content::text(format!("Invalid IP: {}", e))])); } let analyzer_name_to_run = params .analyzer_name .unwrap_or_else(|| "AbuseIPDB_1_0".to_string()); let job_create_request = cortex_client::models::JobCreateRequest { data: Some(ip_to_analyze.clone()), data_type: Some("ip".to_string()), tlp: Some(2), pap: Some(2), message: Some(Some(format!( "MCP Cortex Server: Analyzing IP {} with {}", ip_to_analyze, analyzer_name_to_run ))), parameters: None, label: Some(Some(format!("mcp_ip_analysis_{}", ip_to_analyze))), force: Some(false), attributes: None, }; cortex::run_analyzer_and_get_report( &server.cortex_config, &analyzer_name_to_run, job_create_request, &ip_to_analyze, params.max_retries.unwrap_or(5), ) .await } // --- Utility Functions --- /// Validates an IP address to ensure it's properly formatted fn validate_ip(ip: &str) -> Result<(), String> { if ip.trim().is_empty() { return Err("IP address cannot be empty".to_string()); } // Parse IP address to validate format ip.parse::<IpAddr>().map_err(|_| "Invalid IP address format".to_string())?; // Check for private/loopback addresses that shouldn't be analyzed let parsed_ip = ip.parse::<IpAddr>().unwrap(); match parsed_ip { IpAddr::V4(ipv4) => { if ipv4.is_loopback() || ipv4.is_private() || ipv4.is_link_local() { return Err("Private, loopback, or link-local IP addresses cannot be analyzed".to_string()); } } IpAddr::V6(ipv6) => { if ipv6.is_loopback() || ipv6.is_multicast() { return Err("Loopback or multicast IPv6 addresses cannot be analyzed".to_string()); } } } Ok(()) } #[cfg(test)] mod tests { use super::*; #[test] fn test_validate_ip_valid() { assert!(validate_ip("8.8.8.8").is_ok()); assert!(validate_ip("1.1.1.1").is_ok()); assert!(validate_ip("2001:4860:4860::8888").is_ok()); } #[test] fn test_validate_ip_empty() { assert!(validate_ip("").is_err()); assert!(validate_ip(" ").is_err()); } #[test] fn test_validate_ip_invalid_format() { assert!(validate_ip("256.256.256.256").is_err()); assert!(validate_ip("192.168.1").is_err()); assert!(validate_ip("not.an.ip.address").is_err()); assert!(validate_ip("192.168.1.1.1").is_err()); } #[test] fn test_validate_ip_private_addresses() { assert!(validate_ip("192.168.1.1").is_err()); assert!(validate_ip("10.0.0.1").is_err()); assert!(validate_ip("172.16.0.1").is_err()); assert!(validate_ip("127.0.0.1").is_err()); assert!(validate_ip("169.254.1.1").is_err()); } #[test] fn test_validate_ip_ipv6_special() { assert!(validate_ip("::1").is_err()); // loopback assert!(validate_ip("ff02::1").is_err()); // multicast } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/gbrigandi/mcp-server-cortex'

If you have feedback or need assistance with the MCP directory API, please join our Discord server