Skip to main content
Glama
agent.service.ts9.12 kB
import { Injectable, Logger } from '@nestjs/common'; import { IAgentService, AgentExecutionResponse, } from '../interfaces/agent-service.interface.js'; import { IAgent } from '../interfaces/agent.interface.js'; import { AgentConfig, MessageFromAgentIdDTO, MessageRequest, UpdateModelConfigDTO, } from '@snakagent/core'; import { AgentValidationError, AgentExecutionError, } from '../../common/errors/agent.errors.js'; import { ConfigurationService } from '../../config/configuration.js'; import { StarknetTransactionError } from '../../common/errors/starknet.errors.js'; import { Postgres } from '@snakagent/database'; import { BaseAgent, ChunkOutput, EventType, SnakAgent, UserRequest, } from '@snakagent/agents'; import { redisAgents, agents } from '@snakagent/database/queries'; import { message } from '@snakagent/database/queries'; @Injectable() export class AgentService implements IAgentService { private readonly logger = new Logger(AgentService.name); constructor(private readonly config: ConfigurationService) {} async syncAgentToRedis(agentId: string, userId: string): Promise<void> { try { const fetchQuery = new Postgres.Query( `SELECT id, user_id, row_to_json(profile) as profile, mcp_servers as "mcp_servers", prompts_id, row_to_json(graph) as graph, row_to_json(memory) as memory, row_to_json(rag) as rag, created_at, updated_at, avatar_image, avatar_mime_type FROM agents WHERE id = $1 AND user_id = $2`, [agentId, userId] ); const rows = await Postgres.query<AgentConfig.OutputWithId>(fetchQuery); const agent = rows[0]; if (!agent) return; await redisAgents.updateAgent(agent); this.logger.debug(`Synced agent ${agentId} to Redis`); } catch (err: any) { this.logger.warn( `Redis sync skipped for agent ${agentId}: ${err.message}` ); } } async handleUserRequest( agent: BaseAgent, userId: string, userRequest: MessageRequest ): Promise<AgentExecutionResponse> { this.logger.debug({ message: 'Processing agent request', request: userRequest.request, }); try { let result: any; const user_request: UserRequest = { request: userRequest.request || '', hitl_threshold: userRequest.hitl_threshold ?? undefined, }; const executeResult = agent.execute(user_request); // Check if the result is an AsyncGenerator if (Symbol.asyncIterator in executeResult) { for await (const chunk of executeResult) { if ( chunk.event === EventType.ON_CHAT_MODEL_END || chunk.event === EventType.ON_CHAIN_END ) { const messageId = await message.insert_message( agent.getAgentConfig().id, userId, chunk ); this.logger.debug( `Inserted message with ID: ${messageId.toLocaleString()}` ); if ( chunk.event === EventType.ON_CHAIN_END && chunk.metadata.final === true ) { result = chunk; return { status: 'success', data: result, }; } } } } else { // Handle Promise case result = await executeResult; return { status: 'success', data: result, }; } // If loop completes without returning, throw error if (!result) { throw new AgentExecutionError('Failed to process agent request', { originalError: 'No final chunk received', }); } this.logger.debug({ message: 'Agent request processed successfully', result: result, }); return { status: 'success', data: result, }; } catch (error: any) { this.logger.error('Error processing agent request', { error: { message: error.message, name: error.name, stack: error.stack, }, request: userRequest.request, }); if (error instanceof AgentValidationError) { throw error; } if (error.message?.includes('transaction')) { throw new StarknetTransactionError('Failed to execute transaction', { originalError: error.message, cause: error, }); } throw new AgentExecutionError('Failed to process agent request', { originalError: error.message, cause: error, }); } } async *handleUserRequestWebsocket( agent: BaseAgent, userRequest: MessageRequest, userId: string ): AsyncGenerator<ChunkOutput> { this.logger.debug({ message: 'Processing agent request', request: userRequest.request, }); try { const user_request: UserRequest = { request: userRequest.request || '', hitl_threshold: userRequest.hitl_threshold ?? undefined, }; const executeResult = agent.execute(user_request); // Check if the result is an AsyncGenerator if (Symbol.asyncIterator in executeResult) { for await (const chunk of executeResult) { if (chunk.metadata.final === true) { this.logger.debug('SupervisorService: Execution completed'); yield chunk; return; } yield chunk; } } else { throw new Error('Expected an AsyncGenerator from agent.execute()'); } } catch (error: any) { this.logger.error('Error processing agent request', { error: { message: error.message, name: error.name, stack: error.stack, }, request: userRequest, }); if (error instanceof AgentValidationError) { throw error; } if (error.message?.includes('transaction')) { throw new StarknetTransactionError('Failed to execute transaction', { originalError: error.message, cause: error, }); } throw new AgentExecutionError('Failed to process agent request', { originalError: error.message, cause: error, }); } } async getAllAgentsOfUser( userId: string ): Promise<AgentConfig.OutputWithoutUserId[]> { try { const res = await agents.getAllAgentsByUser(userId); return res; } catch (error) { this.logger.error(error); throw error; } } /** * Get all agents for a user from Redis * @param userId - User ID to fetch agents for * @returns Promise<AgentConfig.OutputWithoutUserId[]> - Array of agent configurations from Redis without user_id */ async getAllAgentsOfUserFromRedis( userId: string ): Promise<AgentConfig.OutputWithoutUserId[]> { try { const agents = await redisAgents.listAgentsByUser(userId); // Remove user_id from each agent to match the PostgreSQL behavior const agentsWithoutUserId = agents.map((agent) => { const { user_id, ...agentWithoutUserId } = agent; return agentWithoutUserId; }); return agentsWithoutUserId; } catch (error) { this.logger.error('Error fetching agents from Redis:', error); throw error; } } async getMessageFromAgentId( userRequest: MessageFromAgentIdDTO, userId: string ): Promise<ChunkOutput[]> { try { const limit = userRequest.limit_message || 10; const res = await agents.getMessagesOptimized( userRequest.agent_id, userRequest.thread_id, userId, false, limit, 0 ); this.logger.debug(`All messages:', ${JSON.stringify(res)} `); return res; } catch (error) { this.logger.error(error); throw error; } } async updateModelsConfig(model: UpdateModelConfigDTO, userId: string) { try { const res = await agents.updateModelConfig( userId, model.provider, model.modelName, model.temperature || 0.7, model.maxTokens || 4096 ); this.logger.debug(`Models config updated:', ${JSON.stringify(res)} `); } catch (error) { this.logger.error(error); throw error; } } async getAgentStatus(agent: IAgent): Promise<{ isReady: boolean; walletConnected: boolean; apiKeyValid: boolean; }> { try { const credentials = agent.getAccountCredentials(); // Check if the AI provider API keys are configured let apiKeyValid = true; // TODO add actual check for API key validity on the agent model return { isReady: Boolean(credentials && apiKeyValid), walletConnected: Boolean(credentials.accountPrivateKey), apiKeyValid, }; } catch (error) { this.logger.error('Error checking agent status', error); return { isReady: false, walletConnected: false, apiKeyValid: false, }; } } }

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/KasarLabs/snak'

If you have feedback or need assistance with the MCP directory API, please join our Discord server