Skip to main content
Glama
amittell

firewalla-mcp-server

get_flow_data

Query network traffic flows from Firewalla firewall to monitor security, analyze network activity, and track bandwidth usage with filtering, grouping, and sorting options.

Instructions

Query network traffic flows from Firewalla firewall

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryNoSearch query for flows. Supports region:US for geographic filtering, protocol:tcp, blocked:true, domain:*, category:social, etc.
groupByNoGroup flows by specified values (e.g., "domain,box")
sortByNoSort flows (default: "ts:desc")
limitNoMaximum results (optional, default: 200, API maximum: 500)
cursorNoPagination cursor from previous response

Implementation Reference

  • The GetFlowDataHandler class provides the core implementation of the 'get_flow_data' tool. It handles parameter sanitization and validation, supports streaming and pagination, calls the Firewalla API (firewalla.getFlowData), processes and enriches the flow data (timestamps, IPs, geo, bytes, etc.), and returns standardized paginated responses.
    export class GetFlowDataHandler extends BaseToolHandler {
      name = 'get_flow_data';
      description =
        'Query network traffic flows with pagination. Data is cached for 15 seconds for performance. Use force_refresh=true to bypass cache for real-time data.';
      category = 'network' as const;
    
      constructor() {
        super({
          enableGeoEnrichment: true,
          enableFieldNormalization: true,
          additionalMeta: {
            data_source: 'flows',
            entity_type: 'network_flows',
            supports_geographic_enrichment: true,
            supports_field_normalization: true,
            supports_streaming: true,
            supports_pagination: true,
            standardization_version: '2.0.0',
          },
        });
      }
    
      async execute(
        rawArgs: unknown,
        firewalla: FirewallaClient
      ): Promise<ToolResponse> {
        // Early parameter sanitization to prevent null/undefined errors
        const sanitizationResult = this.sanitizeParameters(rawArgs);
    
        if ('errorResponse' in sanitizationResult) {
          return sanitizationResult.errorResponse;
        }
    
        const args = sanitizationResult.sanitizedArgs;
        const startTime = Date.now();
    
        try {
          // Parameter validation
          const limitValidation = ParameterValidator.validateNumber(
            args?.limit,
            'limit',
            {
              required: false,
              defaultValue: 200,
              ...getLimitValidationConfig(this.name),
            }
          );
    
          if (!limitValidation.isValid) {
            return this.createErrorResponse(
              'Parameter validation failed',
              ErrorType.VALIDATION_ERROR,
              undefined,
              limitValidation.errors
            );
          }
    
          const query = args?.query;
          const groupBy = args?.groupBy;
          const sortBy = args?.sortBy;
          const limit = limitValidation.sanitizedValue! as number;
          const cursor = args?.cursor;
    
          // Check if streaming is requested or should be automatically enabled
          const enableStreaming =
            Boolean(args?.stream) || shouldUseStreaming(this.name, limit);
          const streamingSessionId = args?.streaming_session_id as
            | string
            | undefined;
    
          // Validate individual date parameters before building query
          const startTimeArg = args?.start_time as string | undefined;
          const endTime = args?.end_time as string | undefined;
          let finalQuery = query;
    
          // Validate start_time if provided
          if (startTimeArg !== undefined) {
            const startTimeValidation = ParameterValidator.validateDateFormat(
              startTimeArg,
              'start_time',
              false
            );
            if (!startTimeValidation.isValid) {
              return this.createErrorResponse(
                'Invalid start_time format',
                ErrorType.VALIDATION_ERROR,
                {
                  provided_value: startTimeArg,
                  documentation:
                    'See /docs/query-syntax-guide.md for time range examples',
                },
                startTimeValidation.errors
              );
            }
          }
    
          // Validate end_time if provided
          if (endTime !== undefined) {
            const endTimeValidation = ParameterValidator.validateDateFormat(
              endTime,
              'end_time',
              false
            );
            if (!endTimeValidation.isValid) {
              return this.createErrorResponse(
                'Invalid end_time format',
                ErrorType.VALIDATION_ERROR,
                {
                  provided_value: endTime,
                  documentation:
                    'See /docs/query-syntax-guide.md for time range examples',
                },
                endTimeValidation.errors
              );
            }
          }
    
          // Validate cursor format if provided
          if (cursor !== undefined) {
            const cursorValidation = ParameterValidator.validateCursor(
              cursor,
              'cursor'
            );
            if (!cursorValidation.isValid) {
              return this.createErrorResponse(
                'Invalid cursor format',
                ErrorType.VALIDATION_ERROR,
                {
                  provided_value: cursor,
                  documentation:
                    'Cursors should be obtained from previous response next_cursor field',
                },
                cursorValidation.errors
              );
            }
          }
    
          // Build time range query if both dates are provided and valid
          if (startTimeArg && endTime) {
            const startDate = new Date(startTimeArg);
            const endDate = new Date(endTime);
    
            // Validate time range order (dates are already validated for format above)
            if (startDate >= endDate) {
              return this.createErrorResponse(
                'Invalid time range order',
                ErrorType.VALIDATION_ERROR,
                {
                  details: 'Start time must be before end time',
                  received: {
                    start_time: startTimeArg,
                    end_time: endTime,
                    parsed_start: startDate.toISOString(),
                    parsed_end: endDate.toISOString(),
                  },
                  time_difference: `Start is ${Math.abs(startDate.getTime() - endDate.getTime()) / 1000} seconds after end`,
                },
                [
                  'Ensure start_time is chronologically before end_time',
                  'Check timezone handling - times may be in different zones',
                  'Verify date format includes correct year/month/day values',
                  'For recent data, try: start_time: "2024-01-01T00:00:00Z", end_time: "2024-01-02T00:00:00Z"',
                ]
              );
            }
    
            const startTs = Math.floor(startDate.getTime() / 1000);
            const endTs = Math.floor(endDate.getTime() / 1000);
            const timeQuery = `ts:${startTs}-${endTs}`;
            finalQuery = query ? `(${query}) AND ${timeQuery}` : timeQuery;
          }
    
          // Handle streaming mode if enabled
          if (enableStreaming) {
            const streamingManager = StreamingManager.forTool(this.name);
    
            // Define the streaming operation
            const streamingOperation: StreamingOperation = async params => {
              const response = await withToolTimeout(
                async () =>
                  firewalla.getFlowData(
                    finalQuery,
                    groupBy,
                    sortBy,
                    params.limit || 100,
                    params.cursor
                  ),
                this.name
              );
    
              // Process flows for this chunk
              const processedFlows = SafeAccess.safeArrayMap(
                response.results,
                (flow: any) => ({
                  timestamp: unixToISOStringOrNow(flow.ts),
                  source_ip: SafeAccess.getNestedValue(
                    flow,
                    'source.ip',
                    SafeAccess.getNestedValue(flow, 'device.ip', 'unknown')
                  ),
                  destination_ip: SafeAccess.getNestedValue(
                    flow,
                    'destination.ip',
                    'unknown'
                  ),
                  protocol: SafeAccess.getNestedValue(flow, 'protocol', 'unknown'),
                  bytes:
                    (SafeAccess.getNestedValue(flow, 'download', 0) as number) +
                    (SafeAccess.getNestedValue(flow, 'upload', 0) as number),
                  download: SafeAccess.getNestedValue(flow, 'download', 0),
                  upload: SafeAccess.getNestedValue(flow, 'upload', 0),
                  packets: SafeAccess.getNestedValue(flow, 'count', 0),
                  duration: SafeAccess.getNestedValue(flow, 'duration', 0),
                  direction: SafeAccess.getNestedValue(
                    flow,
                    'direction',
                    'unknown'
                  ),
                  blocked: SafeAccess.getNestedValue(flow, 'block', false),
                  block_type: SafeAccess.getNestedValue(flow, 'blockType', null),
                  device: SafeAccess.getNestedValue(flow, 'device', {}),
                  source: SafeAccess.getNestedValue(flow, 'source', {}),
                  destination: SafeAccess.getNestedValue(flow, 'destination', {}),
                  region: SafeAccess.getNestedValue(flow, 'region', null),
                  category: SafeAccess.getNestedValue(flow, 'category', null),
                })
              );
    
              return {
                data: processedFlows,
                hasMore: !!response.next_cursor,
                nextCursor: response.next_cursor,
                total: (response as any).total_count,
              };
            };
    
            if (streamingSessionId) {
              // Continue existing streaming session
              const chunk = await streamingManager.continueStreaming(
                streamingSessionId,
                streamingOperation
              );
    
              if (!chunk) {
                return this.createErrorResponse(
                  'Failed to continue streaming session',
                  ErrorType.API_ERROR
                );
              }
    
              return createStreamingResponse(chunk);
            }
            // Start new streaming session
            const { firstChunk } = await streamingManager.startStreaming(
              this.name,
              streamingOperation,
              {
                query: finalQuery,
                groupBy,
                sortBy,
                limit,
                start_time: startTimeArg,
                end_time: endTime,
              }
            );
    
            return createStreamingResponse(firstChunk);
          }
    
          const response = await withToolTimeout(
            async () =>
              firewalla.getFlowData(finalQuery, groupBy, sortBy, limit, cursor),
            this.name
          );
          const executionTime = Date.now() - startTime;
    
          // Process flow data
          let processedFlows = SafeAccess.safeArrayMap(
            response.results,
            (flow: any) => ({
              timestamp: unixToISOStringOrNow(flow.ts),
              source_ip: SafeAccess.getNestedValue(
                flow,
                'source.ip',
                SafeAccess.getNestedValue(flow, 'device.ip', 'unknown')
              ),
              destination_ip: SafeAccess.getNestedValue(
                flow,
                'destination.ip',
                'unknown'
              ),
              protocol: SafeAccess.getNestedValue(flow, 'protocol', 'unknown'),
              bytes:
                (SafeAccess.getNestedValue(flow, 'download', 0) as number) +
                (SafeAccess.getNestedValue(flow, 'upload', 0) as number),
              download: SafeAccess.getNestedValue(flow, 'download', 0),
              upload: SafeAccess.getNestedValue(flow, 'upload', 0),
              packets: SafeAccess.getNestedValue(flow, 'count', 0),
              duration: SafeAccess.getNestedValue(flow, 'duration', 0),
              direction: SafeAccess.getNestedValue(flow, 'direction', 'unknown'),
              blocked: SafeAccess.getNestedValue(flow, 'block', false),
              block_type: SafeAccess.getNestedValue(flow, 'blockType', null),
              device: SafeAccess.getNestedValue(flow, 'device', {}),
              source: SafeAccess.getNestedValue(flow, 'source', {}),
              destination: SafeAccess.getNestedValue(flow, 'destination', {}),
              region: SafeAccess.getNestedValue(flow, 'region', null),
              category: SafeAccess.getNestedValue(flow, 'category', null),
            })
          );
    
          // Apply geographic enrichment for IP addresses
          processedFlows = await this.enrichGeoIfNeeded(processedFlows, [
            'source_ip',
            'destination_ip',
          ]);
    
          // Create metadata for standardized response
          const metadata: PaginationMetadata = {
            cursor: response.next_cursor,
            hasMore: !!response.next_cursor,
            limit,
            executionTime,
            cached: false,
            source: 'firewalla_api',
            queryParams: {
              query: finalQuery,
              groupBy,
              sortBy,
              limit,
              cursor,
              start_time: startTimeArg,
              end_time: endTime,
            },
            totalCount: (response as any).total_count,
          };
    
          // Create standardized response
          const standardResponse = ResponseStandardizer.toPaginatedResponse(
            processedFlows,
            metadata
          );
    
          return this.createUnifiedResponse(standardResponse, {
            executionTimeMs: executionTime,
          });
        } catch (error: unknown) {
          // Handle timeout errors specifically
          if (error instanceof TimeoutError) {
            return createTimeoutErrorResponse(
              this.name,
              error.duration,
              10000 // Default timeout from timeout-manager
            );
          }
    
          const errorMessage =
            error instanceof Error ? error.message : 'Unknown error occurred';
          return this.createErrorResponse(
            `Failed to get flow data: ${errorMessage}`,
            ErrorType.API_ERROR,
            { originalError: errorMessage }
          );
        }
      }
    }
  • MCP protocol schema definition for the 'get_flow_data' tool, specifying input parameters (query, groupBy, sortBy, limit, cursor), their types, descriptions, constraints, and defaults as exposed to the MCP client.
    name: 'get_flow_data',
    description: 'Query network traffic flows from Firewalla firewall',
    inputSchema: {
      type: 'object',
      properties: {
        query: {
          type: 'string',
          description:
            'Search query for flows. Supports region:US for geographic filtering, protocol:tcp, blocked:true, domain:*, category:social, etc.',
        },
        groupBy: {
          type: 'string',
          description:
            'Group flows by specified values (e.g., "domain,box")',
        },
        sortBy: {
          type: 'string',
          description: 'Sort flows (default: "ts:desc")',
        },
        limit: {
          type: 'number',
          description:
            'Maximum results (optional, default: 200, API maximum: 500)',
          minimum: 1,
          maximum: 500,
          default: 200,
        },
        cursor: {
          type: 'string',
          description: 'Pagination cursor from previous response',
        },
      },
      required: [],
    },
  • Registration of the GetFlowDataHandler instance in the central ToolRegistry during initialization of registerHandlers() method.
    // Network tools (1 handler - get_flow_data)
    this.register(new GetFlowDataHandler());
  • Limit configuration for 'get_flow_data' tool, setting maximum results to STANDARD_LIMITS.BASIC_QUERY (1000), used in parameter validation.
    get_flow_data: STANDARD_LIMITS.BASIC_QUERY,
Behavior2/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries full burden for behavioral disclosure. It mentions querying but doesn't describe what kind of data is returned, whether this is a real-time or historical query, rate limits, authentication requirements, or potential side effects. The description is too minimal for a tool with 5 parameters and no output schema.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is a single, efficient sentence that gets straight to the point with zero wasted words. It's appropriately sized for a query tool and front-loads the essential information about what the tool does.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness2/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

For a query tool with 5 parameters, no annotations, and no output schema, the description is insufficient. It doesn't explain what data structure is returned, what time range is covered, whether results are paginated beyond the cursor parameter, or how this differs from similar sibling tools. The description leaves too many contextual gaps.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

Schema description coverage is 100%, so the schema already documents all 5 parameters thoroughly with examples and constraints. The description adds no additional parameter information beyond what's in the schema, which meets the baseline expectation when schema coverage is high.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose4/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the verb ('Query') and resource ('network traffic flows from Firewalla firewall'), providing a specific purpose. However, it doesn't distinguish this tool from sibling tools like 'search_flows' or 'get_recent_flow_activity', which appear to serve similar querying functions.

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines2/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides no guidance on when to use this tool versus alternatives like 'search_flows' or 'get_recent_flow_activity'. There's no mention of prerequisites, typical use cases, or exclusions that would help an agent choose between these similar-sounding tools.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/amittell/firewalla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server