about summary refs log tree commit diff
path: root/tvix/store/src/nar/import.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2024-05-18T20·07-0500
committerConnor Brewster <cbrewster@hey.com>2024-05-20T15·21+0000
commite7be3422566b36e5bd3aeaaf7d47537dfd050a5c (patch)
treea1c529eddf255ae172980867d80bd2415c1e3cb3 /tvix/store/src/nar/import.rs
parentb0aaff25fa68a7b55b870b59024ca2b40c658f33 (diff)
feat(tvix/store): concurrently upload small blobs during nar ingestion r/8159
Currently all blobs are uploaded serially when ingesting NARs. If a NAR
contains many, small blobs, ingestion may become slow if there is a lot
of round-trip latency to the blob service.

This makes the NAR ingester use the ConcurrentBlobUploader which allows
for buffering small blobs in memory so they can be uploaded concurrently
to the blob service without blocking further deserialization.

Change-Id: I093a73770232df12d9a11e5d901b99c08505c3cb
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11694
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to '')
-rw-r--r--tvix/store/src/nar/import.rs34
1 files changed, 22 insertions, 12 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 3d7c50014a..36122d419d 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -3,7 +3,10 @@ use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
-    import::{ingest_entries, IngestionEntry, IngestionError},
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
     proto::{node::Node, NamedNode},
     PathBuf,
 };
@@ -18,7 +21,7 @@ pub async fn ingest_nar<R, BS, DS>(
 ) -> Result<Node, IngestionError<Error>>
 where
     R: AsyncBufRead + Unpin + Send,
-    BS: BlobService + Clone,
+    BS: BlobService + Clone + 'static,
     DS: DirectoryService,
 {
     // open the NAR for reading.
@@ -29,14 +32,22 @@ where
     let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
 
     let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
+
         let res = produce_nar_inner(
-            blob_service,
+            &mut blob_uploader,
             root_node,
             "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
             tx.clone(),
         )
         .await;
 
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
+
         tx.send(res)
             .await
             .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
@@ -54,13 +65,13 @@ where
 }
 
 async fn produce_nar_inner<BS>(
-    blob_service: BS,
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
     node: nar_reader::Node<'_, '_>,
     path: PathBuf,
     tx: mpsc::Sender<Result<IngestionEntry, Error>>,
 ) -> Result<IngestionEntry, Error>
 where
-    BS: BlobService + Clone,
+    BS: BlobService + Clone + 'static,
 {
     Ok(match node {
         nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
@@ -68,12 +79,8 @@ where
             executable,
             mut reader,
         } => {
-            let (digest, size) = {
-                let mut blob_writer = blob_service.open_write().await;
-                let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?;
-
-                (blob_writer.close().await?, size)
-            };
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
 
             IngestionEntry::Regular {
                 path,
@@ -91,7 +98,7 @@ where
                     .expect("Tvix bug: failed to join name");
 
                 let entry = Box::pin(produce_nar_inner(
-                    blob_service.clone(),
+                    blob_uploader,
                     entry.node,
                     path,
                     tx.clone(),
@@ -112,6 +119,9 @@ where
 pub enum Error {
     #[error(transparent)]
     IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
 }
 
 #[cfg(test)]