Skip to content

Commit c4e0fd2

Browse files
authored
refactor(http/retry): outline bounded replay buffer (#3597)
the `ReplayBody<B>` middleware makes use of a `BufList` type to hold a reference to bytes yielded by the inner body `B`. a `Data` enum is composed on top to this, to allow bodies to either return (a) a replay of a previous body, or (b) the iniial bytes yielded by the original body. this branch also takes the step of moving some code out of the `ReplayBody::poll_data(..)` trait method along with inlining `BufList::push_chunk(..)` , a small helper function that is only used once. this is intended to consolidate code related to buffering data yielded by the underlying `B`-typed body, and extricate logic concerning the bounding of this buffer from the now defunct `Body::poll_data()` trait method. see linkerd/linkerd2#8733 for more information about upgrading the proxy to hyper 1.0. this will help make subsequent changes to the model of `ReplayBody<B>` its corresponding `Body` implementation more reviewable, by proactively reorganizing things in advance. --- * refactor(http/retry): outline replay buffer the replay body uses this `BufList` type to hold a reference to bytes yielded by the inner body `B`. the `Data` enum is composed on top to this, to allow bodies to either return a replay of a previous body, or the iniial bytes yielded by the original body. this is all relatively self-contained, so we can move this into a small submodule. Signed-off-by: katelyn martin <[email protected]> * refactor(http/retry): outline replay body buffering this commit moves some code out of the `ReplayBody::poll_data(..)` trait method. this bit of code is where we take a chunk of a data yielded by the inner body, and push it into our replay buffer. if the capacity is exceeded, we flush the buffer. in either case, the code copies the chunk into a cheaply cloneable, contiguous `Bytes`. this is all related to the buffer, so we move it there. Signed-off-by: katelyn martin <[email protected]> * refactor(http/retry): inline `BufList::push_chunk` `push_chunk` is a small helper function that is only used once. now that we have moved our buffering code alongside this type, it's more straightforward to inline this function. Signed-off-by: katelyn martin <[email protected]> * refactor(http/retry): rename `BufList` to `Replay` this structure is responsible for acting as the `bytes::Buf` buffer for the replay of the initial body. this commit renames this to articulate that relationship more directly. Signed-off-by: katelyn martin <[email protected]> * docs(http/retry): polish `Replay` documentation Signed-off-by: katelyn martin <[email protected]> --------- Signed-off-by: katelyn martin <[email protected]>
1 parent 4f7f715 commit c4e0fd2

File tree

2 files changed

+192
-176
lines changed

2 files changed

+192
-176
lines changed

linkerd/http/retry/src/replay.rs

Lines changed: 15 additions & 176 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
1-
use bytes::{Buf, BufMut, Bytes, BytesMut};
1+
use bytes::Buf;
22
use http::HeaderMap;
33
use http_body::{Body, SizeHint};
44
use linkerd_error::Error;
55
use linkerd_http_box::BoxBody;
66
use parking_lot::Mutex;
7-
use std::{collections::VecDeque, io::IoSlice, pin::Pin, sync::Arc, task::Context, task::Poll};
7+
use std::{pin::Pin, sync::Arc, task::Context, task::Poll};
88
use thiserror::Error;
99

10+
pub use self::buffer::{Data, Replay};
11+
12+
mod buffer;
13+
1014
/// Unit tests for [`ReplayBody<B>`].
1115
#[cfg(test)]
1216
mod tests;
@@ -46,21 +50,6 @@ pub struct ReplayBody<B = BoxBody> {
4650
#[error("replay body discarded after reaching maximum buffered bytes limit")]
4751
pub struct Capped;
4852

49-
/// Data returned by `ReplayBody`'s `http_body::Body` implementation is either
50-
/// `Bytes` returned by the initial body, or a list of all `Bytes` chunks
51-
/// returned by the initial body (when replaying it).
52-
#[derive(Debug)]
53-
pub enum Data {
54-
Initial(Bytes),
55-
Replay(BufList),
56-
}
57-
58-
/// Body data composed of multiple `Bytes` chunks.
59-
#[derive(Clone, Debug, Default)]
60-
pub struct BufList {
61-
bufs: VecDeque<Bytes>,
62-
}
63-
6453
#[derive(Debug)]
6554
struct SharedState<B> {
6655
body: Mutex<Option<BodyState<B>>>,
@@ -77,7 +66,7 @@ struct SharedState<B> {
7766

7867
#[derive(Debug)]
7968
struct BodyState<B> {
80-
buf: BufList,
69+
replay: Replay,
8170
trailers: Option<HeaderMap>,
8271
rest: B,
8372
is_completed: bool,
@@ -113,7 +102,7 @@ impl<B: Body> ReplayBody<B> {
113102
was_empty: body.is_end_stream(),
114103
}),
115104
state: Some(BodyState {
116-
buf: Default::default(),
105+
replay: Default::default(),
117106
trailers: None,
118107
rest: body,
119108
is_completed: false,
@@ -180,7 +169,7 @@ where
180169
// when polling the inner body.
181170
tracing::trace!(
182171
replay_body = this.replay_body,
183-
buf.has_remaining = state.buf.has_remaining(),
172+
buf.has_remaining = state.replay.has_remaining(),
184173
body.is_completed = state.is_completed,
185174
body.max_bytes_remaining = state.max_bytes,
186175
"ReplayBody::poll_data"
@@ -189,11 +178,11 @@ where
189178
// If we haven't replayed the buffer yet, and its not empty, return the
190179
// buffered data first.
191180
if this.replay_body {
192-
if state.buf.has_remaining() {
181+
if state.replay.has_remaining() {
193182
tracing::trace!("Replaying body");
194183
// Don't return the buffered data again on the next poll.
195184
this.replay_body = false;
196-
return Poll::Ready(Some(Ok(Data::Replay(state.buf.clone()))));
185+
return Poll::Ready(Some(Ok(Data::Replay(state.replay.clone()))));
197186
}
198187

199188
if state.is_capped() {
@@ -215,7 +204,7 @@ where
215204
// Poll the inner body for more data. If the body has ended, remember
216205
// that so that future clones will not try polling it again (as
217206
// described above).
218-
let mut data = {
207+
let data = {
219208
tracing::trace!("Polling initial body");
220209
match futures::ready!(Pin::new(&mut state.rest).poll_data(cx)) {
221210
Some(Ok(data)) => data,
@@ -230,25 +219,9 @@ where
230219

231220
// If we have buffered the maximum number of bytes, allow *this* body to
232221
// continue, but don't buffer any more.
233-
let length = data.remaining();
234-
state.max_bytes = state.max_bytes.saturating_sub(length);
235-
let chunk = if state.is_capped() {
236-
// If there's data in the buffer, discard it now, since we won't
237-
// allow any clones to have a complete body.
238-
if state.buf.has_remaining() {
239-
tracing::debug!(
240-
buf.size = state.buf.remaining(),
241-
"Buffered maximum capacity, discarding buffer"
242-
);
243-
state.buf = Default::default();
244-
}
245-
data.copy_to_bytes(length)
246-
} else {
247-
// Buffer and return the bytes.
248-
state.buf.push_chunk(data)
249-
};
222+
let chunk = state.record_bytes(data);
250223

251-
Poll::Ready(Some(Ok(Data::Initial(chunk))))
224+
Poll::Ready(Some(Ok(chunk)))
252225
}
253226

254227
/// Polls for an optional **single** [`HeaderMap`] of trailers.
@@ -324,7 +297,7 @@ where
324297

325298
// Otherwise, if we're holding the state but have dropped the inner
326299
// body, the entire body is buffered so we know the exact size hint.
327-
let buffered = state.buf.remaining() as u64;
300+
let buffered = state.replay.remaining() as u64;
328301
let rest_hint = state.rest.size_hint();
329302

330303
// Otherwise, add the inner body's size hint to the amount of buffered
@@ -361,140 +334,6 @@ impl<B> Drop for ReplayBody<B> {
361334
}
362335
}
363336

364-
// === impl Data ===
365-
366-
impl Buf for Data {
367-
#[inline]
368-
fn remaining(&self) -> usize {
369-
match self {
370-
Data::Initial(buf) => buf.remaining(),
371-
Data::Replay(bufs) => bufs.remaining(),
372-
}
373-
}
374-
375-
#[inline]
376-
fn chunk(&self) -> &[u8] {
377-
match self {
378-
Data::Initial(buf) => buf.chunk(),
379-
Data::Replay(bufs) => bufs.chunk(),
380-
}
381-
}
382-
383-
#[inline]
384-
fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize {
385-
match self {
386-
Data::Initial(buf) => buf.chunks_vectored(iovs),
387-
Data::Replay(bufs) => bufs.chunks_vectored(iovs),
388-
}
389-
}
390-
391-
#[inline]
392-
fn advance(&mut self, amt: usize) {
393-
match self {
394-
Data::Initial(buf) => buf.advance(amt),
395-
Data::Replay(bufs) => bufs.advance(amt),
396-
}
397-
}
398-
399-
#[inline]
400-
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
401-
match self {
402-
Data::Initial(buf) => buf.copy_to_bytes(len),
403-
Data::Replay(bufs) => bufs.copy_to_bytes(len),
404-
}
405-
}
406-
}
407-
408-
// === impl BufList ===
409-
410-
impl BufList {
411-
fn push_chunk(&mut self, mut data: impl Buf) -> Bytes {
412-
let len = data.remaining();
413-
// `data` is (almost) certainly a `Bytes`, so `copy_to_bytes` should
414-
// internally be a cheap refcount bump almost all of the time.
415-
// But, if it isn't, this will copy it to a `Bytes` that we can
416-
// now clone.
417-
let bytes = data.copy_to_bytes(len);
418-
// Buffer a clone of the bytes read on this poll.
419-
self.bufs.push_back(bytes.clone());
420-
// Return the bytes
421-
bytes
422-
}
423-
}
424-
425-
impl Buf for BufList {
426-
fn remaining(&self) -> usize {
427-
self.bufs.iter().map(Buf::remaining).sum()
428-
}
429-
430-
fn chunk(&self) -> &[u8] {
431-
self.bufs.front().map(Buf::chunk).unwrap_or(&[])
432-
}
433-
434-
fn chunks_vectored<'iovs>(&'iovs self, iovs: &mut [IoSlice<'iovs>]) -> usize {
435-
// Are there more than zero iovecs to write to?
436-
if iovs.is_empty() {
437-
return 0;
438-
}
439-
440-
// Loop over the buffers in the replay buffer list, and try to fill as
441-
// many iovecs as we can from each buffer.
442-
let mut filled = 0;
443-
for buf in &self.bufs {
444-
filled += buf.chunks_vectored(&mut iovs[filled..]);
445-
if filled == iovs.len() {
446-
return filled;
447-
}
448-
}
449-
450-
filled
451-
}
452-
453-
fn advance(&mut self, mut amt: usize) {
454-
while amt > 0 {
455-
let rem = self.bufs[0].remaining();
456-
// If the amount to advance by is less than the first buffer in
457-
// the buffer list, advance that buffer's cursor by `amt`,
458-
// and we're done.
459-
if rem > amt {
460-
self.bufs[0].advance(amt);
461-
return;
462-
}
463-
464-
// Otherwise, advance the first buffer to its end, and
465-
// continue.
466-
self.bufs[0].advance(rem);
467-
amt -= rem;
468-
469-
self.bufs.pop_front();
470-
}
471-
}
472-
473-
fn copy_to_bytes(&mut self, len: usize) -> Bytes {
474-
// If the length of the requested `Bytes` is <= the length of the front
475-
// buffer, we can just use its `copy_to_bytes` implementation (which is
476-
// just a reference count bump).
477-
match self.bufs.front_mut() {
478-
Some(first) if len <= first.remaining() => {
479-
let buf = first.copy_to_bytes(len);
480-
// If we consumed the first buffer, also advance our "cursor" by
481-
// popping it.
482-
if first.remaining() == 0 {
483-
self.bufs.pop_front();
484-
}
485-
486-
buf
487-
}
488-
_ => {
489-
assert!(len <= self.remaining(), "`len` greater than remaining");
490-
let mut buf = BytesMut::with_capacity(len);
491-
buf.put(self.take(len));
492-
buf.freeze()
493-
}
494-
}
495-
}
496-
}
497-
498337
// === impl BodyState ===
499338

500339
impl<B> BodyState<B> {

0 commit comments

Comments
 (0)