about summary refs log tree commit diff
path: root/tvix/cli
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-13T12·20+0200
committerflokli <flokli@flokli.de>2023-09-18T10·33+0000
commitda6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch)
tree5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/cli
parent3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff)
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync.

This however had some annoying consequences:

 - It became more and more complicated to track when we're in a context
   with an async runtime in the context or not, producing bugs like
   https://b.tvl.fyi/issues/304
 - The sync trait shielded away async clients from async worloads,
   requiring manual block_on code inside the gRPC client code, and
   spawn_blocking calls in consumers of the trait, even if they were
   async (like the gRPC server)
 - We had to write our own custom glue code (SyncReadIntoAsyncRead)
   to convert a sync io::Read into a tokio::io::AsyncRead, which already
   existed in tokio internally, but upstream ia hesitant to expose.

This now makes the BlobService trait async (via the async_trait macro,
like we already do in various gRPC parts), and replaces the sync readers
and writers with their async counterparts.

Tests interacting with a BlobService now need to have an async runtime
available, the easiest way for this is to mark the test functions
with the tokio::test macro, allowing us to directly .await in the test
function.

In places where we don't have an async runtime available from context
(like tvix-cli), we can pass one down explicitly.

Now that we don't provide a sync interface anymore, the (sync) FUSE
library now holds a pointer to a tokio runtime handle, and needs to at
least have 2 threads available when talking to a blob service (which is
why some of the tests now use the multi_thread flavor).

The FUSE tests got a bit more verbose, as we couldn't use the
setup_and_mount function accepting a callback anymore. We can hopefully
move some of the test fixture setup to rstest in the future to make this
less repetitive.

Co-Authored-By: Connor Brewster <cbrewster@hey.com>
Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/cli')
-rw-r--r--tvix/cli/Cargo.toml1
-rw-r--r--tvix/cli/src/main.rs9
-rw-r--r--tvix/cli/src/tvix_store_io.rs175
3 files changed, 108 insertions, 77 deletions
diff --git a/tvix/cli/Cargo.toml b/tvix/cli/Cargo.toml
index caebb169aa..a73d9be239 100644
--- a/tvix/cli/Cargo.toml
+++ b/tvix/cli/Cargo.toml
@@ -20,6 +20,7 @@ smol_str = "0.2.0"
 ssri = "7.0.0"
 thiserror = "1.0.38"
 tracing = "0.1.37"
+tokio = "1.28.0"
 
 [dependencies.wu-manber]
 git = "https://github.com/tvlfyi/wu-manber.git"
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs
index 1980ac1731..65970e1d1c 100644
--- a/tvix/cli/src/main.rs
+++ b/tvix/cli/src/main.rs
@@ -80,9 +80,16 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b
         directory_service.clone(),
     ));
 
+    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
+
     eval.io_handle = Box::new(tvix_io::TvixIO::new(
         known_paths.clone(),
-        TvixStoreIO::new(blob_service, directory_service, path_info_service),
+        TvixStoreIO::new(
+            blob_service,
+            directory_service,
+            path_info_service,
+            tokio_runtime.handle().clone(),
+        ),
     ));
 
     // bundle fetchurl.nix (used in nixpkgs) by resolving <nix> to
diff --git a/tvix/cli/src/tvix_store_io.rs b/tvix/cli/src/tvix_store_io.rs
index 2c67338534..1a373a705f 100644
--- a/tvix/cli/src/tvix_store_io.rs
+++ b/tvix/cli/src/tvix_store_io.rs
@@ -2,6 +2,7 @@
 
 use nix_compat::store_path::{self, StorePath};
 use std::{io, path::Path, path::PathBuf, sync::Arc};
+use tokio::io::AsyncReadExt;
 use tracing::{error, instrument, warn};
 use tvix_eval::{EvalIO, FileType, StdIO};
 
@@ -27,6 +28,7 @@ pub struct TvixStoreIO {
     directory_service: Arc<dyn DirectoryService>,
     path_info_service: Arc<dyn PathInfoService>,
     std_io: StdIO,
+    tokio_handle: tokio::runtime::Handle,
 }
 
 impl TvixStoreIO {
@@ -34,12 +36,14 @@ impl TvixStoreIO {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
         path_info_service: Arc<dyn PathInfoService>,
+        tokio_handle: tokio::runtime::Handle,
     ) -> Self {
         Self {
             blob_service,
             directory_service,
             path_info_service,
             std_io: StdIO {},
+            tokio_handle,
         }
     }
 
@@ -86,65 +90,6 @@ impl TvixStoreIO {
             sub_path,
         )?)
     }
-
-    /// Imports a given path on the filesystem into the store, and returns the
-    /// [PathInfo] describing the path, that was sent to
-    /// [PathInfoService].
-    /// While not part of the [EvalIO], it's still useful for clients who
-    /// care about the [PathInfo].
-    #[instrument(skip(self), ret, err)]
-    pub fn import_path_with_pathinfo(&self, path: &std::path::Path) -> Result<PathInfo, io::Error> {
-        // Call [import::ingest_path], which will walk over the given path and return a root_node.
-        let root_node = import::ingest_path(
-            self.blob_service.clone(),
-            self.directory_service.clone(),
-            path,
-        )
-        .expect("error during import_path");
-
-        // Render the NAR
-        let (nar_size, nar_sha256) = calculate_size_and_sha256(
-            &root_node,
-            self.blob_service.clone(),
-            self.directory_service.clone(),
-        )
-        .expect("error during nar calculation"); // TODO: handle error
-
-        // TODO: make a path_to_name helper function?
-        let name = path
-            .file_name()
-            .expect("path must not be ..")
-            .to_str()
-            .expect("path must be valid unicode");
-
-        let output_path = store_path::build_nar_based_store_path(&nar_sha256, name);
-
-        // assemble a new root_node with a name that is derived from the nar hash.
-        let root_node = root_node.rename(output_path.to_string().into_bytes().into());
-
-        // assemble the [PathInfo] object.
-        let path_info = PathInfo {
-            node: Some(tvix_store::proto::Node {
-                node: Some(root_node),
-            }),
-            // There's no reference scanning on path contents ingested like this.
-            references: vec![],
-            narinfo: Some(NarInfo {
-                nar_size,
-                nar_sha256: nar_sha256.to_vec().into(),
-                signatures: vec![],
-                reference_names: vec![],
-                // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6`
-                // do we need this anywhere?
-            }),
-        };
-
-        // put into [PathInfoService], and return the [PathInfo] that we get
-        // back from there (it might contain additional signatures).
-        let path_info = self.path_info_service.put(path_info)?;
-
-        Ok(path_info)
-    }
 }
 
 impl EvalIO for TvixStoreIO {
@@ -197,24 +142,33 @@ impl EvalIO for TvixStoreIO {
                                 )
                             })?;
 
-                        let reader = {
-                            let resp = self.blob_service.open_read(&digest)?;
-                            match resp {
-                                Some(blob_reader) => blob_reader,
-                                None => {
-                                    error!(
-                                        blob.digest = %digest,
-                                        "blob not found",
-                                    );
-                                    Err(io::Error::new(
-                                        io::ErrorKind::NotFound,
-                                        format!("blob {} not found", &digest),
-                                    ))?
+                        let blob_service = self.blob_service.clone();
+
+                        let task = self.tokio_handle.spawn(async move {
+                            let mut reader = {
+                                let resp = blob_service.open_read(&digest).await?;
+                                match resp {
+                                    Some(blob_reader) => blob_reader,
+                                    None => {
+                                        error!(
+                                            blob.digest = %digest,
+                                            "blob not found",
+                                        );
+                                        Err(io::Error::new(
+                                            io::ErrorKind::NotFound,
+                                            format!("blob {} not found", &digest),
+                                        ))?
+                                    }
                                 }
-                            }
-                        };
+                            };
 
-                        io::read_to_string(reader)
+                            let mut buf = String::new();
+
+                            reader.read_to_string(&mut buf).await?;
+                            Ok(buf)
+                        });
+
+                        self.tokio_handle.block_on(task).unwrap()
                     }
                     Node::Symlink(_symlink_node) => Err(io::Error::new(
                         io::ErrorKind::Unsupported,
@@ -296,7 +250,16 @@ impl EvalIO for TvixStoreIO {
 
     #[instrument(skip(self), ret, err)]
     fn import_path(&self, path: &std::path::Path) -> Result<PathBuf, std::io::Error> {
-        let path_info = self.import_path_with_pathinfo(path)?;
+        let p = path.to_owned();
+        let blob_service = self.blob_service.clone();
+        let directory_service = self.directory_service.clone();
+        let path_info_service = self.path_info_service.clone();
+
+        let task = self.tokio_handle.spawn(async move {
+            import_path_with_pathinfo(blob_service, directory_service, path_info_service, &p).await
+        });
+
+        let path_info = self.tokio_handle.block_on(task).unwrap()?;
 
         // from the [PathInfo], extract the store path (as string).
         Ok({
@@ -320,3 +283,63 @@ impl EvalIO for TvixStoreIO {
         Some("/nix/store".to_string())
     }
 }
+
+/// Imports a given path on the filesystem into the store, and returns the
+/// [PathInfo] describing the path, that was sent to
+/// [PathInfoService].
+#[instrument(skip(blob_service, directory_service, path_info_service), ret, err)]
+async fn import_path_with_pathinfo(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+    path_info_service: Arc<dyn PathInfoService>,
+    path: &std::path::Path,
+) -> Result<PathInfo, io::Error> {
+    // Call [import::ingest_path], which will walk over the given path and return a root_node.
+    let root_node = import::ingest_path(blob_service.clone(), directory_service.clone(), path)
+        .await
+        .expect("error during import_path");
+
+    // Render the NAR. This is blocking.
+    let calc_task = tokio::task::spawn_blocking(move || {
+        let (nar_size, nar_sha256) =
+            calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone())
+                .expect("error during nar calculation"); // TODO: handle error
+        (nar_size, nar_sha256, root_node)
+    });
+    let (nar_size, nar_sha256, root_node) = calc_task.await.unwrap();
+
+    // TODO: make a path_to_name helper function?
+    let name = path
+        .file_name()
+        .expect("path must not be ..")
+        .to_str()
+        .expect("path must be valid unicode");
+
+    let output_path = store_path::build_nar_based_store_path(&nar_sha256, name);
+
+    // assemble a new root_node with a name that is derived from the nar hash.
+    let root_node = root_node.rename(output_path.to_string().into_bytes().into());
+
+    // assemble the [PathInfo] object.
+    let path_info = PathInfo {
+        node: Some(tvix_store::proto::Node {
+            node: Some(root_node),
+        }),
+        // There's no reference scanning on path contents ingested like this.
+        references: vec![],
+        narinfo: Some(NarInfo {
+            nar_size,
+            nar_sha256: nar_sha256.to_vec().into(),
+            signatures: vec![],
+            reference_names: vec![],
+            // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6`
+            // do we need this anywhere?
+        }),
+    };
+
+    // put into [PathInfoService], and return the [PathInfo] that we get
+    // back from there (it might contain additional signatures).
+    let path_info = path_info_service.put(path_info)?;
+
+    Ok(path_info)
+}