about summary refs log tree commit diff
path: root/tvix/castore/src/import.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2024-04-18T18·51-0500
committerConnor Brewster <cbrewster@hey.com>2024-04-19T20·37+0000
commit259d7a3cfa214e7eab7b0862024d595489e92592 (patch)
treef6f52a334d5e5d26e6f3c323db0f8e56beaf56c8 /tvix/castore/src/import.rs
parent150106610e60e95267c0968a9679797b05db7f3d (diff)
refactor(tvix/castore): generalize store ingestion streams r/7979
Previously the store ingestion code was coupled to `walkdir::DirEntry`s
produced by the `walkdir` crate which made it impossible to reuse
ingesting from other sources like tarballs or NARs.

This introduces a `IngestionEntry` which carries enough information for
store ingestion and a future for computing the Blake3 digest of files.
This allows the producer to perform file uploads in a way that makes
sense for the source, ie. the filesystem upload could concurrently
upload multiple files at the same time, while the NAR ingestor will need
to ingest the entire blob before yielding the next blob in the stream.
In the future we can buffer small blobs and upload them concurrently,
but the full blob still needs to be read from the NAR before advancing.

Change-Id: I6d144063e2ba5b05e765bac1f27d41b3c8e7b283
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11462
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to '')
-rw-r--r--tvix/castore/src/import.rs452
1 files changed, 232 insertions, 220 deletions
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs
index e16bda1f64..049c177efc 100644
--- a/tvix/castore/src/import.rs
+++ b/tvix/castore/src/import.rs
@@ -6,11 +6,14 @@ use crate::proto::Directory;
 use crate::proto::DirectoryNode;
 use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
+use crate::B3Digest;
 use crate::Error as CastoreError;
-use async_stream::stream;
-use futures::pin_mut;
+use futures::stream::BoxStream;
+use futures::Future;
 use futures::{Stream, StreamExt};
 use std::fs::FileType;
+use std::os::unix::fs::MetadataExt;
+use std::pin::Pin;
 use tracing::Level;
 
 #[cfg(target_family = "unix")]
@@ -26,9 +29,6 @@ use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
 
-#[cfg(debug_assertions)]
-use std::collections::HashSet;
-
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("failed to upload directory at {0}: {1}")]
@@ -65,274 +65,253 @@ impl From<Error> for std::io::Error {
     }
 }
 
-/// Walk the filesystem at a given path and returns a level-keyed list of directory entries.
+/// Walk the filesystem at a given path and returns a stream of ingestion entries.
 ///
 /// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`].
 /// This low-level function can be used if additional filtering or processing is required on the
 /// entries.
 ///
-/// Level here is in the context of graph theory, e.g. 2-level nodes
-/// are nodes that are at depth 2.
+/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(fields(path), err)]
-pub fn walk_path_for_ingestion<P>(path: P) -> Result<Vec<Vec<DirEntry>>, Error>
+#[instrument(fields(path), skip(blob_service))]
+fn walk_path_for_ingestion<'a, BS>(
+    blob_service: BS,
+    path: &'a Path,
+) -> BoxStream<'a, Result<IngestionEntry<'a>, Error>>
 where
-    P: AsRef<Path> + std::fmt::Debug,
+    BS: BlobService + Clone + 'a,
 {
-    let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()];
-    for entry in WalkDir::new(path.as_ref())
+    let iter = WalkDir::new(path)
         .follow_links(false)
         .follow_root_links(false)
-        .contents_first(false)
-        .sort_by_file_name()
-        .into_iter()
-    {
-        // Entry could be a NotFound, if the root path specified does not exist.
-        let entry = entry.map_err(|e| {
-            Error::UnableToOpen(
-                PathBuf::from(path.as_ref()),
-                e.into_io_error().expect("walkdir err must be some"),
-            )
-        })?;
-
-        if entry.depth() >= entries_per_depths.len() {
-            debug_assert!(
-                entry.depth() == entries_per_depths.len(),
-                "Received unexpected entry with depth {} during descent, previously at {}",
-                entry.depth(),
-                entries_per_depths.len()
-            );
-
-            entries_per_depths.push(vec![entry]);
-        } else {
-            entries_per_depths[entry.depth()].push(entry);
-        }
-    }
+        .contents_first(true)
+        .into_iter();
 
-    Ok(entries_per_depths)
+    dir_entry_iter_to_ingestion_stream(blob_service, iter, path)
 }
 
-/// Convert a leveled-key vector of filesystem entries into a stream of
-/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top.
-pub fn leveled_entries_to_stream(
-    entries_per_depths: Vec<Vec<DirEntry>>,
-) -> impl Stream<Item = DirEntry> {
-    stream! {
-        for level in entries_per_depths.into_iter().rev() {
-            for entry in level.into_iter() {
-                yield entry;
-            }
-        }
+/// Converts an iterator of [walkdir::DirEntry]s into a stream of ingestion entries.
+/// This can then be fed into [ingest_entries] to ingest all the entries into the castore.
+///
+/// The root is the [Path] in the filesystem that is being ingested into the castore.
+pub fn dir_entry_iter_to_ingestion_stream<'a, BS, I>(
+    blob_service: BS,
+    iter: I,
+    root: &'a Path,
+) -> BoxStream<'a, Result<IngestionEntry<'a>, Error>>
+where
+    BS: BlobService + Clone + 'a,
+    I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
+{
+    let prefix = root.parent().unwrap_or_else(|| Path::new(""));
+
+    let iter = iter.map(move |entry| match entry {
+        Ok(entry) => dir_entry_to_ingestion_entry(blob_service.clone(), &entry, prefix),
+        Err(error) => Err(Error::UnableToStat(
+            root.to_path_buf(),
+            error.into_io_error().expect("walkdir err must be some"),
+        )),
+    });
+
+    Box::pin(futures::stream::iter(iter))
+}
+
+/// Converts a [walkdir::DirEntry] into an [IngestionEntry], uploading blobs to the
+/// provided [BlobService].
+///
+/// The prefix path is stripped from the path of each entry. This is usually the parent path
+/// of the path being ingested so that the last element of the stream only has one component.
+fn dir_entry_to_ingestion_entry<'a, BS>(
+    blob_service: BS,
+    entry: &DirEntry,
+    prefix: &Path,
+) -> Result<IngestionEntry<'a>, Error>
+where
+    BS: BlobService + 'a,
+{
+    let file_type = entry.file_type();
+
+    let path = entry
+        .path()
+        .strip_prefix(prefix)
+        .expect("Tvix bug: failed to strip root path prefix")
+        .to_path_buf();
+
+    if file_type.is_dir() {
+        Ok(IngestionEntry::Dir { path })
+    } else if file_type.is_symlink() {
+        let target = std::fs::read_link(entry.path())
+            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?;
+
+        Ok(IngestionEntry::Symlink { path, target })
+    } else if file_type.is_file() {
+        let metadata = entry
+            .metadata()
+            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+
+        // TODO: In the future, for small files, hash right away and upload async.
+        let digest = Box::pin(upload_blob_at_path(
+            blob_service,
+            entry.path().to_path_buf(),
+        ));
+
+        Ok(IngestionEntry::Regular {
+            path,
+            size: metadata.size(),
+            // If it's executable by the user, it'll become executable.
+            // This matches nix's dump() function behaviour.
+            executable: metadata.permissions().mode() & 64 != 0,
+            digest,
+        })
+    } else {
+        Ok(IngestionEntry::Unknown { path, file_type })
     }
 }
 
+/// Uploads the file at the provided [Path] the the [BlobService].
+#[instrument(skip(blob_service), fields(path), err)]
+async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+{
+    let mut file = match tokio::fs::File::open(&path).await {
+        Ok(file) => file,
+        Err(e) => return Err(Error::UnableToRead(path, e)),
+    };
+
+    let mut writer = blob_service.open_write().await;
+
+    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
+        return Err(Error::UnableToRead(path, e));
+    };
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::UnableToRead(path, e))?;
+
+    Ok(digest)
+}
+
 /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
 /// [DirectoryService]. It returns the root node or an error.
 ///
 /// It does not follow symlinks at the root, they will be ingested as actual symlinks.
 #[instrument(skip(blob_service, directory_service), fields(path), err)]
-pub async fn ingest_path<'a, BS, DS, P>(
+pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
     path: P,
 ) -> Result<Node, Error>
 where
     P: AsRef<Path> + std::fmt::Debug,
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService + Clone,
     DS: AsRef<dyn DirectoryService>,
 {
-    // produce the leveled-key vector of DirEntry.
-    let entries_per_depths = walk_path_for_ingestion(path)?;
-    let direntry_stream = leveled_entries_to_stream(entries_per_depths);
-    pin_mut!(direntry_stream);
-
-    ingest_entries(blob_service, directory_service, direntry_stream).await
+    let entry_stream = walk_path_for_ingestion(blob_service, path.as_ref());
+    ingest_entries(directory_service, entry_stream).await
 }
 
-/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory
-/// entries we are ingesting and verifying we are ingesting them in the right order.
-///
-/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier
-/// an entry `P` such that `P` is an **ancestor** of `L`.
-///
-/// If such a thing happened, it means that we have processed something like:
+/// Ingests elements from the given stream of [DirEntry] into a the passed [DirectoryService].
 ///
-///```no_trust
-///        A
-///       / \
-///      B   C
-///     / \   \
-///    G  F    P <--------- processed before this one
-///           / \                                  |
-///          D  E                                  |
-///              \                                 |
-///               L  <-----------------------------+
-/// ```
+/// The stream must have the following invariants:
+/// - All children entries must come before their parents.
+/// - The last entry must be the root node which must have a single path component.
+/// - Every entry should have a unique path.
 ///
-/// This is exactly what must never happen.
+/// Internally we maintain a [HashMap] of [PathBuf] to partially populated [Directory] at that
+/// path. Once we receive an [IngestionEntry] for the directory itself, we remove it from the
+/// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter].
 ///
-/// Note: this checker is local, it can only see what happens on our side, not on the remote side,
-/// i.e. the different remote services.
-#[derive(Default)]
-#[cfg(debug_assertions)]
-struct MerkleInvariantChecker {
-    seen: HashSet<PathBuf>,
-}
-
-#[cfg(debug_assertions)]
-impl MerkleInvariantChecker {
-    /// See a directory entry and remember it.
-    fn see(&mut self, node: &DirEntry) {
-        self.seen.insert(node.path().to_owned());
-    }
-
-    /// Returns a potential ancestor already seen for that directory entry.
-    fn find_ancestor<'a>(&self, node: &'a DirEntry) -> Option<&'a Path> {
-        node.path().ancestors().find(|p| self.seen.contains(*p))
-    }
-}
-
-/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and
-/// [`DirectoryService`].
-/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
+/// On success, returns the root node.
 #[instrument(skip_all, ret(level = Level::TRACE), err)]
-pub async fn ingest_entries<'a, BS, DS, S>(
-    blob_service: BS,
+pub async fn ingest_entries<'a, DS, S>(
     directory_service: DS,
     #[allow(unused_mut)] mut direntry_stream: S,
 ) -> Result<Node, Error>
 where
-    BS: AsRef<dyn BlobService>,
     DS: AsRef<dyn DirectoryService>,
-    S: Stream<Item = DirEntry> + std::marker::Unpin,
+    S: Stream<Item = Result<IngestionEntry<'a>, Error>> + Send + std::marker::Unpin,
 {
-    #[cfg(debug_assertions)]
-    let mut invariant_checker: MerkleInvariantChecker = Default::default();
-
-    #[cfg(debug_assertions)]
-    let mut direntry_stream = direntry_stream.inspect(|e| {
-        // If we find an ancestor before we see this entry, this means that the caller
-        // broke the contract, refer to the documentation of the invariant checker to
-        // understand the reasoning here.
-        if let Some(ancestor) = invariant_checker.find_ancestor(e) {
-            panic!(
-                "Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
-                ancestor.display(),
-                e.path().display()
-            );
-        }
-
-        invariant_checker.see(e);
-    });
-
     // For a given path, this holds the [Directory] structs as they are populated.
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
     let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None;
 
-    // We need to process a directory's children before processing
-    // the directory itself in order to have all the data needed
-    // to compute the hash.
-
     let root_node = loop {
-        let entry = match direntry_stream.next().await {
-            Some(entry) => entry,
-            None => {
-                // The last entry of the stream must have depth 0, after which
-                // we break the loop manually.
-                panic!("Tvix bug: unexpected end of stream");
+        let mut entry = direntry_stream
+            .next()
+            .await
+            // The last entry of the stream must have 1 path component, after which
+            // we break the loop manually.
+            .expect("Tvix bug: unexpected end of stream")?;
+
+        let name = entry
+            .path()
+            .file_name()
+            // If this is the root node, it will have an empty name.
+            .unwrap_or_default()
+            .as_bytes()
+            .to_owned()
+            .into();
+
+        let node = match &mut entry {
+            IngestionEntry::Dir { .. } => {
+                // If the entry is a directory, we traversed all its children (and
+                // populated it in `directories`).
+                // If we don't have it in there, it's an empty directory.
+                let directory = directories
+                    .remove(entry.path())
+                    // In that case, it contained no children
+                    .unwrap_or_default();
+
+                let directory_size = directory.size();
+                let directory_digest = directory.digest();
+
+                // Use the directory_putter to upload the directory.
+                // If we don't have one yet (as that's the first one to upload),
+                // initialize the putter.
+                maybe_directory_putter
+                    .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
+                    .put(directory)
+                    .await?;
+
+                Node::Directory(DirectoryNode {
+                    name,
+                    digest: directory_digest.into(),
+                    size: directory_size,
+                })
+            }
+            IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode {
+                name,
+                target: target.as_os_str().as_bytes().to_owned().into(),
+            }),
+            IngestionEntry::Regular {
+                size,
+                executable,
+                digest,
+                ..
+            } => Node::File(FileNode {
+                name,
+                digest: digest.await?.into(),
+                size: *size,
+                executable: *executable,
+            }),
+            IngestionEntry::Unknown { path, file_type } => {
+                return Err(Error::UnsupportedFileType(path.clone(), *file_type));
             }
-        };
-        let file_type = entry.file_type();
-
-        let node = if file_type.is_dir() {
-            // If the entry is a directory, we traversed all its children (and
-            // populated it in `directories`).
-            // If we don't have it in there, it's an empty directory.
-            let directory = directories
-                .remove(entry.path())
-                // In that case, it contained no children
-                .unwrap_or_default();
-
-            let directory_size = directory.size();
-            let directory_digest = directory.digest();
-
-            // Use the directory_putter to upload the directory.
-            // If we don't have one yet (as that's the first one to upload),
-            // initialize the putter.
-            maybe_directory_putter
-                .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
-                .put(directory)
-                .await?;
-
-            Node::Directory(DirectoryNode {
-                name: entry.file_name().as_bytes().to_owned().into(),
-                digest: directory_digest.into(),
-                size: directory_size,
-            })
-        } else if file_type.is_symlink() {
-            let target: bytes::Bytes = std::fs::read_link(entry.path())
-                .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?
-                .as_os_str()
-                .as_bytes()
-                .to_owned()
-                .into();
-
-            Node::Symlink(SymlinkNode {
-                name: entry.file_name().as_bytes().to_owned().into(),
-                target,
-            })
-        } else if file_type.is_file() {
-            let metadata = entry
-                .metadata()
-                .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
-
-            let mut file = tokio::fs::File::open(entry.path())
-                .await
-                .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?;
-
-            let mut writer = blob_service.as_ref().open_write().await;
-
-            if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-                return Err(Error::UnableToRead(entry.path().to_path_buf(), e));
-            };
-
-            let digest = writer
-                .close()
-                .await
-                .map_err(|e| Error::UnableToRead(entry.path().to_path_buf(), e))?;
-
-            Node::File(FileNode {
-                name: entry.file_name().as_bytes().to_vec().into(),
-                digest: digest.into(),
-                size: metadata.len(),
-                // If it's executable by the user, it'll become executable.
-                // This matches nix's dump() function behaviour.
-                executable: metadata.permissions().mode() & 64 != 0,
-            })
-        } else {
-            return Err(Error::UnsupportedFileType(
-                entry.path().to_path_buf(),
-                file_type,
-            ));
         };
 
-        if entry.depth() == 0 {
+        if entry.path().components().count() == 1 {
             break node;
-        } else {
-            // calculate the parent path, and make sure we register the node there.
-            // NOTE: entry.depth() > 0
-            let parent_path = entry.path().parent().unwrap().to_path_buf();
-
-            // record node in parent directory, creating a new [proto:Directory] if not there yet.
-            let parent_directory = directories.entry(parent_path).or_default();
-            match node {
-                Node::Directory(e) => parent_directory.directories.push(e),
-                Node::File(e) => parent_directory.files.push(e),
-                Node::Symlink(e) => parent_directory.symlinks.push(e),
-            }
         }
+
+        // record node in parent directory, creating a new [Directory] if not there yet.
+        directories
+            .entry(entry.path().parent().unwrap().to_path_buf())
+            .or_default()
+            .add(node);
     };
 
     // if there were directories uploaded, make sure we flush the putter, so
@@ -359,3 +338,36 @@ where
 
     Ok(root_node)
 }
+
+type BlobFut<'a> = Pin<Box<dyn Future<Output = Result<B3Digest, Error>> + Send + 'a>>;
+
+pub enum IngestionEntry<'a> {
+    Regular {
+        path: PathBuf,
+        size: u64,
+        executable: bool,
+        digest: BlobFut<'a>,
+    },
+    Symlink {
+        path: PathBuf,
+        target: PathBuf,
+    },
+    Dir {
+        path: PathBuf,
+    },
+    Unknown {
+        path: PathBuf,
+        file_type: FileType,
+    },
+}
+
+impl<'a> IngestionEntry<'a> {
+    fn path(&self) -> &Path {
+        match self {
+            IngestionEntry::Regular { path, .. } => path,
+            IngestionEntry::Symlink { path, .. } => path,
+            IngestionEntry::Dir { path } => path,
+            IngestionEntry::Unknown { path, .. } => path,
+        }
+    }
+}