about summary refs log tree commit diff
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2024-04-18T18·51-0500
committerConnor Brewster <cbrewster@hey.com>2024-04-19T20·37+0000
commit259d7a3cfa214e7eab7b0862024d595489e92592 (patch)
treef6f52a334d5e5d26e6f3c323db0f8e56beaf56c8
parent150106610e60e95267c0968a9679797b05db7f3d (diff)
refactor(tvix/castore): generalize store ingestion streams r/7979
Previously the store ingestion code was coupled to `walkdir::DirEntry`s
produced by the `walkdir` crate which made it impossible to reuse
ingesting from other sources like tarballs or NARs.

This introduces a `IngestionEntry` which carries enough information for
store ingestion and a future for computing the Blake3 digest of files.
This allows the producer to perform file uploads in a way that makes
sense for the source, ie. the filesystem upload could concurrently
upload multiple files at the same time, while the NAR ingestor will need
to ingest the entire blob before yielding the next blob in the stream.
In the future we can buffer small blobs and upload them concurrently,
but the full blob still needs to be read from the NAR before advancing.

Change-Id: I6d144063e2ba5b05e765bac1f27d41b3c8e7b283
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11462
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
-rw-r--r--tvix/castore/src/import.rs452
-rw-r--r--tvix/glue/src/builtins/import.rs28
-rw-r--r--tvix/glue/src/tvix_store_io.rs23
-rw-r--r--tvix/store/src/bin/tvix-store.rs7
-rw-r--r--tvix/store/src/import.rs4
5 files changed, 256 insertions, 258 deletions
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs
index e16bda1f64..049c177efc 100644
--- a/tvix/castore/src/import.rs
+++ b/tvix/castore/src/import.rs
@@ -6,11 +6,14 @@ use crate::proto::Directory;
 use crate::proto::DirectoryNode;
 use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
+use crate::B3Digest;
 use crate::Error as CastoreError;
-use async_stream::stream;
-use futures::pin_mut;
+use futures::stream::BoxStream;
+use futures::Future;
 use futures::{Stream, StreamExt};
 use std::fs::FileType;
+use std::os::unix::fs::MetadataExt;
+use std::pin::Pin;
 use tracing::Level;
 
 #[cfg(target_family = "unix")]
@@ -26,9 +29,6 @@ use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
 
-#[cfg(debug_assertions)]
-use std::collections::HashSet;
-
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("failed to upload directory at {0}: {1}")]
@@ -65,274 +65,253 @@ impl From<Error> for std::io::Error {
     }
 }
 
-/// Walk the filesystem at a given path and returns a level-keyed list of directory entries.
+/// Walk the filesystem at a given path and returns a stream of ingestion entries.
 ///
 /// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`].
 /// This low-level function can be used if additional filtering or processing is required on the
 /// entries.
 ///
-/// Level here is in the context of graph theory, e.g. 2-level nodes
-/// are nodes that are at depth 2.
+/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(fields(path), err)]
-pub fn walk_path_for_ingestion<P>(path: P) -> Result<Vec<Vec<DirEntry>>, Error>
+#[instrument(fields(path), skip(blob_service))]
+fn walk_path_for_ingestion<'a, BS>(
+    blob_service: BS,
+    path: &'a Path,
+) -> BoxStream<'a, Result<IngestionEntry<'a>, Error>>
 where
-    P: AsRef<Path> + std::fmt::Debug,
+    BS: BlobService + Clone + 'a,
 {
-    let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()];
-    for entry in WalkDir::new(path.as_ref())
+    let iter = WalkDir::new(path)
         .follow_links(false)
         .follow_root_links(false)
-        .contents_first(false)
-        .sort_by_file_name()
-        .into_iter()
-    {
-        // Entry could be a NotFound, if the root path specified does not exist.
-        let entry = entry.map_err(|e| {
-            Error::UnableToOpen(
-                PathBuf::from(path.as_ref()),
-                e.into_io_error().expect("walkdir err must be some"),
-            )
-        })?;
-
-        if entry.depth() >= entries_per_depths.len() {
-            debug_assert!(
-                entry.depth() == entries_per_depths.len(),
-                "Received unexpected entry with depth {} during descent, previously at {}",
-                entry.depth(),
-                entries_per_depths.len()
-            );
-
-            entries_per_depths.push(vec![entry]);
-        } else {
-            entries_per_depths[entry.depth()].push(entry);
-        }
-    }
+        .contents_first(true)
+        .into_iter();
 
-    Ok(entries_per_depths)
+    dir_entry_iter_to_ingestion_stream(blob_service, iter, path)
 }
 
-/// Convert a leveled-key vector of filesystem entries into a stream of
-/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top.
-pub fn leveled_entries_to_stream(
-    entries_per_depths: Vec<Vec<DirEntry>>,
-) -> impl Stream<Item = DirEntry> {
-    stream! {
-        for level in entries_per_depths.into_iter().rev() {
-            for entry in level.into_iter() {
-                yield entry;
-            }
-        }
+/// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries.
+/// This can then be fed into [ingest_entries] to ingest all the entries into the castore.
+///
+/// The root is the [Path] in the filesystem that is being ingested into the castore.
+pub fn dir_entry_iter_to_ingestion_stream<'a, BS, I>(
+    blob_service: BS,
+    iter: I,
+    root: &'a Path,
+) -> BoxStream<'a, Result<IngestionEntry<'a>, Error>>
+where
+    BS: BlobService + Clone + 'a,
+    I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
+{
+    let prefix = root.parent().unwrap_or_else(|| Path::new(""));
+
+    let iter = iter.map(move |entry| match entry {
+        Ok(entry) => dir_entry_to_ingestion_entry(blob_service.clone(), &entry, prefix),
+        Err(error) => Err(Error::UnableToStat(
+            root.to_path_buf(),
+            error.into_io_error().expect("walkdir err must be some"),
+        )),
+    });
+
+    Box::pin(futures::stream::iter(iter))
+}
+
+/// Converts a [walkdir::DirEntry] into an [IngestionEntry], uploading blobs to the
+/// provided [BlobService].
+///
+/// The prefix path is stripped from the path of each entry. This is usually the parent path
+/// of the path being ingested so that the last element of the stream only has one component.
+fn dir_entry_to_ingestion_entry<'a, BS>(
+    blob_service: BS,
+    entry: &DirEntry,
+    prefix: &Path,
+) -> Result<IngestionEntry<'a>, Error>
+where
+    BS: BlobService + 'a,
+{
+    let file_type = entry.file_type();
+
+    let path = entry
+        .path()
+        .strip_prefix(prefix)
+        .expect("Tvix bug: failed to strip root path prefix")
+        .to_path_buf();
+
+    if file_type.is_dir() {
+        Ok(IngestionEntry::Dir { path })
+    } else if file_type.is_symlink() {
+        let target = std::fs::read_link(entry.path())
+            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?;
+
+        Ok(IngestionEntry::Symlink { path, target })
+    } else if file_type.is_file() {
+        let metadata = entry
+            .metadata()
+            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+
+        // TODO: In the future, for small files, hash right away and upload async.
+        let digest = Box::pin(upload_blob_at_path(
+            blob_service,
+            entry.path().to_path_buf(),
+        ));
+
+        Ok(IngestionEntry::Regular {
+            path,
+            size: metadata.size(),
+            // If it's executable by the user, it'll become executable.
+            // This matches nix's dump() function behaviour.
+            executable: metadata.permissions().mode() & 64 != 0,
+            digest,
+        })
+    } else {
+        Ok(IngestionEntry::Unknown { path, file_type })
     }
 }
 
+/// Uploads the file at the provided [Path] the the [BlobService].
+#[instrument(skip(blob_service), fields(path), err)]
+async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+{
+    let mut file = match tokio::fs::File::open(&path).await {
+        Ok(file) => file,
+        Err(e) => return Err(Error::UnableToRead(path, e)),
+    };
+
+    let mut writer = blob_service.open_write().await;
+
+    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
+        return Err(Error::UnableToRead(path, e));
+    };
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::UnableToRead(path, e))?;
+
+    Ok(digest)
+}
+
 /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
 /// [DirectoryService]. It returns the root node or an error.
 ///
 /// It does not follow symlinks at the root, they will be ingested as actual symlinks.
 #[instrument(skip(blob_service, directory_service), fields(path), err)]
-pub async fn ingest_path<'a, BS, DS, P>(
+pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
     path: P,
 ) -> Result<Node, Error>
 where
     P: AsRef<Path> + std::fmt::Debug,
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService + Clone,
     DS: AsRef<dyn DirectoryService>,
 {
-    // produce the leveled-key vector of DirEntry.
-    let entries_per_depths = walk_path_for_ingestion(path)?;
-    let direntry_stream = leveled_entries_to_stream(entries_per_depths);
-    pin_mut!(direntry_stream);
-
-    ingest_entries(blob_service, directory_service, direntry_stream).await
+    let entry_stream = walk_path_for_ingestion(blob_service, path.as_ref());
+    ingest_entries(directory_service, entry_stream).await
 }
 
-/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory
-/// entries we are ingesting and verifying we are ingesting them in the right order.
-///
-/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier
-/// an entry `P` such that `P` is an **ancestor** of `L`.
-///
-/// If such a thing happened, it means that we have processed something like:
+/// Ingests elements from the given stream of [DirEntry] into a the passed [DirectoryService].
 ///
-///```no_trust
-///        A
-///       / \
-///      B   C
-///     / \   \
-///    G  F    P <--------- processed before this one
-///           / \                                  |
-///          D  E                                  |
-///              \                                 |
-///               L  <-----------------------------+
-/// ```
+/// The stream must have the following invariants:
+/// - All children entries must come before their parents.
+/// - The last entry must be the root node which must have a single path component.
+/// - Every entry should have a unique path.
 ///
-/// This is exactly what must never happen.
+/// Internally we maintain a [HashMap] of [PathBuf] to partially populated [Directory] at that
+/// path. Once we receive an [IngestionEntry] for the directory itself, we remove it from the
+/// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
 ///
-/// Note: this checker is local, it can only see what happens on our side, not on the remote side,
-/// i.e. the different remote services.
-#[derive(Default)]
-#[cfg(debug_assertions)]
-struct MerkleInvariantChecker {
-    seen: HashSet<PathBuf>,
-}
-
-#[cfg(debug_assertions)]
-impl MerkleInvariantChecker {
-    /// See a directory entry and remember it.
-    fn see(&mut self, node: &DirEntry) {
-        self.seen.insert(node.path().to_owned());
-    }
-
-    /// Returns a potential ancestor already seen for that directory entry.
-    fn find_ancestor<'a>(&self, node: &'a DirEntry) -> Option<&'a Path> {
-        node.path().ancestors().find(|p| self.seen.contains(*p))
-    }
-}
-
-/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and
-/// [`DirectoryService`].
-/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
+/// On success, returns the root node.
 #[instrument(skip_all, ret(level = Level::TRACE), err)]
-pub async fn ingest_entries<'a, BS, DS, S>(
-    blob_service: BS,
+pub async fn ingest_entries<'a, DS, S>(
     directory_service: DS,
     #[allow(unused_mut)] mut direntry_stream: S,
 ) -> Result<Node, Error>
 where
-    BS: AsRef<dyn BlobService>,
     DS: AsRef<dyn DirectoryService>,
-    S: Stream<Item = DirEntry> + std::marker::Unpin,
+    S: Stream<Item = Result<IngestionEntry<'a>, Error>> + Send + std::marker::Unpin,
 {
-    #[cfg(debug_assertions)]
-    let mut invariant_checker: MerkleInvariantChecker = Default::default();
-
-    #[cfg(debug_assertions)]
-    let mut direntry_stream = direntry_stream.inspect(|e| {
-        // If we find an ancestor before we see this entry, this means that the caller
-        // broke the contract, refer to the documentation of the invariant checker to
-        // understand the reasoning here.
-        if let Some(ancestor) = invariant_checker.find_ancestor(e) {
-            panic!(
-                "Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
-                ancestor.display(),
-                e.path().display()
-            );
-        }
-
-        invariant_checker.see(e);
-    });
-
     // For a given path, this holds the [Directory] structs as they are populated.
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
     let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
 
-    // We need to process a directory's children before processing
-    // the directory itself in order to have all the data needed
-    // to compute the hash.
-
     let root_node = loop {
-        let entry = match direntry_stream.next().await {
-            Some(entry) => entry,
-            None => {
-                // The last entry of the stream must have depth 0, after which
-                // we break the loop manually.
-                panic!("Tvix bug: unexpected end of stream");
+        let mut entry = direntry_stream
+            .next()
+            .await
+            // The last entry of the stream must have 1 path component, after which
+            // we break the loop manually.
+            .expect("Tvix bug: unexpected end of stream")?;
+
+        let name = entry
+            .path()
+            .file_name()
+            // If this is the root node, it will have an empty name.
+            .unwrap_or_default()
+            .as_bytes()
+            .to_owned()
+            .into();
+
+        let node = match &mut entry {
+            IngestionEntry::Dir { .. } => {
+                // If the entry is a directory, we traversed all its children (and
+                // populated it in `directories`).
+                // If we don't have it in there, it's an empty directory.
+                let directory = directories
+                    .remove(entry.path())
+                    // In that case, it contained no children
+                    .unwrap_or_default();
+
+                let directory_size = directory.size();
+                let directory_digest = directory.digest();
+
+                // Use the directory_putter to upload the directory.
+                // If we don't have one yet (as that's the first one to upload),
+                // initialize the putter.
+                maybe_directory_putter
+                    .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
+                    .put(directory)
+                    .await?;
+
+                Node::Directory(DirectoryNode {
+                    name,
+                    digest: directory_digest.into(),
+                    size: directory_size,
+                })
+            }
+            IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode {
+                name,
+                target: target.as_os_str().as_bytes().to_owned().into(),
+            }),
+            IngestionEntry::Regular {
+                size,
+                executable,
+                digest,
+                ..
+            } => Node::File(FileNode {
+                name,
+                digest: digest.await?.into(),
+                size: *size,
+                executable: *executable,
+            }),
+            IngestionEntry::Unknown { path, file_type } => {
+                return Err(Error::UnsupportedFileType(path.clone(), *file_type));
             }
-        };
-        let file_type = entry.file_type();
-
-        let node = if file_type.is_dir() {
-            // If the entry is a directory, we traversed all its children (and
-            // populated it in `directories`).
-            // If we don't have it in there, it's an empty directory.
-            let directory = directories
-                .remove(entry.path())
-                // In that case, it contained no children
-                .unwrap_or_default();
-
-            let directory_size = directory.size();
-            let directory_digest = directory.digest();
-
-            // Use the directory_putter to upload the directory.
-            // If we don't have one yet (as that's the first one to upload),
-            // initialize the putter.
-            maybe_directory_putter
-                .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
-                .put(directory)
-                .await?;
-
-            Node::Directory(DirectoryNode {
-                name: entry.file_name().as_bytes().to_owned().into(),
-                digest: directory_digest.into(),
-                size: directory_size,
-            })
-        } else if file_type.is_symlink() {
-            let target: bytes::Bytes = std::fs::read_link(entry.path())
-                .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?
-                .as_os_str()
-                .as_bytes()
-                .to_owned()
-                .into();
-
-            Node::Symlink(SymlinkNode {
-                name: entry.file_name().as_bytes().to_owned().into(),
-                target,
-            })
-        } else if file_type.is_file() {
-            let metadata = entry
-                .metadata()
-                .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
-
-            let mut file = tokio::fs::File::open(entry.path())
-                .await
-                .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?;
-
-            let mut writer = blob_service.as_ref().open_write().await;
-
-            if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-                return Err(Error::UnableToRead(entry.path().to_path_buf(), e));
-            };
-
-            let digest = writer
-                .close()
-                .await
-                .map_err(|e| Error::UnableToRead(entry.path().to_path_buf(), e))?;
-
-            Node::File(FileNode {
-                name: entry.file_name().as_bytes().to_vec().into(),
-                digest: digest.into(),
-                size: metadata.len(),
-                // If it's executable by the user, it'll become executable.
-                // This matches nix's dump() function behaviour.
-                executable: metadata.permissions().mode() & 64 != 0,
-            })
-        } else {
-            return Err(Error::UnsupportedFileType(
-                entry.path().to_path_buf(),
-                file_type,
-            ));
         };
 
-        if entry.depth() == 0 {
+        if entry.path().components().count() == 1 {
             break node;
-        } else {
-            // calculate the parent path, and make sure we register the node there.
-            // NOTE: entry.depth() > 0
-            let parent_path = entry.path().parent().unwrap().to_path_buf();
-
-            // record node in parent directory, creating a new [proto:Directory] if not there yet.
-            let parent_directory = directories.entry(parent_path).or_default();
-            match node {
-                Node::Directory(e) => parent_directory.directories.push(e),
-                Node::File(e) => parent_directory.files.push(e),
-                Node::Symlink(e) => parent_directory.symlinks.push(e),
-            }
         }
+
+        // record node in parent directory, creating a new [Directory] if not there yet.
+        directories
+            .entry(entry.path().parent().unwrap().to_path_buf())
+            .or_default()
+            .add(node);
     };
 
     // if there were directories uploaded, make sure we flush the putter, so
@@ -359,3 +338,36 @@ where
 
     Ok(root_node)
 }
+
+type BlobFut<'a> = Pin<Box<dyn Future<Output = Result<B3Digest, Error>> + Send + 'a>>;
+
+pub enum IngestionEntry<'a> {
+    Regular {
+        path: PathBuf,
+        size: u64,
+        executable: bool,
+        digest: BlobFut<'a>,
+    },
+    Symlink {
+        path: PathBuf,
+        target: PathBuf,
+    },
+    Dir {
+        path: PathBuf,
+    },
+    Unknown {
+        path: PathBuf,
+        file_type: FileType,
+    },
+}
+
+impl<'a> IngestionEntry<'a> {
+    fn path(&self) -> &Path {
+        match self {
+            IngestionEntry::Regular { path, .. } => path,
+            IngestionEntry::Symlink { path, .. } => path,
+            IngestionEntry::Dir { path } => path,
+            IngestionEntry::Unknown { path, .. } => path,
+        }
+    }
+}
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 800f8ddc17..639095c459 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -1,9 +1,7 @@
 //! Implements builtins used to import paths in the store.
 
 use crate::builtins::errors::ImportError;
-use futures::pin_mut;
 use std::path::Path;
-use tvix_castore::import::leveled_entries_to_stream;
 use tvix_eval::{
     builtin_macros::builtins,
     generators::{self, GenCo},
@@ -18,17 +16,15 @@ async fn filtered_ingest(
     path: &Path,
     filter: Option<&Value>,
 ) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
-    // produce the leveled-key vector of DirEntry.
-    let mut entries_per_depths: Vec<Vec<walkdir::DirEntry>> = vec![Vec::new()];
+    let mut entries: Vec<walkdir::DirEntry> = vec![];
     let mut it = walkdir::WalkDir::new(path)
         .follow_links(false)
         .follow_root_links(false)
         .contents_first(false)
-        .sort_by_file_name()
         .into_iter();
 
     // Skip root node.
-    entries_per_depths[0].push(
+    entries.push(
         it.next()
             .ok_or_else(|| ErrorKind::IO {
                 path: Some(path.to_path_buf()),
@@ -85,28 +81,14 @@ async fn filtered_ingest(
             continue;
         }
 
-        if entry.depth() >= entries_per_depths.len() {
-            debug_assert!(
-                entry.depth() == entries_per_depths.len(),
-                "Received unexpected entry with depth {} during descent, previously at {}",
-                entry.depth(),
-                entries_per_depths.len()
-            );
-
-            entries_per_depths.push(vec![entry]);
-        } else {
-            entries_per_depths[entry.depth()].push(entry);
-        }
-
-        // FUTUREWORK: determine when it's the right moment to flush a level to the ingester.
+        entries.push(entry);
     }
 
-    let direntry_stream = leveled_entries_to_stream(entries_per_depths);
-    pin_mut!(direntry_stream);
+    let entries_iter = entries.into_iter().rev().map(Ok);
 
     state.tokio_handle.block_on(async {
         state
-            .ingest_entries(direntry_stream)
+            .ingest_dir_entries(entries_iter, path)
             .await
             .map_err(|err| ErrorKind::IO {
                 path: Some(path.to_path_buf()),
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index 10a5902785..46575743c4 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -2,13 +2,11 @@
 
 use async_recursion::async_recursion;
 use bytes::Bytes;
-use futures::Stream;
 use futures::{StreamExt, TryStreamExt};
 use nix_compat::nixhash::NixHash;
 use nix_compat::store_path::{build_ca_path, StorePathRef};
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
 use sha2::{Digest, Sha256};
-use std::marker::Unpin;
 use std::rc::Rc;
 use std::{
     cell::RefCell,
@@ -20,6 +18,7 @@ use std::{
 use tokio_util::io::SyncIoBridge;
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
+use tvix_castore::import::dir_entry_iter_to_ingestion_stream;
 use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
 use walkdir::DirEntry;
@@ -278,17 +277,19 @@ impl TvixStoreIO {
     /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`],
     /// passing the blob_service and directory_service that's used.
     /// The error is mapped to std::io::Error for simplicity.
-    pub(crate) async fn ingest_entries<S>(&self, entries_stream: S) -> io::Result<Node>
+    pub(crate) async fn ingest_dir_entries<'a, I>(
+        &'a self,
+        iter: I,
+        root: &Path,
+    ) -> io::Result<Node>
     where
-        S: Stream<Item = DirEntry> + Unpin,
+        I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + Sync + 'a,
     {
-        tvix_castore::import::ingest_entries(
-            &self.blob_service,
-            &self.directory_service,
-            entries_stream,
-        )
-        .await
-        .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
+        let entries_stream = dir_entry_iter_to_ingestion_stream(&self.blob_service, iter, root);
+
+        tvix_castore::import::ingest_entries(&self.directory_service, entries_stream)
+            .await
+            .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err))
     }
 
     pub(crate) async fn node_to_path_info(
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 15f37d301f..1f172d65c6 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -5,6 +5,7 @@ use futures::future::try_join_all;
 use nix_compat::path_info::ExportedPathInfo;
 use serde::Deserialize;
 use serde::Serialize;
+use std::os::unix::ffi::OsStrExt;
 use std::path::PathBuf;
 use std::sync::Arc;
 use tokio_listener::Listener;
@@ -430,9 +431,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 }
 
                 let path: PathBuf = elem.path.to_absolute_path().into();
+                let root_name = path.file_name().unwrap().as_bytes().to_vec().into();
                 // Ingest the given path
-                let root_node =
-                    ingest_path(blob_service.clone(), directory_service.clone(), path).await?;
+                let root_node = ingest_path(blob_service.clone(), directory_service.clone(), &path)
+                    .await?
+                    .rename(root_name);
 
                 // Create and upload a PathInfo pointing to the root_node,
                 // annotated with information we have from the reference graph.
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index e6d451834c..5cff29a9e5 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -112,12 +112,12 @@ pub async fn import_path_as_nar_ca<BS, DS, PS, P>(
 ) -> Result<StorePath, std::io::Error>
 where
     P: AsRef<Path> + std::fmt::Debug,
-    BS: AsRef<dyn BlobService> + Clone,
+    BS: BlobService + Clone,
     DS: AsRef<dyn DirectoryService>,
     PS: AsRef<dyn PathInfoService>,
 {
     let root_node =
-        tvix_castore::import::ingest_path(blob_service, directory_service, &path).await?;
+        tvix_castore::import::ingest_path(blob_service, directory_service, path.as_ref()).await?;
 
     // Ask the PathInfoService for the NAR size and sha256
     let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?;