7�0�
r*B�70�0�0��ٷm��Rm��v�,%�����E�E_o�1Z|���}��^Zܪ{5�I�#x � ��b�hL*���
����
J$�`0�qEA�@�0�bJ1B Dy�4�1 ));
assert!(ContentLength(0) < 123);
assert!(0 < ContentLength(123));
}
}
use std::sync::Arc;
use actix_utils:/// The "Content-Length" header field indicates the associated representation's data length as a
/// decimal non-negative integer number of octets.
///
/// # ABNF
/// ```plain
/// Content-Length = 1*DIGIT
/// ```
///
/// [RFC 9110 §8.6]: https://www.rfc-editor.org/rfc/rfc9110#name-content-length
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct ContentLength(usize);
impl ContentLength {
/// Returns Content-Length value.
pub fn into_inner(&self) -> usize {
self.0
}
}
impl str::FromStr for ContentLength {
type Err = <usize as str::FromStr>::Err;
#[inline]
fn from_str(val: &str) -> Result<Self, Self::Err> {
let val = val.trim();
// decoder prevents this case
debug_assert!(!val.starts_with('+'));
val.parse().map(Self)
}
}
impl TryIntoHeaderValue for ContentLength {
type Error = Infallible;
fn try_into_value(self) -> Result<HeaderValue, Self::Error> {
Ok(HeaderValue::from(self.0))
}
}
impl Header for C/
/// Also see the [Forwarded header's MDN docs][mdn] for field semantics.
///
/// [RFC 7239]: https://datatracker.ietf.org/doc/html/rfc7239
/// [mdn]: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Forwarded
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(test, derive(Default))]
pub struct Forwarded {
/// The interface where the request came in to the proxy server. The identifier can be:
///
/// - an obfuscated identifier (such as "hidden" or "secret"). This should be treated as the
/// default.
/// - an IP address (v4 or v6, optionally with a port. IPv6 address are quoted and enclosed in
/// square brackets)
/// - "unknown" when the preceding entity is not known (and you still want to indicate that
/// forwarding of the request was made)
by: Option<String>,
/// The client that initiated the request and subsequent proxies in a chain of proxies. The
/// identifier has the same possible values as the by directive.
r#for: Vec<S.as_mut().project() {
BodyHashFutProj::PayloadNone { inner_fut, hash } => {
let inner = ready!(inner_fut.poll(cx))?;
Poll::Ready(Ok(BodyHash {
inner,
hash: mem::take(hash),
}))
}
BodyHashFutProj::Inner {
inner_fut,
hasher,
mut forked_payload,
} => {
// poll original extractor
match inner_fut.poll(cx)? {
Poll::Ready(inner) => {
trace!("inner extractor complete");
let next = BodyHashFut::InnerDone {
inner: Some(inner),
hasher: mem::replace(hasher, D::new()),
forked_payload: mem::replace(forked_payload, dev::Payload::None),
};
self.set(next);
// re-enter poll in done state
Create a relative or absolute redirect.
///
/// _This feature has [graduated to Actix Web][graduated]. Further development will occur there._
///
/// See [`Redirect`] docs for usage details.
///
/// # Examples
/// ```
/// use actix_web::App;
/// use actix_web_lab::web as web_lab;
///
/// let app = App::new()
/// .service(web_lab::redirect("/one", "/two"));
/// ```
///
/// [graduated]: https://docs.rs/actix-web/4/actix_web/web/struct.Redirect.html
#[allow(deprecated)]
#[deprecated(since = "0.19.0", note = "Type has graduated to Actix Web.")]
pub fn redirect(from: impl Into<Cow<'static, str>>, to: impl Into<Cow<'static, str>>) -> Redirect {
Redirect::new(from, to)
}
/// Constructs a new Single-page Application (SPA) builder.
///
/// See [`Spa`] docs for more details.
///
/// # Examples
/// ```
/// # use actix_web::App;
/// # use actix_web_lab::web::spa;
/// let app = App::new()
/// // ...api routes...
/// .service(
/// spa()
/// .index_file("./examples/assets/spa.html")
/// fut: Cell::new(Some(Box::pin(init()))),
}),
}
}
/// Returns reference to result of lazy `T` value, initializing if necessary.
pub async fn get(&self) -> &T {
self.inner
.cell
.get_or_init(|| async move {
match self.inner.fut.take() {
Some(fut) => fut.await,
None => panic!("LazyData instance has previously been poisoned"),
}
})
.await
}
}
impl<T: 'static> FromRequest for LazyData<T> {
type Error = Error;
type Future = Ready<Result<Self, Error>>;
#[inline]
fn from_request(req: &HttpRequest, _: &mut dev::Payload) -> Self::Future {
if let Some(lazy) = req.app_data::<LazyData<T>>() {
ready(Ok(lazy.clone()))
} else {
debug!(
"Failed to extract `LazyData<{}>` for `{}` handler. For the Data extractor to work \
correctly, wrap the data with `LazyData::nreq.headers().get("Last-Event-ID"));
common_countdown(n.try_into().unwrap())
}
fn common_countdown(n: i32) -> impl Responder {
let countdown_stream = stream::unfold((false, n), |(started, n)| async move {
// allow first countdown value to yield immediately
if started {
sleep(Duration::from_secs(1)).await;
}
if n > 0 {
let data = sse::Data::new(n.to_string())
.event("countdown")
.id(n.to_string());
Some((Ok::<_, Infallible>(sse::Event::Data(data)), (true, n - 1)))
} else {
None
}
});
sse::Sse::from_stream(countdown_stream).with_retry_duration(Duration::from_secs(5))
}
#[get("/time")]
async fn timestamp() -> impl Responder {
let (sender, sse) = sse::channel(2);
actix_web::rt::spawn(async move {
loop {
let time = time::OffsetDateTime::now_utc();
let msg = sse::Data::new(time.format(&Rfc3339).unwrap()).event("timestamp");
der((
header::CONTENT_LENGTH,
header::HeaderValue::from_static("9"),
))
.set_payload(Bytes::from_static(b"name=test"))
.to_http_parts();
let s = UrlEncodedForm::<MyObject, 8>::from_request(&req, &mut pl).await;
let err = format!("{}", s.unwrap_err());
assert!(
err.contains("payload is larger") && err.contains("than allowed"),
"unexpected error string: {err:?}"
);
}
#[actix_web::test]
async fn test_form_body() {
let (req, mut pl) = TestRequest::default().to_http_parts();
let form =
UrlEncodedFormBody::<MyObject, DEFAULT_URL_ENCODED_FORM_LIMIT>::new(&req, &mut pl)
.await;
assert!(err_eq(form.unwrap_err(), UrlencodedError::ContentType));
let (req, mut pl) = TestRequest::default()
.insert_header((
header::CONTENT_TYPE,
header::HeaderValue::from_static("application/text"),
ix_web::test]
async fn compat_compat() {
let _ = App::new().wrap(Compat::new(from_fn(noop)));
let _ = App::new().wrap(Compat::new(from_fn(mutate_body_type)));
}
#[actix_web::test]
async fn feels_good() {
let app = test::init_service(
App::new()
.wrap(from_fn(mutate_body_type))
.wrap(from_fn(add_res_header))
.wrap(Logger::default())
.wrap(from_fn(noop))
.default_service(web::to(HttpResponse::NotFound)),
)
.await;
let req = test::TestRequest::default().to_request();
let res = test::call_service(&app, req).await;
assert!(res.headers().contains_key(header::WARNING));
}
#[actix_web::test]
async fn closure_capture_and_return_from_fn() {
let app = test::init_service(
App::new()
.wrap(Logger::default())
.wrap(MyMw(true).into_middleware())
.wrap(Logger::default()),
fut: Box::pin(T::from_request(req, payload)),
counter_pl: counter,
size: 0,
},
}
}
}
pub struct BodyLimitFut<T, const LIMIT: usize>
where
T: FromRequest + 'static,
T::Error: fmt::Debug + fmt::Display,
{
inner: Inner<T, LIMIT>,
}
impl<T, const LIMIT: usize> BodyLimitFut<T, LIMIT>
where
T: FromRequest + 'static,
T::Error: fmt::Debug + fmt::Display,
{
fn new_error(err: BodyLimitError<T>) -> Self {
Self {
inner: Inner::Error { err: Some(err) },
}
}
}
enum Inner<T, const LIMIT: usize>
where
T: FromRequest + 'static,
T::Error: fmt::Debug + fmt::Display,
{
Error {
err: Option<BodyLimitError<T>>,
},
Body {
/// Wrapped extractor future.
fut: Pin<Box<T::Future>>,
/// Forked request payload.
counter_pl: dev::Payload,
/// Running payload size count.
size: usize,
},
}
impl<T, const LIMIT: usize> Unpin for Inner<
use crate::util::{InfallibleStream, MutWriter};
pin_project! {
/// A buffered CSV serializing body stream.
///
/// This has significant memory efficiency advantages over returning an array of CSV rows when
/// the data set is very large because it avoids buffering the entire response.
///
/// # Examples
/// ```
/// # use actix_web::Responder;
/// # use actix_web_lab::respond::Csv;
/// # use futures_core::Stream;
/// fn streaming_data_source() -> impl Stream<Item = [String; 2]> {
/// // get item stream from source
/// # futures_util::stream::empty()
/// }
///
/// async fn handler() -> impl Responder {
/// let data_stream = streaming_data_source();
///
/// Csv::new_infallible(data_stream)
/// .into_responder()
/// }
/// ```
pub struct Csv<S> {
// The wrapped item stream.
#[pin]
stream: S,
}
}
impl<S> Csv<S> {
/// Constructs a new `Csv` from a stream of rows.
string. Example: `/users?n=100`.
//!
//! Also includes a low-efficiency route to demonstrate the difference.
use std::io::{self, Write as _};
use actix_web::{
get,
web::{self, BufMut as _, BytesMut},
App, HttpResponse, HttpServer, Responder,
};
use actix_web_lab::respond::NdJson;
use futures_core::Stream;
use futures_util::{stream, StreamExt as _};
use rand::{distributions::Alphanumeric, Rng as _};
use serde::Deserialize;
use serde_json::json;
use tracing::info;
fn streaming_data_source(n: u32) -> impl Stream<Item = Result<serde_json::Value, io::Error>> {
stream::repeat_with(|| {
Ok(json!({
"email": random_email(),
"address": random_address(),
}))
})
.take(n as usize)
}
#[derive(Debug, Deserialize)]
struct Opts {
n: Option<u32>,
}
/// This handler streams data as NDJSON to the client in a fast and memory efficient way.
///
/// A real data source might be a downstream server, database query, or other external resource.
#[get("/users")]
as body,
hex!(
"cf83e135 7eefb8bd f1542850 d66d8007 d620e405 0b5715dc 83f4a921 d36ce9ce
47d0d13c 5d85f2b0 ff8318d2 877eec2f 63b931bd 47417a81 a538327a f927da3e"
)
.as_ref()
);
let (req, _) =
test::TestRequest::default()
.to_request()
.replace_payload(dev::Payload::Stream {
payload: Box::pin(
stream::iter([b"a", b"b", b"c"].map(|b| Bytes::from_static(b))).map(Ok),
) as BoxedPayloadStream,
});
let body = test::call_and_read_body(&app, req).await;
assert_eq!(
body,
hex!("ba7816bf 8f01cfea 414140de 5dae2223 b00361a3 96177a9c b410ff61 f20015ad").as_ref()
);
}
#[actix_web::test]
async fn type_alias_equivalence() {
let app = test::init_service(
App::new()
.route(
"/alias",
web::get().to(|body: BodySha256<Bytes>| async move {
Bytes::copy_from_slice(body.hash()unt = self.static_resources_mount.into_owned();
let files = {
let index_file = index_file.clone();
Files::new(&static_resources_mount, static_resources_location)
// HACK: FilesService will try to read a directory listing unless index_file is provided
// FilesService will fail to load the index_file and will then call our default_handler
.index_file("extremely-unlikely-to-exist-!@$%^&*.txt")
.default_handler(move |req| serve_index(req, index_file.clone()))
};
SpaService { index_file, files }
}
}
#[derive(Debug)]
struct SpaService {
index_file: String,
files: Files,
}
impl HttpServiceFactory for SpaService {
fn register(self, config: &mut actix_web::dev::AppService) {
// let Files register its mount path as-is
self.files.register(config);
// also define a root prefix handler directed towards our SPA index
let rdef = ResourceDef::root_prefix("");
t!("{s}, {s:?}"), "test, Query(Id { id: \"test\" })");
s.id = "test1".to_string();
let s = s.into_inner();
assert_eq!(s.id, "test1");
}
#[actix_web::test]
#[should_panic]
async fn test_tuple_panic() {
let req = TestRequest::with_uri("/?one=1&two=2").to_srv_request();
let (req, mut pl) = req.into_parts();
Query::<(u32, u32)>::from_request(&req, &mut pl)
.await
.unwrap();
}
}
//! Hashing utilities for Actix Web.
//!
//! # Crate Features
//! All features are enabled by default.
//! - `blake2`: Blake2 types
//! - `blake3`: Blake3 types
//! - `md5`: MD5 types 🚩
//! - `md4`: MD4 types 🚩
//! - `sha1`: SHA-1 types 🚩
//! - `sha2`: SHA-2 types
//! - `sha3`: SHA-3 types
//!
//! # Security Warning 🚩
//! The `md4`, `md5`, and `sha1` types are included for completeness and interoperability but they
//! are considered cryptographically broken by modern standards. For security critical use cases,
//! you should move to Some(code) if path_altered => {
let mut res = HttpResponse::with_body(code, ());
res.headers_mut().insert(
header::LOCATION,
req.head_mut().uri.to_string().parse().unwrap(),
);
NormalizePathFuture::redirect(req.into_response(res))
}
_ => NormalizePathFuture::service(self.service.call(req)),
}
}
}
pin_project! {
pub struct NormalizePathFuture<S: Service<ServiceRequest>, B> {
#[pin] inner: Inner<S, B>,
}
}
impl<S: Service<ServiceRequest>, B> NormalizePathFuture<S, B> {
fn service(fut: S::Future) -> Self {
Self {
inner: Inner::Service {
fut,
_body: PhantomData,
},
}
}
fn redirect(res: ServiceResponse<()>) -> Self {
Self {
inner: Inner::Redirect { res: Some(res) },
}
}
}
pin_project! {
#[project = InnerProj]
enum Inner<S: Ser let uri = format!("{scheme}://www.{host}{path}");
let res = Redirect::to(uri).respond_to(&req);
drop(conn_info);
return Ok(ServiceResponse::new(req, res).map_into_right_body());
}
drop(conn_info);
let req = ServiceRequest::from_parts(req, pl);
Ok(next.call(req).await?.map_into_left_body())
}
#[cfg(test)]
mod test_super {
use actix_web::{
dev::ServiceFactory,
http::{header, StatusCode},
test, web, App, HttpResponse,
};
use super::*;
use crate::middleware::from_fn;
fn test_app() -> App<
impl ServiceFactory<
ServiceRequest,
Response = ServiceResponse<impl MessageBody>,
Config = (),
InitError = (),
Error = Error,
>,
> {
App::new().wrap(from_fn(redirect_to_www)).route(
"/",
web::get().to(|| async { HttpResponse::Ok().body("content") }),
)
}
#[actix_web::test]
async fn redirect_non_www() {
es;
use tokio::{
io::AsyncWrite,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
/// Returns an `AsyncWrite` response body writer and its associated body type.
///
/// # Examples
/// ```
/// # use actix_web::{HttpResponse, web};
/// use tokio::io::AsyncWriteExt as _;
/// use actix_web_lab::body;
///
/// # async fn index() {
/// let (mut wrt, body) = body::writer();
///
/// let _ = tokio::spawn(async move {
/// wrt.write_all(b"body from another thread").await
/// });
///
/// HttpResponse::Ok().body(body)
/// # ;}
/// ```
pub fn writer() -> (Writer, impl MessageBody) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(Writer { tx }, BodyStream { rx })
}
/// An `AsyncWrite` response body writer.
#[derive(Debug, Clone)]
pub struct Writer {
tx: UnboundedSender<Bytes>,
}
impl AsyncWrite for Writer {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
self.tx
.send(Bytes(), "data: foo\n\n");
let st = stream::repeat(Ok::<_, Infallible>(Event::Data(Data::new("foo")))).take(2);
let sse = Sse::from_stream(st);
assert_eq!(
body::to_bytes(sse).await.unwrap(),
"data: foo\n\ndata: foo\n\n",
);
}
#[actix_web::test]
async fn appropriate_headers_are_set_on_responder() {
let st = stream::empty::<Result<_, Infallible>>();
let sse = Sse::from_stream(st);
let res = sse.respond_to(&TestRequest::default().to_http_request());
assert_response_matches!(res, OK;
"content-type" => "text/event-stream"
"content-encoding" => "identity"
"cache-control" => "no-cache"
);
}
#[actix_web::test]
async fn messages_are_received_from_sender() {
let (sender, mut sse) = channel(9);
assert!(poll_fn(|cx| Pin::new(&mut sse).poll_next(cx))
.now_or_never()
.is_none());
sender.send(Data::new("bar").event("foo"amples
/// ```
/// #[actix_web::main] async fn test() {
/// use actix_web_lab::sse;
///
/// let (sender, sse_stream) = sse::channel(5);
/// sender.try_send(sse::Data::new("my data").event("my event name")).unwrap();
/// sender.try_send(sse::Event::Comment("my comment".into())).unwrap();
/// # } test();
/// ```
pub fn try_send(&self, msg: impl Into<Event>) -> Result<(), TrySendError> {
self.tx.try_send(msg.into()).map_err(|err| match err {
mpsc::error::TrySendError::Full(ev) => TrySendError::Full(ev),
mpsc::error::TrySendError::Closed(ev) => TrySendError::Closed(ev),
})
}
}
pin_project! {
/// Server-sent events (`text/event-stream`) responder.
///
/// Constructed with an [SSE channel](channel) or [using your own stream](Self::from_stream).
#[must_use]
#[derive(Debug)]
pub struct Sse<S> {
#[pin]
stream: S,
keep_alive: Option<Interval>,
retry_interval: Option<Duration>,
p_data(Data::new(AbcSigningKey([0; 32])))
.route(
"/",
web::post().to(|body: RequestSignature<Bytes, AbcApi>| async move {
let (body, sig) = body.into_parts();
let sig = sig.into_bytes().to_vec();
format!("{body:?}\n\n{sig:x?}")
}),
)
})
.workers(1)
.bind(("127.0.0.1", 8080))?
.run()
.await
}
//! Expiremental testing utilities.
#[doc(inline)]
#[cfg(test)]
pub(crate) use crate::test_header_macros::{header_round_trip_test, header_test_module};
#[doc(inline)]
pub use crate::test_request_macros::test_request;
#[doc(inline)]
pub use crate::test_response_macros::assert_response_matches;
pub use crate::test_services::echo_path_service;
//! Semantic server-sent events (SSE) responder with a channel-like interface.
//!
//! # Examples
//! ```no_run
//! use std::{convert::Infallible, time::Duration};
//! use actix_web::{Responder, get};
//! use actix_web_lab::sse;
//!
/ranges: unsafe {
IpCidrCombiner::from_cidr_vec_unchecked(ipv4_cidr_vec, ipv6_cidr_vec)
},
}
}
}
/// Fetched trusted Cloudflare IP addresses from their API.
#[cfg(feature = "fetch-ips")]
pub async fn fetch_trusted_cf_ips() -> Result<TrustedIps, Err> {
let client = awc::Client::new();
tracing::debug!("fetching cloudflare ips");
let mut res = client.get(CF_URL_IPS).send().await.map_err(|err| {
tracing::error!("{err}");
Err::Fetch
})?;
tracing::debug!("parsing response");
let res = res.json::<CfIpsResponse>().await.map_err(|err| {
tracing::error!("{err}");
Err::Fetch
})?;
TrustedIps::try_from_response(res)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cf_ips_from_response() {
let res = CfIpsResponse::Failure { success: false };
assert!(Trustednse::Success { result } => result,
CfIpsResponse::Failure { .. } => {
tracing::error!("parsing response returned success: false");
return Err(Err::Fetch);
}
};
let mut cidr_ranges = IpCidrCombiner::new();
for cidr in ips.ipv4_cidrs {
cidr_ranges.push(IpCidr::V4(cidr));
}
for cidr in ips.ipv6_cidrs {
cidr_ranges.push(IpCidr::V6(cidr));
}
Ok(Self { cidr_ranges })
}
/// Add trusted IP range to list.
pub fn with_ip_range(mut self, cidr: IpCidr) -> Self {
self.cidr_ranges.push(cidr);
self
}
/// Returns true if `ip` is controlled by Cloudflare.
pub fn contains(&self, ip: IpAddr) -> bool {
self.cidr_ranges.contains(ip)
}
}
impl Clone for TrustedIps {
fn clone(&self) -> Self {
let ipv4_cidr_vec = self.cidr_ranges.get_ipv4_cidrs().to_vec();
let ipv6_cidr_vec = self.cidr_ranges.get_ipv6_cidrs().to_vec();
//! Utilities for working with Actix Web types.
// stuff in here comes in and out of usage
#![allow(dead_code)]
use std::{
convert::Infallible,
io,
pin::Pin,
task::{ready, Context, Poll},
};
use actix_http::{error::PayloadError, BoxedPayloadStream};
use actix_web::{dev, web::BufMut};
use futures_core::Stream;
use futures_util::StreamExt as _;
use local_channel::mpsc;
/// Returns an effectively cloned payload that supports streaming efficiently.
///
/// The cloned payload:
/// - yields identical chunks;
/// - does not poll ahead of the original;
/// - does not poll significantly slower than the original;
/// - receives an error signal if the original errors, but details are opaque to the copy.
///
/// If the payload is forked in one of the extractors used in a handler, then the original _must_ be
/// read in another extractor or else the request will hang.
pub fn fork_request_payload(orig_payload: &mut dev::Payload) -> dev::Payload {
const TARGET: &str = concat!(module_path!(), "::fork_q).unwrap(), expect);
}
#[track_caller]
pub(crate) fn assert_parse_fail<
H: Header + fmt::Debug,
I: IntoIterator<Item = V>,
V: AsRef<[u8]>,
>(
headers: I,
) {
let req = req_from_raw_headers::<H, _, _>(headers);
H::parse(&req).unwrap_err();
}
}
#[cfg(test)]
pub(crate) use header_test_helpers::{assert_parse_eq, assert_parse_fail};
use actix_client_ip_cloudflare::{fetch_trusted_cf_ips, CfConnectingIp, TrustedClientIp};
use actix_web::{get, web::Header, App, HttpServer, Responder};
#[get("/raw-header")]
async fn header(Header(client_ip): Header<CfConnectingIp>) -> impl Responder {
match client_ip {
CfConnectingIp::Trusted(_ip) => unreachable!(),
CfConnectingIp::Untrusted(ip) => format!("Possibly fake client IP: {ip}"),
}
}
#[get("/client-ip")]
async fn trusted_client_ip(client_ip: TrustedClientIp) -> impl Responder {
format!("Trusted client IP: {client_ip}")
}
#[actix_web::main]
async fn main() -> std::io::Res by: Some("203.0.113.43".to_owned()),
r#for: vec!["192.0.2.60".to_owned()],
host: Some("rust-lang.org".to_owned()),
proto: Some("https".to_owned()),
};
assert_eq!(
fwd.try_into_value().unwrap(),
r#"by="203.0.113.43"; for="192.0.2.60"; host="rust-lang.org"; proto="https""#
);
}
#[test]
fn case_sensitivity() {
assert_parse_eq::<Forwarded, _, _>(
["For=192.0.2.60"],
Forwarded {
r#for: vec!["192.0.2.60".to_owned()],
..Forwarded::default()
},
);
}
#[test]
fn weird_whitespace() {
assert_parse_eq::<Forwarded, _, _>(
["for= 1.2.3.4; proto= https"],
Forwarded {
r#for: vec!["1.2.3.4".to_owned()],
proto: Some("https".to_owned()),
..Forwarded::default()
},
);
assert_parse_eq::<Forwarded, _, _>(
[" for = 1.2. let mut proto = None;
let mut r#for = vec![];
// "for=1.2.3.4, for=5.6.7.8; scheme=https"
for (name, val) in val
.split(';')
// ["for=1.2.3.4, for=5.6.7.8", " proto=https"]
.flat_map(|vals| vals.split(','))
// ["for=1.2.3.4", " for=5.6.7.8", " proto=https"]
.flat_map(|pair| {
let mut items = pair.trim().splitn(2, '=');
Some((items.next()?, items.next()?))
})
{
// [(name , val ), ... ]
// [("for", "1.2.3.4"), ("for", "5.6.7.8"), ("scheme", "https")]
match name.trim().to_lowercase().as_str() {
"by" => {
// multiple values on other properties have no defined semantics
by.get_or_insert_with(|| unquote(val));
}
"for" => {
// parameter order is defined to be client first and last proxy last
tractor that
/// _takes_ the payload. In this case, the resulting hash will be as if an empty input was given to
/// the hasher.
///
/// # Example
/// ```
/// use actix_web::{Responder, web};
/// use actix_hash::BodyHash;
/// use sha2::Sha256;
///
/// # type T = u64;
/// async fn hash_payload(form: BodyHash<web::Json<T>, Sha256>) -> impl Responder {
/// if !form.verify_slice(b"correct-signature") {
/// // return unauthorized error
/// }
///
/// "Ok"
/// }
/// ```
#[derive(Debug, Clone)]
pub struct BodyHash<T, D: Digest> {
inner: T,
hash: GenericArray<u8, D::OutputSize>,
}
impl<T, D: Digest> BodyHash<T, D> {
/// Returns hash slice.
pub fn hash(&self) -> &[u8] {
self.hash.as_slice()
}
/// Returns hash output size.
pub fn hash_size(&self) -> usize {
self.hash.len()
}
/// Verifies HMAC hash against provided `tag` using constant-time equality.
pub fn verify_slice(&self, tag: &[u8]) -> bool {
use subtle::ConstantTimeEq as _;
use async_trait::async_trait;
use bytes::{BufMut as _, BytesMut};
use ed25519_dalek::{PublicKey, Signature, Verifier as _};
use hex_literal::hex;
use once_cell::sync::Lazy;
use rustls::{Certificate, PrivateKey, ServerConfig};
use rustls_pemfile::{certs, pkcs8_private_keys};
use tracing::info;
const APP_PUBLIC_KEY_BYTES: &[u8] =
&hex!("d7d9a14753b591be99a0c5721be8083b1e486c3fcdc6ac08bfb63a6e5c204569");
static SIG_HDR_NAME: HeaderName = HeaderName::from_static("x-signature-ed25519");
static TS_HDR_NAME: HeaderName = HeaderName::from_static("x-signature-timestamp");
static APP_PUBLIC_KEY: Lazy<PublicKey> =
Lazy::new(|| PublicKey::from_bytes(APP_PUBLIC_KEY_BYTES).unwrap());
#[derive(Debug)]
struct DiscordWebhook {
/// Signature taken from webhook request header.
candidate_signature: Signature,
/// Cloned payload state.
chunks: Vec<Bytes>,
}
impl DiscordWebhook {
fn get_timestamp(req: &HttpRequest) -> Result<&[u8], Error> {
req.headers()
.get(&TS_HDR_NAME)
s_core::ready;
use pin_project_lite::pin_project;
/// Creates a middleware from an async function that is used as a mapping function for a
/// [`ServiceResponse`].
///
/// # Examples
/// Adds header:
/// ```
/// # use actix_web_lab::middleware::map_response;
/// use actix_web::{body::MessageBody, dev::ServiceResponse, http::header};
///
/// async fn add_header(
/// mut res: ServiceResponse<impl MessageBody>,
/// ) -> actix_web::Result<ServiceResponse<impl MessageBody>> {
/// res.headers_mut()
/// .insert(header::WARNING, header::HeaderValue::from_static("42"));
///
/// Ok(res)
/// }
/// # actix_web::App::new().wrap(map_response(add_header));
/// ```
///
/// Maps body:
/// ```
/// # use actix_web_lab::middleware::map_response;
/// use actix_web::{body::MessageBody, dev::ServiceResponse};
///
/// async fn mutate_body_type(
/// res: ServiceResponse<impl MessageBody + 'static>,
/// ) -> actix_web::Result<ServiceResponse<impl MessageBody>> {
/// Ok(res.map_into_left_body::<()>())
/// }b);
// catch panics in service call
AssertUnwindSafe(self.service.call(req))
.catch_unwind()
.map(move |maybe_res| match maybe_res {
Ok(res) => res,
Err(panic_err) => {
// invoke callback with panic arg
(cb)(&panic_err);
// continue unwinding
panic::resume_unwind(panic_err)
}
})
.boxed_local()
}
}
#[cfg(test)]
mod tests {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use actix_web::{
dev::Service as _,
test,
web::{self, ServiceConfig},
App,
};
use super::*;
fn configure_test_app(cfg: &mut ServiceConfig) {
cfg.route("/", web::get().to(|| async { "content" })).route(
"/disco",
#[allow(unreachable_code)]
web::get().to(|| async {
panic!("the disco");
""
//
/// /// Deserialize payload with a higher 32MiB limit.
/// #[post("/big-payload")]
/// async fn big_payload(info: UrlEncodedForm<Info, LIMIT_32_MB>) -> String {
/// format!("Welcome {}!", info.username)
/// }
/// ```
#[doc(alias = "html_form", alias = "html form", alias = "form")]
#[derive(Debug, Deref, DerefMut, Display)]
pub struct UrlEncodedForm<T, const LIMIT: usize = DEFAULT_URL_ENCODED_FORM_LIMIT>(pub T);
impl<T, const LIMIT: usize> UrlEncodedForm<T, LIMIT> {
/// Unwraps into inner `T` value.
pub fn into_inner(self) -> T {
self.0
}
}
/// See [here](#extractor) for example of usage as an extractor.
impl<T: DeserializeOwned, const LIMIT: usize> FromRequest for UrlEncodedForm<T, LIMIT> {
type Error = Error;
type Future = UrlEncodedFormExtractFut<T, LIMIT>;
#[inline]
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
UrlEncodedFormExtractFut {
req: Some(req.clone()),
fut: UrlEncodedFormBody::new(req, payloice {
service: boxed::rc_service(service),
mw_fn: Rc::clone(&self.mw_fn),
_phantom: PhantomData,
}))
}
}
/// Middleware service for [`from_fn`].
pub struct MiddlewareFnService<F, B, Es> {
service: RcService<ServiceRequest, ServiceResponse<B>, Error>,
mw_fn: Rc<F>,
_phantom: PhantomData<(B, Es)>,
}
impl<F, Fut, B, B2> Service<ServiceRequest> for MiddlewareFnService<F, B, ()>
where
F: Fn(ServiceRequest, Next<B>) -> Fut,
Fut: Future<Output = Result<ServiceResponse<B2>, Error>>,
B2: MessageBody,
{
type Response = ServiceResponse<B2>;
type Error = Error;
type Future = Fut;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
(self.mw_fn)(
req,
Next::<B> {
service: Rc::clone(&self.service),
},
)
}
}
macro_rules! impl_middleware_fn_service {
($($ext_type:ident),*) => {
impl<S, F, Fut, B, B2, $($ext_type),*> Trans assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR);
}
#[actix_web::test]
async fn test_override_data() {
let srv = init_service(
App::new().app_data(LocalData::new(1usize)).service(
web::resource("/")
.app_data(LocalData::new(10usize))
.route(web::get().to(|data: LocalData<usize>| {
assert_eq!(*data, 10);
HttpResponse::Ok()
})),
),
)
.await;
let req = TestRequest::default().to_request();
let resp = srv.call(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
}
#[actix_web::test]
async fn test_data_from_rc() {
let data_new = LocalData::new(String::from("test-123"));
let data_from_rc = LocalData::from(Rc::new(String::from("test-123")));
assert_eq!(data_new.0, data_from_rc.0);
}
#[actix_web::test]
async fn test_data_from_dyn_rc() nsume_chunk(&mut self, _req: &HttpRequest, chunk: Bytes) -> Result<(), Self::Error> {
Digest::update(&mut self.hasher, &chunk);
Ok(())
}
async fn finalize(self, _req: &HttpRequest) -> Result<Self::Signature, Self::Error> {
println!("using key: {:X?}", &self.key);
let mut hmac = <SimpleHmac<Sha512>>::new_from_slice(&self.key).unwrap();
let payload_hash = self.hasher.finalize();
println!("payload hash: {payload_hash:X?}");
Mac::update(&mut hmac, &payload_hash);
Ok(hmac.finalize())
}
fn verify(
signature: Self::Signature,
req: &HttpRequest,
) -> Result<Self::Signature, Self::Error> {
let user_sig = get_user_signature(req)?;
let user_sig = CtOutput::new(GenericArray::from_slice(&user_sig).to_owned());
if signature == user_sig {
Ok(signature)
} else {
Err(error::ErrorUnauthorized(
"given signature does not match calculated signature",
form: web::Json<HashMap<String, String>>,
}
fn main() {}
use std::io;
use actix_web::{
error,
http::header::HeaderValue,
middleware::Logger,
web::{self, Bytes},
App, Error, HttpRequest, HttpServer,
};
use actix_web_lab::extract::{RequestSignature, RequestSignatureScheme};
use async_trait::async_trait;
use digest::{CtOutput, Digest, Mac};
use generic_array::GenericArray;
use hmac::SimpleHmac;
use sha2::{Sha256, Sha512};
use tracing::info;
#[allow(non_upper_case_globals)]
const db: () = ();
async fn lookup_public_key_in_db<T>(_db: &(), val: T) -> T {
val
}
/// Extracts user's public key from request and pretends to look up secret key in the DB.
async fn get_base64_api_key(req: &HttpRequest) -> actix_web::Result<Vec<u8>> {
// public key, not encryption key
let pub_key = req
.headers()
.get("Api-Key")
.map(HeaderValue::as_bytes)
.map(base64::decode)
.transpose()
.map_err(|_| error::ErrorInternalServerError("invalid api key"))?app, req).await;
assert_eq!(res.status(), StatusCode::OK);
let body = test::read_body(res).await;
assert_eq!(body, "content");
}
#[actix_web::test]
async fn catch_panic_return_internal_server_error_response() {
let app = test::init_service(test_app()).await;
let req = test::TestRequest::with_uri("/disco").to_request();
let err = match app.call(req).await {
Ok(_) => panic!("unexpected Ok response"),
Err(err) => err,
};
let res = err.error_response();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
let body = to_bytes(res.into_body()).await.unwrap();
assert!(body.is_empty());
}
}
use std::{
pin::Pin,
task::{Context, Poll},
};
use actix_web::body::{BodySize, MessageBody};
use bytes::Bytes;
use tokio::sync::mpsc::{error::SendError, UnboundedReceiver, UnboundedSender};
use crate::BoxError;
/// Returns a sender half and a receiver half that can be used as a body ton: Cow<'static, str>,
}
impl Spa {
/// Location of the SPA index file.
///
/// This file will be served if:
/// - the Actix Web router has reached this service, indicating that none of the API routes
/// matched the URL path;
/// - and none of the static resources handled matched.
///
/// The default is "./index.html". I.e., the `index.html` file located in the directory that
/// the server is running from.
pub fn index_file(mut self, index_file: impl Into<Cow<'static, str>>) -> Self {
self.index_file = index_file.into();
self
}
/// The URL path prefix that static files should be served from.
///
/// The default is "/". I.e., static files are served from the root URL path.
pub fn static_resources_mount(
mut self,
static_resources_mount: impl Into<Cow<'static, str>>,
) -> Self {
self.static_resources_mount = static_resources_mount.into();
self
}
/// The location in the filesystem to ser #[doc = concat!("# type Hasher = ", stringify!($digest), ";")]
#[doc = concat!("# const OutSize: usize = ", $out_size, ";")]
/// # assert_eq!(
/// # digest::generic_array::GenericArray::<u8,
/// # <Hasher as digest::OutputSizeUser>::OutputSize
/// # >::default().len(),
/// # OutSize
/// # );
/// ```
#[cfg(feature = $feature)]
pub type $name<T> = BodyHash<T, $digest>;
};
}
// Obsolete
body_hash_alias!(BodyMd4, md4::Md4, "md4", "MD4", 16);
body_hash_alias!(BodyMd5, md5::Md5, "md5", "MD5", 16);
body_hash_alias!(BodySha1, sha1::Sha1, "sha1", "SHA-1", 20);
// SHA-2
body_hash_alias!(BodySha224, sha2::Sha224, "sha2", "SHA-224", 28);
body_hash_alias!(BodySha256, sha2::Sha256, "sha2", "SHA-256", 32);
body_hash_alias!(BodySha384, sha2::Sha384, "sha2", "SHA-384", 48);
body_hash_alias!(BodySha512, sha2::Sha512, "sha2", "SHA-512", 64);
// SHA-3
body_hash_alias!(BodySha3_224, sha3::Sha3_224, "sha3", "SHA-3-224", e_slash.replace_all(&path, "/");
// Ensure root paths are still resolvable. If resulting path is blank after previous
// step it means the path was one or more slashes. Reduce to single slash.
let path = if path.is_empty() { "/" } else { path.as_ref() };
// Check whether the path has been changed
//
// This check was previously implemented as string length comparison
//
// That approach fails when a trailing slash is added,
// and a duplicate slash is removed,
// since the length of the strings remains the same
//
// For example, the path "/v1//s" will be normalized to "/v1/s/"
// Both of the paths have the same length,
// so the change can not be deduced from the length comparison
if path != original_path {
let mut parts = head.uri.clone().into_parts();
let query = parts.path_and_query.as_ref().and3, 21, 247, 0])));
dbg!(ips.contains(IpAddr::from([103, 21, 248, 0])));
}
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
web::Redirect,
Error, Responder,
};
use crate::middleware_from_fn::Next;
/// A function middleware to redirect traffic to `www.` if not already there.
///
/// # Examples
/// ```
/// # use actix_web::App;
/// use actix_web_lab::middleware::{from_fn, redirect_to_www};
///
/// App::new()
/// .wrap(from_fn(redirect_to_www))
/// # ;
/// ```
pub async fn redirect_to_www(
req: ServiceRequest,
next: Next<impl MessageBody + 'static>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
#![allow(clippy::await_holding_refcell_ref)] // RefCell is dropped before await
let (req, pl) = req.into_parts();
let conn_info = req.connection_info();
if !conn_info.host().starts_with("www.") {
let scheme = conn_info.scheme();
let host = conn_info.host();
let path = req.uri().path();
let uri = form$hdr_name:expr => $hdr_val:expr)+; @raw $payload:expr) => {{
assert_response_matches!($res, $status; $($hdr_name => $hdr_val)+);
assert_eq!(::actix_web::test::read_body($res).await, $payload);
}};
($res:ident, $status:ident; @json $payload:tt) => {{
assert_response_matches!($res, $status);
assert_eq!(
::actix_web::test::read_body_json::<$crate::__reexports::serde_json::Value, _>($res).await,
$crate::__reexports::serde_json::json!($payload),
);
}};
}
pub use assert_response_matches;
#[cfg(test)]
mod tests {
use actix_web::{
dev::ServiceResponse, http::header::ContentType, test::TestRequest, HttpResponse,
};
use super::*;
#[actix_web::test]
async fn response_matching() {
let res = ServiceResponse::new(
TestRequest::default().to_http_request(),
HttpResponse::Created()
.insert_header(("date", "today"))
.insert_header(("set-cookie", "a=b"))
(Some(Ok(Event::retry_to_bytes(retry))));
}
if let Poll::Ready(msg) = this.stream.poll_next(cx) {
return match msg {
Some(Ok(msg)) => Poll::Ready(Some(Ok(msg.into_bytes()))),
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
None => Poll::Ready(None),
};
}
if let Some(ref mut keep_alive) = this.keep_alive {
if keep_alive.poll_tick(cx).is_ready() {
return Poll::Ready(Some(Ok(Event::keep_alive_bytes())));
}
}
Poll::Pending
}
}
/// Create server-sent events (SSE) channel pair.
///
/// The `buffer` argument controls how many unsent messages can be stored without waiting.
///
/// The first item in the tuple is the sender half. Much like a regular channel, it can be cloned,
/// sent to another thread/task, and send event messages to the response stream. It provides several
/// methods that represent the event-stream format.
///
/// The second ite.into());
self
}
/// Sets `id` field.
pub fn set_id(&mut self, id: impl Into<ByteString>) {
self.id = Some(id.into());
}
/// Sets `event` name field, returning a new data message.
pub fn event(mut self, event: impl Into<ByteString>) -> Self {
self.event = Some(event.into());
self
}
/// Sets `event` name field.
pub fn set_event(&mut self, event: impl Into<ByteString>) {
self.event = Some(event.into());
}
}
impl From<Data> for Event {
fn from(data: Data) -> Self {
Self::Data(data)
}
}
/// Server-sent events message containing one or more fields.
#[must_use]
#[derive(Debug, Clone)]
pub enum Event {
/// A `data` message with optional ID and event name.
///
/// Data messages looks like this in the response stream.
/// ```plain
/// event: foo
/// id: 42
/// data: my data
///
/// data: {
/// data: "multiline": "data"
/// data: }
/// ```
Data(Data),
/// A comm(noop)));
let _ = App::new().wrap(Compat::new(map_response_body(mutate_body_type)));
}
#[actix_web::test]
async fn feels_good() {
let app = test::init_service(
App::new()
.default_service(web::to(HttpResponse::Ok))
.wrap(map_response_body(|_req, body| async move { Ok(body) }))
.wrap(map_response_body(noop))
.wrap(Logger::default())
.wrap(map_response_body(mutate_body_type)),
)
.await;
let req = test::TestRequest::default().to_request();
let body = test::call_and_read_body(&app, req).await;
assert_eq!(body, "foo");
}
}
use actix_web::{
http::{Method, StatusCode},
web, App, HttpResponse, Responder,
};
use actix_web_lab_derive::FromRequest;
#[derive(Debug, FromRequest)]
struct RequestParts {
method: Method,
pool: web::Data<u32>,
body: String,
body2: String,
#[from_request(copy_from_app_data)]
copied_data: u64,
}
asstedIps::try_from_response(res).is_err());
}
}
//! For path segment extractor documentation, see [`Path`].
use actix_router::PathDeserializer;
use actix_utils::future::{ready, Ready};
use actix_web::{
dev::Payload,
error::{Error, ErrorNotFound},
FromRequest, HttpRequest,
};
use derive_more::{AsRef, Display, From};
use serde::de;
use tracing::debug;
/// Extract typed data from request path segments.
///
/// Alternative to `web::Path` extractor from Actix Web that allows deconstruction, but omits the
/// implementation of `Deref`.
///
/// Unlike, [`HttpRequest::match_info`], this extractor will fully percent-decode dynamic segments,
/// including `/`, `%`, and `+`.
///
/// # Examples
/// ```
/// use actix_web::get;
/// use actix_web_lab::extract::Path;
///
/// // extract path info from "/{name}/{count}/index.html" into tuple
/// // {name} - deserialize a String
/// // {count} - deserialize a u32
/// #[get("/{name}/{count}/index.html")]
/// async fn index(Path((name, count)): Path<(String, u32 else {
buf.extend_from_slice(&chunk);
}
}
None => {
let json = serde_json::from_slice::<T>(buf)
.map_err(JsonPayloadError::Deserialize)?;
return Poll::Ready(Ok(json));
}
}
},
JsonBody::Error(e) => Poll::Ready(Err(e.take().unwrap())),
}
}
}
#[cfg(test)]
mod tests {
use actix_web::{http::header, test::TestRequest, web::Bytes};
use serde::{Deserialize, Serialize};
use super::*;
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct MyObject {
name: String,
}
fn json_eq(err: JsonPayloadError, other: JsonPayloadError) -> bool {
match err {
JsonPayloadError::Overflow { .. } => {
matches!(other, JsonPayloadError::Overflow { .. })
}
JsonPayloadError::OverflowKnownLength { .. }format!("error from original stream: {err}"),
))))
.unwrap(),
}
}));
tracing::trace!(target: TARGET, "creating proxy payload");
*orig_payload = dev::Payload::from(proxy_stream);
dev::Payload::Stream {
payload: Box::pin(rx),
}
}
/// An `io::Write`r that only requires mutable reference and assumes that there is space available
/// in the buffer for every write operation or that it can be extended implicitly (like
/// `bytes::BytesMut`, for example).
///
/// This is slightly faster (~10%) than `bytes::buf::Writer` in such cases because it does not
/// perform a remaining length check before writing.
pub(crate) struct MutWriter<'a, B>(pub(crate) &'a mut B);
impl<'a, B> MutWriter<'a, B> {
pub fn get_ref(&self) -> &B {
self.0
}
}
impl<'a, B: BufMut> io::Write for MutWriter<'a, B> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.0.put_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> iof.into_chunk_stream())
}
/// Creates a `Responder` type with a line-by-line serializing stream and `text/plain`
/// content-type header.
pub fn into_responder(self) -> impl Responder
where
S: 'static,
T: 'static,
E: 'static,
{
HttpResponse::Ok()
.content_type(mime::TEXT_PLAIN_UTF_8)
.message_body(self.into_body_stream())
.unwrap()
}
/// Creates a stream of serialized chunks.
pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
self.stream.map_ok(write_display)
}
}
fn write_display(item: impl fmt::Display) -> Bytes {
let mut buf = BytesMut::new();
let mut wrt = MutWriter(&mut buf);
writeln!(wrt, "{item}").unwrap();
buf.freeze()
}
#[cfg(test)]
mod tests {
use std::error::Error as StdError;
use actix_web::body;
use futures_util::stream;
use super::*;
#[actix_web::test]
async fn serializes_into_body() {
let ndjson_body = Dio.map(|proto| format!("proto=\"{proto}\"")))
.join("; ")
.try_into_value()
}
}
impl Header for Forwarded {
fn name() -> HeaderName {
header::FORWARDED
}
fn parse<M: HttpMessage>(msg: &M) -> Result<Self, ParseError> {
let combined = msg
.headers()
.get_all(Self::name())
.filter_map(|hdr| hdr.to_str().ok())
.filter_map(|hdr_str| match hdr_str.trim() {
"" => None,
val => Some(val),
})
.collect::<Vec<_>>();
if combined.is_empty() {
return Err(ParseError::Header);
}
// pass to FromStr impl as if it were one concatenated header with semicolon joiners
// https://datatracker.ietf.org/doc/html/rfc7239#section-7.1
combined.join(";").parse().map_err(|_| ParseError::Header)
}
}
/// Trim whitespace then any quote marks.
fn unquote(val: &str) -> &str {
val.trim().trim_start_matches('"').trim_end_matche/// Should equal the `Host` request header field as received by the proxy.
pub fn host(&self) -> Option<&str> {
self.host.as_deref()
}
/// Returns the "proto" identifier, if set.
///
/// Indicates which protocol was used to make the request (typically "http" or "https").
pub fn proto(&self) -> Option<&str> {
self.proto.as_deref()
}
/// Adds an identifier to the "for" chain.
///
/// Useful when re-forwarding a request and needing to update the request headers with previous
/// proxy's address.
pub fn push_for(&mut self, identifier: impl Into<String>) {
self.r#for.push(identifier.into())
}
/// Returns true if all of the fields are empty.
fn has_no_info(&self) -> bool {
self.by.is_none() && self.r#for.is_empty() && self.host.is_none() && self.proto.is_none()
}
// TODO: parse with trusted IP ranges fn
}
impl str::FromStr for Forwarded {
type Err = Infallible;
#[inline]
fn from_str(val: &str) -> Re // Ignore Pending because its possible the inner extractor never
// polls the payload stream and ignore errors because they will be
// propagated by original payload polls.
Poll::Ready(Some(Err(_))) | Poll::Pending => break,
}
}
Poll::Pending
}
}
}
BodyHashFutProj::InnerDone {
inner,
hasher,
forked_payload,
} => {
let mut pl = Pin::new(forked_payload);
// drain forked payload
loop {
match pl.as_mut().poll_next(cx) {
// update hasher with chunks
Poll::Ready(Some(Ok(chunk))) => hasher.update(&chunk),
// when drain is complete, finalize hash and return parts
keys: Vec<PrivateKey> = pkcs8_private_keys(key_file)
.unwrap()
.into_iter()
.map(PrivateKey)
.collect();
// exit if no keys could be parsed
if keys.is_empty() {
eprintln!("Could not locate PKCS 8 private keys.");
std::process::exit(1);
}
config.with_single_cert(cert_chain, keys.remove(0)).unwrap()
}
use std::{io, time::Duration};
use actix_web::{
get,
http::{
self,
header::{ContentEncoding, ContentType},
},
App, HttpResponse, HttpServer, Responder,
};
use actix_web_lab::body;
use async_zip::{write::ZipFileWriter, ZipEntryBuilder};
use tokio::{
fs,
io::{AsyncWrite, AsyncWriteExt as _},
};
fn zip_to_io_err(err: async_zip::error::ZipError) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}
async fn read_dir<W>(zipper: &mut ZipFileWriter<W>) -> io::Result<()>
where
W: AsyncWrite + Unpin,
{
let mut path = fs::canonicalize(env!("CARGO_MANIFEST_DIR")).await?;
path.push("examples");
sync::OnceCell;
use tracing::debug;
/// A lazy extractor for thread-local data.
///
/// Using `LazyData` as an extractor will not initialize the data; [`get`](Self::get) must be used.
pub struct LazyData<T> {
inner: Rc<LazyDataInner<T>>,
}
struct LazyDataInner<T> {
cell: OnceCell<T>,
fut: Cell<Option<LocalBoxFuture<'static, T>>>,
}
impl<T> Clone for LazyData<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T: fmt::Debug> fmt::Debug for LazyData<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Lazy")
.field("cell", &self.inner.cell)
.field("fut", &"..")
.finish()
}
}
impl<T> LazyData<T> {
/// Constructs a new `LazyData` extractor with the given initialization function.
///
/// Initialization functions must return a future that resolves to `T`.
pub fn new<F, Fut>(init: F) -> LazyData<T>
where
F: FnOnce() -> Fut,
Fut assert!(s.is_err());
let err_str = s.unwrap_err().to_string();
assert_eq!(
err_str,
"URL encoded payload is larger (9 bytes) than allowed (limit: 8 bytes).",
);
}
}
//! Panic reporter middleware.
//!
//! See [`PanicReporter`] for docs.
use std::{
any::Any,
future::{ready, Ready},
panic::{self, AssertUnwindSafe},
rc::Rc,
};
use actix_web::dev::{forward_ready, Service, Transform};
use futures_core::future::LocalBoxFuture;
use futures_util::FutureExt as _;
type PanicCallback = Rc<dyn Fn(&(dyn Any + Send))>;
/// A middleware that triggers a callback when the worker is panicking.
///
/// Mostly useful for logging or metrics publishing. The callback received the object with which
/// panic was originally invoked to allow down-casting.
///
/// # Examples
/// ```ignore
/// # use actix_web::App;
/// use actix_web_lab::middleware::PanicReporter;
/// use metrics::increment_counter;
///
/// App::new()
/// .wrap(PanicReporter::new(|_| increme::Bytes};
use serde::{Deserialize, Serialize};
use super::*;
#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct MyObject {
name: String,
}
fn err_eq(err: UrlencodedError, other: UrlencodedError) -> bool {
match err {
UrlencodedError::Overflow { .. } => {
matches!(other, UrlencodedError::Overflow { .. })
}
UrlencodedError::ContentType => matches!(other, UrlencodedError::ContentType),
_ => false,
}
}
#[actix_web::test]
async fn test_extract() {
let (req, mut pl) = TestRequest::default()
.insert_header(header::ContentType::form_url_encoded())
.insert_header((
header::CONTENT_LENGTH,
header::HeaderValue::from_static("9"),
))
.set_payload(Bytes::from_static(b"name=test"))
.to_http_parts();
let s =
UrlEncodedForm::<MyObject, DEFAULT_URL_ENCODED_FORM_LIMIT>::fro let mw_fn = Rc::clone(&self.mw_fn);
let service = Rc::clone(&self.service);
Box::pin(async move {
let ($($ext_type,)*) = req.extract::<($($ext_type,)*)>().await?;
(mw_fn)($($ext_type),*, req, Next::<B> { service }).await
})
}
}
};
}
impl_middleware_fn_service!(E1);
impl_middleware_fn_service!(E1, E2);
impl_middleware_fn_service!(E1, E2, E3);
impl_middleware_fn_service!(E1, E2, E3, E4);
impl_middleware_fn_service!(E1, E2, E3, E4, E5);
impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6);
impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6, E7);
impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6, E7, E8);
impl_middleware_fn_service!(E1, E2, E3, E4, E5, E6, E7, E8, E9);
/// Wraps the "next" service in the middleware chain.
pub struct Next<B> {
service: RcService<ServiceRequest, ServiceResponse<B>, Error>,
}
impl<B> Next<B> {
/// Equivalent to `Service::call(self, req)`.
pub fn cal/ This works when Sized is required
let dyn_rc_box: Rc<Box<dyn TestTrait>> = Rc::new(Box::new(A {}));
let data_arc_box = LocalData::from(dyn_rc_box);
// This works when Data Sized Bound is removed
let dyn_rc: Rc<dyn TestTrait> = Rc::new(A {});
let data_arc = LocalData::from(dyn_rc);
assert_eq!(data_arc_box.get_num(), data_arc.get_num())
}
#[actix_web::test]
async fn test_get_ref_from_dyn_data() {
let dyn_rc: Rc<dyn TestTrait> = Rc::new(A {});
let data_arc = LocalData::from(dyn_rc);
let ref_data: &dyn TestTrait = &*data_arc;
assert_eq!(data_arc.get_num(), ref_data.get_num())
}
}
use std::io;
use actix_web::{get, App, HttpServer, Responder};
use actix_web_lab::respond::Cbor;
use serde::Serialize;
use tracing::info;
#[derive(Debug, Serialize)]
struct Test {
one: u32,
two: String,
}
#[get("/")]
async fn index() -> impl Responder {
Cbor(Test {
one: 42,
two: "two".to_owned(),
})
}
#[ [456, 789],
]))
.into_body_stream();
let body_bytes = body::to_bytes(ndjson_body)
.await
.map_err(Into::<Box<dyn StdError>>::into)
.unwrap();
const EXP_BYTES: &str = "123,456\n\
789,12\n\
345,678\n\
901,234\n\
456,789\n";
assert_eq!(body_bytes, EXP_BYTES);
}
}
use std::{any::type_name, ops::Deref, rc::Rc};
use actix_utils::future::{err, ok, Ready};
use actix_web::{dev::Payload, error, Error, FromRequest, HttpRequest};
use tracing::debug;
/// A thread-local equivalent to [`SharedData`](crate::extract::SharedData).
#[doc(alias = "state")]
#[derive(Debug)]
pub struct LocalData<T: ?Sized>(Rc<T>);
impl<T> LocalData<T> {
/// Constructs a new `LocalData` instance.
pub fn new(item: T) -> LocalData<T> {
LocalData(Rc::new(item))
}
}
impl<T: ?Sized> Deref for LocalData<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0
}
}
impl<T: ?Sized> Clone for Lo-results", n_items))
// alternative if you need more control of the HttpResponse:
//
// HttpResponse::Ok()
// .insert_header(("content-type", NdJson::mime()))
// .insert_header(("num-results", n_items))
// .body(NdJson::new(data_stream).into_body_stream())
}
/// A comparison route that loads all the data into memory before sending it to the client.
///
/// If you provide a high number in the query string like `?n=300000` you should be able to observe
/// increasing memory usage of the process in your process monitor.
#[get("/users-high-mem")]
async fn get_high_mem_user_list(opts: web::Query<Opts>) -> impl Responder {
let n_items = opts.n.unwrap_or(10);
let mut stream = streaming_data_source(n_items);
// buffer all data from the source into a Bytes container
let mut buf = BytesMut::new().writer();
while let Some(Ok(item)) = stream.next().await {
serde_json::to_writer(&mut buf, &item).unwrap();
buf.write_all(b"\n").unwrap();
}
Hrn self.tx.send(Err(err)).map_err(|SendError(err)| match err {
Ok(_) => unreachable!(),
Err(err) => err,
});
}
Ok(())
}
}
#[derive(Debug)]
struct Receiver<E> {
rx: UnboundedReceiver<Result<Bytes, E>>,
}
impl<E> Receiver<E> {
fn new(rx: UnboundedReceiver<Result<Bytes, E>>) -> Self {
Self { rx }
}
}
impl<E> MessageBody for Receiver<E>
where
E: Into<BoxError>,
{
type Error = E;
fn size(&self) -> BodySize {
BodySize::Stream
}
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Bytes, Self::Error>>> {
self.rx.poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use std::io;
use super::*;
static_assertions::assert_impl_all!(Sender<io::Error>: Send, Sync, Unpin);
static_assertions::assert_impl_all!(Receiver<io::Error>: Send, Sync, Unpin, MessageBody);
}
use actix_hash::{BodyHash, BodySha256};
use actix_http::BoxedPayloadStreamregister_service(
rdef,
None,
fn_service(move |req| serve_index(req, self.index_file.clone())),
None,
);
}
}
async fn serve_index(
req: ServiceRequest,
index_file: String,
) -> Result<ServiceResponse, actix_web::Error> {
trace!("serving default SPA page");
let (req, _) = req.into_parts();
let file = NamedFile::open_async(&index_file).await?;
let res = file.into_response(&req);
Ok(ServiceResponse::new(req, res))
}
impl Default for Spa {
fn default() -> Self {
Self {
index_file: Cow::Borrowed("./index.html"),
static_resources_mount: Cow::Borrowed("/"),
static_resources_location: Cow::Borrowed("./"),
}
}
}
// Code mostly copied from `tower`:
// https://github.com/tower-rs/tower/tree/5064987f/tower/src/load_shed
//! Load-shedding middleware.
use std::{
cell::Cell,
error::Error as StdError,
fmt,
future::Future,
pin::Pin,
task::{ready, Contexrde(rename = "user")]
/// users: Vec<String>,
/// }
///
/// // Deserialize `LogsParams` struct from query string.
/// // This handler gets called only if the request's query parameters contain both fields.
/// // A valid request path for this handler would be `/logs?type=reports&user=foo&user=bar"`.
/// #[get("/logs")]
/// async fn index(info: Query<LogsParams>) -> impl Responder {
/// let LogsParams { log_type, users } = info.into_inner();
/// format!("Logs request for type={log_type} and user list={users:?}!")
/// }
///
/// // Or use destructuring, which is equivalent to `.into_inner()`.
/// #[get("/debug2")]
/// async fn debug2(Query(info): Query<LogsParams>) -> impl Responder {
/// dbg!("Authorization object = {info:?}");
/// "OK"
/// }
/// ```
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Query<T>(pub T);
impl_more::impl_deref_and_mut!(<T> in Query<T> => T);
impl_more::forward_display!(<T> in Query<T>);
impl<T> Query<T> {
/// Unwrap into inner `T` value.
redirects,
}))
}
}
pub struct NormalizePathService<S> {
service: S,
merge_slash: Regex,
trailing_slash_behavior: TrailingSlash,
use_redirects: Option<StatusCode>,
}
impl<S, B> Service<ServiceRequest> for NormalizePathService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
{
type Response = ServiceResponse<EitherBody<B, ()>>;
type Error = Error;
type Future = NormalizePathFuture<S, B>;
actix_service::forward_ready!(service);
fn call(&self, mut req: ServiceRequest) -> Self::Future {
let head = req.head_mut();
let mut path_altered = false;
let original_path = head.uri.path();
// An empty path here means that the URI has no valid path. We skip normalization in this
// case, because adding a path can make the URI invalid
if !original_path.is_empty() {
// Either adds a string to the end (duplicates will be removed anyways) or trims all
ly.
///
/// # Normalization Steps
/// - Merges consecutive slashes into one. (For example, `/path//one` always becomes `/path/one`.)
/// - Appends a trailing slash if one is not present, removes one if present, or keeps trailing
/// slashes as-is, depending on which [`TrailingSlash`] variant is supplied
/// to [`new`](NormalizePath::new()).
///
/// # Default Behavior
/// The default constructor chooses to strip trailing slashes from the end of paths with them
/// ([`TrailingSlash::Trim`]). The implication is that route definitions should be defined without
/// trailing slashes or else they will be inaccessible (or vice versa when using the
/// `TrailingSlash::Always` behavior), as shown in the example tests below.
///
/// # Examples
/// ```
/// use actix_web::{web, middleware, App};
///
/// # actix_web::rt::System::new().block_on(async {
/// let app = App::new()
/// .wrap(middleware::NormalizePath::trim())
/// .route("/test", web::get().to(|| async { "test" }))
/// .route("/unmatchable/", web: = match self.0 {
true => req.into_response("short-circuited").map_into_right_body(),
false => next.call(req).await?.map_into_left_body(),
};
res.headers_mut()
.insert(header::WARNING, HeaderValue::from_static("42"));
Ok(res)
}
pub fn into_middleware<S, B>(
self,
) -> impl Transform<
S,
ServiceRequest,
Response = ServiceResponse<impl MessageBody>,
Error = Error,
InitError = (),
>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error> + 'static,
B: MessageBody + 'static,
{
let this = Rc::new(self);
from_fn(move |req, next| {
let this = Rc::clone(&this);
async move { Self::mw_cb(&this, req, next).await }
})
}
}
#[actix_web::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
let bind = ("127.0.0.1", 8080);
inond item is the responder and can, therefore, be used as a handler return type directly.
/// The stream will be closed after all [senders](SseSender) are dropped.
///
/// Read more about server-sent events in [this MDN article][mdn-sse].
///
/// See [module docs](self) for usage example.
///
/// [mdn-sse]: https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
pub fn channel(buffer: usize) -> (Sender, Sse<ChannelStream>) {
let (tx, rx) = mpsc::channel(buffer);
(
Sender { tx },
Sse {
stream: ChannelStream(rx),
keep_alive: None,
retry_interval: None,
},
)
}
/// Stream implementation for channel-based SSE [`Sender`].
#[derive(Debug)]
pub struct ChannelStream(mpsc::Receiver<Event>);
impl Stream for ChannelStream {
type Item = Result<Event, Infallible>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_recv(cx).map(|ev| ev.map(Ok))
A comment message.
///
/// Comments look like this in the response stream.
/// ```plain
/// : my comment
///
/// : another comment
/// ```
Comment(ByteString),
}
#[doc(hidden)]
#[deprecated(since = "0.17.0", note = "Renamed to `Event`. Prefer `sse::Event`.")]
pub type SseMessage = Event;
impl Event {
/// Splits data into lines and prepend each line with `prefix`.
fn line_split_with_prefix(buf: &mut BytesMut, prefix: &'static str, data: ByteString) {
// initial buffer size guess is len(data) + 10 lines of prefix + EOLs + EOF
buf.reserve(data.len() + (10 * (prefix.len() + 1)) + 1);
// append prefix + space + line to buffer
for line in data.split('\n') {
buf.put_slice(prefix.as_bytes());
buf.put_slice(line.as_bytes());
buf.put_u8(b'\n');
}
}
/// Serializes message into event-stream format.
fn into_bytes(self) -> Bytes {
let mut buf = BytesMut::new();
match self {
e_stream) = sse::channel(10);
//!
//! // note: sender will typically be spawned or handed off somewhere else
//! let _ = sender.send(sse::Event::Comment("my comment".into())).await;
//! let _ = sender.send(sse::Data::new("my data").event("chat_msg")).await;
//!
//! sse_stream.with_retry_duration(Duration::from_secs(10))
//! }
//!
//! #[get("/from-stream")]
//! async fn from_stream() -> impl Responder {
//! let event_stream = futures_util::stream::iter([
//! Ok::<_, Infallible>(sse::Event::Data(sse::Data::new("foo"))),
//! ]);
//!
//! sse::Sse::from_stream(event_stream)
//! .with_keep_alive(Duration::from_secs(5))
//! }
//! ```
//!
//! Complete usage examples can be found in the examples directory of the source code repo.
#![doc(
alias = "server sent",
alias = "server-sent",
alias = "server sent events",
alias = "server-sent events",
alias = "event-stream"
)]
use std::{
convert::Infallible,
pin::Pin,
task::{Context, Poll},
time::p();
}
#[actix_web::test]
async fn test_request_extract() {
let mut req = TestRequest::with_uri("/name/user1/?id=test").to_srv_request();
let resource = ResourceDef::new("/{key}/{value}/");
resource.capture_match_info(req.match_info_mut());
let (req, mut pl) = req.into_parts();
let s = Path::<MyStruct>::from_request(&req, &mut pl).await.unwrap();
assert_eq!(format!("{s}"), "MyStruct(name, user1)");
assert_eq!(
format!("{s:?}"),
"Path(MyStruct { key: \"name\", value: \"user1\" })"
);
let mut s = s.into_inner();
assert_eq!(s.key, "name");
assert_eq!(s.value, "user1");
s.value = "user2".to_string();
assert_eq!(s.value, "user2");
let Path(s) = Path::<(String, String)>::from_request(&req, &mut pl)
.await
.unwrap();
assert_eq!(s.0, "name");
assert_eq!(s.1, "user1");
let mut req = TestRequest::with_uri("/name/32/").to_b enum Err {
Fetch,
}
impl_more::impl_display_enum!(Err, Fetch => "failed to fetch");
impl std::error::Error for Err {}
#[derive(Debug, Deserialize)]
pub struct CfIpsResult {
ipv4_cidrs: Vec<cidr_utils::cidr::Ipv4Cidr>,
ipv6_cidrs: Vec<cidr_utils::cidr::Ipv6Cidr>,
}
#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum CfIpsResponse {
Success { result: CfIpsResult },
Failure { success: bool },
}
/// Trusted IP ranges.
#[derive(Debug)]
pub struct TrustedIps {
cidr_ranges: IpCidrCombiner,
}
impl TrustedIps {
pub fn try_from_response(res: CfIpsResponse) -> Result<Self, Err> {
let ips = match res {
CfIpsResponse::Success { result } => result,
CfIpsResponse::Failure { .. } => {
tracing::error!("parsing response returned success: false");
return Err(Err::Fetch);
}
};
let mut cidr_ranges = IpCidrCombiner::new();
for cidr in ips.ipv4_cidrs {
cidr_ranges.push(IpCidr::Vet req = this.req.take().unwrap();
debug!(
"Failed to deserialize Json<{}> from payload in handler: {}",
core::any::type_name::<T>(),
req.match_name().unwrap_or_else(|| req.path())
);
Err(err.into())
}
Ok(data) => Ok(Json(data)),
};
Poll::Ready(res)
}
}
/// Future that resolves to some `T` when parsed from a JSON payload.
///
/// Can deserialize any type `T` that implements [`Deserialize`][serde::Deserialize].
///
/// Returns error if:
/// - `Content-Type` is not `application/json`.
/// - `Content-Length` is greater than `LIMIT`.
/// - The payload, when consumed, is not valid JSON.
pub enum JsonBody<T, const LIMIT: usize> {
Error(Option<JsonPayloadError>),
Body {
/// Length as reported by `Content-Length` header, if present.
length: Option<usize>,
// #[cfg(feature = "__compress")]
// payload: Decompress<Payload>,
// next load in handler loads new value
let extracted_data = SwapData::<NonCopy>::extract(&req).await.unwrap();
assert_eq!(**extracted_data.load(), NonCopy(80));
// initial extracted data stays the same
assert_eq!(*initial_data, NonCopy(42));
}
}
//! Experimental body types.
//!
//! Analogous to the `body` module in Actix Web.
pub use crate::{
body_async_write::{writer, Writer},
body_channel::{channel, Sender},
infallible_body_stream::{new_infallible_body_stream, new_infallible_sized_stream},
};
//! Experimental typed headers.
pub use crate::{
cache_control::{CacheControl, CacheDirective},
content_length::ContentLength,
forwarded::Forwarded,
strict_transport_security::StrictTransportSecurity,
};
#[cfg(test)]
mod header_test_helpers {
use std::fmt;
use actix_http::header::Header;
use actix_web::{test, HttpRequest};
fn req_from_raw_headers<H: Header, I: IntoIterator<Item = V>, V: AsRef<[u8]>>(
header_lines: I,
)e {
fn eq(&self, other: &ContentLength) -> bool {
*self == other.0
}
}
impl PartialOrd<usize> for ContentLength {
fn partial_cmp(&self, other: &usize) -> Option<std::cmp::Ordering> {
self.0.partial_cmp(other)
}
}
impl PartialOrd<ContentLength> for usize {
fn partial_cmp(&self, other: &ContentLength) -> Option<std::cmp::Ordering> {
self.partial_cmp(&other.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::header::{assert_parse_eq, assert_parse_fail};
#[test]
fn missing_header() {
assert_parse_fail::<ContentLength, _, _>([""; 0]);
assert_parse_fail::<ContentLength, _, _>([""]);
}
#[test]
fn bad_header() {
assert_parse_fail::<ContentLength, _, _>(["-123"]);
assert_parse_fail::<ContentLength, _, _>(["123_456"]);
assert_parse_fail::<ContentLength, _, _>(["123.456"]);
// too large for u64 (2^64, 2^64 + 1)
assert_parse_fail::<ContentLength, _, _>(["18446744073709551616"]);
//! Experimental route guards.
//!
//! Analogous to the `guard` module in Actix Web.
#[allow(deprecated)]
pub use crate::acceptable::Acceptable;
//! Extractor for client IP addresses when proxied through Cloudflare.
// #![forbid(unsafe_code)] // urgh why cidr-utils
#![deny(rust_2018_idioms, nonstandard_style)]
#![warn(future_incompatible)]
// #![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
mod extract;
mod fetch_cf_ips;
mod header_v4;
// mod header_v6;
pub use self::extract::TrustedClientIp;
#[cfg(feature = "fetch-ips")]
pub use self::fetch_cf_ips::fetch_trusted_cf_ips;
pub use self::fetch_cf_ips::{TrustedIps, CF_URL_IPS};
pub use self::header_v4::CfConnectingIp;
//! Forwarded typed header.
//!
//! See [`Forwarded`] docs.
use std::{convert::Infallible, str};
use actix_web::{
error::ParseError,
http::header::{self, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
HttpMessage,
};
use itertools::Itertools as _;
// TODO: implement typed parsing of Node identifiers as pergest::{generic_array::GenericArray, Digest};
use futures_core::Stream as _;
use pin_project_lite::pin_project;
use tracing::trace;
/// Parts of the resulting body hash extractor.
pub struct BodyHashParts<T> {
/// Extracted item.
pub inner: T,
/// Bytes of the calculated hash.
pub hash_bytes: Vec<u8>,
}
/// Wraps an extractor and calculates a body checksum hash alongside.
///
/// If your extractor would usually be `T` and you want to create a hash of type `D` then you need
/// to use `BodyHash<T, D>`. E.g., `BodyHash<String, Sha256>`.
///
/// Any hasher that implements [`Digest`] can be used. Type aliases for common hashing algorithms
/// are available at the crate root.
///
/// # Errors
/// This extractor produces no errors of its own and all errors from the underlying extractor are
/// propagated correctly; for example, if the payload limits are exceeded.
///
/// # When Used On The Wrong Extractor
/// Use on a non-body extractor is tolerated unless it is used after a different extractor thignature"))?
.ok_or_else(|| error::ErrorUnauthorized("signature not provided"))?
.try_into()
.map_err(|_| error::ErrorInternalServerError("invalid signature"))?;
Ok(Signature::from(sig))
}
}
#[async_trait(?Send)]
impl RequestSignatureScheme for DiscordWebhook {
type Signature = (BytesMut, Signature);
type Error = Error;
async fn init(req: &HttpRequest) -> Result<Self, Self::Error> {
let ts = Self::get_timestamp(req)?.to_owned();
let candidate_signature = Self::get_signature(req)?;
Ok(Self {
candidate_signature,
chunks: vec![Bytes::from(ts)],
})
}
async fn consume_chunk(&mut self, _req: &HttpRequest, chunk: Bytes) -> Result<(), Self::Error> {
self.chunks.push(chunk);
Ok(())
}
async fn finalize(self, _req: &HttpRequest) -> Result<Self::Signature, Self::Error> {
let buf_len = self.chunks.iter().map(|chunk| chunk.len()).sum();
let mut buf = ByteRc<F>,
#[pin]
state: MapResFutState<SvcFut, FnFut>,
}
}
pin_project! {
#[project = MapResFutStateProj]
enum MapResFutState<SvcFut, FnFut> {
Svc { #[pin] fut: SvcFut },
Fn { #[pin] fut: FnFut },
}
}
impl<SvcFut, B, F, FnFut, B2> Future for MapResFut<SvcFut, F, FnFut>
where
SvcFut: Future<Output = Result<ServiceResponse<B>, Error>>,
F: Fn(ServiceResponse<B>) -> FnFut,
FnFut: Future<Output = Result<ServiceResponse<B2>, Error>>,
{
type Output = Result<ServiceResponse<B2>, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.as_mut().project();
match this.state.as_mut().project() {
MapResFutStateProj::Svc { fut } => {
let res = ready!(fut.poll(cx))?;
let fut = (this.mw_fn)(res);
this.state.set(MapResFutState::Fn { fut });
self.poll(cx)
}
MapResFutStateProj::Fn { fut } => fu chunk stream.
///
/// This could be stabilized into Actix Web as `SizedStream::from_infallible()`.
pub fn new_infallible_sized_stream<S: Stream<Item = Bytes>>(
size: u64,
stream: S,
) -> SizedStream<InfallibleStream<S>> {
SizedStream::new(size, InfallibleStream::new(stream))
}
use std::{convert::Infallible, io, time::Duration};
use actix_web::{get, middleware::Logger, App, HttpRequest, HttpServer, Responder};
use actix_web_lab::{extract::Path, respond::Html, sse};
use futures_util::stream;
use time::format_description::well_known::Rfc3339;
use tokio::time::sleep;
#[get("/")]
async fn index() -> impl Responder {
Html(include_str!("./assets/sse.html").to_string())
}
/// Countdown event stream starting from 8.
#[get("/countdown")]
async fn countdown(req: HttpRequest) -> impl Responder {
// note: a more production-ready implementation might want to use the lastEventId header
// sent by the reconnecting browser after the _retry_ period
tracing::debug!("lastEventId: {:?}", req.headere> UrlEncodedFormBody<T, LIMIT> {
/// Create a new future to decode a URL-encoded request payload.
pub fn new(req: &HttpRequest, payload: &mut Payload) -> Self {
// check content-type
let can_parse_form = if let Ok(Some(mime)) = req.mime_type() {
mime == mime::APPLICATION_WWW_FORM_URLENCODED
} else {
false
};
if !can_parse_form {
return UrlEncodedFormBody::Error(Some(UrlencodedError::ContentType));
}
let length = req
.headers()
.get(&header::CONTENT_LENGTH)
.and_then(|l| l.to_str().ok())
.and_then(|s| s.parse::<usize>().ok());
// Notice the content-length is not checked against config limit here.
// As the internal usage always call UrlEncodedBody::limit after UrlEncodedBody::new.
// And limit check to return an error variant of UrlEncodedBody happens there.
let payload = payload.take();
if let Some(len) = length {
new()
/// .wrap(from_fn(my_mw))
/// # ;
/// ```
///
/// It is also possible to write a middleware that automatically uses extractors, similar to request
/// handlers, by declaring them as the first parameters:
/// ```
/// # use std::collections::HashMap;
/// # use actix_web::{
/// # App, Error,
/// # body::MessageBody,
/// # dev::{ServiceRequest, ServiceResponse, Service as _},
/// # web,
/// # };
/// use actix_web_lab::middleware::Next;
///
/// async fn my_extracting_mw(
/// string_body: String,
/// query: web::Query<HashMap<String, String>>,
/// req: ServiceRequest,
/// next: Next<impl MessageBody>,
/// ) -> Result<ServiceResponse<impl MessageBody>, Error> {
/// // pre-processing
/// next.call(req).await
/// // post-processing
/// }
/// # actix_web::App::new().wrap(actix_web_lab::middleware::from_fn(my_extracting_mw));
pub fn from_fn<F, Es>(mw_fn: F) -> MiddlewareFn<F, Es> {
MiddlewareFn {
mw_fn: Rc::new(mw_fn),
_phantom: PhantomData,
}g, PartialEq, Eq, AsRef, Display, From)]
pub struct BodyLimit<T, const LIMIT: usize = DEFAULT_BODY_LIMIT> {
inner: T,
}
impl<T, const LIMIT: usize> BodyLimit<T, LIMIT> {
/// Returns inner extracted type.
pub fn into_inner(self) -> T {
self.inner
}
}
impl<T, const LIMIT: usize> FromRequest for BodyLimit<T, LIMIT>
where
T: FromRequest + 'static,
T::Error: fmt::Debug + fmt::Display,
{
type Error = BodyLimitError<T>;
type Future = BodyLimitFut<T, LIMIT>;
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
// fast check of Content-Length header
match req.get_header::<ContentLength>() {
// CL header indicated that payload would be too large
Some(len) if len > LIMIT => return BodyLimitFut::new_error(BodyLimitError::Overflow),
_ => {}
}
let counter = crate::util::fork_request_payload(payload);
BodyLimitFut {
inner: Inner::Body {
fut: Box::pin( if let Some(st) = req.app_data::<LocalData<T>>() {
ok(st.clone())
} else {
debug!(
"Failed to extract `LocalData<{}>` for `{}` handler. For the LocalData extractor \
to work correctly, wrap the data with `LocalData::new()` and pass it to \
`App::app_data()`. Ensure that types align in both the set and retrieve calls.",
type_name::<T>(),
req.match_name().unwrap_or_else(|| req.path())
);
err(error::ErrorInternalServerError(
"Requested application data is not configured correctly. \
View/enable debug logs for more details.",
))
}
}
}
#[cfg(test)]
mod tests {
use actix_web::{
dev::Service,
http::StatusCode,
test::{init_service, TestRequest},
web, App, HttpResponse,
};
use super::*;
trait TestTrait {
fn get_num(&self) -> i32;
}
struct A {}
impl TestTrhttp://{}:{}", &bind.0, &bind.1);
HttpServer::new(|| {
App::new()
.service(get_user_list)
.service(get_high_mem_user_list)
})
.workers(1)
.bind(bind)?
.run()
.await
}
fn random_email() -> String {
let rng = rand::thread_rng();
let id: String = rng
.sample_iter(Alphanumeric)
.take(10)
.map(char::from)
.collect();
format!("user_{id}@example.com")
}
fn random_address() -> String {
let mut rng = rand::thread_rng();
let street_no: u16 = rng.gen_range(10..99);
format!("{street_no} Random Street")
}
/// Create a `TestRequest` using a DSL that looks kinda like on-the-wire HTTP/1.x requests.
///
/// # Examples
/// ```
/// use actix_web::test::TestRequest;
/// use actix_web_lab::test_request;
///
/// let _req: TestRequest = test_request! {
/// POST "/";
/// "Origin" => "example.com"
/// "Access-Control-Request-Method" => "POST"
/// "Access-Control-Request-Headers" => "Content-Type, X-CSRFxFuture;
use futures_util::FutureExt as _;
/// A middleware to catch panics in wrapped handlers and middleware, returning empty 500 responses.
///
/// **This middleware should never be used as replacement for proper error handling.** See [this
/// thread](https://github.com/actix/actix-web/issues/1501#issuecomment-627517783) for historical
/// discussion on why Actix Web does not do this by default.
///
/// It is recommended that this middleware be registered last. That is, `wrap`ed after everything
/// else except `Logger`.
///
/// # Examples
/// ```
/// # use actix_web::App;
/// use actix_web_lab::middleware::CatchPanic;
///
/// App::new()
/// .wrap(CatchPanic::default())
/// # ;
/// ```
///
/// ```ignore
/// // recommended wrap order
/// App::new()
/// .wrap(NormalizePath::default())
/// .wrap(CatchPanic::default()) // <- after everything except logger
/// .wrap(Logger::default())
/// # ;
/// ```
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct CatchPanic;
impl<S, B> rr(err)) => return Poll::Ready(Err(Overloaded::Service(err))),
res => res.is_ready(),
};
self.is_ready.set(is_ready);
// But we always report Ready, so that layers above don't wait until
// the inner service is ready (the entire point of this layer!)
Poll::Ready(Ok(()))
}
fn call(&self, req: Req) -> Self::Future {
if self.is_ready.get() {
// readiness only counts once, you need to check again!
self.is_ready.set(false);
LoadShedFuture::called(self.inner.call(req))
} else {
LoadShedFuture::overloaded()
}
}
}
pin_project! {
/// Future for [`LoadShedService`].
pub struct LoadShedFuture<F> {
#[pin]
state: LoadShedFutureState<F>,
}
}
pin_project! {
#[project = LoadShedFutureStateProj]
enum LoadShedFutureState<F> {
Called { #[pin] fut: F },
Overloaded,
}
}
impl<F> LoadShedFuture<F> {
pub(crate) fn called(fut: F) -> Se:Blake2s256, "blake2", "Blake2s", 32);
// Blake3
body_hash_alias!(BodyBlake3, blake3::Hasher, "blake3", "Blake3", 32);
//! Demonstrates forking a request payload so that multiple extractors can derive data from a body.
//!
//! ```sh
//! curl -X POST localhost:8080/ -d 'foo'
//!
//! # or using HTTPie
//! http POST :8080/ --raw foo
//! ```
use std::io;
use actix_web::{dev, middleware, web, App, FromRequest, HttpRequest, HttpServer};
use actix_web_lab::util::fork_request_payload;
use futures_util::{future::LocalBoxFuture, TryFutureExt as _};
use tokio::try_join;
use tracing::info;
struct TwoBodies<T, U>(T, U);
impl<T, U> TwoBodies<T, U> {
fn into_parts(self) -> (T, U) {
(self.0, self.1)
}
}
impl<T, U> FromRequest for TwoBodies<T, U>
where
T: FromRequest,
T::Future: 'static,
U: FromRequest,
U::Future: 'static,
{
type Error = actix_web::Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, pl: &mut dev::Payload////something?query=test",
];
for uri in test_uris {
let req = TestRequest::with_uri(uri).to_request();
let res = call_service(&app, req).await;
assert!(res.status().is_success(), "Failed uri: {uri}");
}
}
#[actix_web::test]
async fn always_trailing_slashes() {
let app = init_service(
App::new()
.wrap(NormalizePath::new(TrailingSlash::Always))
.service(web::resource("/").to(HttpResponse::Ok))
.service(web::resource("/v1/something/").to(HttpResponse::Ok))
.service(
web::resource("/v2/something/")
.guard(fn_guard(|ctx| ctx.head().uri.query() == Some("query=test")))
.to(HttpResponse::Ok),
),
)
.await;
let test_uris = vec semantics.
///
/// Use this instead of `NormalizePath::default()` to avoid deprecation warning.
pub fn trim() -> Self {
Self::new(TrailingSlash::Trim)
}
/// Configures middleware to respond to requests with non-normalized paths with a 307 redirect.
///
/// If configured
///
/// For example, a request with the path `/api//v1/foo/` would receive a response with a
/// `Location: /api/v1/foo` header (assuming `Trim` trailing slash behavior.)
///
/// To customize the status code, use [`use_redirects_with`](Self::use_redirects_with).
pub fn use_redirects(mut self) -> Self {
self.use_redirects = Some(StatusCode::TEMPORARY_REDIRECT);
self
}
.body("Hello World!"),
/// );
///
/// assert_response_matches!(res, CREATED;
/// "date" => "today"
/// "set-cookie" => "a=b";
/// @raw "Hello World!"
/// );
///
/// let res = ServiceResponse::new(
/// TestRequest::default().to_http_request(),
/// HttpResponse::Created()
/// .content_type(ContentType::json())
/// .insert_header(("date", "today"))
/// .insert_header(("set-cookie", "a=b"))
/// .body(r#"{"abc":"123"}"#),
/// );
///
/// assert_response_matches!(res, CREATED; @json { "abc": "123" });
/// # });
/// ```
#[macro_export]
macro_rules! assert_response_matches {
($res:ident, $status:ident) => {{
assert_eq!($res.status(), ::actix_web::http::StatusCode::$status)
}};
($res:ident, $status:ident; $($hdr_name:expr => $hdr_val:expr)+) => {{
assert_response_matches!($res, $status);
$(
assert_eq!(
$res.headers().get(::actix_web::http::header::HeaderName::from_static($hdr_name)).unwrap(),
assert_eq!(
Event::Data(Data {
id: Some("42".into()),
event: Some("bar".into()),
data: "foo".into()
})
.into_bytes(),
"id: 42\nevent: bar\ndata: foo\n\n"
);
}
#[test]
fn retry_is_first_msg() {
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
{
let (_sender, mut sse) = channel(9);
assert!(Pin::new(&mut sse).poll_next(&mut cx).is_pending());
}
{
let (_sender, sse) = channel(9);
let mut sse = sse.with_retry_duration(Duration::from_millis(42));
match Pin::new(&mut sse).poll_next(&mut cx) {
Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes, "retry: 42\n\n"),
res => panic!("poll should return retry message, got {res:?}"),
}
}
}
#[actix_web::test]
async fn dropping_responder_causes_send_fails() {
let (sender,(source))] Event),
/// The receiving ([`Sse`]) has been dropped, likely because the client disconnected.
#[display(fmt = "channel closed")]
Closed(#[error(not(source))] Event),
}
#[doc(hidden)]
#[deprecated(
since = "0.17.0",
note = "Renamed to `TrySendError`. Prefer `sse::TrySendError`."
)]
pub type SseTrySendError = TrySendError;
/// Server-sent events data message containing a `data` field and optional `id` and `event` fields.
///
/// Since it implements `Into<SseMessage>`, this can be passed directly to [`send`](SseSender::send)
/// or [`try_send`](SseSender::try_send).
///
/// # Examples
/// ```
/// # #[actix_web::main] async fn test() {
/// use std::convert::Infallible;
/// use actix_web::body;
/// use serde::Serialize;
/// use futures_util::stream;
/// use actix_web_lab::sse;
///
/// #[derive(serde::Serialize)]
/// struct Foo {
/// bar: u32,
/// }
///
/// let sse = sse::Sse::from_stream(stream::iter([
/// Ok::<_, Infallible>(sse::Event::Data(sse::Data::new("foo"))),
/// .app_data(web::Data::new(42u32))
.default_service(web::to(handler))
});
let res = srv.post("/").send_body("foo").await.unwrap();
assert_eq!(res.status(), StatusCode::OK);
}
//! Alternative approach to using `BodyHmac` type using more flexible `RequestSignature` type.
use std::io;
use actix_web::{
middleware::Logger,
web::{self, Bytes, Data},
App, Error, FromRequest, HttpRequest, HttpServer,
};
use actix_web_lab::extract::{RequestSignature, RequestSignatureScheme};
use async_trait::async_trait;
use digest::{CtOutput, Mac};
use hmac::SimpleHmac;
use sha2::Sha256;
use tracing::info;
struct AbcSigningKey([u8; 32]);
/// Grabs variable signing key from app data.
async fn get_signing_key<Key>(req: &HttpRequest) -> actix_web::Result<[u8; 32]> {
let key = Data::<AbcSigningKey>::extract(req).into_inner()?.0;
Ok(key)
}
#[derive(Debug)]
struct AbcApi {
/// Payload hash state.
hmac: SimpleHmac<Sha256>,
}
#[async_trait(?Send)]
impl RequestSignatureScheme for Aody},
/// web::{BufMut as _, BytesMut},
/// HttpRequest,
/// };
///
/// async fn append_bytes(
/// _req: HttpRequest,
/// body: impl MessageBody
/// ) -> actix_web::Result<impl MessageBody> {
/// let buf = body::to_bytes(body).await.ok().unwrap();
///
/// let mut body = BytesMut::from(&buf[..]);
/// body.put_slice(b" - hope you like things ruining your payload format");
///
/// Ok(body)
/// }
/// # actix_web::App::new().wrap(map_response_body(append_bytes));
/// ```
pub fn map_response_body<F>(mapper_fn: F) -> MapResBodyMiddleware<F> {
MapResBodyMiddleware {
mw_fn: Rc::new(mapper_fn),
}
}
/// Middleware transform for [`map_response_body`].
pub struct MapResBodyMiddleware<F> {
mw_fn: Rc<F>,
}
impl<S, F, Fut, B, B2> Transform<S, ServiceRequest> for MapResBodyMiddleware<F>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
F: Fn(HttpRequest, B) -> Fut,
Fut: Future<Output = Result<B2, Error>>,
B2: MessageBody,
{
tains("JSON payload (16 bytes) is larger than allowed (limit: 10 bytes)."),
"unexpected error string: {err:?}"
);
let (req, mut pl) = TestRequest::default()
.insert_header(header::ContentType::json())
.insert_header((
header::CONTENT_LENGTH,
header::HeaderValue::from_static("16"),
))
.set_payload(Bytes::from_static(b"{\"name\": \"test\"}"))
.to_http_parts();
let s = Json::<MyObject, 10>::from_request(&req, &mut pl).await;
let err = format!("{}", s.unwrap_err());
assert!(
err.contains("larger than allowed"),
"unexpected error string: {err:?}"
);
}
#[actix_web::test]
async fn test_json_body() {
let (req, mut pl) = TestRequest::default().to_http_parts();
let json = JsonBody::<MyObject, DEFAULT_JSON_LIMIT>::new(&req, &mut pl).await;
assert!(json_eq(json.unwrap_err(), JsonPayloadError::ContentType));
zeOwned;
use tracing::debug;
/// Default JSON payload size limit of 2MiB.
pub const DEFAULT_JSON_LIMIT: usize = 2_097_152;
/// JSON extractor with const-generic payload size limit.
///
/// `Json` is used to extract typed data from JSON request payloads.
///
/// # Extractor
/// To extract typed data from a request body, the inner type `T` must implement the
/// [`serde::Deserialize`] trait.
///
/// Use the `LIMIT` const generic parameter to control the payload size limit. The default limit
/// that is exported (`DEFAULT_LIMIT`) is 2MiB.
///
/// ```
/// use actix_web::{post, App};
/// use actix_web_lab::extract::{DEFAULT_JSON_LIMIT, Json};
/// use serde::Deserialize;
///
/// #[derive(Deserialize)]
/// struct Info {
/// username: String,
/// }
///
/// /// Deserialize `Info` from request's body.
/// #[post("/")]
/// async fn index(info: Json<Info>) -> String {
/// format!("Welcome {}!", info.username)
/// }
///
/// const LIMIT_32_MB: usize = 33_554_432;
///
/// /// Deserialize payload with a higher 32Mi /// async fn handler() -> impl Responder {
/// let data_stream = streaming_data_source();
///
/// DisplayStream::new_infallible(data_stream)
/// .into_responder()
/// }
/// ```
pub struct DisplayStream<S> {
// The wrapped item stream.
#[pin]
stream: S,
}
}
impl<S> DisplayStream<S> {
/// Constructs a new `DisplayStream` from a stream of lines.
pub fn new(stream: S) -> Self {
Self { stream }
}
}
impl<S> DisplayStream<S> {
/// Constructs a new `DisplayStream` from an infallible stream of lines.
pub fn new_infallible(stream: S) -> DisplayStream<InfallibleStream<S>> {
DisplayStream::new(InfallibleStream::new(stream))
}
}
impl<S, T, E> DisplayStream<S>
where
S: Stream<Item = Result<T, E>>,
T: fmt::Display,
E: Into<Box<dyn StdError>> + 'static,
{
/// Creates a chunked body stream that serializes as CSV on-the-fly.
pub fn into_body_stream(self) -> impl MessageBody {
BodyStr
#[test]
fn for_multiple() {
let fwd = Forwarded {
r#for: vec!["192.0.2.60".to_owned(), "198.51.100.17".to_owned()],
..Forwarded::default()
};
assert_eq!(fwd.for_client().unwrap(), "192.0.2.60");
assert_parse_eq::<Forwarded, _, _>(["for=192.0.2.60, for=198.51.100.17"], fwd);
}
}
//! Expiremental responders and response helpers.
pub use crate::{csv::Csv, display_stream::DisplayStream, html::Html, ndjson::NdJson};
#[cfg(feature = "cbor")]
pub use crate::cbor::Cbor;
#[cfg(feature = "msgpack")]
pub use crate::msgpack::{MessagePack, MessagePackNamed};
//! Content-Length typed header.
//!
//! See [`ContentLength`] docs.
use std::{convert::Infallible, str};
use actix_web::{
error::ParseError,
http::header::{
from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue, CONTENT_LENGTH,
},
HttpMessage,
};
/// `Content-Length` header, defined in [RFC 9110 §8.6].
///
/// The "Content-Length" header field indicatw `Forwarded` header from a single "for" identifier.
pub fn new_for(r#for: impl Into<String>) -> Self {
Self {
by: None,
r#for: vec![r#for.into()],
host: None,
proto: None,
}
}
/// Returns first "for" parameter which is typically the client's identifier.
pub fn for_client(&self) -> Option<&str> {
// Taking the first value for each property is correct because spec states that first "for"
// value is client and rest are proxies. We collect them in the order they are read.
//
// ```plain
// > In a chain of proxy servers where this is fully utilized, the first
// > "for" parameter will disclose the client where the request was first
// > made, followed by any subsequent proxy identifiers.
// - https://datatracker.ietf.org/doc/html/rfc7239#section-5.2
// ```
self.r#for.first().map(String::as_str)
}
/// Returns iterator over the "for" chain.
/nue, // we can't read the file
};
let filename = match entry.file_name().into_string() {
Ok(filename) => filename,
Err(_) => continue, // the file has a non UTF-8 name
};
let mut entry = zipper
.write_entry_stream(ZipEntryBuilder::new(
filename,
async_zip::Compression::Deflate,
))
.await
.map_err(zip_to_io_err)?;
tokio::io::copy(&mut file, &mut entry).await?;
entry.close().await.map_err(zip_to_io_err)?;
}
Ok(())
}
#[get("/")]
async fn index() -> impl Responder {
let (wrt, body) = body::writer();
// allow response to be started while this is processing
#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(async move {
let mut zipper = async_zip::write::ZipFileWriter::new(wrt);
if let Err(err) = read_dir(&mut zipper).await {
tracing::warn!("Failed to write files from directory to zip: {err}")
tatic")
/// .static_resources_location("./examples/assets")
/// .finish()
/// );
/// ```
#[cfg(feature = "spa")]
pub fn spa() -> Spa {
Spa::default()
}
//! MessagePack responder.
use actix_web::{HttpRequest, HttpResponse, Responder};
use bytes::Bytes;
use derive_more::{Deref, DerefMut, Display};
use mime::Mime;
use once_cell::sync::Lazy;
use serde::Serialize;
static MSGPACK_MIME: Lazy<Mime> = Lazy::new(|| "application/msgpack".parse().unwrap());
/// MessagePack responder.
///
/// If you require the fields to be named, use [`MessagePackNamed`].
#[derive(Debug, Deref, DerefMut, Display)]
pub struct MessagePack<T>(pub T);
impl<T: Serialize> Responder for MessagePack<T> {
type Body = Bytes;
fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
let body = Bytes::from(rmp_serde::to_vec(&self.0).unwrap());
HttpResponse::Ok()
.content_type(MSGPACK_MIME.clone())
.message_body(body)
.unwrap()
}
}
/// Macing::warn!("client disconnected; could not send SSE message");
break;
}
sleep(Duration::from_secs(10)).await;
}
});
sse.with_keep_alive(Duration::from_secs(3))
}
#[actix_web::main]
async fn main() -> io::Result<()> {
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
tracing::info!("starting HTTP server at http://localhost:8080");
HttpServer::new(|| {
App::new()
.service(index)
.service(countdown)
.service(countdown_from)
.service(timestamp)
.wrap(Logger::default())
})
.workers(2)
.bind(("127.0.0.1", 8080))?
.run()
.await
}
use std::{
future::{ready, Future, Ready},
marker::PhantomData,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use actix_service::{forward_ready, Service, Transform};
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
Error,
};
use futures_core::read
/// Constructs new panic reporter middleware with `callback`.
pub fn new(callback: impl Fn(&(dyn Any + Send)) + 'static) -> Self {
Self {
cb: Rc::new(callback),
}
}
}
impl std::fmt::Debug for PanicReporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PanicReporter")
.field("cb", &"<callback>")
.finish()
}
}
impl<S, Req> Transform<S, Req> for PanicReporter
where
S: Service<Req>,
S::Future: 'static,
{
type Response = S::Response;
type Error = S::Error;
type Transform = PanicReporterMiddleware<S>;
type InitError = ();
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ready(Ok(PanicReporterMiddleware {
service: Rc::new(service),
cb: Rc::clone(&self.cb),
}))
}
}
pub struct PanicReporterMiddleware<S> {
service: Rc<S>,
cb: PanicCallback,
}
immBody::Error(Some(UrlencodedError::Overflow {
size: len,
limit: LIMIT,
}));
}
}
UrlEncodedFormBody::Body {
length,
payload,
buf: web::BytesMut::with_capacity(8192),
_res: PhantomData,
}
}
}
impl<T: DeserializeOwned, const LIMIT: usize> Future for UrlEncodedFormBody<T, LIMIT> {
type Output = Result<T, UrlencodedError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
match this {
UrlEncodedFormBody::Body { buf, payload, .. } => loop {
let res = ready!(Pin::new(&mut *payload).poll_next(cx));
match res {
Some(chunk) => {
let chunk = chunk?;
let buf_len = buf.len() + chunk.len();
if buf_len > LIMIT {
return Poll::Ready(Err(Urlenclt<Self::Response, Self::Error>>;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
self.service.call(req)
}
}
#[cfg(test)]
mod tests {
use actix_web::{
http::header::{self, HeaderValue},
middleware::{Compat, Logger},
test, web, App, HttpResponse,
};
use super::*;
async fn noop<B>(req: ServiceRequest, next: Next<B>) -> Result<ServiceResponse<B>, Error> {
next.call(req).await
}
async fn add_res_header<B>(
req: ServiceRequest,
next: Next<B>,
) -> Result<ServiceResponse<B>, Error> {
let mut res = next.call(req).await?;
res.headers_mut()
.insert(header::WARNING, HeaderValue::from_static("42"));
Ok(res)
}
async fn mutate_body_type(
req: ServiceRequest,
next: Next<impl MessageBody + 'static>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
let res = next.call(req).await?;
Ok(res.map_into_left_body:::Overflow"),
}
}
}
impl<T> ResponseError for BodyLimitError<T>
where
T: FromRequest + 'static,
T::Error: fmt::Debug + fmt::Display,
{
}
#[cfg(test)]
mod tests {
use actix_web::{http::header, test::TestRequest};
use bytes::Bytes;
use super::*;
static_assertions::assert_impl_all!(BodyLimitFut<(), 100>: Unpin);
static_assertions::assert_impl_all!(BodyLimitFut<Bytes, 100>: Unpin);
#[actix_web::test]
async fn within_limit() {
let (req, mut pl) = TestRequest::default()
.insert_header(header::ContentType::plaintext())
.insert_header((
header::CONTENT_LENGTH,
header::HeaderValue::from_static("9"),
))
.set_payload(Bytes::from_static(b"123456789"))
.to_http_parts();
let body = BodyLimit::<Bytes, 10>::from_request(&req, &mut pl).await;
assert_eq!(
body.ok().unwrap().into_inner(),
Bytes::from_static(b"123456789")
);
}
actix_utils::future::ok;
use actix_web::{
body::BoxBody,
dev::{fn_service, Service, ServiceRequest, ServiceResponse},
http::StatusCode,
Error, HttpResponseBuilder,
};
/// Creates service that always responds with given status code and echoes request path as response
/// body.
pub fn echo_path_service(
status_code: StatusCode,
) -> impl Service<ServiceRequest, Response = ServiceResponse<BoxBody>, Error = Error> {
fn_service(move |req: ServiceRequest| {
let path = req.path().to_owned();
ok(req.into_response(HttpResponseBuilder::new(status_code).body(path)))
})
}
use std::{convert::Infallible, error::Error as StdError};
use actix_web::{
body::{BodyStream, MessageBody},
HttpResponse, Responder,
};
use bytes::{Bytes, BytesMut};
use futures_core::Stream;
use futures_util::TryStreamExt as _;
use mime::Mime;
use pin_project_lite::pin_project;
use serde::Serialize;
use crate::util::{InfallibleStream, MutWriter};
pin_project! {
/// A buffered CSV serializing hex!("03ac6742 16f3e15c 761ee1a5 e255f067 953623c8 b388b445 9e13f978 d7c846f4").as_ref()
);
}
#[actix_web::test]
async fn use_on_wrong_extractor_in_wrong_order() {
let app = test::init_service(App::new().route(
"/",
web::get().to(
|_body: Json<u64, 4>, null: BodyHash<(), Sha256>| async move {
Bytes::copy_from_slice(null.hash())
},
),
))
.await;
let req = test::TestRequest::default().set_json(1234).to_request();
let res = test::call_service(&app, req).await;
assert_eq!(res.status(), StatusCode::OK);
let body = test::read_body(res).await;
// if the hash wrapper is on a non-body extractor _and_ a body extractor has already taken the
// payload, this should return the empty input hash
assert_eq!(
body,
hex!("e3b0c442 98fc1c14 9afbf4c8 996fb924 27ae41e4 649b934c a495991b 7852b855").as_ref()
);
}
//! How to use `NdJson` as an efficient streaming response type.
//!
//! The same techniquedy type.
///
/// # Examples
/// ```
/// # use actix_web::{HttpResponse, web};
/// use std::convert::Infallible;
/// use actix_web_lab::body;
///
/// # async fn index() {
/// let (mut body_tx, body) = body::channel::<Infallible>();
///
/// let _ = web::block(move || {
/// body_tx.send(web::Bytes::from_static(b"body from another thread")).unwrap();
/// });
///
/// HttpResponse::Ok().body(body)
/// # ;}
/// ```
pub fn channel<E: Into<BoxError>>() -> (Sender<E>, impl MessageBody) {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
(Sender::new(tx), Receiver::new(rx))
}
/// A channel-like sender for body chunks.
#[derive(Debug, Clone)]
pub struct Sender<E> {
tx: UnboundedSender<Result<Bytes, E>>,
}
impl<E> Sender<E> {
fn new(tx: UnboundedSender<Result<Bytes, E>>) -> Self {
Self { tx }
}
/// Submits a chunk of bytes to the response body stream.
///
/// # Errors
/// Errors if other side of channel body was dropped, returning `chunk`.
pub fn send(&mut self, _env(env_logger::Env::new().default_filter_or("info"));
info!("staring server at http://localhost:8080");
HttpServer::new(|| {
App::new()
.wrap(middleware::Logger::default().log_target("@"))
.route(
"/",
web::post().to(|body: TwoBodies<String, web::Bytes>| async move {
let (string, bytes) = body.into_parts();
// proves that body was extracted twice since the bytes extracted are byte-equal to
// the string, without forking the request payload, the bytes parts would be empty
assert_eq!(string.as_bytes(), &bytes);
// echo string
string
}),
)
})
.workers(1)
.bind(("127.0.0.1", 8080))?
.run()
.await
}
use std::borrow::Cow;
use actix_files::{Files, NamedFile};
use actix_service::fn_service;
use actix_web::dev::{HttpServiceFactory, ResourceDef, ServiceRequest, ServiceResponseequest for Query<T> {
type Error = Error;
type Future = Ready<Result<Self, Error>>;
#[inline]
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
serde_html_form::from_str::<T>(req.query_string())
.map(|val| ready(Ok(Query(val))))
.unwrap_or_else(move |e| {
let err = QueryPayloadError::Deserialize(e);
debug!(
"Failed during Query extractor deserialization. \
Request path: {:?}",
req.path()
);
ready(Err(err.into()))
})
}
}
#[cfg(test)]
mod tests {
use actix_web::test::TestRequest;
use derive_more::Display;
use serde::Deserialize;
use super::*;
#[derive(Deserialize, Debug, Display)]
struct Id {
id: String,
}