Skip to main content
Glama
cameronsjo

MCP Server Template

by cameronsjo
zta.ts10.1 kB
/** * Zero Trust Architecture (ZTA) patterns * * Implements Zero Trust principles: * - Never trust, always verify * - Least privilege access * - Assume breach mentality * - Per-request authentication and authorization * - Continuous verification */ import { createLogger } from './logger.js'; import { AuthenticationError, AuthorizationError } from './errors.js'; import { JwtValidator, ValidatedClaims, extractBearerToken } from './jwt.js'; import { getTraceContext } from './tracing.js'; const logger = createLogger('zta'); /** * Request context for ZTA evaluation */ export interface ZtaRequestContext { /** Authenticated identity (from JWT sub claim) */ identity?: string; /** Client ID (for service-to-service) */ clientId?: string; /** Source IP address */ sourceIp: string; /** User agent */ userAgent?: string; /** Requested resource/action */ resource: string; /** HTTP method */ method: string; /** Request timestamp */ timestamp: Date; /** Validated JWT claims */ claims?: ValidatedClaims; /** Session ID if applicable */ sessionId?: string; /** Trace context */ traceId?: string; spanId?: string; } /** * ZTA policy decision */ export interface ZtaPolicyDecision { /** Whether access is allowed */ allowed: boolean; /** Reason for decision */ reason: string; /** Additional conditions or warnings */ conditions?: string[]; /** Suggested actions (e.g., require MFA) */ actions?: string[]; } /** * Policy rule definition */ export interface ZtaPolicyRule { /** Rule name */ name: string; /** Resources this rule applies to (glob patterns) */ resources: string[]; /** Required scopes */ requiredScopes?: string[]; /** Allowed client IDs (if restricted) */ allowedClients?: string[]; /** Allowed IP ranges (CIDR notation) */ allowedIpRanges?: string[]; /** Whether to require fresh authentication */ requireFreshAuth?: boolean; /** Maximum token age in seconds */ maxTokenAge?: number; } /** * ZTA Policy Engine * * Evaluates access requests against configured policies */ export class ZtaPolicyEngine { private readonly rules: ZtaPolicyRule[] = []; constructor(_jwtValidator: JwtValidator | null = null) { // JWT validator can be used for additional token introspection if needed } /** * Add a policy rule */ addRule(rule: ZtaPolicyRule): void { this.rules.push(rule); logger.debug('Added ZTA policy rule', { name: rule.name, resources: rule.resources }); } /** * Evaluate an access request */ async evaluate(context: ZtaRequestContext): Promise<ZtaPolicyDecision> { const { traceId, spanId } = getTraceContext(); logger.debug('Evaluating ZTA policy', { identity: context.identity, resource: context.resource, method: context.method, traceId, spanId, }); // Find matching rules const matchingRules = this.rules.filter((rule) => this.resourceMatches(context.resource, rule.resources) ); if (matchingRules.length === 0) { // Default deny if no rules match logger.warning('No matching ZTA rules, denying access', { resource: context.resource, traceId, }); return { allowed: false, reason: 'No policy defined for this resource', }; } // Evaluate all matching rules (all must pass) const conditions: string[] = []; const actions: string[] = []; for (const rule of matchingRules) { const ruleResult = await this.evaluateRule(rule, context); if (!ruleResult.allowed) { logger.warning('ZTA rule denied access', { rule: rule.name, reason: ruleResult.reason, identity: context.identity, resource: context.resource, traceId, }); return ruleResult; } if (ruleResult.conditions) { conditions.push(...ruleResult.conditions); } if (ruleResult.actions) { actions.push(...ruleResult.actions); } } logger.info('ZTA policy evaluation passed', { identity: context.identity, resource: context.resource, rulesEvaluated: matchingRules.length, traceId, }); return { allowed: true, reason: 'All policy rules passed', conditions: conditions.length > 0 ? conditions : undefined, actions: actions.length > 0 ? actions : undefined, }; } /** * Evaluate a single rule */ private async evaluateRule( rule: ZtaPolicyRule, context: ZtaRequestContext ): Promise<ZtaPolicyDecision> { // Check client ID restrictions if (rule.allowedClients && rule.allowedClients.length > 0) { if (!context.clientId || !rule.allowedClients.includes(context.clientId)) { return { allowed: false, reason: `Client not authorized for this resource`, }; } } // Check IP restrictions if (rule.allowedIpRanges && rule.allowedIpRanges.length > 0) { if (!this.ipInRanges(context.sourceIp, rule.allowedIpRanges)) { return { allowed: false, reason: 'Request from unauthorized IP range', }; } } // Check required scopes if (rule.requiredScopes && rule.requiredScopes.length > 0 && context.claims) { const tokenScopes = this.extractScopes(context.claims); const missingScopes = rule.requiredScopes.filter((s) => !tokenScopes.includes(s)); if (missingScopes.length > 0) { return { allowed: false, reason: `Missing required scopes: ${missingScopes.join(', ')}`, }; } } // Check token freshness if (rule.maxTokenAge && context.claims?.iat) { const tokenAge = Math.floor(Date.now() / 1000) - context.claims.iat; if (tokenAge > rule.maxTokenAge) { return { allowed: false, reason: 'Token too old, re-authentication required', actions: ['reauthenticate'], }; } } return { allowed: true, reason: `Rule '${rule.name}' passed`, }; } /** * Check if resource matches any of the patterns */ private resourceMatches(resource: string, patterns: string[]): boolean { return patterns.some((pattern) => { if (pattern === '*') return true; if (pattern.endsWith('/*')) { const prefix = pattern.slice(0, -2); return resource === prefix || resource.startsWith(prefix + '/'); } return resource === pattern; }); } /** * Check if IP is in any of the allowed ranges */ private ipInRanges(ip: string, ranges: string[]): boolean { // Simplified IP check - for production use a proper CIDR library return ranges.some((range) => { if (range === '*') return true; if (!range.includes('/')) { return ip === range; } // Basic CIDR matching would go here // For now, just check exact match or localhost return ip === range.split('/')[0] || ip === '127.0.0.1' || ip === '::1'; }); } /** * Extract scopes from claims */ private extractScopes(claims: ValidatedClaims): string[] { const scopeClaim = claims.scope; if (!scopeClaim) return []; if (Array.isArray(scopeClaim)) return scopeClaim; return scopeClaim.split(' ').filter(Boolean); } } /** * ZTA middleware for Express */ export function createZtaMiddleware( policyEngine: ZtaPolicyEngine, jwtValidator: JwtValidator | null ): ( req: { headers: Record<string, string | undefined>; ip?: string; path: string; method: string }, res: { status: (code: number) => { json: (body: unknown) => void } }, next: () => void ) => Promise<void> { return async (req, res, next) => { const authHeader = req.headers['authorization']; const token = extractBearerToken(authHeader); let claims: ValidatedClaims | undefined; // Validate JWT if present and validator configured if (token && jwtValidator) { try { claims = await jwtValidator.validate(token); } catch (error) { if (error instanceof AuthenticationError) { res.status(401).json({ error: error.message }); return; } throw error; } } // Build request context const context: ZtaRequestContext = { identity: claims?.sub, clientId: claims?.client_id as string | undefined, sourceIp: req.ip ?? req.headers['x-forwarded-for']?.split(',')[0]?.trim() ?? 'unknown', userAgent: req.headers['user-agent'], resource: req.path, method: req.method, timestamp: new Date(), claims, ...getTraceContext(), }; // Evaluate policy const decision = await policyEngine.evaluate(context); if (!decision.allowed) { logger.warning('ZTA policy denied request', { reason: decision.reason, identity: context.identity, resource: context.resource, }); if (!claims) { res.status(401).json({ error: 'Authentication required' }); } else { res.status(403).json({ error: decision.reason }); } return; } next(); }; } /** * Per-request verification decorator * * Use this to wrap tool handlers for continuous verification */ export function withZtaVerification<TInput, TOutput>( policyEngine: ZtaPolicyEngine, resourceName: string, handler: (input: TInput, context: ZtaRequestContext) => Promise<TOutput> ): (input: TInput, authContext: { claims?: ValidatedClaims; sourceIp?: string }) => Promise<TOutput> { return async (input, authContext) => { const context: ZtaRequestContext = { identity: authContext.claims?.sub, clientId: authContext.claims?.client_id as string | undefined, sourceIp: authContext.sourceIp ?? 'unknown', resource: resourceName, method: 'TOOL_INVOKE', timestamp: new Date(), claims: authContext.claims, ...getTraceContext(), }; const decision = await policyEngine.evaluate(context); if (!decision.allowed) { throw new AuthorizationError(decision.reason); } return handler(input, context); }; }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/cameronsjo/mcp-server-template'

If you have feedback or need assistance with the MCP directory API, please join our Discord server