1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
use event_listener::Event;
use http::HeaderMap;
use once_cell::sync::OnceCell;
use std::{sync::Arc, time::Duration};

/// Holds the current state of a trailer for a response.
///
/// This object acts as a shared handle that can be cloned and polled from
/// multiple threads to wait for and act on the response trailer.
///
/// There are two typical workflows for accessing trailer headers:
///
/// - If you are consuming the response body and then accessing the headers
///   afterward, then all trailers are guaranteed to have arrived (if any).
///   [`Trailer::try_get`] will allow you to access them without extra overhead.
/// - If you are handling trailers in a separate task, callback, or thread, then
///   either [`Trailer::wait`] or [`Trailer::wait_async`] will allow you to wait
///   for the trailer headers to arrive and then handle them.
///
/// Note that in either approach, trailer headers are delivered to your
/// application as a single [`HeaderMap`]; it is not possible to handle
/// individual headers as they arrive.
#[derive(Clone, Debug)]
pub struct Trailer {
    shared: Arc<Shared>,
}

#[derive(Debug)]
struct Shared {
    headers: OnceCell<HeaderMap>,
    ready: Event,
}

impl Trailer {
    /// Get a populated trailer handle containing no headers.
    pub(crate) fn empty() -> &'static Self {
        static EMPTY: OnceCell<Trailer> = OnceCell::new();

        EMPTY.get_or_init(|| Self {
            shared: Arc::new(Shared {
                headers: OnceCell::from(HeaderMap::new()),
                ready: Event::new(),
            }),
        })
    }

    /// Returns true if the trailer has been received (if any).
    ///
    /// The trailer will not be received until the body stream associated with
    /// this response has been fully consumed.
    #[inline]
    pub fn is_ready(&self) -> bool {
        self.try_get().is_some()
    }

    /// Attempt to get the trailer headers without blocking. Returns `None` if
    /// the trailer has not been received yet.
    #[inline]
    pub fn try_get(&self) -> Option<&HeaderMap> {
        self.shared.headers.get()
    }

    /// Block the current thread until the trailer headers arrive, and then
    /// return them.
    ///
    /// This is a blocking operation! If you are writing an asynchronous
    /// application, then you probably want to use [`Trailer::wait_async`]
    /// instead.
    pub fn wait(&self) -> &HeaderMap {
        loop {
            // Fast path: If the headers are already set, return them.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Headers not set, jump into the slow path by creating a new
            // listener for the ready event.
            let listener = self.shared.ready.listen();

            // Double-check that the headers are not set.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Otherwise, block until they are set.
            listener.wait();

            // If we got the notification, then the headers are likely to be
            // set.
            if let Some(headers) = self.try_get() {
                return headers;
            }
        }
    }

    /// Block the current thread until the trailer headers arrive or a timeout
    /// expires.
    ///
    /// If the given timeout expired before the trailer arrived then `None` is
    /// returned.
    ///
    /// This is a blocking operation! If you are writing an asynchronous
    /// application, then you probably want to use [`Trailer::wait_async`]
    /// instead.
    pub fn wait_timeout(&self, timeout: Duration) -> Option<&HeaderMap> {
        // Fast path: If the headers are already set, return them.
        if let Some(headers) = self.try_get() {
            return Some(headers);
        }

        // Headers not set, jump into the slow path by creating a new listener
        // for the ready event.
        let listener = self.shared.ready.listen();

        // Double-check that the headers are not set.
        if let Some(headers) = self.try_get() {
            return Some(headers);
        }

        // Otherwise, block with a timeout.
        if listener.wait_timeout(timeout) {
            self.try_get()
        } else {
            None
        }
    }

    /// Wait asynchronously until the trailer headers arrive, and then return
    /// them.
    pub async fn wait_async(&self) -> &HeaderMap {
        loop {
            // Fast path: If the headers are already set, return them.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Headers not set, jump into the slow path by creating a new
            // listener for the ready event.
            let listener = self.shared.ready.listen();

            // Double-check that the headers are not set.
            if let Some(headers) = self.try_get() {
                return headers;
            }

            // Otherwise, wait asynchronously until they are.
            listener.await;

            // If we got the notification, then the headers are likely to be
            // set.
            if let Some(headers) = self.try_get() {
                return headers;
            }
        }
    }
}

pub(crate) struct TrailerWriter {
    shared: Arc<Shared>,
    headers: Option<HeaderMap>,
}

impl TrailerWriter {
    pub(crate) fn new() -> Self {
        Self {
            shared: Arc::new(Shared {
                headers: Default::default(),
                ready: Event::new(),
            }),
            headers: Some(HeaderMap::new()),
        }
    }

    pub(crate) fn trailer(&self) -> Trailer {
        Trailer {
            shared: self.shared.clone(),
        }
    }

    pub(crate) fn get_mut(&mut self) -> Option<&mut HeaderMap> {
        self.headers.as_mut()
    }

    #[inline]
    pub(crate) fn flush(&mut self) {
        if !self.flush_impl() {
            tracing::warn!("tried to flush trailer multiple times");
        }
    }

    fn flush_impl(&mut self) -> bool {
        if let Some(headers) = self.headers.take() {
            let _ = self.shared.headers.set(headers);

            // Wake up any calls waiting for the headers.
            self.shared.ready.notify(usize::max_value());

            true
        } else {
            false
        }
    }
}

impl Drop for TrailerWriter {
    fn drop(&mut self) {
        self.flush_impl();
    }
}