//! MCP Operations
//!
//! Consolidated tool for MCP server lifecycle management.
use letta::{
types::tool::{McpServerConfig, TestMcpServerRequest, UpdateMcpServerRequest},
LettaClient,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::info;
use turbomcp::McpError;
use super::response_utils::{get_pagination_params, truncate_with_flag};
#[derive(Debug, Deserialize, Serialize, schemars::JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum McpOperation {
Add,
Update,
Delete,
Test,
Connect,
Resync,
Execute,
ListServers,
ListTools,
RegisterTool,
}
#[derive(Debug, Deserialize, schemars::JsonSchema)]
pub struct McpOpsRequest {
pub operation: McpOperation,
#[serde(skip_serializing_if = "Option::is_none")]
pub server_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub server_config: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_args: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub oauth_config: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pagination: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_heartbeat: Option<bool>,
}
#[derive(Debug, Serialize, Default)]
pub struct McpOpsResponse {
pub success: bool,
pub operation: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub servers: Option<Vec<Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tools: Option<Vec<Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub server_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tool_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub returned: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub truncated: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub output_length: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub hints: Option<Vec<String>>,
}
impl McpOpsResponse {
/// Create a success response with operation and message
fn success(operation: &str, message: &str) -> Self {
Self {
success: true,
operation: operation.to_string(),
message: message.to_string(),
data: None,
servers: None,
tools: None,
server_name: None,
tool_name: None,
total: None,
returned: None,
truncated: None,
output_length: None,
hints: None,
}
}
fn with_data(mut self, data: Value) -> Self {
self.data = Some(data);
self
}
fn with_server_name(mut self, name: String) -> Self {
self.server_name = Some(name);
self
}
fn with_hint(mut self, hint: &str) -> Self {
self.hints
.get_or_insert_with(Vec::new)
.push(hint.to_string());
self
}
}
// Constants for response size optimization
const DEFAULT_SERVERS_LIMIT: usize = 20;
const MAX_SERVERS_LIMIT: usize = 50;
const DEFAULT_TOOLS_LIMIT: usize = 30;
const MAX_TOOLS_LIMIT: usize = 100;
const MAX_DESCRIPTION_LENGTH: usize = 80;
const MAX_OUTPUT_LENGTH: usize = 3000;
// Common hint messages
mod hints {
pub const TEST_CONNECTION: &str = "Use 'test' operation to verify connection";
}
pub async fn handle_mcp_ops(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let operation_str = format!("{:?}", request.operation).to_lowercase();
info!(operation = %operation_str, "Executing MCP operation");
match request.operation {
McpOperation::Add => handle_add_server(client, request).await,
McpOperation::Update => handle_update_server(client, request).await,
McpOperation::Delete => handle_delete_server(client, request).await,
McpOperation::Test => handle_test_server(client, request).await,
McpOperation::Connect => handle_connect_server(client, request).await,
McpOperation::Resync => handle_resync_server(client, request).await,
McpOperation::Execute => handle_execute_tool(client, request).await,
McpOperation::ListServers => handle_list_servers(client, request).await,
McpOperation::ListTools => handle_list_tools(client, request).await,
McpOperation::RegisterTool => handle_register_tool(client, request).await,
}
}
async fn handle_add_server(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_config_value = request
.server_config
.ok_or_else(|| McpError::invalid_request("server_config required"))?;
// Deserialize Value to McpServerConfig
let server_config: McpServerConfig = serde_json::from_value(server_config_value)
.map_err(|e| McpError::invalid_request(format!("Invalid server_config: {}", e)))?;
// Extract server name and type from the enum variant
let (server_name, server_type) = match &server_config {
McpServerConfig::Sse(config) => (config.server_name.clone(), "sse"),
McpServerConfig::Stdio(config) => (config.server_name.clone(), "stdio"),
McpServerConfig::StreamableHttp(config) => (config.server_name.clone(), "http"),
};
client
.tools()
.add_mcp_server(server_config)
.await
.map_err(|e| McpError::internal(format!("Failed to add MCP server: {}", e)))?;
// Use json! macro instead of manual Map::insert - fewer allocations
let summary = serde_json::json!({
"server_name": server_name,
"server_type": server_type,
});
Ok(
McpOpsResponse::success("add", "MCP server added successfully")
.with_data(summary)
.with_hint(hints::TEST_CONNECTION),
)
}
async fn handle_update_server(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_name = request
.server_name
.ok_or_else(|| McpError::invalid_request("server_name required"))?;
let server_config_value = request
.server_config
.ok_or_else(|| McpError::invalid_request("server_config required"))?;
// Deserialize Value to UpdateMcpServerRequest
let update_request: UpdateMcpServerRequest = serde_json::from_value(server_config_value)
.map_err(|e| McpError::invalid_request(format!("Invalid server_config: {}", e)))?;
client
.tools()
.update_mcp_server(&server_name, update_request)
.await
.map_err(|e| McpError::internal(format!("Failed to update MCP server: {}", e)))?;
let summary = serde_json::json!({ "server_name": &server_name });
Ok(
McpOpsResponse::success("update", "MCP server updated successfully")
.with_data(summary)
.with_server_name(server_name)
.with_hint(hints::TEST_CONNECTION),
)
}
async fn handle_delete_server(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_name = request
.server_name
.ok_or_else(|| McpError::invalid_request("server_name required"))?;
client
.tools()
.delete_mcp_server(&server_name)
.await
.map_err(|e| McpError::internal(format!("Failed to delete MCP server: {}", e)))?;
Ok(
McpOpsResponse::success("delete", "MCP server deleted successfully")
.with_server_name(server_name),
)
}
async fn handle_test_server(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_config_value = request
.server_config
.ok_or_else(|| McpError::invalid_request("server_config required"))?;
// Deserialize Value to McpServerConfig for the flattened TestMcpServerRequest
let config: McpServerConfig = serde_json::from_value(server_config_value)
.map_err(|e| McpError::invalid_request(format!("Invalid server_config: {}", e)))?;
let test_request = TestMcpServerRequest { config };
let start_time = std::time::Instant::now();
let result = client
.tools()
.test_mcp_server(test_request)
.await
.map_err(|e| McpError::internal(format!("Failed to test MCP server: {}", e)))?;
let connection_time_ms = start_time.elapsed().as_millis() as i64;
// Extract tool names only from the result
let result_value = serde_json::to_value(&result)?;
let tool_names: Vec<&str> = result_value
.get("tools")
.and_then(|t| t.as_array())
.map(|tools| {
tools
.iter()
.filter_map(|tool| tool.get("name").and_then(|n| n.as_str()))
.collect()
})
.unwrap_or_default();
// Use json! macro for compact response
let test_result = serde_json::json!({
"status": "success",
"connection_time_ms": connection_time_ms,
"tools_available": tool_names.len(),
"tool_names": tool_names,
});
Ok(McpOpsResponse::success("test", "MCP server connection successful").with_data(test_result))
}
async fn handle_connect_server(
_client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_name = request
.server_name
.ok_or_else(|| McpError::invalid_request("server_name required"))?;
// TODO: Implement when SDK adds connect_mcp_server support
Ok(McpOpsResponse {
success: false,
operation: "connect".into(),
message: "Connect operation not yet implemented in Rust SDK".into(),
server_name: Some(server_name),
..Default::default()
})
}
async fn handle_resync_server(
_client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_name = request
.server_name
.ok_or_else(|| McpError::invalid_request("server_name required"))?;
// TODO: Implement when SDK adds resync support
Ok(McpOpsResponse {
success: false,
operation: "resync".into(),
message: "Resync operation not yet implemented in Rust SDK".into(),
server_name: Some(server_name),
hints: Some(vec![
"This operation will return summary with counts when implemented".into(),
]),
..Default::default()
})
}
async fn handle_execute_tool(
_client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_name = request
.server_name
.ok_or_else(|| McpError::invalid_request("server_name required"))?;
let tool_name = request
.tool_name
.ok_or_else(|| McpError::invalid_request("tool_name required"))?;
// TODO: Implement when SDK adds tool execution support
// When implemented, this should truncate output to MAX_OUTPUT_LENGTH
Ok(McpOpsResponse {
success: false,
operation: "execute".into(),
message: "Execute operation not yet implemented in Rust SDK".into(),
server_name: Some(server_name),
tool_name: Some(tool_name),
hints: Some(vec![format!(
"Output will be truncated to {} characters when implemented",
MAX_OUTPUT_LENGTH
)]),
..Default::default()
})
}
async fn handle_list_servers(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let result = client
.tools()
.list_mcp_servers()
.await
.map_err(|e| McpError::internal(format!("Failed to list MCP servers: {}", e)))?;
// SDK returns object with server names as keys
let all_servers: Vec<Value> = if let Value::Object(servers_map) = serde_json::to_value(&result)?
{
servers_map
.into_iter()
.map(|(name, config)| {
// Determine server type and extract metadata
let (server_type, status, tool_count, last_connected) =
if let Value::Object(ref config_obj) = config {
let server_type = config_obj
.get("config")
.and_then(|c| c.as_object())
.map(|c| {
if c.contains_key("command") {
"stdio"
} else if c.contains_key("url") {
"http"
} else {
"unknown"
}
})
.unwrap_or("unknown");
let status = config_obj
.get("status")
.and_then(|s| s.as_str())
.unwrap_or("unknown");
let tool_count = config_obj
.get("tools")
.and_then(|t| t.as_array())
.map(|t| t.len())
.unwrap_or(0);
let last_connected = config_obj.get("last_connected").cloned();
(server_type, status, tool_count, last_connected)
} else {
("unknown", "unknown", 0, None)
};
// Use json! macro - fewer allocations than manual Map::insert
serde_json::json!({
"name": name,
"server_type": server_type,
"status": status,
"tool_count": tool_count,
"last_connected": last_connected,
})
})
.collect()
} else {
vec![]
};
let total_count = all_servers.len();
let (limit, offset) = get_pagination_params(
&request.pagination,
DEFAULT_SERVERS_LIMIT,
MAX_SERVERS_LIMIT,
);
// Apply pagination
let paginated_servers: Vec<Value> = all_servers.into_iter().skip(offset).take(limit).collect();
let returned_count = paginated_servers.len();
let has_more = offset + returned_count < total_count;
let mut hints = vec![];
if has_more {
hints.push(format!(
"Showing {} of {} servers. Use pagination to see more.",
returned_count, total_count
));
}
hints.push("Use 'test' operation with server_name for full config".into());
Ok(McpOpsResponse {
success: true,
operation: "list_servers".into(),
message: format!("Found {} MCP servers", total_count),
servers: Some(paginated_servers),
total: Some(total_count),
returned: Some(returned_count),
hints: Some(hints),
..Default::default()
})
}
async fn handle_list_tools(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_name = request
.server_name
.ok_or_else(|| McpError::invalid_request("server_name required"))?;
let result = client
.tools()
.list_mcp_tools_by_server(&server_name)
.await
.map_err(|e| McpError::internal(format!("Failed to list MCP tools: {}", e)))?;
let all_tools: Vec<Value> = if let Value::Array(arr) = serde_json::to_value(&result)? {
arr.into_iter()
.map(|tool| {
if let Value::Object(mut tool_obj) = tool {
let name = tool_obj.remove("name");
let description = tool_obj
.remove("description")
.and_then(|d| d.as_str().map(String::from));
// Truncate description if needed
let (desc, was_truncated) = match description {
Some(d) => truncate_with_flag(&d, MAX_DESCRIPTION_LENGTH),
None => (String::new(), false),
};
// Use json! macro for cleaner code
let mut summary = serde_json::json!({
"name": name,
"server_name": &server_name,
"description": desc,
});
if was_truncated {
summary["description_truncated"] = serde_json::json!(true);
}
summary
} else {
tool
}
})
.collect()
} else if let Value::Object(obj) = serde_json::to_value(&result)? {
obj.get("tools")
.and_then(|t| t.as_array())
.cloned()
.unwrap_or_default()
} else {
vec![]
};
let total_count = all_tools.len();
let (limit, offset) =
get_pagination_params(&request.pagination, DEFAULT_TOOLS_LIMIT, MAX_TOOLS_LIMIT);
// Apply pagination
let paginated_tools: Vec<Value> = all_tools.into_iter().skip(offset).take(limit).collect();
let returned_count = paginated_tools.len();
let has_more = offset + returned_count < total_count;
let hints = if has_more {
Some(vec![format!(
"Showing {} of {} tools. Use pagination to see more.",
returned_count, total_count
)])
} else {
None
};
Ok(McpOpsResponse {
success: true,
operation: "list_tools".into(),
message: format!("Found {} tools on server {}", total_count, server_name),
tools: Some(paginated_tools),
server_name: Some(server_name),
total: Some(total_count),
returned: Some(returned_count),
hints,
..Default::default()
})
}
async fn handle_register_tool(
client: &LettaClient,
request: McpOpsRequest,
) -> Result<McpOpsResponse, McpError> {
let server_name = request
.server_name
.ok_or_else(|| McpError::invalid_request("server_name required"))?;
let tool_name = request
.tool_name
.ok_or_else(|| McpError::invalid_request("tool_name required"))?;
let result = client
.tools()
.add_mcp_tool(&server_name, &tool_name)
.await
.map_err(|e| McpError::internal(format!("Failed to register MCP tool: {}", e)))?;
Ok(McpOpsResponse {
success: true,
operation: "register_tool".into(),
message: format!(
"Tool {} from {} registered successfully in Letta",
tool_name, server_name
),
data: Some(serde_json::to_value(&result)?),
server_name: Some(server_name),
tool_name: Some(tool_name),
..Default::default()
})
}