Skip to main content
Glama
amittell

firewalla-mcp-server

search_flows

Analyze network traffic by searching historical flows with advanced filters for protocols, locations, categories, and time ranges to identify patterns or investigate security events.

Instructions

Search network flows with advanced query filters. Use this for: historical analysis, specific time ranges, complex filtering, or when you need more than 50 flows. Supports pagination, time-based queries (e.g., "ts:>1h" for last hour), and all flow fields including geographic filtering. For quick "what's happening now" snapshots, use get_recent_flow_activity instead.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
queryNoSearch query using Firewalla syntax. Supported fields: protocol:tcp/udp, direction:inbound/outbound/local, blocked:true/false, bytes:>1MB, domain:*.example.com, region:US (country code), category:social/games/porn/etc, gid:box_id, device.ip:192.168.*, source_ip:*, destination_ip:*. Examples: "region:US AND protocol:tcp", "blocked:true AND bytes:>1MB", "category:social OR category:games"
groupByNoGroup flows by specified values (e.g., "domain,box")
sortByNoSort flows (default: "ts:desc")
limitNoMaximum results (optional, default: 200, API maximum: 500)
cursorNoPagination cursor from previous response

Implementation Reference

  • Core handler class for 'search_flows' tool. Implements execute() method with full logic: input validation, Firewalla search API integration, flow data processing/normalization, geographic enrichment, caching control, retry/timeout handling, and standardized ToolResponse output.
    export class SearchFlowsHandler extends BaseToolHandler {
      name = 'search_flows';
      description = `Advanced network flow searching with powerful query syntax and enhanced reliability. Data cached for 15 seconds, use force_refresh=true for real-time network analysis.
      
    Search through network traffic flows using complex queries with logical operators, wildcards, and field-specific filters. Features automatic boolean query translation for improved compatibility.
    
    REQUIRED PARAMETERS:
    - query: Search query string using flow field syntax
    
    OPTIONAL PARAMETERS:
    - limit: Maximum number of results to return (default: 200, max: 500)
    - force_refresh: Bypass cache for real-time data (default: false)
    - cursor: Pagination cursor from previous response
    - time_range: Time window for search (start/end timestamps)
    - sort_by: Field to sort results by
    - group_by: Field to group results by for aggregation
    - aggregate: Enable aggregation statistics
    
    QUERY EXAMPLES (with automatic boolean translation):
    - Boolean fields (both syntaxes supported): "blocked:true" OR "blocked=true", "allowed:false" OR "allowed=false" (automatically converted to backend format)
    - Basic field queries: "protocol:tcp", "source_ip:192.168.1.100", "destination_port:443"
    - Logical operators: "protocol:tcp AND blocked:false", "blocked=true OR allowed=false"
    - Wildcards: "source_ip:192.168.*", "destination_domain:*.facebook.com"
    - Ranges: "bytes:[1000 TO 50000]", "timestamp:>=2024-01-01"
    - Complex queries: "(protocol:tcp OR protocol:udp) AND source_ip:192.168.* NOT blocked=true"
    
    CACHE CONTROL:
    - Default: 15-second cache for optimal performance
    - Real-time: Use force_refresh=true for live network monitoring
    - Cache info included in responses for timing awareness
    
    PERFORMANCE TIPS:
    - Use specific time ranges for better performance: {"time_range": {"start": "2024-01-01T00:00:00Z", "end": "2024-01-02T00:00:00Z"}}
    - Limit results with reasonable values (100-1000) for faster responses
    - Use cursor for pagination with large datasets
    - Group by fields like "source_ip" or "protocol" for aggregated insights
    
    See the Query Syntax Guide for complete documentation: /docs/query-syntax-guide.md`;
      category = 'search' as const;
    
      constructor() {
        // Enable full standardization: geographic enrichment and field normalization for network flows
        super({
          enableGeoEnrichment: true, // Network flows have IP addresses that require geographic enrichment
          enableFieldNormalization: true, // Ensure consistent snake_case field naming across all responses
          additionalMeta: {
            data_source: 'flows',
            entity_type: 'network_flows',
            supports_geographic_enrichment: true,
            supports_field_normalization: true,
            standardization_version: '2.0.0',
          },
        });
      }
    
      async execute(
        args: ToolArgs,
        firewalla: FirewallaClient
      ): Promise<ToolResponse> {
        const searchArgs = args as SearchFlowsArgs;
        const startTime = Date.now();
    
        try {
          // Validate common search parameters
          const validation = validateCommonSearchParameters(
            searchArgs,
            this.name,
            'flows'
          );
    
          if (!validation.isValid) {
            return validation.response;
          }
    
          // Validate force_refresh parameter if provided
          const forceRefreshValidation = ParameterValidator.validateBoolean(
            searchArgs.force_refresh,
            'force_refresh',
            false
          );
    
          if (!forceRefreshValidation.isValid) {
            return createErrorResponse(
              this.name,
              'Force refresh parameter validation failed',
              ErrorType.VALIDATION_ERROR,
              undefined,
              forceRefreshValidation.errors
            );
          }
    
          const finalQuery = searchArgs.query;
    
          // ------------------------------------------------------------
          // Validate geographic_filters if provided
          // ------------------------------------------------------------
          if (searchArgs.geographic_filters !== undefined) {
            // Validate it's an object
            if (
              typeof searchArgs.geographic_filters !== 'object' ||
              searchArgs.geographic_filters === null
            ) {
              return createErrorResponse(
                this.name,
                'Invalid geographic_filters parameter',
                ErrorType.VALIDATION_ERROR,
                {
                  provided_value: searchArgs.geographic_filters,
                  expected:
                    'object with optional fields: countries, continents, regions, cities, etc.',
                }
              );
            }
    
            // Validate country codes if provided
            if (
              searchArgs.geographic_filters.countries &&
              searchArgs.geographic_filters.countries.length > 0
            ) {
              const countryValidation = validateCountryCodes(
                searchArgs.geographic_filters.countries
              );
              if (!countryValidation.valid) {
                return createErrorResponse(
                  this.name,
                  `Country code validation failed: Invalid country codes: ${countryValidation.invalid.join(', ')}`,
                  ErrorType.VALIDATION_ERROR,
                  {
                    invalid_codes: countryValidation.invalid,
                    valid_codes: countryValidation.valid,
                    documentation:
                      'Country codes must be ISO 3166-1 alpha-2 format (e.g., US, CN, GB)',
                  }
                );
              }
            }
          }
    
          // ------------------------------------------------------------
          // Validate include_analytics parameter if provided
          // ------------------------------------------------------------
          const includeAnalyticsValidation = ParameterValidator.validateBoolean(
            searchArgs.include_analytics,
            'include_analytics',
            false
          );
    
          if (!includeAnalyticsValidation.isValid) {
            return createErrorResponse(
              this.name,
              'Include analytics parameter validation failed',
              ErrorType.VALIDATION_ERROR,
              undefined,
              includeAnalyticsValidation.errors
            );
          }
    
          const searchTools = createSearchTools(firewalla);
          const searchParams: SearchParams = {
            query: finalQuery,
            limit: searchArgs.limit,
            offset: searchArgs.offset,
            cursor: searchArgs.cursor,
            sort_by: searchArgs.sort_by,
            sort_order: searchArgs.sort_order,
            group_by: searchArgs.group_by,
            aggregate: searchArgs.aggregate,
            time_range: searchArgs.time_range,
            force_refresh: forceRefreshValidation.sanitizedValue as boolean,
            geographic_filters: searchArgs.geographic_filters,
            include_analytics: includeAnalyticsValidation.sanitizedValue as boolean,
          };
    
          // Use retry logic for search operations as they can be prone to timeouts
          const result = await withRetryAndTimeout(
            async () => searchTools.search_flows(searchParams),
            this.name,
            {
              maxAttempts: 2, // Conservative retry for search operations
              initialDelayMs: 2000, // Wait 2 seconds before retry
              shouldRetry: (error, attempt) => {
                // Retry on timeouts and network errors, but not on validation errors
                if (error instanceof TimeoutError) {
                  return true;
                }
                return isRetryableError(error) && attempt === 1; // Only retry once for search
              },
            }
          );
          const executionTime = Date.now() - startTime;
    
          // Process flow data with enhanced standardization
          let processedFlows = SafeAccess.safeArrayMap(
            (result as any).results,
            (flow: Flow) => ({
              timestamp: unixToISOStringOrNow(flow.ts),
              source_ip: SafeAccess.getNestedValue(
                flow as any,
                'source.ip',
                'unknown'
              ),
              source_country: SafeAccess.getNestedValue(
                flow as any,
                'source.geo.country',
                'unknown'
              ),
              source_city: SafeAccess.getNestedValue(
                flow as any,
                'source.geo.city',
                'unknown'
              ),
              source_continent: SafeAccess.getNestedValue(
                flow as any,
                'source.geo.continent',
                'unknown'
              ),
              destination_ip: SafeAccess.getNestedValue(
                flow as any,
                'destination.ip',
                'unknown'
              ),
              destination_country: SafeAccess.getNestedValue(
                flow as any,
                'destination.geo.country',
                'unknown'
              ),
              destination_city: SafeAccess.getNestedValue(
                flow as any,
                'destination.geo.city',
                'unknown'
              ),
              destination_continent: SafeAccess.getNestedValue(
                flow as any,
                'destination.geo.continent',
                'unknown'
              ),
              protocol: SafeAccess.getNestedValue(
                flow as any,
                'protocol',
                'unknown'
              ),
              // bytes field is calculated as total traffic: download + upload
              bytes:
                (SafeAccess.getNestedValue(flow as any, 'download', 0) as number) +
                (SafeAccess.getNestedValue(flow as any, 'upload', 0) as number),
              blocked: SafeAccess.getNestedValue(flow as any, 'block', false),
              direction: SafeAccess.getNestedValue(
                flow as any,
                'direction',
                'unknown'
              ),
              device: SafeAccess.getNestedValue(flow as any, 'device', {}),
            })
          );
    
          // Apply geographic enrichment pipeline for IP addresses
          processedFlows = await this.enrichGeoIfNeeded(processedFlows, [
            'source_ip',
            'destination_ip',
          ]);
    
          // Create metadata for standardized response
          const metadata: SearchMetadata = {
            query: SafeAccess.getNestedValue(
              result as any,
              'query',
              searchArgs.query || ''
            ) as string,
            entityType: 'flows',
            executionTime: SafeAccess.getNestedValue(
              result as any,
              'execution_time_ms',
              executionTime
            ) as number,
            cached: false,
            cursor: (result as any).next_cursor,
            hasMore: !!(result as any).next_cursor,
            limit: searchArgs.limit,
            aggregations: SafeAccess.getNestedValue(
              result as any,
              'aggregations',
              null
            ) as Record<string, any> | undefined,
          };
    
          // Create unified response with standardized metadata
          const unifiedResponseData = {
            flows: processedFlows,
            metadata,
            query_info: {
              original_query: searchArgs.query,
              final_query: finalQuery,
              applied_filters: {
                geographic: !!searchArgs.geographic_filters,
                time_range: !!searchArgs.time_range,
                analytics: !!searchArgs.include_analytics,
              },
            },
          };
    
          // Return unified response
          return this.createUnifiedResponse(unifiedResponseData, {
            executionTimeMs: executionTime,
          });
        } catch (error: unknown) {
          if (error instanceof TimeoutError) {
            return createTimeoutErrorResponse(
              this.name,
              error.duration,
              10000 // Default timeout from timeout-manager
            );
          }
    
          // Handle retry failure errors with enhanced context
          if (error instanceof Error && error.name === 'RetryFailureError') {
            const { retryContext } = error as any;
            const { userGuidance } = error as any;
    
            return createErrorResponse(
              this.name,
              `Search flows operation failed after ${retryContext?.attempts || 'multiple'} attempts: ${error.message}`,
              ErrorType.SEARCH_ERROR,
              {
                retry_attempts: retryContext?.attempts,
                total_duration_ms: retryContext?.totalDurationMs,
                final_error:
                  retryContext?.originalError instanceof Error
                    ? retryContext.originalError.message
                    : 'Unknown error',
              },
              userGuidance || [
                'Multiple retry attempts failed',
                'Try reducing the scope of your search query',
                'Check network connectivity and try again later',
              ]
            );
          }
    
          const errorMessage =
            error instanceof Error ? error.message : 'Unknown error occurred';
          return createErrorResponse(
            this.name,
            `Failed to search flows: ${errorMessage}`,
            ErrorType.SEARCH_ERROR
          );
        }
      }
  • Tool registration in ToolRegistry constructor's registerHandlers() method.
    this.register(new SearchFlowsHandler());
  • Import statement for SearchFlowsHandler from handlers/search.ts
    import {
      SearchFlowsHandler,
      SearchAlarmsHandler,
      SearchRulesHandler,
      SearchDevicesHandler,
      SearchTargetListsHandler,
    } from './handlers/search.js';
  • TypeScript interfaces defining input schema (SearchFlowsArgs extending BaseSearchArgs) for tool parameters including query, pagination, sorting, time/geographic filters.
    export interface BaseSearchArgs extends ToolArgs {
      query: string;
      limit: number;
      offset?: number;
      cursor?: string;
      sort_by?: string;
      sort_order?: 'asc' | 'desc';
      group_by?: string;
      aggregate?: boolean;
      force_refresh?: boolean;
    }
    
    // Search argument interfaces for type safety
    export interface SearchFlowsArgs extends BaseSearchArgs {
      time_range?: {
        start?: string;
        end?: string;
      };
      geographic_filters?: {
        countries?: string[];
        continents?: string[];
        regions?: string[];
        cities?: string[];
        asns?: string[];
        hosting_providers?: string[];
        exclude_vpn?: boolean;
        exclude_cloud?: boolean;
        min_risk_score?: number;
      };
      include_analytics?: boolean;
    }
  • Shared validation helper function used by search_flows handler for common parameter validation (limit, query syntax/fields, cursor, group_by). Called at line 370 in execute().
    function validateCommonSearchParameters(
      args: BaseSearchArgs,
      toolName: string,
      entityType: 'flows' | 'alarms' | 'rules' | 'devices' | 'target_lists'
    ): CommonSearchValidationResult {
      // Validate optional limit parameter with default
      const limitValidation = ParameterValidator.validateNumber(
        args.limit,
        'limit',
        {
          required: false,
          defaultValue: 200,
          ...getLimitValidationConfig(toolName),
        }
      );
    
      if (!limitValidation.isValid) {
        return {
          isValid: false,
          response: createErrorResponse(
            toolName,
            'Parameter validation failed',
            ErrorType.VALIDATION_ERROR,
            undefined,
            limitValidation.errors
          ),
        };
      }
    
      // Validate required query parameter
      const queryValidation = ParameterValidator.validateRequiredString(
        args.query,
        'query'
      );
    
      if (!queryValidation.isValid) {
        return {
          isValid: false,
          response: createErrorResponse(
            toolName,
            'Query parameter validation failed',
            ErrorType.VALIDATION_ERROR,
            undefined,
            queryValidation.errors
          ),
        };
      }
    
      // Validate query syntax
      const querySyntaxValidation = validateFirewallaQuerySyntax(args.query);
    
      if (!querySyntaxValidation.isValid) {
        const examples = getExampleQueries(entityType);
        return {
          isValid: false,
          response: createErrorResponse(
            toolName,
            'Invalid query syntax',
            ErrorType.VALIDATION_ERROR,
            {
              query: args.query,
              syntax_errors: querySyntaxValidation.errors,
              examples: examples.slice(0, 3),
              hint: 'Use field:value syntax with logical operators (AND, OR, NOT)',
            },
            querySyntaxValidation.errors
          ),
        };
      }
    
      // Validate field names in the query
      const fieldValidation = QuerySanitizer.validateQueryFields(
        args.query,
        entityType
      );
    
      if (!fieldValidation.isValid) {
        return {
          isValid: false,
          response: createErrorResponse(
            toolName,
            'Query contains invalid field names',
            ErrorType.VALIDATION_ERROR,
            {
              query: args.query,
              documentation:
                entityType === 'alarms'
                  ? 'See /docs/error-handling-guide.md for troubleshooting'
                  : 'See /docs/query-syntax-guide.md for valid field names',
            },
            fieldValidation.errors
          ),
        };
      }
    
      // Validate cursor format if provided
      if (args.cursor !== undefined) {
        const cursorValidation = ParameterValidator.validateCursor(
          args.cursor,
          'cursor'
        );
        if (!cursorValidation.isValid) {
          return {
            isValid: false,
            response: createErrorResponse(
              toolName,
              'Invalid cursor format',
              ErrorType.VALIDATION_ERROR,
              undefined,
              cursorValidation.errors
            ),
          };
        }
      }
    
      // Validate group_by parameter if provided
      if (args.group_by !== undefined) {
        const groupByValidation = ParameterValidator.validateEnum(
          args.group_by,
          'group_by',
          SEARCH_FIELDS[entityType],
          false
        );
    
        if (!groupByValidation.isValid) {
          return {
            isValid: false,
            response: createErrorResponse(
              toolName,
              'Invalid group_by field',
              ErrorType.VALIDATION_ERROR,
              {
                group_by: args.group_by,
                valid_fields: SEARCH_FIELDS[entityType],
                documentation: 'See /docs/query-syntax-guide.md for valid fields',
              },
              groupByValidation.errors
            ),
          };
        }
      }
    
      return {
        isValid: true,
        limit: args.limit,
        query: args.query,
        cursor: args.cursor,
        groupBy: args.group_by,
      };
    }
Behavior4/5

Does the description disclose side effects, auth requirements, rate limits, or destructive behavior?

With no annotations provided, the description carries the full burden of behavioral disclosure. It effectively describes key behavioral traits: it supports pagination (implied by 'cursor' parameter), time-based queries, complex filtering across all flow fields, and geographic filtering. It also mentions the scale ('more than 50 flows'), which helps set expectations. However, it doesn't explicitly state whether this is a read-only operation or if it has any side effects, which would be valuable for a search tool.

Agents need to know what a tool does to the world before calling it. Descriptions should go beyond structured annotations to explain consequences.

Conciseness5/5

Is the description appropriately sized, front-loaded, and free of redundancy?

The description is well-structured and front-loaded with the core purpose, followed by usage guidelines and behavioral details. Every sentence adds value: the first states the purpose, the second lists use cases, the third describes capabilities, and the fourth provides a clear alternative. There is no wasted text or redundancy.

Shorter descriptions cost fewer tokens and are easier for agents to parse. Every sentence should earn its place.

Completeness4/5

Given the tool's complexity, does the description cover enough for an agent to succeed on first attempt?

Given the tool's complexity (5 parameters, no annotations, no output schema), the description does a good job of covering purpose, usage, and key behaviors. However, without annotations or an output schema, it could benefit from more explicit details on the response format (e.g., what data is returned, structure of results) and any limitations beyond the 50-flow mention. It's largely complete but has minor gaps in output expectations.

Complex tools with many parameters or behaviors need more documentation. Simple tools need less. This dimension scales expectations accordingly.

Parameters3/5

Does the description clarify parameter syntax, constraints, interactions, or defaults beyond what the schema provides?

The schema description coverage is 100%, so the schema already documents all 5 parameters thoroughly. The description adds minimal parameter-specific semantics beyond the schema—it mentions 'time-based queries' and 'geographic filtering' which relate to the 'query' parameter, but doesn't provide additional syntax or format details. This meets the baseline of 3 when the schema does the heavy lifting.

Input schemas describe structure but not intent. Descriptions should explain non-obvious parameter relationships and valid value ranges.

Purpose5/5

Does the description clearly state what the tool does and how it differs from similar tools?

The description clearly states the tool's purpose with specific verbs ('search network flows') and resources ('flows'), and explicitly distinguishes it from its sibling 'get_recent_flow_activity' by contrasting use cases. It goes beyond a simple restatement of the name by specifying the type of search ('advanced query filters').

Agents choose between tools based on descriptions. A clear purpose with a specific verb and resource helps agents select the right tool.

Usage Guidelines5/5

Does the description explain when to use this tool, when not to, or what alternatives exist?

The description provides explicit guidance on when to use this tool ('historical analysis, specific time ranges, complex filtering, or when you need more than 50 flows') and when to use an alternative ('For quick "what's happening now" snapshots, use get_recent_flow_activity instead'). This covers both inclusion and exclusion criteria with named alternatives.

Agents often have multiple tools that could apply. Explicit usage guidance like "use X instead of Y when Z" prevents misuse.

Install Server

Other Tools

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/amittell/firewalla-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server