search_flows
Analyze network traffic by searching historical flows with advanced filters for protocols, locations, categories, and time ranges to identify patterns or investigate security events.
Instructions
Search network flows with advanced query filters. Use this for: historical analysis, specific time ranges, complex filtering, or when you need more than 50 flows. Supports pagination, time-based queries (e.g., "ts:>1h" for last hour), and all flow fields including geographic filtering. For quick "what's happening now" snapshots, use get_recent_flow_activity instead.
Input Schema
TableJSON Schema
| Name | Required | Description | Default |
|---|---|---|---|
| query | No | Search query using Firewalla syntax. Supported fields: protocol:tcp/udp, direction:inbound/outbound/local, blocked:true/false, bytes:>1MB, domain:*.example.com, region:US (country code), category:social/games/porn/etc, gid:box_id, device.ip:192.168.*, source_ip:*, destination_ip:*. Examples: "region:US AND protocol:tcp", "blocked:true AND bytes:>1MB", "category:social OR category:games" | |
| groupBy | No | Group flows by specified values (e.g., "domain,box") | |
| sortBy | No | Sort flows (default: "ts:desc") | |
| limit | No | Maximum results (optional, default: 200, API maximum: 500) | |
| cursor | No | Pagination cursor from previous response |
Implementation Reference
- src/tools/handlers/search.ts:306-652 (handler)Core handler class for 'search_flows' tool. Implements execute() method with full logic: input validation, Firewalla search API integration, flow data processing/normalization, geographic enrichment, caching control, retry/timeout handling, and standardized ToolResponse output.export class SearchFlowsHandler extends BaseToolHandler { name = 'search_flows'; description = `Advanced network flow searching with powerful query syntax and enhanced reliability. Data cached for 15 seconds, use force_refresh=true for real-time network analysis. Search through network traffic flows using complex queries with logical operators, wildcards, and field-specific filters. Features automatic boolean query translation for improved compatibility. REQUIRED PARAMETERS: - query: Search query string using flow field syntax OPTIONAL PARAMETERS: - limit: Maximum number of results to return (default: 200, max: 500) - force_refresh: Bypass cache for real-time data (default: false) - cursor: Pagination cursor from previous response - time_range: Time window for search (start/end timestamps) - sort_by: Field to sort results by - group_by: Field to group results by for aggregation - aggregate: Enable aggregation statistics QUERY EXAMPLES (with automatic boolean translation): - Boolean fields (both syntaxes supported): "blocked:true" OR "blocked=true", "allowed:false" OR "allowed=false" (automatically converted to backend format) - Basic field queries: "protocol:tcp", "source_ip:192.168.1.100", "destination_port:443" - Logical operators: "protocol:tcp AND blocked:false", "blocked=true OR allowed=false" - Wildcards: "source_ip:192.168.*", "destination_domain:*.facebook.com" - Ranges: "bytes:[1000 TO 50000]", "timestamp:>=2024-01-01" - Complex queries: "(protocol:tcp OR protocol:udp) AND source_ip:192.168.* NOT blocked=true" CACHE CONTROL: - Default: 15-second cache for optimal performance - Real-time: Use force_refresh=true for live network monitoring - Cache info included in responses for timing awareness PERFORMANCE TIPS: - Use specific time ranges for better performance: {"time_range": {"start": "2024-01-01T00:00:00Z", "end": "2024-01-02T00:00:00Z"}} - Limit results with reasonable values (100-1000) for faster responses - Use cursor for pagination with large datasets - Group by fields like "source_ip" or "protocol" for aggregated insights See the Query Syntax Guide for complete documentation: /docs/query-syntax-guide.md`; category = 'search' as const; constructor() { // Enable full standardization: geographic enrichment and field normalization for network flows super({ enableGeoEnrichment: true, // Network flows have IP addresses that require geographic enrichment enableFieldNormalization: true, // Ensure consistent snake_case field naming across all responses additionalMeta: { data_source: 'flows', entity_type: 'network_flows', supports_geographic_enrichment: true, supports_field_normalization: true, standardization_version: '2.0.0', }, }); } async execute( args: ToolArgs, firewalla: FirewallaClient ): Promise<ToolResponse> { const searchArgs = args as SearchFlowsArgs; const startTime = Date.now(); try { // Validate common search parameters const validation = validateCommonSearchParameters( searchArgs, this.name, 'flows' ); if (!validation.isValid) { return validation.response; } // Validate force_refresh parameter if provided const forceRefreshValidation = ParameterValidator.validateBoolean( searchArgs.force_refresh, 'force_refresh', false ); if (!forceRefreshValidation.isValid) { return createErrorResponse( this.name, 'Force refresh parameter validation failed', ErrorType.VALIDATION_ERROR, undefined, forceRefreshValidation.errors ); } const finalQuery = searchArgs.query; // ------------------------------------------------------------ // Validate geographic_filters if provided // ------------------------------------------------------------ if (searchArgs.geographic_filters !== undefined) { // Validate it's an object if ( typeof searchArgs.geographic_filters !== 'object' || searchArgs.geographic_filters === null ) { return createErrorResponse( this.name, 'Invalid geographic_filters parameter', ErrorType.VALIDATION_ERROR, { provided_value: searchArgs.geographic_filters, expected: 'object with optional fields: countries, continents, regions, cities, etc.', } ); } // Validate country codes if provided if ( searchArgs.geographic_filters.countries && searchArgs.geographic_filters.countries.length > 0 ) { const countryValidation = validateCountryCodes( searchArgs.geographic_filters.countries ); if (!countryValidation.valid) { return createErrorResponse( this.name, `Country code validation failed: Invalid country codes: ${countryValidation.invalid.join(', ')}`, ErrorType.VALIDATION_ERROR, { invalid_codes: countryValidation.invalid, valid_codes: countryValidation.valid, documentation: 'Country codes must be ISO 3166-1 alpha-2 format (e.g., US, CN, GB)', } ); } } } // ------------------------------------------------------------ // Validate include_analytics parameter if provided // ------------------------------------------------------------ const includeAnalyticsValidation = ParameterValidator.validateBoolean( searchArgs.include_analytics, 'include_analytics', false ); if (!includeAnalyticsValidation.isValid) { return createErrorResponse( this.name, 'Include analytics parameter validation failed', ErrorType.VALIDATION_ERROR, undefined, includeAnalyticsValidation.errors ); } const searchTools = createSearchTools(firewalla); const searchParams: SearchParams = { query: finalQuery, limit: searchArgs.limit, offset: searchArgs.offset, cursor: searchArgs.cursor, sort_by: searchArgs.sort_by, sort_order: searchArgs.sort_order, group_by: searchArgs.group_by, aggregate: searchArgs.aggregate, time_range: searchArgs.time_range, force_refresh: forceRefreshValidation.sanitizedValue as boolean, geographic_filters: searchArgs.geographic_filters, include_analytics: includeAnalyticsValidation.sanitizedValue as boolean, }; // Use retry logic for search operations as they can be prone to timeouts const result = await withRetryAndTimeout( async () => searchTools.search_flows(searchParams), this.name, { maxAttempts: 2, // Conservative retry for search operations initialDelayMs: 2000, // Wait 2 seconds before retry shouldRetry: (error, attempt) => { // Retry on timeouts and network errors, but not on validation errors if (error instanceof TimeoutError) { return true; } return isRetryableError(error) && attempt === 1; // Only retry once for search }, } ); const executionTime = Date.now() - startTime; // Process flow data with enhanced standardization let processedFlows = SafeAccess.safeArrayMap( (result as any).results, (flow: Flow) => ({ timestamp: unixToISOStringOrNow(flow.ts), source_ip: SafeAccess.getNestedValue( flow as any, 'source.ip', 'unknown' ), source_country: SafeAccess.getNestedValue( flow as any, 'source.geo.country', 'unknown' ), source_city: SafeAccess.getNestedValue( flow as any, 'source.geo.city', 'unknown' ), source_continent: SafeAccess.getNestedValue( flow as any, 'source.geo.continent', 'unknown' ), destination_ip: SafeAccess.getNestedValue( flow as any, 'destination.ip', 'unknown' ), destination_country: SafeAccess.getNestedValue( flow as any, 'destination.geo.country', 'unknown' ), destination_city: SafeAccess.getNestedValue( flow as any, 'destination.geo.city', 'unknown' ), destination_continent: SafeAccess.getNestedValue( flow as any, 'destination.geo.continent', 'unknown' ), protocol: SafeAccess.getNestedValue( flow as any, 'protocol', 'unknown' ), // bytes field is calculated as total traffic: download + upload bytes: (SafeAccess.getNestedValue(flow as any, 'download', 0) as number) + (SafeAccess.getNestedValue(flow as any, 'upload', 0) as number), blocked: SafeAccess.getNestedValue(flow as any, 'block', false), direction: SafeAccess.getNestedValue( flow as any, 'direction', 'unknown' ), device: SafeAccess.getNestedValue(flow as any, 'device', {}), }) ); // Apply geographic enrichment pipeline for IP addresses processedFlows = await this.enrichGeoIfNeeded(processedFlows, [ 'source_ip', 'destination_ip', ]); // Create metadata for standardized response const metadata: SearchMetadata = { query: SafeAccess.getNestedValue( result as any, 'query', searchArgs.query || '' ) as string, entityType: 'flows', executionTime: SafeAccess.getNestedValue( result as any, 'execution_time_ms', executionTime ) as number, cached: false, cursor: (result as any).next_cursor, hasMore: !!(result as any).next_cursor, limit: searchArgs.limit, aggregations: SafeAccess.getNestedValue( result as any, 'aggregations', null ) as Record<string, any> | undefined, }; // Create unified response with standardized metadata const unifiedResponseData = { flows: processedFlows, metadata, query_info: { original_query: searchArgs.query, final_query: finalQuery, applied_filters: { geographic: !!searchArgs.geographic_filters, time_range: !!searchArgs.time_range, analytics: !!searchArgs.include_analytics, }, }, }; // Return unified response return this.createUnifiedResponse(unifiedResponseData, { executionTimeMs: executionTime, }); } catch (error: unknown) { if (error instanceof TimeoutError) { return createTimeoutErrorResponse( this.name, error.duration, 10000 // Default timeout from timeout-manager ); } // Handle retry failure errors with enhanced context if (error instanceof Error && error.name === 'RetryFailureError') { const { retryContext } = error as any; const { userGuidance } = error as any; return createErrorResponse( this.name, `Search flows operation failed after ${retryContext?.attempts || 'multiple'} attempts: ${error.message}`, ErrorType.SEARCH_ERROR, { retry_attempts: retryContext?.attempts, total_duration_ms: retryContext?.totalDurationMs, final_error: retryContext?.originalError instanceof Error ? retryContext.originalError.message : 'Unknown error', }, userGuidance || [ 'Multiple retry attempts failed', 'Try reducing the scope of your search query', 'Check network connectivity and try again later', ] ); } const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'; return createErrorResponse( this.name, `Failed to search flows: ${errorMessage}`, ErrorType.SEARCH_ERROR ); } }
- src/tools/registry.ts:156-156 (registration)Tool registration in ToolRegistry constructor's registerHandlers() method.this.register(new SearchFlowsHandler());
- src/tools/registry.ts:67-73 (registration)Import statement for SearchFlowsHandler from handlers/search.tsimport { SearchFlowsHandler, SearchAlarmsHandler, SearchRulesHandler, SearchDevicesHandler, SearchTargetListsHandler, } from './handlers/search.js';
- src/tools/handlers/search.ts:44-74 (schema)TypeScript interfaces defining input schema (SearchFlowsArgs extending BaseSearchArgs) for tool parameters including query, pagination, sorting, time/geographic filters.export interface BaseSearchArgs extends ToolArgs { query: string; limit: number; offset?: number; cursor?: string; sort_by?: string; sort_order?: 'asc' | 'desc'; group_by?: string; aggregate?: boolean; force_refresh?: boolean; } // Search argument interfaces for type safety export interface SearchFlowsArgs extends BaseSearchArgs { time_range?: { start?: string; end?: string; }; geographic_filters?: { countries?: string[]; continents?: string[]; regions?: string[]; cities?: string[]; asns?: string[]; hosting_providers?: string[]; exclude_vpn?: boolean; exclude_cloud?: boolean; min_risk_score?: number; }; include_analytics?: boolean; }
- src/tools/handlers/search.ts:155-304 (helper)Shared validation helper function used by search_flows handler for common parameter validation (limit, query syntax/fields, cursor, group_by). Called at line 370 in execute().function validateCommonSearchParameters( args: BaseSearchArgs, toolName: string, entityType: 'flows' | 'alarms' | 'rules' | 'devices' | 'target_lists' ): CommonSearchValidationResult { // Validate optional limit parameter with default const limitValidation = ParameterValidator.validateNumber( args.limit, 'limit', { required: false, defaultValue: 200, ...getLimitValidationConfig(toolName), } ); if (!limitValidation.isValid) { return { isValid: false, response: createErrorResponse( toolName, 'Parameter validation failed', ErrorType.VALIDATION_ERROR, undefined, limitValidation.errors ), }; } // Validate required query parameter const queryValidation = ParameterValidator.validateRequiredString( args.query, 'query' ); if (!queryValidation.isValid) { return { isValid: false, response: createErrorResponse( toolName, 'Query parameter validation failed', ErrorType.VALIDATION_ERROR, undefined, queryValidation.errors ), }; } // Validate query syntax const querySyntaxValidation = validateFirewallaQuerySyntax(args.query); if (!querySyntaxValidation.isValid) { const examples = getExampleQueries(entityType); return { isValid: false, response: createErrorResponse( toolName, 'Invalid query syntax', ErrorType.VALIDATION_ERROR, { query: args.query, syntax_errors: querySyntaxValidation.errors, examples: examples.slice(0, 3), hint: 'Use field:value syntax with logical operators (AND, OR, NOT)', }, querySyntaxValidation.errors ), }; } // Validate field names in the query const fieldValidation = QuerySanitizer.validateQueryFields( args.query, entityType ); if (!fieldValidation.isValid) { return { isValid: false, response: createErrorResponse( toolName, 'Query contains invalid field names', ErrorType.VALIDATION_ERROR, { query: args.query, documentation: entityType === 'alarms' ? 'See /docs/error-handling-guide.md for troubleshooting' : 'See /docs/query-syntax-guide.md for valid field names', }, fieldValidation.errors ), }; } // Validate cursor format if provided if (args.cursor !== undefined) { const cursorValidation = ParameterValidator.validateCursor( args.cursor, 'cursor' ); if (!cursorValidation.isValid) { return { isValid: false, response: createErrorResponse( toolName, 'Invalid cursor format', ErrorType.VALIDATION_ERROR, undefined, cursorValidation.errors ), }; } } // Validate group_by parameter if provided if (args.group_by !== undefined) { const groupByValidation = ParameterValidator.validateEnum( args.group_by, 'group_by', SEARCH_FIELDS[entityType], false ); if (!groupByValidation.isValid) { return { isValid: false, response: createErrorResponse( toolName, 'Invalid group_by field', ErrorType.VALIDATION_ERROR, { group_by: args.group_by, valid_fields: SEARCH_FIELDS[entityType], documentation: 'See /docs/query-syntax-guide.md for valid fields', }, groupByValidation.errors ), }; } } return { isValid: true, limit: args.limit, query: args.query, cursor: args.cursor, groupBy: args.group_by, }; }