From 259d7a3cfa214e7eab7b0862024d595489e92592 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Thu, 18 Apr 2024 13:51:28 -0500 Subject: refactor(tvix/castore): generalize store ingestion streams Previously the store ingestion code was coupled to `walkdir::DirEntry`s produced by the `walkdir` crate which made it impossible to reuse ingesting from other sources like tarballs or NARs. This introduces a `IngestionEntry` which carries enough information for store ingestion and a future for computing the Blake3 digest of files. This allows the producer to perform file uploads in a way that makes sense for the source, ie. the filesystem upload could concurrently upload multiple files at the same time, while the NAR ingestor will need to ingest the entire blob before yielding the next blob in the stream. In the future we can buffer small blobs and upload them concurrently, but the full blob still needs to be read from the NAR before advancing. Change-Id: I6d144063e2ba5b05e765bac1f27d41b3c8e7b283 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11462 Reviewed-by: flokli Tested-by: BuildkiteCI --- tvix/store/src/bin/tvix-store.rs | 7 +++++-- tvix/store/src/import.rs | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) (limited to 'tvix/store') diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 15f37d301f..1f172d65c6 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -5,6 +5,7 @@ use futures::future::try_join_all; use nix_compat::path_info::ExportedPathInfo; use serde::Deserialize; use serde::Serialize; +use std::os::unix::ffi::OsStrExt; use std::path::PathBuf; use std::sync::Arc; use tokio_listener::Listener; @@ -430,9 +431,11 @@ async fn main() -> Result<(), Box> { } let path: PathBuf = elem.path.to_absolute_path().into(); + let root_name = path.file_name().unwrap().as_bytes().to_vec().into(); // Ingest the given path - let root_node = - ingest_path(blob_service.clone(), directory_service.clone(), path).await?; + let root_node = ingest_path(blob_service.clone(), directory_service.clone(), &path) + .await? + .rename(root_name); // Create and upload a PathInfo pointing to the root_node, // annotated with information we have from the reference graph. diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index e6d451834c..5cff29a9e5 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -112,12 +112,12 @@ pub async fn import_path_as_nar_ca( ) -> Result where P: AsRef + std::fmt::Debug, - BS: AsRef + Clone, + BS: BlobService + Clone, DS: AsRef, PS: AsRef, { let root_node = - tvix_castore::import::ingest_path(blob_service, directory_service, &path).await?; + tvix_castore::import::ingest_path(blob_service, directory_service, path.as_ref()).await?; // Ask the PathInfoService for the NAR size and sha256 let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?; -- cgit 1.4.1