import { Injectable, Logger } from '@nestjs/common';
import { AgentExecutionResponse } from '../interfaces/agent-service.interface.js';
import {
AgentConfig,
MessageFromAgentIdDTO,
MessageRequest,
UpdateModelConfigDTO,
} from '@snakagent/core';
import {
AgentValidationError,
AgentExecutionError,
} from '../../common/errors/agent.errors.js';
import { ConfigurationService } from '../../config/configuration.js';
import {
BaseAgent,
ChunkOutput,
EventType,
SnakAgent,
UserRequest,
} from '@snakagent/agents';
import { agents, redisAgents } from '@snakagent/database/queries';
import { message } from '@snakagent/database/queries';
@Injectable()
export class AgentService {
private readonly logger = new Logger(AgentService.name);
constructor(private readonly config: ConfigurationService) {}
async handleUserRequest(
agent: BaseAgent,
userId: string,
userRequest: MessageRequest
): Promise<AgentExecutionResponse> {
this.logger.debug({
message: 'Processing agent request',
request: userRequest.content,
});
try {
let result: any;
const user_request: UserRequest = {
request: userRequest.content || '',
hitl_threshold: userRequest.hitl_threshold ?? undefined,
};
const executeResult = agent.execute(user_request);
// Check if the result is an AsyncGenerator
if (Symbol.asyncIterator in executeResult) {
for await (const chunk of executeResult) {
if (
chunk.event === EventType.ON_CHAT_MODEL_END ||
chunk.event === EventType.ON_CHAIN_END
) {
const messageId = await message.insert_message(
agent.getAgentConfig().id,
userId,
chunk
);
this.logger.debug(
`Inserted message with ID: ${messageId.toLocaleString()}`
);
if (
chunk.event === EventType.ON_CHAIN_END &&
chunk.metadata.final === true
) {
result = chunk;
return {
status: 'success',
data: result,
};
}
}
}
} else {
// Handle Promise case
result = await executeResult;
return {
status: 'success',
data: result,
};
}
// If loop completes without returning, throw error
if (!result) {
throw new AgentExecutionError('Failed to process agent request', {
originalError: 'No final chunk received',
});
}
this.logger.debug({
message: 'Agent request processed successfully',
result: result,
});
return {
status: 'success',
data: result,
};
} catch (error: any) {
this.logger.error('Error processing agent request', {
error: {
message: error.message,
name: error.name,
stack: error.stack,
},
request: userRequest.content,
});
if (error instanceof AgentValidationError) {
throw error;
}
throw new AgentExecutionError('Failed to process agent request', {
originalError: error.message,
cause: error,
});
}
}
async *handleUserRequestWebsocket(
agent: BaseAgent,
userRequest: MessageRequest,
userId: string
): AsyncGenerator<ChunkOutput> {
this.logger.debug({
message: 'Processing agent request',
request: userRequest.content,
});
try {
const user_request: UserRequest = {
request: userRequest.content || '',
hitl_threshold: userRequest.hitl_threshold ?? undefined,
};
const executeResult = agent.execute(user_request);
// Check if the result is an AsyncGenerator
if (Symbol.asyncIterator in executeResult) {
for await (const chunk of executeResult) {
if (chunk.metadata.final === true) {
this.logger.debug('SupervisorService: Execution completed');
yield chunk;
return;
}
yield chunk;
}
} else {
throw new Error('Expected an AsyncGenerator from agent.execute()');
}
} catch (error: any) {
this.logger.error('Error processing agent request', {
error: {
message: error.message,
name: error.name,
stack: error.stack,
},
request: userRequest,
});
if (error instanceof AgentValidationError) {
throw error;
}
throw new AgentExecutionError('Failed to process agent request', {
originalError: error.message,
cause: error,
});
}
}
async getAllAgentsOfUser(
userId: string
): Promise<AgentConfig.OutputWithoutUserId[]> {
try {
const res = await agents.getAllAgentsByUser(userId);
return res;
} catch (error) {
this.logger.error(error);
throw error;
}
}
/**
* Get all agents for a user from Redis
* @param userId - User ID to fetch agents for
* @returns Promise<AgentConfig.OutputWithoutUserId[]> - Array of agent configurations from Redis without user_id
*/
async getAllAgentsOfUserFromRedis(
userId: string
): Promise<AgentConfig.OutputWithoutUserId[]> {
try {
const agents = await redisAgents.listAgentsByUser(userId);
// Remove user_id from each agent to match the PostgreSQL behavior
const agentsWithoutUserId = agents.map((agent) => {
const { user_id, ...agentWithoutUserId } = agent;
return agentWithoutUserId;
});
return agentsWithoutUserId;
} catch (error) {
this.logger.error('Error fetching agents from Redis:', error);
throw error;
}
}
async getMessageFromAgentId(
userRequest: MessageFromAgentIdDTO,
userId: string
): Promise<ChunkOutput[]> {
try {
const limit = userRequest.limit_message || 10;
const res = await agents.getMessagesOptimized(
userRequest.agent_id,
userRequest.thread_id,
userId,
false,
limit,
0
);
this.logger.debug(`All messages:', ${JSON.stringify(res)} `);
return res;
} catch (error) {
this.logger.error(error);
throw error;
}
}
async updateModelsConfig(model: UpdateModelConfigDTO, userId: string) {
try {
const res = await agents.updateModelConfig(
userId,
model.provider,
model.modelName,
model.temperature || 0.7,
model.maxTokens || 4096
);
this.logger.debug(`Models config updated:', ${JSON.stringify(res)} `);
} catch (error) {
this.logger.error(error);
throw error;
}
}
}