1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209
use event_listener::Event;
use http::HeaderMap;
use once_cell::sync::OnceCell;
use std::{sync::Arc, time::Duration};
/// Holds the current state of a trailer for a response.
///
/// This object acts as a shared handle that can be cloned and polled from
/// multiple threads to wait for and act on the response trailer.
///
/// There are two typical workflows for accessing trailer headers:
///
/// - If you are consuming the response body and then accessing the headers
/// afterward, then all trailers are guaranteed to have arrived (if any).
/// [`Trailer::try_get`] will allow you to access them without extra overhead.
/// - If you are handling trailers in a separate task, callback, or thread, then
/// either [`Trailer::wait`] or [`Trailer::wait_async`] will allow you to wait
/// for the trailer headers to arrive and then handle them.
///
/// Note that in either approach, trailer headers are delivered to your
/// application as a single [`HeaderMap`]; it is not possible to handle
/// individual headers as they arrive.
#[derive(Clone, Debug)]
pub struct Trailer {
shared: Arc<Shared>,
}
#[derive(Debug)]
struct Shared {
headers: OnceCell<HeaderMap>,
ready: Event,
}
impl Trailer {
/// Get a populated trailer handle containing no headers.
pub(crate) fn empty() -> &'static Self {
static EMPTY: OnceCell<Trailer> = OnceCell::new();
EMPTY.get_or_init(|| Self {
shared: Arc::new(Shared {
headers: OnceCell::from(HeaderMap::new()),
ready: Event::new(),
}),
})
}
/// Returns true if the trailer has been received (if any).
///
/// The trailer will not be received until the body stream associated with
/// this response has been fully consumed.
#[inline]
pub fn is_ready(&self) -> bool {
self.try_get().is_some()
}
/// Attempt to get the trailer headers without blocking. Returns `None` if
/// the trailer has not been received yet.
#[inline]
pub fn try_get(&self) -> Option<&HeaderMap> {
self.shared.headers.get()
}
/// Block the current thread until the trailer headers arrive, and then
/// return them.
///
/// This is a blocking operation! If you are writing an asynchronous
/// application, then you probably want to use [`Trailer::wait_async`]
/// instead.
pub fn wait(&self) -> &HeaderMap {
loop {
// Fast path: If the headers are already set, return them.
if let Some(headers) = self.try_get() {
return headers;
}
// Headers not set, jump into the slow path by creating a new
// listener for the ready event.
let listener = self.shared.ready.listen();
// Double-check that the headers are not set.
if let Some(headers) = self.try_get() {
return headers;
}
// Otherwise, block until they are set.
listener.wait();
// If we got the notification, then the headers are likely to be
// set.
if let Some(headers) = self.try_get() {
return headers;
}
}
}
/// Block the current thread until the trailer headers arrive or a timeout
/// expires.
///
/// If the given timeout expired before the trailer arrived then `None` is
/// returned.
///
/// This is a blocking operation! If you are writing an asynchronous
/// application, then you probably want to use [`Trailer::wait_async`]
/// instead.
pub fn wait_timeout(&self, timeout: Duration) -> Option<&HeaderMap> {
// Fast path: If the headers are already set, return them.
if let Some(headers) = self.try_get() {
return Some(headers);
}
// Headers not set, jump into the slow path by creating a new listener
// for the ready event.
let listener = self.shared.ready.listen();
// Double-check that the headers are not set.
if let Some(headers) = self.try_get() {
return Some(headers);
}
// Otherwise, block with a timeout.
if listener.wait_timeout(timeout) {
self.try_get()
} else {
None
}
}
/// Wait asynchronously until the trailer headers arrive, and then return
/// them.
pub async fn wait_async(&self) -> &HeaderMap {
loop {
// Fast path: If the headers are already set, return them.
if let Some(headers) = self.try_get() {
return headers;
}
// Headers not set, jump into the slow path by creating a new
// listener for the ready event.
let listener = self.shared.ready.listen();
// Double-check that the headers are not set.
if let Some(headers) = self.try_get() {
return headers;
}
// Otherwise, wait asynchronously until they are.
listener.await;
// If we got the notification, then the headers are likely to be
// set.
if let Some(headers) = self.try_get() {
return headers;
}
}
}
}
pub(crate) struct TrailerWriter {
shared: Arc<Shared>,
headers: Option<HeaderMap>,
}
impl TrailerWriter {
pub(crate) fn new() -> Self {
Self {
shared: Arc::new(Shared {
headers: Default::default(),
ready: Event::new(),
}),
headers: Some(HeaderMap::new()),
}
}
pub(crate) fn trailer(&self) -> Trailer {
Trailer {
shared: self.shared.clone(),
}
}
pub(crate) fn get_mut(&mut self) -> Option<&mut HeaderMap> {
self.headers.as_mut()
}
#[inline]
pub(crate) fn flush(&mut self) {
if !self.flush_impl() {
tracing::warn!("tried to flush trailer multiple times");
}
}
fn flush_impl(&mut self) -> bool {
if let Some(headers) = self.headers.take() {
let _ = self.shared.headers.set(headers);
// Wake up any calls waiting for the headers.
self.shared.ready.notify(usize::max_value());
true
} else {
false
}
}
}
impl Drop for TrailerWriter {
fn drop(&mut self) {
self.flush_impl();
}
}