about summary refs log tree commit diff
path: root/tvix/castore/src/import
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/import')
-rw-r--r--tvix/castore/src/import/archive.rs188
-rw-r--r--tvix/castore/src/import/blobs.rs177
-rw-r--r--tvix/castore/src/import/error.rs43
-rw-r--r--tvix/castore/src/import/fs.rs91
-rw-r--r--tvix/castore/src/import/mod.rs241
5 files changed, 516 insertions, 224 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index adcfb871d5..cd5b1290e0 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -1,51 +1,58 @@
-use std::io::{Cursor, Write};
-use std::sync::Arc;
-use std::{collections::HashMap, path::PathBuf};
+//! Imports from an archive (tarballs)
+
+use std::collections::HashMap;
 
 use petgraph::graph::{DiGraph, NodeIndex};
 use petgraph::visit::{DfsPostOrder, EdgeRef};
 use petgraph::Direction;
 use tokio::io::AsyncRead;
-use tokio::sync::Semaphore;
-use tokio::task::JoinSet;
 use tokio_stream::StreamExt;
 use tokio_tar::Archive;
-use tokio_util::io::InspectReader;
 use tracing::{instrument, warn, Level};
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
-use crate::import::{ingest_entries, Error as ImportError, IngestionEntry};
+use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
-use crate::B3Digest;
 
-/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
-/// background.
-///
-/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
-/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
-/// the blob can be represented using a u32 and will not cause an overflow.
-const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
+use super::blobs::{self, ConcurrentBlobUploader};
 
-/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
-const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024;
+type TarPathBuf = std::path::PathBuf;
 
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
-    #[error("error reading archive entry: {0}")]
-    Io(#[from] std::io::Error),
+    #[error("unable to construct stream of entries: {0}")]
+    Entries(std::io::Error),
+
+    #[error("unable to read next entry: {0}")]
+    NextEntry(std::io::Error),
+
+    #[error("unable to read path for entry: {0}")]
+    PathRead(std::io::Error),
+
+    #[error("unable to convert path {0} for entry: {1}")]
+    PathConvert(TarPathBuf, std::io::Error),
+
+    #[error("unable to read size field for {0}: {1}")]
+    Size(TarPathBuf, std::io::Error),
+
+    #[error("unable to read mode field for {0}: {1}")]
+    Mode(TarPathBuf, std::io::Error),
+
+    #[error("unable to read link name field for {0}: {1}")]
+    LinkName(TarPathBuf, std::io::Error),
 
     #[error("unsupported tar entry {0} type: {1:?}")]
-    UnsupportedTarEntry(PathBuf, tokio_tar::EntryType),
+    EntryType(TarPathBuf, tokio_tar::EntryType),
 
     #[error("symlink missing target {0}")]
-    MissingSymlinkTarget(PathBuf),
+    MissingSymlinkTarget(TarPathBuf),
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
 
-    #[error("failed to import into castore {0}")]
-    Import(#[from] ImportError),
+    #[error(transparent)]
+    BlobUploadError(#[from] blobs::Error),
 }
 
 /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and
@@ -55,10 +62,10 @@ pub async fn ingest_archive<BS, DS, R>(
     blob_service: BS,
     directory_service: DS,
     mut archive: Archive<R>,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
     BS: BlobService + Clone + 'static,
-    DS: AsRef<dyn DirectoryService>,
+    DS: DirectoryService,
     R: AsyncRead + Unpin,
 {
     // Since tarballs can have entries in any arbitrary order, we need to
@@ -68,105 +75,68 @@ where
     // In the first phase, collect up all the regular files and symlinks.
     let mut nodes = IngestionEntryGraph::new();
 
-    let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE));
-    let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new();
+    let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
+
+    let mut entries_iter = archive.entries().map_err(Error::Entries)?;
+    while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
+        let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into();
 
-    let mut entries_iter = archive.entries()?;
-    while let Some(mut entry) = entries_iter.try_next().await? {
-        let path: PathBuf = entry.path()?.into();
+        // construct a castore PathBuf, which we use in the produced IngestionEntry.
+        let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true)
+            .map_err(|e| Error::PathConvert(tar_path.clone(), e))?;
 
         let header = entry.header();
         let entry = match header.entry_type() {
             tokio_tar::EntryType::Regular
             | tokio_tar::EntryType::GNUSparse
             | tokio_tar::EntryType::Continuous => {
-                let header_size = header.size()?;
-
-                // If the blob is small enough, read it off the wire, compute the digest,
-                // and upload it to the [BlobService] in the background.
-                let (size, digest) = if header_size <= CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
-                    let mut buffer = Vec::with_capacity(header_size as usize);
-                    let mut hasher = blake3::Hasher::new();
-                    let mut reader = InspectReader::new(&mut entry, |bytes| {
-                        hasher.write_all(bytes).unwrap();
-                    });
-
-                    // Ensure that we don't buffer into memory until we've acquired a permit.
-                    // This prevents consuming too much memory when performing concurrent
-                    // blob uploads.
-                    let permit = semaphore
-                        .clone()
-                        // This cast is safe because ensure the header_size is less than
-                        // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
-                        .acquire_many_owned(header_size as u32)
-                        .await
-                        .unwrap();
-                    let size = tokio::io::copy(&mut reader, &mut buffer).await?;
-
-                    let digest: B3Digest = hasher.finalize().as_bytes().into();
-
-                    {
-                        let blob_service = blob_service.clone();
-                        let digest = digest.clone();
-                        async_blob_uploads.spawn({
-                            async move {
-                                let mut writer = blob_service.open_write().await;
-
-                                tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?;
-
-                                let blob_digest = writer.close().await?;
-
-                                assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
-
-                                // Make sure we hold the permit until we finish writing the blob
-                                // to the [BlobService].
-                                drop(permit);
-                                Ok(())
-                            }
-                        });
-                    }
-
-                    (size, digest)
-                } else {
-                    let mut writer = blob_service.open_write().await;
-
-                    let size = tokio::io::copy(&mut entry, &mut writer).await?;
-
-                    let digest = writer.close().await?;
-
-                    (size, digest)
-                };
+                let size = header
+                    .size()
+                    .map_err(|e| Error::Size(tar_path.clone(), e))?;
+
+                let digest = blob_uploader
+                    .upload(&path, size, &mut entry)
+                    .await
+                    .map_err(Error::BlobUploadError)?;
+
+                let executable = entry
+                    .header()
+                    .mode()
+                    .map_err(|e| Error::Mode(tar_path, e))?
+                    & 64
+                    != 0;
 
                 IngestionEntry::Regular {
                     path,
                     size,
-                    executable: entry.header().mode()? & 64 != 0,
+                    executable,
                     digest,
                 }
             }
             tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
-                    .link_name()?
-                    .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
-                    .into(),
+                    .link_name()
+                    .map_err(|e| Error::LinkName(tar_path.clone(), e))?
+                    .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))?
+                    .into_owned()
+                    .into_os_string()
+                    .into_encoded_bytes(),
                 path,
             },
             // Push a bogus directory marker so we can make sure this directoy gets
             // created. We don't know the digest and size until after reading the full
             // tarball.
-            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() },
+            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path },
 
             tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
 
-            entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)),
+            entry_type => return Err(Error::EntryType(tar_path, entry_type).into()),
         };
 
         nodes.add(entry)?;
     }
 
-    while let Some(result) = async_blob_uploads.join_next().await {
-        result.expect("task panicked")?;
-    }
+    blob_uploader.join().await.map_err(Error::BlobUploadError)?;
 
     let root_node = ingest_entries(
         directory_service,
@@ -193,7 +163,7 @@ where
 /// An error is returned if this is not the case and ingestion will fail.
 struct IngestionEntryGraph {
     graph: DiGraph<IngestionEntry, ()>,
-    path_to_index: HashMap<PathBuf, NodeIndex>,
+    path_to_index: HashMap<crate::path::PathBuf, NodeIndex>,
     root_node: Option<NodeIndex>,
 }
 
@@ -218,7 +188,7 @@ impl IngestionEntryGraph {
     /// and new nodes are not directories, the node is replaced and is disconnected from its
     /// children.
     pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
-        let path = entry.path().to_path_buf();
+        let path = entry.path().to_owned();
 
         let index = match self.path_to_index.get(entry.path()) {
             Some(&index) => {
@@ -233,12 +203,12 @@ impl IngestionEntryGraph {
             None => self.graph.add_node(entry),
         };
 
-        // A path with 1 component is the root node
+        // for archives, a path with 1 component is the root node
         if path.components().count() == 1 {
             // We expect archives to contain a single root node, if there is another root node
             // entry with a different path name, this is unsupported.
             if let Some(root_node) = self.root_node {
-                if self.get_node(root_node).path() != path {
+                if self.get_node(root_node).path() != path.as_ref() {
                     return Err(Error::UnexpectedNumberOfTopLevelEntries);
                 }
             }
@@ -247,7 +217,7 @@ impl IngestionEntryGraph {
         } else if let Some(parent_path) = path.parent() {
             // Recursively add the parent node until it hits the root node.
             let parent_index = self.add(IngestionEntry::Dir {
-                path: parent_path.to_path_buf(),
+                path: parent_path.to_owned(),
             })?;
 
             // Insert an edge from the parent directory to the child entry.
@@ -332,23 +302,29 @@ mod test {
 
     lazy_static! {
         pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
-        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() };
-        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() };
-        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() };
+        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir {
+            path: "a".parse().unwrap()
+        };
+        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir {
+            path: "b".parse().unwrap()
+        };
+        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir {
+            path: "a/b".parse().unwrap()
+        };
         pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
-            path: "a".into(),
+            path: "a".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b".into(),
+            path: "a/b".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b/c".into(),
+            path: "a/b/c".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs
new file mode 100644
index 0000000000..8135d871d6
--- /dev/null
+++ b/tvix/castore/src/import/blobs.rs
@@ -0,0 +1,177 @@
+use std::{
+    io::{Cursor, Write},
+    sync::Arc,
+};
+
+use tokio::{
+    io::AsyncRead,
+    sync::Semaphore,
+    task::{JoinError, JoinSet},
+};
+use tokio_util::io::InspectReader;
+
+use crate::{blobservice::BlobService, B3Digest, Path, PathBuf};
+
+/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
+/// background.
+///
+/// This is a u32 since we acquire a weighted semaphore using the size of the blob.
+/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of
+/// the blob can be represented using a u32 and will not cause an overflow.
+const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024;
+
+/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads.
+const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024;
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unable to read blob contents for {0}: {1}")]
+    BlobRead(PathBuf, std::io::Error),
+
+    // FUTUREWORK: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(PathBuf, std::io::Error),
+
+    #[error("unexpected size for {path} wanted: {wanted} got: {got}")]
+    UnexpectedSize {
+        path: PathBuf,
+        wanted: u64,
+        got: u64,
+    },
+
+    #[error("blob upload join error: {0}")]
+    JoinError(#[from] JoinError),
+}
+
+/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs.
+/// This is useful when ingesting from sources like tarballs and archives which each blob entry
+/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to
+/// round trip time with the blob service. The concurrent blob uploader will buffer small
+/// blobs in memory and upload them to the blob service in the background.
+///
+/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait
+/// for all background jobs to complete and check for any errors.
+pub struct ConcurrentBlobUploader<BS> {
+    blob_service: BS,
+    upload_tasks: JoinSet<Result<(), Error>>,
+    upload_semaphore: Arc<Semaphore>,
+}
+
+impl<BS> ConcurrentBlobUploader<BS>
+where
+    BS: BlobService + Clone + 'static,
+{
+    /// Creates a new concurrent blob uploader which uploads blobs to the provided
+    /// blob service.
+    pub fn new(blob_service: BS) -> Self {
+        Self {
+            blob_service,
+            upload_tasks: JoinSet::new(),
+            upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)),
+        }
+    }
+
+    /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer
+    /// and uploaded in the background.
+    /// This will read the entirety of the provided reader unless an error occurs, even if blobs
+    /// are uploaded in the background..
+    pub async fn upload<R>(
+        &mut self,
+        path: &Path,
+        expected_size: u64,
+        mut r: R,
+    ) -> Result<B3Digest, Error>
+    where
+        R: AsyncRead + Unpin,
+    {
+        if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 {
+            let mut buffer = Vec::with_capacity(expected_size as usize);
+            let mut hasher = blake3::Hasher::new();
+            let mut reader = InspectReader::new(&mut r, |bytes| {
+                hasher.write_all(bytes).unwrap();
+            });
+
+            let permit = self
+                .upload_semaphore
+                .clone()
+                // This cast is safe because ensure the header_size is less than
+                // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32.
+                .acquire_many_owned(expected_size as u32)
+                .await
+                .unwrap();
+            let size = tokio::io::copy(&mut reader, &mut buffer)
+                .await
+                .map_err(|e| Error::BlobRead(path.into(), e))?;
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
+
+            if size != expected_size {
+                return Err(Error::UnexpectedSize {
+                    path: path.into(),
+                    wanted: expected_size,
+                    got: size,
+                });
+            }
+
+            self.upload_tasks.spawn({
+                let blob_service = self.blob_service.clone();
+                let expected_digest = digest.clone();
+                let path = path.to_owned();
+                let r = Cursor::new(buffer);
+                async move {
+                    let digest = upload_blob(&blob_service, &path, expected_size, r).await?;
+
+                    assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch");
+
+                    // Make sure we hold the permit until we finish writing the blob
+                    // to the [BlobService].
+                    drop(permit);
+                    Ok(())
+                }
+            });
+
+            return Ok(digest);
+        }
+
+        upload_blob(&self.blob_service, path, expected_size, r).await
+    }
+
+    /// Waits for all background upload jobs to complete, returning any upload errors.
+    pub async fn join(mut self) -> Result<(), Error> {
+        while let Some(result) = self.upload_tasks.join_next().await {
+            result??;
+        }
+        Ok(())
+    }
+}
+
+async fn upload_blob<BS, R>(
+    blob_service: &BS,
+    path: &Path,
+    expected_size: u64,
+    mut r: R,
+) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+    R: AsyncRead + Unpin,
+{
+    let mut writer = blob_service.open_write().await;
+
+    let size = tokio::io::copy(&mut r, &mut writer)
+        .await
+        .map_err(|e| Error::BlobRead(path.into(), e))?;
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::BlobFinalize(path.into(), e))?;
+
+    if size != expected_size {
+        return Err(Error::UnexpectedSize {
+            path: path.into(),
+            wanted: expected_size,
+            got: size,
+        });
+    }
+
+    Ok(digest)
+}
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 15dd0664de..e3fba617e0 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -1,39 +1,20 @@
-use std::{fs::FileType, path::PathBuf};
+use super::PathBuf;
 
 use crate::Error as CastoreError;
 
+/// Represents all error types that emitted by ingest_entries.
+/// It can represent errors uploading individual Directories and finalizing
+/// the upload.
+/// It also contains a generic error kind that'll carry ingestion-method
+/// specific errors.
 #[derive(Debug, thiserror::Error)]
-pub enum Error {
+pub enum IngestionError<E: std::fmt::Display> {
+    #[error("error from producer: {0}")]
+    Producer(#[from] E),
+
     #[error("failed to upload directory at {0}: {1}")]
     UploadDirectoryError(PathBuf, CastoreError),
 
-    #[error("invalid encoding encountered for entry {0:?}")]
-    InvalidEncoding(PathBuf),
-
-    #[error("unable to stat {0}: {1}")]
-    UnableToStat(PathBuf, std::io::Error),
-
-    #[error("unable to open {0}: {1}")]
-    UnableToOpen(PathBuf, std::io::Error),
-
-    #[error("unable to read {0}: {1}")]
-    UnableToRead(PathBuf, std::io::Error),
-
-    #[error("unsupported file {0} type: {1:?}")]
-    UnsupportedFileType(PathBuf, FileType),
-}
-
-impl From<CastoreError> for Error {
-    fn from(value: CastoreError) -> Self {
-        match value {
-            CastoreError::InvalidRequest(_) => panic!("tvix bug"),
-            CastoreError::StorageError(_) => panic!("error"),
-        }
-    }
-}
-
-impl From<Error> for std::io::Error {
-    fn from(value: Error) -> Self {
-        std::io::Error::new(std::io::ErrorKind::Other, value)
-    }
+    #[error("failed to finalize directory upload: {0}")]
+    FinalizeDirectoryUpload(CastoreError),
 }
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 6709d4a127..9d3ecfe6ab 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -1,8 +1,11 @@
+//! Import from a real filesystem.
+
 use futures::stream::BoxStream;
 use futures::StreamExt;
+use std::fs::FileType;
+use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
-use std::path::Path;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
@@ -10,13 +13,11 @@ use walkdir::WalkDir;
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::proto::node::Node;
+use crate::B3Digest;
 
 use super::ingest_entries;
-use super::upload_blob_at_path;
-use super::Error;
 use super::IngestionEntry;
-
-///! Imports that deal with a real filesystem.
+use super::IngestionError;
 
 /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
 /// [DirectoryService]. It returns the root node or an error.
@@ -30,11 +31,11 @@ pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
     path: P,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
-    P: AsRef<Path> + std::fmt::Debug,
+    P: AsRef<std::path::Path> + std::fmt::Debug,
     BS: BlobService + Clone,
-    DS: AsRef<dyn DirectoryService>,
+    DS: DirectoryService,
 {
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
@@ -55,13 +56,13 @@ where
 pub fn dir_entries_to_ingestion_stream<'a, BS, I>(
     blob_service: BS,
     iter: I,
-    root: &'a Path,
+    root: &'a std::path::Path,
 ) -> BoxStream<'a, Result<IngestionEntry, Error>>
 where
     BS: BlobService + Clone + 'a,
     I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
 {
-    let prefix = root.parent().unwrap_or_else(|| Path::new(""));
+    let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
 
     Box::pin(
         futures::stream::iter(iter)
@@ -72,7 +73,7 @@ where
                         Ok(dir_entry) => {
                             dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
                         }
-                        Err(e) => Err(Error::UnableToStat(
+                        Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
                             e.into_io_error().expect("walkdir err must be some"),
                         )),
@@ -91,32 +92,37 @@ where
 pub async fn dir_entry_to_ingestion_entry<BS>(
     blob_service: BS,
     entry: &DirEntry,
-    prefix: &Path,
+    prefix: &std::path::Path,
 ) -> Result<IngestionEntry, Error>
 where
     BS: BlobService,
 {
     let file_type = entry.file_type();
 
-    let path = entry
+    let fs_path = entry
         .path()
         .strip_prefix(prefix)
-        .expect("Tvix bug: failed to strip root path prefix")
-        .to_path_buf();
+        .expect("Tvix bug: failed to strip root path prefix");
+
+    // convert to castore PathBuf
+    let path = crate::path::PathBuf::from_host_path(fs_path, false)
+        .unwrap_or_else(|e| panic!("Tvix bug: walkdir direntry cannot be parsed: {}", e));
 
     if file_type.is_dir() {
         Ok(IngestionEntry::Dir { path })
     } else if file_type.is_symlink() {
         let target = std::fs::read_link(entry.path())
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))?
+            .into_os_string()
+            .into_vec();
 
         Ok(IngestionEntry::Symlink { path, target })
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
-        let digest = upload_blob_at_path(blob_service, entry.path().to_path_buf()).await?;
+        let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
 
         Ok(IngestionEntry::Regular {
             path,
@@ -127,6 +133,53 @@ where
             digest,
         })
     } else {
-        Ok(IngestionEntry::Unknown { path, file_type })
+        return Err(Error::FileType(fs_path.to_path_buf(), file_type));
     }
 }
+
+/// Uploads the file at the provided [Path] the the [BlobService].
+#[instrument(skip(blob_service), fields(path), err)]
+async fn upload_blob<BS>(
+    blob_service: BS,
+    path: impl AsRef<std::path::Path>,
+) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+{
+    let mut file = match tokio::fs::File::open(path.as_ref()).await {
+        Ok(file) => file,
+        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
+    };
+
+    let mut writer = blob_service.open_write().await;
+
+    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
+        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
+    };
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
+
+    Ok(digest)
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unsupported file type at {0}: {1:?}")]
+    FileType(std::path::PathBuf, FileType),
+
+    #[error("unable to stat {0}: {1}")]
+    Stat(std::path::PathBuf, std::io::Error),
+
+    #[error("unable to open {0}: {1}")]
+    Open(std::path::PathBuf, std::io::Error),
+
+    #[error("unable to read {0}: {1}")]
+    BlobRead(std::path::PathBuf, std::io::Error),
+
+    // TODO: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(std::path::PathBuf, std::io::Error),
+}
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index e9fdc750f8..4223fe5387 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -4,9 +4,9 @@
 //! Specific implementations, such as ingesting from the filesystem, live in
 //! child modules.
 
-use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryPutter;
 use crate::directoryservice::DirectoryService;
+use crate::path::{Path, PathBuf};
 use crate::proto::node::Node;
 use crate::proto::Directory;
 use crate::proto::DirectoryNode;
@@ -14,23 +14,17 @@ use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::B3Digest;
 use futures::{Stream, StreamExt};
-use std::fs::FileType;
 
 use tracing::Level;
 
-#[cfg(target_family = "unix")]
-use std::os::unix::ffi::OsStrExt;
-
-use std::{
-    collections::HashMap,
-    path::{Path, PathBuf},
-};
+use std::collections::HashMap;
 use tracing::instrument;
 
 mod error;
-pub use error::Error;
+pub use error::IngestionError;
 
 pub mod archive;
+pub mod blobs;
 pub mod fs;
 
 /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService].
@@ -51,10 +45,14 @@ pub mod fs;
 ///
 /// On success, returns the root node.
 #[instrument(skip_all, ret(level = Level::TRACE), err)]
-pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error>
+pub async fn ingest_entries<DS, S, E>(
+    directory_service: DS,
+    mut entries: S,
+) -> Result<Node, IngestionError<E>>
 where
-    DS: AsRef<dyn DirectoryService>,
-    S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin,
+    DS: DirectoryService,
+    S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin,
+    E: std::error::Error,
 {
     // For a given path, this holds the [Directory] structs as they are populated.
     let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
@@ -68,20 +66,11 @@ where
             // we break the loop manually.
             .expect("Tvix bug: unexpected end of stream")?;
 
-        debug_assert!(
-            entry
-                .path()
-                .components()
-                .all(|x| matches!(x, std::path::Component::Normal(_))),
-            "path may only contain normal components"
-        );
-
         let name = entry
             .path()
             .file_name()
             // If this is the root node, it will have an empty name.
             .unwrap_or_default()
-            .as_bytes()
             .to_owned()
             .into();
 
@@ -89,7 +78,8 @@ where
             IngestionEntry::Dir { .. } => {
                 // If the entry is a directory, we traversed all its children (and
                 // populated it in `directories`).
-                // If we don't have it in there, it's an empty directory.
+                // If we don't have it in directories, it's a directory without
+                // children.
                 let directory = directories
                     .remove(entry.path())
                     // In that case, it contained no children
@@ -102,9 +92,12 @@ where
                 // If we don't have one yet (as that's the first one to upload),
                 // initialize the putter.
                 maybe_directory_putter
-                    .get_or_insert_with(|| directory_service.as_ref().put_multiple_start())
+                    .get_or_insert_with(|| directory_service.put_multiple_start())
                     .put(directory)
-                    .await?;
+                    .await
+                    .map_err(|e| {
+                        IngestionError::UploadDirectoryError(entry.path().to_owned(), e)
+                    })?;
 
                 Node::Directory(DirectoryNode {
                     name,
@@ -114,7 +107,7 @@ where
             }
             IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode {
                 name,
-                target: target.as_os_str().as_bytes().to_owned().into(),
+                target: target.to_owned().into(),
             }),
             IngestionEntry::Regular {
                 size,
@@ -127,23 +120,27 @@ where
                 size: *size,
                 executable: *executable,
             }),
-            IngestionEntry::Unknown { path, file_type } => {
-                return Err(Error::UnsupportedFileType(path.clone(), *file_type));
-            }
         };
 
-        if entry.path().components().count() == 1 {
+        let parent = entry
+            .path()
+            .parent()
+            .expect("Tvix bug: got entry with root node");
+
+        if parent == crate::Path::ROOT {
             break node;
+        } else {
+            // record node in parent directory, creating a new [Directory] if not there yet.
+            directories.entry(parent.to_owned()).or_default().add(node);
         }
-
-        // record node in parent directory, creating a new [Directory] if not there yet.
-        directories
-            .entry(entry.path().parent().unwrap().to_path_buf())
-            .or_default()
-            .add(node);
     };
 
     assert!(
+        entries.count().await == 0,
+        "Tvix bug: left over elements in the stream"
+    );
+
+    assert!(
         directories.is_empty(),
         "Tvix bug: left over directories after processing ingestion stream"
     );
@@ -152,7 +149,10 @@ where
     // they're all persisted to the backend.
     if let Some(mut directory_putter) = maybe_directory_putter {
         #[cfg_attr(not(debug_assertions), allow(unused))]
-        let root_directory_digest = directory_putter.close().await?;
+        let root_directory_digest = directory_putter
+            .close()
+            .await
+            .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?;
 
         #[cfg(debug_assertions)]
         {
@@ -174,31 +174,6 @@ where
     Ok(root_node)
 }
 
-/// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path), err)]
-async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error>
-where
-    BS: BlobService,
-{
-    let mut file = match tokio::fs::File::open(&path).await {
-        Ok(file) => file,
-        Err(e) => return Err(Error::UnableToRead(path, e)),
-    };
-
-    let mut writer = blob_service.open_write().await;
-
-    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
-        return Err(Error::UnableToRead(path, e));
-    };
-
-    let digest = writer
-        .close()
-        .await
-        .map_err(|e| Error::UnableToRead(path, e))?;
-
-    Ok(digest)
-}
-
 #[derive(Debug, Clone, Eq, PartialEq)]
 pub enum IngestionEntry {
     Regular {
@@ -209,15 +184,11 @@ pub enum IngestionEntry {
     },
     Symlink {
         path: PathBuf,
-        target: PathBuf,
+        target: Vec<u8>,
     },
     Dir {
         path: PathBuf,
     },
-    Unknown {
-        path: PathBuf,
-        file_type: FileType,
-    },
 }
 
 impl IngestionEntry {
@@ -226,7 +197,6 @@ impl IngestionEntry {
             IngestionEntry::Regular { path, .. } => path,
             IngestionEntry::Symlink { path, .. } => path,
             IngestionEntry::Dir { path } => path,
-            IngestionEntry::Unknown { path, .. } => path,
         }
     }
 
@@ -234,3 +204,138 @@ impl IngestionEntry {
         matches!(self, IngestionEntry::Dir { .. })
     }
 }
+
+#[cfg(test)]
+mod test {
+    use rstest::rstest;
+
+    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
+    use crate::proto::node::Node;
+    use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode};
+    use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST};
+
+    use super::ingest_entries;
+    use super::IngestionEntry;
+
+    #[rstest]
+    #[case::single_file(vec![IngestionEntry::Regular {
+        path: "foo".parse().unwrap(),
+        size: 42,
+        executable: true,
+        digest: DUMMY_DIGEST.clone(),
+    }],
+        Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true }
+    ))]
+    #[case::single_symlink(vec![IngestionEntry::Symlink {
+        path: "foo".parse().unwrap(),
+        target: b"blub".into(),
+    }],
+        Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()})
+    )]
+    #[case::single_dir(vec![IngestionEntry::Dir {
+        path: "foo".parse().unwrap(),
+    }],
+        Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()})
+    )]
+    #[case::dir_with_keep(vec![
+        IngestionEntry::Regular {
+            path: "foo/.keep".parse().unwrap(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_BLOB_DIGEST.clone(),
+        },
+        IngestionEntry::Dir {
+            path: "foo".parse().unwrap(),
+        },
+    ],
+        Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() })
+    )]
+    /// This is intentionally a bit unsorted, though it still satisfies all
+    /// requirements we have on the order of elements in the stream.
+    #[case::directory_complicated(vec![
+        IngestionEntry::Regular {
+            path: "blub/.keep".parse().unwrap(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_BLOB_DIGEST.clone(),
+        },
+        IngestionEntry::Regular {
+            path: "blub/keep/.keep".parse().unwrap(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_BLOB_DIGEST.clone(),
+        },
+        IngestionEntry::Dir {
+            path: "blub/keep".parse().unwrap(),
+        },
+        IngestionEntry::Symlink {
+            path: "blub/aa".parse().unwrap(),
+            target: b"/nix/store/somewhereelse".into(),
+        },
+        IngestionEntry::Dir {
+            path: "blub".parse().unwrap(),
+        },
+    ],
+        Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() })
+    )]
+    #[tokio::test]
+    async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
+        let directory_service = MemoryDirectoryService::default();
+
+        let root_node = ingest_entries(
+            directory_service.clone(),
+            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
+        )
+        .await
+        .expect("must succeed");
+
+        assert_eq!(exp_root_node, root_node, "root node should match");
+    }
+
+    #[rstest]
+    #[should_panic]
+    #[case::empty_entries(vec![])]
+    #[should_panic]
+    #[case::missing_intermediate_dir(vec![
+        IngestionEntry::Regular {
+            path: "blub/.keep".parse().unwrap(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_BLOB_DIGEST.clone(),
+        },
+    ])]
+    #[should_panic]
+    #[case::leaf_after_parent(vec![
+        IngestionEntry::Dir {
+            path: "blub".parse().unwrap(),
+        },
+        IngestionEntry::Regular {
+            path: "blub/.keep".parse().unwrap(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_BLOB_DIGEST.clone(),
+        },
+    ])]
+    #[should_panic]
+    #[case::root_in_entry(vec![
+        IngestionEntry::Regular {
+            path: ".keep".parse().unwrap(),
+            size: 0,
+            executable: false,
+            digest: EMPTY_BLOB_DIGEST.clone(),
+        },
+        IngestionEntry::Dir {
+            path: "".parse().unwrap(),
+        },
+    ])]
+    #[tokio::test]
+    async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) {
+        let directory_service = MemoryDirectoryService::default();
+
+        let _ = ingest_entries(
+            directory_service.clone(),
+            futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)),
+        )
+        .await;
+    }
+}