http.rs•5.51 kB
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Implementation of HTTP protocols
use crate::utils::rmcp_ext::ServerProvider;
use axum::Router;
use axum::http::StatusCode;
use axum::routing::get;
use rmcp::transport::sse_server::SseServerConfig;
use rmcp::transport::streamable_http_server::session::local::LocalSessionManager;
use rmcp::transport::streamable_http_server::{SessionManager, StreamableHttpServerConfig};
use rmcp::transport::{SseServer, StreamableHttpService};
use rmcp::{RoleServer, Service};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
/// Configuration for an HTTP MCP server
pub struct HttpServerConfig<M: SessionManager = LocalSessionManager> {
/// TCP address to bind to
pub bind: SocketAddr,
/// Parent cancellation token. `serve_with_config` will return a child token
pub ct: CancellationToken,
/// Streamable http server option
pub keep_alive: Option<Duration>,
/// Streamable http server option
pub stateful_mode: bool,
/// Streamable http server option
pub session_manager: Arc<M>,
}
/// An HTTP MCP server that supports both SSE and streamable HTTP.
pub struct HttpProtocol {}
impl HttpProtocol {
pub async fn serve_with_config<S: Service<RoleServer>, M: SessionManager>(
server_provider: impl Into<ServerProvider<S>>,
config: HttpServerConfig<M>,
) -> std::io::Result<CancellationToken> {
let server_provider = server_provider.into().0;
let ct = config.ct.child_token();
// Create a streamable http router
let sh_router = {
let sh_config = StreamableHttpServerConfig {
sse_keep_alive: config.keep_alive,
stateful_mode: config.stateful_mode,
};
let server_provider = server_provider.clone();
// TODO: internally, new() wraps the server provider closure with an Arc. We can avoid
// "double-Arc" by having
let sh_service =
StreamableHttpService::new(move || Ok(server_provider()), config.session_manager, sh_config);
Router::new().route_service("/", sh_service)
};
// Create an SSE router
let sse_router = {
let sse_config = SseServerConfig {
bind: config.bind,
// SSE server will create a child cancellation token for every transport that is created
// (see with_service() below)
ct: ct.clone(),
sse_keep_alive: config.keep_alive,
sse_path: "/".to_string(),
post_path: "/message".to_string(),
};
let (sse_server, sse_router) = SseServer::new(sse_config);
let _sse_ct = sse_server.with_service(move || server_provider());
sse_router
};
// Health and readiness
// See https://kubernetes.io/docs/concepts/configuration/liveness-readiness-startup-probes/
let health_router = {
Router::new()
// We may introduce a startup probe if we need to fetch/cache remote resources
// during initialization
// Ready: once we have the tool list we can process incoming requests
.route("/ready", get(async || (StatusCode::OK, "Ready\n")))
// Live: are we alive?
.route("/live", get(async || "Alive\n"))
};
// Put all things together
let main_router = Router::new()
.route("/", get(hello))
.route("/ping", get(async || (StatusCode::OK, "Ready\n")))
.nest("/mcp/sse", sse_router)
.nest("/mcp", sh_router)
.nest("/_health", health_router)
.with_state(());
// Start the http server
let listener = tokio::net::TcpListener::bind(config.bind).await?;
let server = axum::serve(listener, main_router).with_graceful_shutdown({
let ct = ct.clone();
async move {
ct.cancelled().await;
tracing::info!("http server cancelled");
}
});
// Await the server, or it will do nothing :-)
tokio::spawn(
async {
let _ = server.await;
}
.instrument(tracing::info_span!("http-server", bind_address = %config.bind)),
);
Ok(ct)
}
}
async fn hello() -> String {
let version = env!("CARGO_PKG_VERSION");
format!(
r#"Elasticsearch MCP server. Version {version}
Endpoints:
- streamable-http: /mcp
- sse: /mcp/sse
"#
)
}
#[cfg(test)]
mod tests {
#[test]
pub fn test_parts_in_extensions() {}
}