From ba46b1818a5065fd84d969bc4bf2e6f7aa98d9c7 Mon Sep 17 00:00:00 2001 From: edef Date: Thu, 25 Apr 2024 23:27:58 +0000 Subject: fix(nix-compat/bytes): make BytesReader less hazardous We now *never* return the final bytes until we've read the padding in full, so read_exact is safe to use. This is implemented by TrailerReader, which splits the phases of reading (and validating) the final 8-byte block, and providing the contained payload bytes to the caller. Change-Id: I0d05946a8af9c260a18d71d2b763ba7a68b3c27f Reviewed-on: https://cl.tvl.fyi/c/depot/+/11518 Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/nix-compat/src/wire/bytes/reader.rs | 507 ----------------------- tvix/nix-compat/src/wire/bytes/reader/mod.rs | 471 +++++++++++++++++++++ tvix/nix-compat/src/wire/bytes/reader/trailer.rs | 175 ++++++++ 3 files changed, 646 insertions(+), 507 deletions(-) delete mode 100644 tvix/nix-compat/src/wire/bytes/reader.rs create mode 100644 tvix/nix-compat/src/wire/bytes/reader/mod.rs create mode 100644 tvix/nix-compat/src/wire/bytes/reader/trailer.rs diff --git a/tvix/nix-compat/src/wire/bytes/reader.rs b/tvix/nix-compat/src/wire/bytes/reader.rs deleted file mode 100644 index 18a8c6c686..0000000000 --- a/tvix/nix-compat/src/wire/bytes/reader.rs +++ /dev/null @@ -1,507 +0,0 @@ -use std::{ - io, - ops::{Bound, RangeBounds, RangeInclusive}, - pin::Pin, - task::{self, ready, Poll}, -}; -use tokio::io::AsyncRead; - -use super::{padding_len, BytesPacketPosition, LEN_SIZE}; - -/// Reads a "bytes wire packet" from the underlying reader. -/// The format is the same as in [crate::wire::bytes::read_bytes], -/// however this structure provides a [AsyncRead] interface, -/// allowing to not having to pass around the entire payload in memory. -/// -/// After being constructed with the underlying reader and an allowed size, -/// subsequent requests to poll_read will return payload data until the end -/// of the packet is reached. -/// -/// Internally, it will first read over the size packet, filling payload_size, -/// ensuring it fits allowed_size, then return payload data. -/// It will only signal EOF (returning `Ok(())` without filling the buffer anymore) -/// when all padding has been successfully consumed too. -/// -/// This also means, it's important for a user to always read to the end, -/// and not just call read_exact - otherwise it might not skip over the -/// padding, and return garbage when reading the next packet. -/// -/// In case of an error due to size constraints, or in case of not reading -/// all the way to the end (and getting a EOF), the underlying reader is no -/// longer usable and might return garbage. -pub struct BytesReader { - inner: R, - allowed_size: RangeInclusive, - payload_size: [u8; 8], - state: BytesPacketPosition, -} - -impl BytesReader -where - R: AsyncRead + Unpin, -{ - /// Constructs a new BytesReader, using the underlying passed reader. - pub fn new>(r: R, allowed_size: S) -> Self { - let user_len_min = match allowed_size.start_bound() { - Bound::Included(&n) => n, - Bound::Excluded(&n) => n.saturating_add(1), - Bound::Unbounded => 0, - }; - - let user_len_max = match allowed_size.end_bound() { - Bound::Included(&n) => n, - Bound::Excluded(&n) => n.checked_sub(1).unwrap(), - Bound::Unbounded => u64::MAX, - }; - - Self { - inner: r, - allowed_size: user_len_min..=user_len_max, - payload_size: [0; 8], - state: BytesPacketPosition::Size(0), - } - } - - /// Construct a new BytesReader with a known, and already-read size. - pub fn with_size(r: R, size: u64) -> Self { - Self { - inner: r, - allowed_size: size..=size, - payload_size: u64::to_le_bytes(size), - state: if size != 0 { - BytesPacketPosition::Payload(0) - } else { - BytesPacketPosition::Padding(0) - }, - } - } -} -/// Returns an error if the passed usize is 0. -#[inline] -fn ensure_nonzero_bytes_read(bytes_read: usize) -> Result { - if bytes_read == 0 { - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "underlying reader returned EOF", - )) - } else { - Ok(bytes_read) - } -} - -impl AsyncRead for BytesReader -where - R: AsyncRead + Unpin, -{ - fn poll_read( - self: Pin<&mut Self>, - cx: &mut task::Context, - buf: &mut tokio::io::ReadBuf, - ) -> Poll> { - let this = self.get_mut(); - - // Use a loop, so we can deal with (multiple) state transitions. - loop { - match this.state { - BytesPacketPosition::Size(LEN_SIZE) => { - // used in case an invalid size was signalled. - Err(io::Error::new( - io::ErrorKind::InvalidData, - "signalled package size not in allowed range", - ))? - } - BytesPacketPosition::Size(pos) => { - // try to read more of the size field. - // We wrap a ReadBuf around this.payload_size here, and set_filled. - let mut read_buf = tokio::io::ReadBuf::new(&mut this.payload_size); - read_buf.advance(pos); - ready!(Pin::new(&mut this.inner).poll_read(cx, &mut read_buf))?; - - ensure_nonzero_bytes_read(read_buf.filled().len() - pos)?; - - let total_size_read = read_buf.filled().len(); - if total_size_read == LEN_SIZE { - // If the entire payload size was read, parse it - let payload_size = u64::from_le_bytes(this.payload_size); - - if !this.allowed_size.contains(&payload_size) { - // If it's not in the allowed - // range, transition to failure mode - // `BytesPacketPosition::Size(LEN_SIZE)`, where only - // an error is returned. - this.state = BytesPacketPosition::Size(LEN_SIZE) - } else if payload_size == 0 { - // If the payload size is 0, move on to reading padding directly. - this.state = BytesPacketPosition::Padding(0) - } else { - // Else, transition to reading the payload. - this.state = BytesPacketPosition::Payload(0) - } - } else { - // If we still need to read more of payload size, update - // our position in the state. - this.state = BytesPacketPosition::Size(total_size_read) - } - } - BytesPacketPosition::Payload(pos) => { - let signalled_size = u64::from_le_bytes(this.payload_size); - // We don't enter this match arm at all if we're expecting empty payload - debug_assert!(signalled_size > 0, "signalled size must be larger than 0"); - - // Read from the underlying reader into buf - // We cap the ReadBuf to the size of the payload, as we - // don't want to leak padding to the caller. - let bytes_read = ensure_nonzero_bytes_read({ - // Reducing these two u64 to usize on 32bits is fine - we - // only care about not reading too much, not too less. - let mut limited_buf = buf.take((signalled_size - pos) as usize); - ready!(Pin::new(&mut this.inner).poll_read(cx, &mut limited_buf))?; - limited_buf.filled().len() - })?; - - // SAFETY: we just did populate this, but through limited_buf. - unsafe { buf.assume_init(bytes_read) } - buf.advance(bytes_read); - - if pos + bytes_read as u64 == signalled_size { - // If we now read all payload, transition to padding - // state. - this.state = BytesPacketPosition::Padding(0); - } else { - // if we didn't read everything yet, update our position - // in the state. - this.state = BytesPacketPosition::Payload(pos + bytes_read as u64); - } - - // We return from poll_read here. - // This is important, as any error (or even Pending) from - // the underlying reader on the next read (be it padding or - // payload) would require us to roll back buf, as generally - // a AsyncRead::poll_read may not advance the buffer in case - // of a nonsuccessful read. - // It can't be misinterpreted as EOF, as we definitely *did* - // write something into buf if we come to here (we pass - // `ensure_nonzero_bytes_read`). - return Ok(()).into(); - } - BytesPacketPosition::Padding(pos) => { - // Consume whatever padding is left, ensuring it's all null - // bytes. Only return `Ready(Ok(()))` once we're past the - // padding (or in cases where polling the inner reader - // returns `Poll::Pending`). - let signalled_size = u64::from_le_bytes(this.payload_size); - let total_padding_len = padding_len(signalled_size) as usize; - - let padding_len_remaining = total_padding_len - pos; - if padding_len_remaining != 0 { - // create a buffer only accepting the number of remaining padding bytes. - let mut buf = [0; 8]; - let mut padding_buf = tokio::io::ReadBuf::new(&mut buf); - let mut padding_buf = padding_buf.take(padding_len_remaining); - - // read into padding_buf. - ready!(Pin::new(&mut this.inner).poll_read(cx, &mut padding_buf))?; - let bytes_read = ensure_nonzero_bytes_read(padding_buf.filled().len())?; - - this.state = BytesPacketPosition::Padding(pos + bytes_read); - - // ensure the bytes are not null bytes - if !padding_buf.filled().iter().all(|e| *e == b'\0') { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "padding is not all zeroes", - )) - .into(); - } - - // if we still have padding to read, run the loop again. - continue; - } - // return EOF - return Ok(()).into(); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use crate::wire::bytes::write_bytes; - use hex_literal::hex; - use lazy_static::lazy_static; - use rstest::rstest; - use tokio::io::AsyncReadExt; - use tokio_test::{assert_err, io::Builder}; - - use super::*; - - /// The maximum length of bytes packets we're willing to accept in the test - /// cases. - const MAX_LEN: u64 = 1024; - - lazy_static! { - pub static ref LARGE_PAYLOAD: Vec = (0..255).collect::>().repeat(4 * 1024); - } - - /// Helper function, calling the (simpler) write_bytes with the payload. - /// We use this to create data we want to read from the wire. - async fn produce_packet_bytes(payload: &[u8]) -> Vec { - let mut exp = vec![]; - write_bytes(&mut exp, payload).await.unwrap(); - exp - } - - /// Read bytes packets of various length, and ensure read_to_end returns the - /// expected payload. - #[rstest] - #[case::empty(&[])] // empty bytes packet - #[case::size_1b(&[0xff])] // 1 bytes payload - #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding) - #[case::size_9b( &hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding) - #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet - #[tokio::test] - async fn read_payload_correct(#[case] payload: &[u8]) { - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await) - .build(); - - let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64); - let mut buf = Vec::new(); - r.read_to_end(&mut buf).await.expect("must succeed"); - - assert_eq!(payload, &buf[..]); - } - - /// Read bytes packets of various length, and ensure read_to_end returns the - /// expected payload. - #[rstest] - #[case::empty(&[])] // empty bytes packet - #[case::size_1b(&[0xff])] // 1 bytes payload - #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding) - #[case::size_9b( &hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding) - #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet - #[tokio::test] - async fn read_payload_correct_known(#[case] payload: &[u8]) { - let packet = produce_packet_bytes(payload).await; - - let size = u64::from_le_bytes({ - let mut buf = [0; 8]; - buf.copy_from_slice(&packet[..8]); - buf - }); - - let mut mock = Builder::new().read(&packet[8..]).build(); - - let mut r = BytesReader::with_size(&mut mock, size); - let mut buf = Vec::new(); - r.read_to_end(&mut buf).await.expect("must succeed"); - - assert_eq!(payload, &buf[..]); - } - - /// Fail if the bytes packet is larger than allowed - #[tokio::test] - async fn read_bigger_than_allowed_fail() { - let payload = LARGE_PAYLOAD.as_slice(); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet - .build(); - - let mut r = BytesReader::new(&mut mock, ..2048); - let mut buf = Vec::new(); - assert_err!(r.read_to_end(&mut buf).await); - } - - /// Fail if the bytes packet is smaller than allowed - #[tokio::test] - async fn read_smaller_than_allowed_fail() { - let payload = &[0x00, 0x01, 0x02]; - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet - .build(); - - let mut r = BytesReader::new(&mut mock, 1024..2048); - let mut buf = Vec::new(); - assert_err!(r.read_to_end(&mut buf).await); - } - - /// Fail if the padding is not all zeroes - #[tokio::test] - async fn read_fail_if_nonzero_padding() { - let payload = &[0x00, 0x01, 0x02]; - let mut packet_bytes = produce_packet_bytes(payload).await; - // Flip some bits in the padding - packet_bytes[12] = 0xff; - let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - - r.read_to_end(&mut buf).await.expect_err("must fail"); - } - - /// Start a 9 bytes payload packet, but have the underlying reader return - /// EOF in the middle of the size packet (after 4 bytes). - /// We should get an unexpected EOF error, already when trying to read the - /// first byte (of payload) - #[tokio::test] - async fn read_9b_eof_during_size() { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..4]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = [0u8; 1]; - - assert_eq!( - r.read_exact(&mut buf).await.expect_err("must fail").kind(), - std::io::ErrorKind::UnexpectedEof - ); - - assert_eq!(&[0], &buf, "buffer should stay empty"); - } - - /// Start a 9 bytes payload packet, but have the underlying reader return - /// EOF in the middle of the payload (4 bytes into the payload). - /// We should get an unexpected EOF error, after reading the first 4 bytes - /// (successfully). - #[tokio::test] - async fn read_9b_eof_during_payload() { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..8 + 4]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = [0; 9]; - - r.read_exact(&mut buf[..4]).await.expect("must succeed"); - - assert_eq!( - r.read_exact(&mut buf[4..=4]) - .await - .expect_err("must fail") - .kind(), - std::io::ErrorKind::UnexpectedEof - ); - } - - /// Start a 9 bytes payload packet, but return an error at various stages *after* the actual payload. - /// read_exact with a 9 bytes buffer is expected to succeed, but any further - /// read, as well as read_to_end are expected to fail. - #[rstest] - #[case::before_padding(8 + 9)] - #[case::during_padding(8 + 9 + 2)] - #[case::after_padding(8 + 9 + padding_len(9) as usize)] - #[tokio::test] - async fn read_9b_eof_after_payload(#[case] offset: usize) { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..offset]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = [0; 9]; - - // read_exact of the payload will succeed, but a subsequent read will - // return UnexpectedEof error. - r.read_exact(&mut buf).await.expect("should succeed"); - assert_eq!( - r.read_exact(&mut buf[4..=4]) - .await - .expect_err("must fail") - .kind(), - std::io::ErrorKind::UnexpectedEof - ); - - // read_to_end will fail. - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..8 + payload.len()]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - assert_eq!( - r.read_to_end(&mut buf).await.expect_err("must fail").kind(), - std::io::ErrorKind::UnexpectedEof - ); - } - - /// Start a 9 bytes payload packet, but return an error after a certain position. - /// Ensure that error is propagated. - #[rstest] - #[case::during_size(4)] - #[case::before_payload(8)] - #[case::during_payload(8 + 4)] - #[case::before_padding(8 + 4)] - #[case::during_padding(8 + 9 + 2)] - #[tokio::test] - async fn propagate_error_from_reader(#[case] offset: usize) { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..offset]) - .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - - let err = r.read_to_end(&mut buf).await.expect_err("must fail"); - assert_eq!( - err.kind(), - std::io::ErrorKind::Other, - "error kind must match" - ); - - assert_eq!( - err.into_inner().unwrap().to_string(), - "foo", - "error payload must contain foo" - ); - } - - /// If there's an error right after the padding, we don't propagate it, as - /// we're done reading. We just return EOF. - #[tokio::test] - async fn no_error_after_eof() { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await) - .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - - r.read_to_end(&mut buf).await.expect("must succeed"); - assert_eq!(buf.as_slice(), payload); - } - - /// Introduce various stalls in various places of the packet, to ensure we - /// handle these cases properly, too. - #[rstest] - #[case::beginning(0)] - #[case::before_payload(8)] - #[case::during_payload(8 + 4)] - #[case::before_padding(8 + 4)] - #[case::during_padding(8 + 9 + 2)] - #[tokio::test] - async fn read_payload_correct_pending(#[case] offset: usize) { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..offset]) - .wait(Duration::from_nanos(0)) - .read(&produce_packet_bytes(payload).await[offset..]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64); - let mut buf = Vec::new(); - r.read_to_end(&mut buf).await.expect("must succeed"); - - assert_eq!(payload, &buf[..]); - } -} diff --git a/tvix/nix-compat/src/wire/bytes/reader/mod.rs b/tvix/nix-compat/src/wire/bytes/reader/mod.rs new file mode 100644 index 0000000000..8d4eab78f3 --- /dev/null +++ b/tvix/nix-compat/src/wire/bytes/reader/mod.rs @@ -0,0 +1,471 @@ +use std::{ + io, + ops::{Bound, RangeBounds}, + pin::Pin, + task::{self, ready, Poll}, +}; +use tokio::io::{AsyncRead, ReadBuf}; + +use trailer::TrailerReader; +mod trailer; + +/// Reads a "bytes wire packet" from the underlying reader. +/// The format is the same as in [crate::wire::bytes::read_bytes], +/// however this structure provides a [AsyncRead] interface, +/// allowing to not having to pass around the entire payload in memory. +/// +/// After being constructed with the underlying reader and an allowed size, +/// subsequent requests to poll_read will return payload data until the end +/// of the packet is reached. +/// +/// Internally, it will first read over the size packet, filling payload_size, +/// ensuring it fits allowed_size, then return payload data. +/// +/// It will not return the final bytes before all padding has been successfully +/// consumed as well, but the full length of the reader must be consumed. +/// +/// In case of an error due to size constraints, or in case of not reading +/// all the way to the end (and getting a EOF), the underlying reader is no +/// longer usable and might return garbage. +pub struct BytesReader { + state: State, +} + +#[derive(Debug)] +enum State { + Size { + reader: Option, + /// Minimum length (inclusive) + user_len_min: u64, + /// Maximum length (inclusive) + user_len_max: u64, + filled: u8, + buf: [u8; 8], + }, + Body { + reader: Option, + consumed: u64, + user_len: u64, + }, + Trailer(TrailerReader), +} + +impl BytesReader +where + R: AsyncRead + Unpin, +{ + /// Constructs a new BytesReader, using the underlying passed reader. + pub fn new>(reader: R, allowed_size: S) -> Self { + let user_len_min = match allowed_size.start_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n.saturating_add(1), + Bound::Unbounded => 0, + }; + + let user_len_max = match allowed_size.end_bound() { + Bound::Included(&n) => n, + Bound::Excluded(&n) => n.checked_sub(1).unwrap(), + Bound::Unbounded => u64::MAX, + }; + + Self { + state: State::Size { + reader: Some(reader), + user_len_min, + user_len_max, + filled: 0, + buf: [0; 8], + }, + } + } + + /// Construct a new BytesReader with a known, and already-read size. + pub fn with_size(reader: R, size: u64) -> Self { + Self { + state: State::Body { + reader: Some(reader), + consumed: 0, + user_len: size, + }, + } + } +} + +impl AsyncRead for BytesReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context, + buf: &mut ReadBuf, + ) -> Poll> { + let this = &mut self.get_mut().state; + + loop { + match this { + State::Size { + reader, + user_len_min, + user_len_max, + filled: 8, + buf, + } => { + let reader = reader.take().unwrap(); + + let data_len = u64::from_le_bytes(*buf); + if data_len < *user_len_min || data_len > *user_len_max { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid size")) + .into(); + } + + *this = State::Body { + reader: Some(reader), + consumed: 0, + user_len: data_len, + }; + } + State::Size { + reader, + filled, + buf, + .. + } => { + let reader = reader.as_mut().unwrap(); + + let mut read_buf = ReadBuf::new(&mut buf[..]); + read_buf.advance(*filled as usize); + ready!(Pin::new(reader).poll_read(cx, &mut read_buf))?; + + let new_filled = read_buf.filled().len() as u8; + if *filled == new_filled { + return Err(io::ErrorKind::UnexpectedEof.into()).into(); + } + + *filled = new_filled; + } + State::Body { + reader, + consumed, + user_len, + } => { + let body_len = *user_len & !7; + let remaining = body_len - *consumed; + + let reader = if remaining == 0 { + let reader = reader.take().unwrap(); + let user_len = (*user_len & 7) as u8; + *this = State::Trailer(TrailerReader::new(reader, user_len)); + continue; + } else { + reader.as_mut().unwrap() + }; + + let mut bytes_read = 0; + ready!(with_limited(buf, remaining, |buf| { + let ret = Pin::new(reader).poll_read(cx, buf); + bytes_read = buf.initialized().len(); + ret + }))?; + + *consumed += bytes_read as u64; + + return if bytes_read != 0 { + Ok(()) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + } + .into(); + } + State::Trailer(reader) => { + return Pin::new(reader).poll_read(cx, buf); + } + } + } + } +} + +/// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it. +/// After `f` returns, we propagate the filled cursor advancement back to `buf`. +fn with_limited(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R { + let mut nbuf = buf.take(n.try_into().unwrap_or(usize::MAX)); + let ptr = nbuf.initialized().as_ptr(); + let ret = f(&mut nbuf); + + // SAFETY: `ReadBuf::take` only returns the *unfilled* section of `buf`, + // so anything filled is new, initialized data. + // + // We verify that `nbuf` still points to the same buffer, + // so we're sure it hasn't been swapped out. + unsafe { + // ensure our buffer hasn't been swapped out + assert_eq!(nbuf.initialized().as_ptr(), ptr); + + let n = nbuf.filled().len(); + buf.assume_init(n); + buf.advance(n); + } + + ret +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::wire::bytes::{padding_len, write_bytes}; + use hex_literal::hex; + use lazy_static::lazy_static; + use rstest::rstest; + use tokio::io::AsyncReadExt; + use tokio_test::{assert_err, io::Builder}; + + use super::*; + + /// The maximum length of bytes packets we're willing to accept in the test + /// cases. + const MAX_LEN: u64 = 1024; + + lazy_static! { + pub static ref LARGE_PAYLOAD: Vec = (0..255).collect::>().repeat(4 * 1024); + } + + /// Helper function, calling the (simpler) write_bytes with the payload. + /// We use this to create data we want to read from the wire. + async fn produce_packet_bytes(payload: &[u8]) -> Vec { + let mut exp = vec![]; + write_bytes(&mut exp, payload).await.unwrap(); + exp + } + + /// Read bytes packets of various length, and ensure read_to_end returns the + /// expected payload. + #[rstest] + #[case::empty(&[])] // empty bytes packet + #[case::size_1b(&[0xff])] // 1 bytes payload + #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding) + #[case::size_9b( &hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding) + #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet + #[tokio::test] + async fn read_payload_correct(#[case] payload: &[u8]) { + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await) + .build(); + + let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64); + let mut buf = Vec::new(); + r.read_to_end(&mut buf).await.expect("must succeed"); + + assert_eq!(payload, &buf[..]); + } + + /// Read bytes packets of various length, and ensure read_to_end returns the + /// expected payload. + #[rstest] + #[case::empty(&[])] // empty bytes packet + #[case::size_1b(&[0xff])] // 1 bytes payload + #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding) + #[case::size_9b( &hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding) + #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet + #[tokio::test] + async fn read_payload_correct_known(#[case] payload: &[u8]) { + let packet = produce_packet_bytes(payload).await; + + let size = u64::from_le_bytes({ + let mut buf = [0; 8]; + buf.copy_from_slice(&packet[..8]); + buf + }); + + let mut mock = Builder::new().read(&packet[8..]).build(); + + let mut r = BytesReader::with_size(&mut mock, size); + let mut buf = Vec::new(); + r.read_to_end(&mut buf).await.expect("must succeed"); + + assert_eq!(payload, &buf[..]); + } + + /// Fail if the bytes packet is larger than allowed + #[tokio::test] + async fn read_bigger_than_allowed_fail() { + let payload = LARGE_PAYLOAD.as_slice(); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet + .build(); + + let mut r = BytesReader::new(&mut mock, ..2048); + let mut buf = Vec::new(); + assert_err!(r.read_to_end(&mut buf).await); + } + + /// Fail if the bytes packet is smaller than allowed + #[tokio::test] + async fn read_smaller_than_allowed_fail() { + let payload = &[0x00, 0x01, 0x02]; + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet + .build(); + + let mut r = BytesReader::new(&mut mock, 1024..2048); + let mut buf = Vec::new(); + assert_err!(r.read_to_end(&mut buf).await); + } + + /// Fail if the padding is not all zeroes + #[tokio::test] + async fn read_fail_if_nonzero_padding() { + let payload = &[0x00, 0x01, 0x02]; + let mut packet_bytes = produce_packet_bytes(payload).await; + // Flip some bits in the padding + packet_bytes[12] = 0xff; + let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN); + let mut buf = Vec::new(); + + r.read_to_end(&mut buf).await.expect_err("must fail"); + } + + /// Start a 9 bytes payload packet, but have the underlying reader return + /// EOF in the middle of the size packet (after 4 bytes). + /// We should get an unexpected EOF error, already when trying to read the + /// first byte (of payload) + #[tokio::test] + async fn read_9b_eof_during_size() { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..4]) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN); + let mut buf = [0u8; 1]; + + assert_eq!( + r.read_exact(&mut buf).await.expect_err("must fail").kind(), + std::io::ErrorKind::UnexpectedEof + ); + + assert_eq!(&[0], &buf, "buffer should stay empty"); + } + + /// Start a 9 bytes payload packet, but have the underlying reader return + /// EOF in the middle of the payload (4 bytes into the payload). + /// We should get an unexpected EOF error, after reading the first 4 bytes + /// (successfully). + #[tokio::test] + async fn read_9b_eof_during_payload() { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..8 + 4]) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN); + let mut buf = [0; 9]; + + r.read_exact(&mut buf[..4]).await.expect("must succeed"); + + assert_eq!( + r.read_exact(&mut buf[4..=4]) + .await + .expect_err("must fail") + .kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + /// Start a 9 bytes payload packet, but don't supply the necessary padding. + /// This is expected to always fail before returning the final data. + #[rstest] + #[case::before_padding(8 + 9)] + #[case::during_padding(8 + 9 + 2)] + #[case::after_padding(8 + 9 + padding_len(9) as usize - 1)] + #[tokio::test] + async fn read_9b_eof_after_payload(#[case] offset: usize) { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..offset]) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN); + + // read_exact of the payload *body* will succeed, but a subsequent read will + // return UnexpectedEof error. + assert_eq!(r.read_exact(&mut [0; 8]).await.unwrap(), 8); + assert_eq!( + r.read_exact(&mut [0]).await.unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + /// Start a 9 bytes payload packet, but return an error after a certain position. + /// Ensure that error is propagated. + #[rstest] + #[case::during_size(4)] + #[case::before_payload(8)] + #[case::during_payload(8 + 4)] + #[case::before_padding(8 + 4)] + #[case::during_padding(8 + 9 + 2)] + #[tokio::test] + async fn propagate_error_from_reader(#[case] offset: usize) { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..offset]) + .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN); + let mut buf = Vec::new(); + + let err = r.read_to_end(&mut buf).await.expect_err("must fail"); + assert_eq!( + err.kind(), + std::io::ErrorKind::Other, + "error kind must match" + ); + + assert_eq!( + err.into_inner().unwrap().to_string(), + "foo", + "error payload must contain foo" + ); + } + + /// If there's an error right after the padding, we don't propagate it, as + /// we're done reading. We just return EOF. + #[tokio::test] + async fn no_error_after_eof() { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await) + .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN); + let mut buf = Vec::new(); + + r.read_to_end(&mut buf).await.expect("must succeed"); + assert_eq!(buf.as_slice(), payload); + } + + /// Introduce various stalls in various places of the packet, to ensure we + /// handle these cases properly, too. + #[rstest] + #[case::beginning(0)] + #[case::before_payload(8)] + #[case::during_payload(8 + 4)] + #[case::before_padding(8 + 4)] + #[case::during_padding(8 + 9 + 2)] + #[tokio::test] + async fn read_payload_correct_pending(#[case] offset: usize) { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..offset]) + .wait(Duration::from_nanos(0)) + .read(&produce_packet_bytes(payload).await[offset..]) + .build(); + + let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64); + let mut buf = Vec::new(); + r.read_to_end(&mut buf).await.expect("must succeed"); + + assert_eq!(payload, &buf[..]); + } +} diff --git a/tvix/nix-compat/src/wire/bytes/reader/trailer.rs b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs new file mode 100644 index 0000000000..d2b867c2c3 --- /dev/null +++ b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs @@ -0,0 +1,175 @@ +use std::{ + pin::Pin, + task::{self, ready, Poll}, +}; + +use tokio::io::{self, AsyncRead, ReadBuf}; + +#[derive(Debug)] +pub enum TrailerReader { + Reading { + reader: R, + user_len: u8, + filled: u8, + buf: [u8; 8], + }, + Releasing { + off: u8, + len: u8, + buf: [u8; 8], + }, + Done, +} + +impl TrailerReader { + pub fn new(reader: R, user_len: u8) -> Self { + if user_len == 0 { + return Self::Done; + } + + assert!(user_len < 8, "payload in trailer must be less than 8 bytes"); + Self::Reading { + reader, + user_len, + filled: 0, + buf: [0; 8], + } + } +} + +impl AsyncRead for TrailerReader { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context, + user_buf: &mut ReadBuf, + ) -> Poll> { + let this = self.get_mut(); + + loop { + match this { + &mut Self::Reading { + reader: _, + user_len, + filled: 8, + buf, + } => { + *this = Self::Releasing { + off: 0, + len: user_len, + buf, + }; + } + Self::Reading { + reader, + user_len, + filled, + buf, + } => { + let mut read_buf = ReadBuf::new(&mut buf[..]); + read_buf.advance(*filled as usize); + ready!(Pin::new(reader).poll_read(cx, &mut read_buf))?; + + let new_filled = read_buf.filled().len() as u8; + if *filled == new_filled { + return Err(io::ErrorKind::UnexpectedEof.into()).into(); + } + + *filled = new_filled; + + // ensure the padding is all zeroes + if (u64::from_le_bytes(*buf) >> (*user_len * 8)) != 0 { + return Err(io::ErrorKind::InvalidData.into()).into(); + } + } + Self::Releasing { off: 8, .. } => { + *this = Self::Done; + } + Self::Releasing { off, len, buf } => { + assert_ne!(user_buf.remaining(), 0); + + let buf = &buf[*off as usize..*len as usize]; + let buf = &buf[..usize::min(buf.len(), user_buf.remaining())]; + + user_buf.put_slice(buf); + *off += buf.len() as u8; + + break; + } + Self::Done => break, + } + } + + Ok(()).into() + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + use tokio::io::AsyncReadExt; + + use super::*; + + #[tokio::test] + async fn unexpected_eof() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x00]) + .build(); + + let mut reader = TrailerReader::new(reader, 2); + + let mut buf = vec![]; + assert_eq!( + reader.read_to_end(&mut buf).await.unwrap_err().kind(), + io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn invalid_padding() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x01, 0x00]) + .wait(Duration::ZERO) + .build(); + + let mut reader = TrailerReader::new(reader, 2); + + let mut buf = vec![]; + assert_eq!( + reader.read_to_end(&mut buf).await.unwrap_err().kind(), + io::ErrorKind::InvalidData + ); + } + + #[tokio::test] + async fn success() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x00]) + .wait(Duration::ZERO) + .read(&[0x00, 0x00, 0x00, 0x00, 0x00]) + .build(); + + let mut reader = TrailerReader::new(reader, 2); + + let mut buf = vec![]; + reader.read_to_end(&mut buf).await.unwrap(); + + assert_eq!(buf, &[0xed, 0xef]); + } + + #[tokio::test] + async fn no_padding() { + let reader = tokio_test::io::Builder::new().build(); + let mut reader = TrailerReader::new(reader, 0); + + let mut buf = vec![]; + reader.read_to_end(&mut buf).await.unwrap(); + assert!(buf.is_empty()); + } +} -- cgit 1.4.1