Skip to main content
Glama
amittell

firewalla-mcp-server

get_flow_data

Query network traffic flows from Firewalla firewall to monitor security, analyze network activity, and track bandwidth usage with filtering, grouping, and sorting options.

Instructions

Query network traffic flows from Firewalla firewall

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryNoSearch query for flows. Supports region:US for geographic filtering, protocol:tcp, blocked:true, domain:*, category:social, etc.
groupByNoGroup flows by specified values (e.g., "domain,box")
sortByNoSort flows (default: "ts:desc")
limitNoMaximum results (optional, default: 200, API maximum: 500)
cursorNoPagination cursor from previous response

Implementation Reference

  • The GetFlowDataHandler class provides the core implementation of the 'get_flow_data' tool. It handles parameter sanitization and validation, supports streaming and pagination, calls the Firewalla API (firewalla.getFlowData), processes and enriches the flow data (timestamps, IPs, geo, bytes, etc.), and returns standardized paginated responses.
    export class GetFlowDataHandler extends BaseToolHandler { name = 'get_flow_data'; description = 'Query network traffic flows with pagination. Data is cached for 15 seconds for performance. Use force_refresh=true to bypass cache for real-time data.'; category = 'network' as const; constructor() { super({ enableGeoEnrichment: true, enableFieldNormalization: true, additionalMeta: { data_source: 'flows', entity_type: 'network_flows', supports_geographic_enrichment: true, supports_field_normalization: true, supports_streaming: true, supports_pagination: true, standardization_version: '2.0.0', }, }); } async execute( rawArgs: unknown, firewalla: FirewallaClient ): Promise<ToolResponse> { // Early parameter sanitization to prevent null/undefined errors const sanitizationResult = this.sanitizeParameters(rawArgs); if ('errorResponse' in sanitizationResult) { return sanitizationResult.errorResponse; } const args = sanitizationResult.sanitizedArgs; const startTime = Date.now(); try { // Parameter validation const limitValidation = ParameterValidator.validateNumber( args?.limit, 'limit', { required: false, defaultValue: 200, ...getLimitValidationConfig(this.name), } ); if (!limitValidation.isValid) { return this.createErrorResponse( 'Parameter validation failed', ErrorType.VALIDATION_ERROR, undefined, limitValidation.errors ); } const query = args?.query; const groupBy = args?.groupBy; const sortBy = args?.sortBy; const limit = limitValidation.sanitizedValue! as number; const cursor = args?.cursor; // Check if streaming is requested or should be automatically enabled const enableStreaming = Boolean(args?.stream) || shouldUseStreaming(this.name, limit); const streamingSessionId = args?.streaming_session_id as | string | undefined; // Validate individual date parameters before building query const startTimeArg = args?.start_time as string | undefined; const endTime = args?.end_time as string | undefined; let finalQuery = query; // Validate start_time if provided if (startTimeArg !== undefined) { const startTimeValidation = ParameterValidator.validateDateFormat( startTimeArg, 'start_time', false ); if (!startTimeValidation.isValid) { return this.createErrorResponse( 'Invalid start_time format', ErrorType.VALIDATION_ERROR, { provided_value: startTimeArg, documentation: 'See /docs/query-syntax-guide.md for time range examples', }, startTimeValidation.errors ); } } // Validate end_time if provided if (endTime !== undefined) { const endTimeValidation = ParameterValidator.validateDateFormat( endTime, 'end_time', false ); if (!endTimeValidation.isValid) { return this.createErrorResponse( 'Invalid end_time format', ErrorType.VALIDATION_ERROR, { provided_value: endTime, documentation: 'See /docs/query-syntax-guide.md for time range examples', }, endTimeValidation.errors ); } } // Validate cursor format if provided if (cursor !== undefined) { const cursorValidation = ParameterValidator.validateCursor( cursor, 'cursor' ); if (!cursorValidation.isValid) { return this.createErrorResponse( 'Invalid cursor format', ErrorType.VALIDATION_ERROR, { provided_value: cursor, documentation: 'Cursors should be obtained from previous response next_cursor field', }, cursorValidation.errors ); } } // Build time range query if both dates are provided and valid if (startTimeArg && endTime) { const startDate = new Date(startTimeArg); const endDate = new Date(endTime); // Validate time range order (dates are already validated for format above) if (startDate >= endDate) { return this.createErrorResponse( 'Invalid time range order', ErrorType.VALIDATION_ERROR, { details: 'Start time must be before end time', received: { start_time: startTimeArg, end_time: endTime, parsed_start: startDate.toISOString(), parsed_end: endDate.toISOString(), }, time_difference: `Start is ${Math.abs(startDate.getTime() - endDate.getTime()) / 1000} seconds after end`, }, [ 'Ensure start_time is chronologically before end_time', 'Check timezone handling - times may be in different zones', 'Verify date format includes correct year/month/day values', 'For recent data, try: start_time: "2024-01-01T00:00:00Z", end_time: "2024-01-02T00:00:00Z"', ] ); } const startTs = Math.floor(startDate.getTime() / 1000); const endTs = Math.floor(endDate.getTime() / 1000); const timeQuery = `ts:${startTs}-${endTs}`; finalQuery = query ? `(${query}) AND ${timeQuery}` : timeQuery; } // Handle streaming mode if enabled if (enableStreaming) { const streamingManager = StreamingManager.forTool(this.name); // Define the streaming operation const streamingOperation: StreamingOperation = async params => { const response = await withToolTimeout( async () => firewalla.getFlowData( finalQuery, groupBy, sortBy, params.limit || 100, params.cursor ), this.name ); // Process flows for this chunk const processedFlows = SafeAccess.safeArrayMap( response.results, (flow: any) => ({ timestamp: unixToISOStringOrNow(flow.ts), source_ip: SafeAccess.getNestedValue( flow, 'source.ip', SafeAccess.getNestedValue(flow, 'device.ip', 'unknown') ), destination_ip: SafeAccess.getNestedValue( flow, 'destination.ip', 'unknown' ), protocol: SafeAccess.getNestedValue(flow, 'protocol', 'unknown'), bytes: (SafeAccess.getNestedValue(flow, 'download', 0) as number) + (SafeAccess.getNestedValue(flow, 'upload', 0) as number), download: SafeAccess.getNestedValue(flow, 'download', 0), upload: SafeAccess.getNestedValue(flow, 'upload', 0), packets: SafeAccess.getNestedValue(flow, 'count', 0), duration: SafeAccess.getNestedValue(flow, 'duration', 0), direction: SafeAccess.getNestedValue( flow, 'direction', 'unknown' ), blocked: SafeAccess.getNestedValue(flow, 'block', false), block_type: SafeAccess.getNestedValue(flow, 'blockType', null), device: SafeAccess.getNestedValue(flow, 'device', {}), source: SafeAccess.getNestedValue(flow, 'source', {}), destination: SafeAccess.getNestedValue(flow, 'destination', {}), region: SafeAccess.getNestedValue(flow, 'region', null), category: SafeAccess.getNestedValue(flow, 'category', null), }) ); return { data: processedFlows, hasMore: !!response.next_cursor, nextCursor: response.next_cursor, total: (response as any).total_count, }; }; if (streamingSessionId) { // Continue existing streaming session const chunk = await streamingManager.continueStreaming( streamingSessionId, streamingOperation ); if (!chunk) { return this.createErrorResponse( 'Failed to continue streaming session', ErrorType.API_ERROR ); } return createStreamingResponse(chunk); } // Start new streaming session const { firstChunk } = await streamingManager.startStreaming( this.name, streamingOperation, { query: finalQuery, groupBy, sortBy, limit, start_time: startTimeArg, end_time: endTime, } ); return createStreamingResponse(firstChunk); } const response = await withToolTimeout( async () => firewalla.getFlowData(finalQuery, groupBy, sortBy, limit, cursor), this.name ); const executionTime = Date.now() - startTime; // Process flow data let processedFlows = SafeAccess.safeArrayMap( response.results, (flow: any) => ({ timestamp: unixToISOStringOrNow(flow.ts), source_ip: SafeAccess.getNestedValue( flow, 'source.ip', SafeAccess.getNestedValue(flow, 'device.ip', 'unknown') ), destination_ip: SafeAccess.getNestedValue( flow, 'destination.ip', 'unknown' ), protocol: SafeAccess.getNestedValue(flow, 'protocol', 'unknown'), bytes: (SafeAccess.getNestedValue(flow, 'download', 0) as number) + (SafeAccess.getNestedValue(flow, 'upload', 0) as number), download: SafeAccess.getNestedValue(flow, 'download', 0), upload: SafeAccess.getNestedValue(flow, 'upload', 0), packets: SafeAccess.getNestedValue(flow, 'count', 0), duration: SafeAccess.getNestedValue(flow, 'duration', 0), direction: SafeAccess.getNestedValue(flow, 'direction', 'unknown'), blocked: SafeAccess.getNestedValue(flow, 'block', false), block_type: SafeAccess.getNestedValue(flow, 'blockType', null), device: SafeAccess.getNestedValue(flow, 'device', {}), source: SafeAccess.getNestedValue(flow, 'source', {}), destination: SafeAccess.getNestedValue(flow, 'destination', {}), region: SafeAccess.getNestedValue(flow, 'region', null), category: SafeAccess.getNestedValue(flow, 'category', null), }) ); // Apply geographic enrichment for IP addresses processedFlows = await this.enrichGeoIfNeeded(processedFlows, [ 'source_ip', 'destination_ip', ]); // Create metadata for standardized response const metadata: PaginationMetadata = { cursor: response.next_cursor, hasMore: !!response.next_cursor, limit, executionTime, cached: false, source: 'firewalla_api', queryParams: { query: finalQuery, groupBy, sortBy, limit, cursor, start_time: startTimeArg, end_time: endTime, }, totalCount: (response as any).total_count, }; // Create standardized response const standardResponse = ResponseStandardizer.toPaginatedResponse( processedFlows, metadata ); return this.createUnifiedResponse(standardResponse, { executionTimeMs: executionTime, }); } catch (error: unknown) { // Handle timeout errors specifically if (error instanceof TimeoutError) { return createTimeoutErrorResponse( this.name, error.duration, 10000 // Default timeout from timeout-manager ); } const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'; return this.createErrorResponse( `Failed to get flow data: ${errorMessage}`, ErrorType.API_ERROR, { originalError: errorMessage } ); } } }
  • MCP protocol schema definition for the 'get_flow_data' tool, specifying input parameters (query, groupBy, sortBy, limit, cursor), their types, descriptions, constraints, and defaults as exposed to the MCP client.
    name: 'get_flow_data', description: 'Query network traffic flows from Firewalla firewall', inputSchema: { type: 'object', properties: { query: { type: 'string', description: 'Search query for flows. Supports region:US for geographic filtering, protocol:tcp, blocked:true, domain:*, category:social, etc.', }, groupBy: { type: 'string', description: 'Group flows by specified values (e.g., "domain,box")', }, sortBy: { type: 'string', description: 'Sort flows (default: "ts:desc")', }, limit: { type: 'number', description: 'Maximum results (optional, default: 200, API maximum: 500)', minimum: 1, maximum: 500, default: 200, }, cursor: { type: 'string', description: 'Pagination cursor from previous response', }, }, required: [], },
  • Registration of the GetFlowDataHandler instance in the central ToolRegistry during initialization of registerHandlers() method.
    // Network tools (1 handler - get_flow_data) this.register(new GetFlowDataHandler());
  • Limit configuration for 'get_flow_data' tool, setting maximum results to STANDARD_LIMITS.BASIC_QUERY (1000), used in parameter validation.
    get_flow_data: STANDARD_LIMITS.BASIC_QUERY,

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/amittell/firewalla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server