about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2024-04-29T17·34+0000
committeredef <edef@edef.eu>2024-05-08T06·03+0000
commit51e0f78e9317c1234bc982dcaa280c0d3674d164 (patch)
tree057023b4f0aa6545a757f69da0978d5028618590 /tvix
parentebad318ab3accf34dff84a49d18b96a7efd15c22 (diff)
feat(nix-compat/wire/bytes/reader): support buffered reading r/8086
If our underlying reader supports AsyncBufRead, then we can too.

Change-Id: If4b948c983400ca591c1c475bbcf7dc00d562040
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11545
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix')
-rw-r--r--tvix/nix-compat/src/nar/reader/async/mod.rs10
-rw-r--r--tvix/nix-compat/src/wire/bytes/reader/mod.rs188
-rw-r--r--tvix/store/src/nar/import.rs3
3 files changed, 195 insertions, 6 deletions
diff --git a/tvix/nix-compat/src/nar/reader/async/mod.rs b/tvix/nix-compat/src/nar/reader/async/mod.rs
index aaf00faf44..048283a67c 100644
--- a/tvix/nix-compat/src/nar/reader/async/mod.rs
+++ b/tvix/nix-compat/src/nar/reader/async/mod.rs
@@ -94,6 +94,16 @@ impl<'a, 'r> AsyncRead for FileReader<'a, 'r> {
     }
 }
 
+impl<'a, 'r> AsyncBufRead for FileReader<'a, 'r> {
+    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<&[u8]>> {
+        Pin::new(&mut self.get_mut().inner).poll_fill_buf(cx)
+    }
+
+    fn consume(self: Pin<&mut Self>, amt: usize) {
+        Pin::new(&mut self.get_mut().inner).consume(amt)
+    }
+}
+
 /// A directory iterator, yielding a sequence of [Node]s.
 /// It must be fully consumed before reading further from the [DirReader] that produced it, if any.
 pub struct DirReader<'a, 'r> {
diff --git a/tvix/nix-compat/src/wire/bytes/reader/mod.rs b/tvix/nix-compat/src/wire/bytes/reader/mod.rs
index 4a8cfd1f65..6bd376c06f 100644
--- a/tvix/nix-compat/src/wire/bytes/reader/mod.rs
+++ b/tvix/nix-compat/src/wire/bytes/reader/mod.rs
@@ -6,7 +6,7 @@ use std::{
     pin::Pin,
     task::{self, ready, Poll},
 };
-use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
+use tokio::io::{AsyncBufRead, AsyncRead, AsyncReadExt, ReadBuf};
 
 use trailer::{read_trailer, ReadTrailer, Trailer};
 
@@ -146,12 +146,12 @@ impl<R: AsyncRead + Unpin, T: Tag> AsyncRead for BytesReader<R, T> {
                         *this = State::ReadTrailer(read_trailer(reader, tail_len));
                         continue;
                     } else {
-                        reader.as_mut().unwrap()
+                        Pin::new(reader.as_mut().unwrap())
                     };
 
                     let mut bytes_read = 0;
                     ready!(with_limited(buf, remaining, |buf| {
-                        let ret = Pin::new(reader).poll_read(cx, buf);
+                        let ret = reader.poll_read(cx, buf);
                         bytes_read = buf.initialized().len();
                         ret
                     }))?;
@@ -185,6 +185,96 @@ impl<R: AsyncRead + Unpin, T: Tag> AsyncRead for BytesReader<R, T> {
     }
 }
 
+#[allow(private_bounds)]
+impl<R: AsyncBufRead + Unpin, T: Tag> AsyncBufRead for BytesReader<R, T> {
+    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<io::Result<&[u8]>> {
+        let this = &mut self.get_mut().state;
+
+        loop {
+            match this {
+                // This state comes *after* the following case,
+                // but we can't keep it in logical order because
+                // that would lengthen the borrow lifetime.
+                State::Body {
+                    reader,
+                    consumed,
+                    user_len,
+                } if {
+                    let (body_len, _) = split_user_len(*user_len);
+                    let remaining = body_len - *consumed;
+
+                    remaining == 0
+                } =>
+                {
+                    let reader = reader.take().unwrap();
+                    let (_, tail_len) = split_user_len(*user_len);
+
+                    *this = State::ReadTrailer(read_trailer(reader, tail_len));
+                }
+                State::Body {
+                    reader,
+                    consumed,
+                    user_len,
+                } => {
+                    let (body_len, _) = split_user_len(*user_len);
+                    let remaining = body_len - *consumed;
+
+                    let reader = Pin::new(reader.as_mut().unwrap());
+
+                    match ready!(reader.poll_fill_buf(cx))? {
+                        &[] => {
+                            return Err(io::ErrorKind::UnexpectedEof.into()).into();
+                        }
+                        mut buf => {
+                            if buf.len() as u64 > remaining {
+                                buf = &buf[..remaining as usize];
+                            }
+
+                            return Ok(buf).into();
+                        }
+                    }
+                }
+                State::ReadTrailer(fut) => {
+                    *this = State::ReleaseTrailer {
+                        consumed: 0,
+                        data: ready!(Pin::new(fut).poll(cx))?,
+                    };
+                }
+                State::ReleaseTrailer { consumed, data } => {
+                    return Ok(&data[*consumed as usize..]).into();
+                }
+            }
+        }
+    }
+
+    fn consume(mut self: Pin<&mut Self>, amt: usize) {
+        match &mut self.state {
+            State::Body {
+                reader,
+                consumed,
+                user_len,
+            } => {
+                let reader = Pin::new(reader.as_mut().unwrap());
+                let (body_len, _) = split_user_len(*user_len);
+
+                *consumed = consumed
+                    .checked_add(amt as u64)
+                    .filter(|&consumed| consumed <= body_len)
+                    .expect("consumed out of bounds");
+
+                reader.consume(amt);
+            }
+            State::ReadTrailer(_) => unreachable!(),
+            State::ReleaseTrailer { consumed, data } => {
+                *consumed = amt
+                    .checked_add(*consumed as usize)
+                    .filter(|&consumed| consumed <= data.len())
+                    .expect("consumed out of bounds") as u8;
+            }
+        }
+    }
+}
+
 /// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it.
 /// After `f` returns, we propagate the filled cursor advancement back to `buf`.
 fn with_limited<R>(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R {
@@ -217,7 +307,7 @@ mod tests {
     use hex_literal::hex;
     use lazy_static::lazy_static;
     use rstest::rstest;
-    use tokio::io::AsyncReadExt;
+    use tokio::io::{AsyncReadExt, BufReader};
     use tokio_test::io::Builder;
 
     use super::*;
@@ -261,6 +351,34 @@ mod tests {
         assert_eq!(payload, &buf[..]);
     }
 
+    /// Read bytes packets of various length, and ensure copy_buf reads the
+    /// expected payload.
+    #[rstest]
+    #[case::empty(&[])] // empty bytes packet
+    #[case::size_1b(&[0xff])] // 1 bytes payload
+    #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding)
+    #[case::size_9b(&hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding)
+    #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet
+    #[tokio::test]
+    async fn read_payload_correct_readbuf(#[case] payload: &[u8]) {
+        let mut mock = BufReader::new(
+            Builder::new()
+                .read(&produce_packet_bytes(payload).await)
+                .build(),
+        );
+
+        let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64)
+            .await
+            .unwrap();
+
+        let mut buf = Vec::new();
+        tokio::io::copy_buf(&mut r, &mut buf)
+            .await
+            .expect("copy_buf must succeed");
+
+        assert_eq!(payload, &buf[..]);
+    }
+
     /// Fail if the bytes packet is larger than allowed
     #[tokio::test]
     async fn read_bigger_than_allowed_fail() {
@@ -459,6 +577,48 @@ mod tests {
         );
     }
 
+    /// Start a 9 bytes payload packet, but return an error after a certain position.
+    /// Ensure that error is propagated (AsyncReadBuf case)
+    #[rstest]
+    #[case::during_size(4)]
+    #[case::before_payload(8)]
+    #[case::during_payload(8 + 4)]
+    #[case::before_padding(8 + 4)]
+    #[case::during_padding(8 + 9 + 2)]
+    #[tokio::test]
+    async fn propagate_error_from_reader_buffered(#[case] offset: usize) {
+        let payload = &hex!("FF0102030405060708");
+        let mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await[..offset])
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+        let mut mock = BufReader::new(mock);
+
+        // Either length reading or data reading can fail, depending on which test case we're in.
+        let err: io::Error = async {
+            let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await?;
+            let mut buf = Vec::new();
+
+            tokio::io::copy_buf(&mut r, &mut buf).await?;
+
+            Ok(())
+        }
+        .await
+        .expect_err("must fail");
+
+        assert_eq!(
+            err.kind(),
+            std::io::ErrorKind::Other,
+            "error kind must match"
+        );
+
+        assert_eq!(
+            err.into_inner().unwrap().to_string(),
+            "foo",
+            "error payload must contain foo"
+        );
+    }
+
     /// If there's an error right after the padding, we don't propagate it, as
     /// we're done reading. We just return EOF.
     #[tokio::test]
@@ -476,6 +636,26 @@ mod tests {
         assert_eq!(buf.as_slice(), payload);
     }
 
+    /// If there's an error right after the padding, we don't propagate it, as
+    /// we're done reading. We just return EOF.
+    #[tokio::test]
+    async fn no_error_after_eof_buffered() {
+        let payload = &hex!("FF0102030405060708");
+        let mock = Builder::new()
+            .read(&produce_packet_bytes(payload).await)
+            .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo"))
+            .build();
+        let mut mock = BufReader::new(mock);
+
+        let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap();
+        let mut buf = Vec::new();
+
+        tokio::io::copy_buf(&mut r, &mut buf)
+            .await
+            .expect("must succeed");
+        assert_eq!(buf.as_slice(), payload);
+    }
+
     /// Introduce various stalls in various places of the packet, to ensure we
     /// handle these cases properly, too.
     #[rstest]
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index cc62c1a4e9..7a9f1231e7 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -70,8 +70,7 @@ where
         } => {
             let (digest, size) = {
                 let mut blob_writer = blob_service.open_write().await;
-                // TODO(edef): fix the AsyncBufRead implementation of nix_compat::wire::BytesReader
-                let size = tokio::io::copy(&mut reader, &mut blob_writer).await?;
+                let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?;
 
                 (blob_writer.close().await?, size)
             };