Skip to main content
Glama
inkog-io

inkog

inkog_audit_a2a

Audit Agent-to-Agent communications to detect delegation loops, privilege escalation, and data leakage in multi-agent systems using protocols like A2A, CrewAI, LangGraph, and AutoGen.

Instructions

Audit Agent-to-Agent (A2A) communications in multi-agent systems. Detects infinite delegation loops, privilege escalation, data leakage between agents, and unauthorized handoffs. Supports Google A2A protocol, CrewAI, LangGraph, and AutoGen. Use this when building or reviewing multi-agent systems to detect delegation vulnerabilities.

Input Schema

TableJSON Schema
NameRequiredDescriptionDefault
pathYesPath to multi-agent system codebase
protocolNoMulti-agent protocol hint (optional, will auto-detect if not specified)
check_delegation_chainsNoCheck for infinite delegation loops and unauthorized handoffs

Implementation Reference

  • The handler function for the 'inkog_audit_a2a' tool, which performs the multi-agent security audit by calling the Inkog API.
    async function auditA2AHandler(rawArgs: Record<string, unknown>): Promise<ToolResult> {
      // Validate arguments
      const parseResult = A2AArgsSchema.safeParse(rawArgs);
      if (!parseResult.success) {
        return {
          content: [
            {
              type: 'text',
              text: `Invalid arguments: ${parseResult.error.message}`,
            },
          ],
          isError: true,
        };
      }
    
      const args: A2AArgs = parseResult.data;
    
      try {
        // Read files from path
        const readResult = readDirectory(args.path);
    
        if (readResult.files.length === 0) {
          return {
            content: [
              {
                type: 'text',
                text: `No files found in: ${args.path}`,
              },
            ],
            isError: true,
          };
        }
    
        // Get relative paths
        const files = getRelativePaths(readResult.files, args.path);
    
        // Call Inkog API - first scan, then audit A2A
        const client = getClient();
    
        // Step 1: Run a scan to get a scan_id
        const scanResponse = await client.scan(files, { policy: 'balanced' });
        if (!scanResponse.success || !scanResponse.scan_id) {
          return {
            content: [
              {
                type: 'text',
                text: 'Scan failed: Unable to analyze files',
              },
            ],
            isError: true,
          };
        }
    
        // Step 2: Use scan_id to audit A2A
        const a2aOptions: { protocol?: A2AProtocol; checkDelegationChains?: boolean; scanId?: string } = {
          checkDelegationChains: args.check_delegation_chains,
          scanId: scanResponse.scan_id,
        };
        if (args.protocol) {
          a2aOptions.protocol = args.protocol;
        }
        const response = await client.auditA2A([], a2aOptions);
    
        // Build formatted output
        let output = '╔══════════════════════════════════════════════════════╗\n';
        output += '║        🤖 Agent-to-Agent Security Audit               ║\n';
        output += '╚══════════════════════════════════════════════════════╝\n\n';
    
        // Warning if topology is incomplete
        if (response.warning) {
          output += `⚠️  ${response.warning}\n\n`;
        }
    
        // Overview
        output += `📡 Protocol: ${formatProtocol(response.protocol)}\n`;
        output += `🤖 Agents Detected: ${safeLength(response.agents)}\n`;
        output += `🔗 Communication Channels: ${safeLength(response.communications)}\n`;
    
        // Risk assessment
        if (response.risk_assessment) {
          output += `📊 Overall Risk: ${formatRiskLevel(response.risk_assessment.overall_risk)}\n`;
        }
        output += '\n';
    
        // Trust analysis warnings
        if (response.trust_analysis) {
          const ta = response.trust_analysis;
          if (ta.circular_delegations && ta.circular_delegations.length > 0) {
            output += '⚠️  WARNING: Circular delegation chains detected (potential infinite loops)\n';
            for (const cycle of ta.circular_delegations) {
              output += `   Cycle: ${cycle.join(' → ')}\n`;
            }
            output += '\n';
          }
          if (ta.unguarded_delegations > 0) {
            output += `⚠️  WARNING: ${ta.unguarded_delegations} unguarded delegation(s) detected\n`;
          }
          if (ta.privilege_escalations > 0) {
            output += `⚠️  WARNING: ${ta.privilege_escalations} potential privilege escalation(s)\n`;
          }
          if (ta.cross_boundary_flows > 0) {
            output += `ℹ️  ${ta.cross_boundary_flows} cross-trust-boundary flow(s) detected\n`;
          }
          output += '\n';
        }
    
        // Findings summary
        const findings = safeArray(response.findings);
        if (findings.length === 0) {
          output += '✅ No multi-agent security issues detected!\n\n';
        } else {
          const critical = findings.filter((f) => f.severity.toUpperCase() === 'CRITICAL').length;
          const high = findings.filter((f) => f.severity.toUpperCase() === 'HIGH').length;
          const medium = findings.filter((f) => f.severity.toUpperCase() === 'MEDIUM').length;
          const low = findings.filter((f) => f.severity.toUpperCase() === 'LOW').length;
    
          output += `📋 Security Issues: ${findings.length}\n`;
          output += `   🔴 Critical: ${critical} | 🟠 High: ${high} | 🟡 Medium: ${medium} | 🟢 Low: ${low}\n\n`;
        }
    
        // Agent inventory
        const agents = safeArray(response.agents);
        if (agents.length > 0) {
          output += '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n';
          output += '🤖 AGENT INVENTORY\n\n';
          for (const agent of agents) {
            output += formatAgent(agent) + '\n';
          }
        }
    
        // Delegation graph visualization
        const communications = safeArray(response.communications);
        if (communications.length > 0) {
          output += '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n';
          output += '🔗 DELEGATION GRAPH\n';
          output += renderDelegationGraph(agents, communications);
          output += '\n';
    
          output += 'Communication Channels:\n';
          for (const comm of communications) {
            output += formatCommunication(comm) + '\n';
          }
          output += '\n';
          output += 'Legend: 🛡️  = has permission guards, ⚠️  = no guards, 🔐 = authenticated\n\n';
        }
    
        // Detailed findings
        if (findings.length > 0) {
          output += '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n';
          output += '🔍 SECURITY FINDINGS\n\n';
    
          // Group by type
          const groupedFindings = new Map<string, A2AFinding[]>();
          for (const finding of findings) {
            const type = finding.type;
            if (!groupedFindings.has(type)) {
              groupedFindings.set(type, []);
            }
            groupedFindings.get(type)!.push(finding);
          }
    
          for (const [type, typeFindings] of groupedFindings) {
            output += `${formatFindingType(type)}\n\n`;
            for (const finding of typeFindings) {
              output += formatFinding(finding) + '\n\n';
            }
          }
        }
    
        // Trust boundaries
        const trustBoundaries = safeArray(response.trust_analysis?.trust_boundaries);
        if (trustBoundaries.length > 0) {
          output += '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n';
          output += '🛡️  TRUST BOUNDARIES\n\n';
          for (const boundary of trustBoundaries) {
            output += `📦 ${boundary.name} [${boundary.trust_level}]\n`;
            if (boundary.description) {
              output += `   ${boundary.description}\n`;
            }
            output += `   Agents: ${safeJoin(boundary.agent_ids)}\n\n`;
          }
        }
    
        // Recommendations
        const recommendations = safeArray(response.risk_assessment?.recommendations);
        if (recommendations.length > 0) {
          output += '━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n\n';
          output += '💡 RECOMMENDATIONS\n\n';
          for (let i = 0; i < recommendations.length; i++) {
            output += `${i + 1}. ${recommendations[i]}\n`;
          }
        }
    
        // Risk summary
        if (response.risk_assessment?.summary) {
          output += '\n📊 SUMMARY\n';
          output += `   ${response.risk_assessment.summary}\n`;
        }
    
        // Footer
        output += '\n━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━\n';
        output += 'Multi-Agent Security Audit powered by Inkog AI Security Platform\n';
        output += 'Learn more: https://inkog.io/multi-agent-security\n';
    
        return {
          content: [
            {
              type: 'text',
              text: output,
            },
          ],
        };
      } catch (error) {
        if (error instanceof InkogAuthError) {
          return {
            content: [
              {
                type: 'text',
                text: '🔐 API Key Required\n\nGet your free key at https://app.inkog.io',
              },
            ],
            isError: true,
          };
        }
    
        if (error instanceof InkogRateLimitError) {
          return {
            content: [
              {
                type: 'text',
                text: `⏱️ Rate Limited\n\nToo many requests. Please retry after ${error.retryAfter} seconds.`,
              },
            ],
            isError: true,
          };
        }
    
        if (error instanceof InkogNetworkError) {
          return {
            content: [
              {
                type: 'text',
                text: `Network error: ${error.message}`,
              },
            ],
            isError: true,
          };
        }
    
        if (error instanceof InkogApiError) {
          return {
            content: [
              {
                type: 'text',
                text: `API error: ${error.message}${error.details ? `\n\nDetails: ${JSON.stringify(error.details)}` : ''}`,
              },
            ],
            isError: true,
          };
        }
    
        const message = error instanceof Error ? error.message : 'Unknown error occurred';
        return {
          content: [
            {
              type: 'text',
              text: `Error: ${message}`,
            },
          ],
          isError: true,
        };
      }
    }
  • Input validation schema for the 'inkog_audit_a2a' tool using zod.
    const A2AArgsSchema = z.object({
      path: z.string().describe('Path to multi-agent system codebase'),
      protocol: z
        .enum(['a2a', 'crewai', 'langgraph', 'autogen', 'custom', 'unknown'])
        .optional()
        .describe('Multi-agent protocol hint: a2a (Google), crewai, langgraph, autogen, or leave empty for auto-detect'),
      check_delegation_chains: z
        .boolean()
        .optional()
        .default(true)
        .describe('Check for infinite delegation loops and unauthorized handoffs'),
    });
    
    type A2AArgs = z.infer<typeof A2AArgsSchema>;
  • Registration of the 'inkog_audit_a2a' tool definition.
    export const auditA2aTool: ToolDefinition = {
      tool: {
        name: 'inkog_audit_a2a',
        description:
          'Audit Agent-to-Agent (A2A) communications in multi-agent systems. Detects infinite delegation loops, privilege escalation, data leakage between agents, and unauthorized handoffs. Supports Google A2A protocol, CrewAI, LangGraph, and AutoGen. Use this when building or reviewing multi-agent systems to detect delegation vulnerabilities.',
        inputSchema: {
          type: 'object',
          properties: {
            path: {
              type: 'string',
              description: 'Path to multi-agent system codebase',
            },
            protocol: {
              type: 'string',
              enum: ['a2a', 'crewai', 'langgraph', 'autogen', 'custom'],
              description: 'Multi-agent protocol hint (optional, will auto-detect if not specified)',
            },
            check_delegation_chains: {
              type: 'boolean',
              default: true,
              description: 'Check for infinite delegation loops and unauthorized handoffs',
            },
          },
          required: ['path'],
        },
      },
      handler: auditA2AHandler,
    };

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/inkog-io/inkog'

If you have feedback or need assistance with the MCP directory API, please join our Discord server