diff options
Diffstat (limited to 'tvix/castore/src/import')
-rw-r--r-- | tvix/castore/src/import/archive.rs | 188 | ||||
-rw-r--r-- | tvix/castore/src/import/blobs.rs | 177 | ||||
-rw-r--r-- | tvix/castore/src/import/error.rs | 43 | ||||
-rw-r--r-- | tvix/castore/src/import/fs.rs | 91 | ||||
-rw-r--r-- | tvix/castore/src/import/mod.rs | 241 |
5 files changed, 516 insertions, 224 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index adcfb871d5..cd5b1290e0 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -1,51 +1,58 @@ -use std::io::{Cursor, Write}; -use std::sync::Arc; -use std::{collections::HashMap, path::PathBuf}; +//! Imports from an archive (tarballs) + +use std::collections::HashMap; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::{DfsPostOrder, EdgeRef}; use petgraph::Direction; use tokio::io::AsyncRead; -use tokio::sync::Semaphore; -use tokio::task::JoinSet; use tokio_stream::StreamExt; use tokio_tar::Archive; -use tokio_util::io::InspectReader; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::import::{ingest_entries, Error as ImportError, IngestionEntry}; +use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; -use crate::B3Digest; -/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the -/// background. -/// -/// This is a u32 since we acquire a weighted semaphore using the size of the blob. -/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of -/// the blob can be represented using a u32 and will not cause an overflow. -const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; +use super::blobs::{self, ConcurrentBlobUploader}; -/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. -const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; +type TarPathBuf = std::path::PathBuf; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("error reading archive entry: {0}")] - Io(#[from] std::io::Error), + #[error("unable to construct stream of entries: {0}")] + Entries(std::io::Error), + + #[error("unable to read next entry: {0}")] + NextEntry(std::io::Error), + + #[error("unable to read path for entry: {0}")] + PathRead(std::io::Error), + + #[error("unable to convert path {0} for entry: {1}")] + PathConvert(TarPathBuf, std::io::Error), + + #[error("unable to read size field for {0}: {1}")] + Size(TarPathBuf, std::io::Error), + + #[error("unable to read mode field for {0}: {1}")] + Mode(TarPathBuf, std::io::Error), + + #[error("unable to read link name field for {0}: {1}")] + LinkName(TarPathBuf, std::io::Error), #[error("unsupported tar entry {0} type: {1:?}")] - UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), + EntryType(TarPathBuf, tokio_tar::EntryType), #[error("symlink missing target {0}")] - MissingSymlinkTarget(PathBuf), + MissingSymlinkTarget(TarPathBuf), #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, - #[error("failed to import into castore {0}")] - Import(#[from] ImportError), + #[error(transparent)] + BlobUploadError(#[from] blobs::Error), } /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and @@ -55,10 +62,10 @@ pub async fn ingest_archive<BS, DS, R>( blob_service: BS, directory_service: DS, mut archive: Archive<R>, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where BS: BlobService + Clone + 'static, - DS: AsRef<dyn DirectoryService>, + DS: DirectoryService, R: AsyncRead + Unpin, { // Since tarballs can have entries in any arbitrary order, we need to @@ -68,105 +75,68 @@ where // In the first phase, collect up all the regular files and symlinks. let mut nodes = IngestionEntryGraph::new(); - let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); - let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); + + let mut entries_iter = archive.entries().map_err(Error::Entries)?; + while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { + let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into(); - let mut entries_iter = archive.entries()?; - while let Some(mut entry) = entries_iter.try_next().await? { - let path: PathBuf = entry.path()?.into(); + // construct a castore PathBuf, which we use in the produced IngestionEntry. + let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true) + .map_err(|e| Error::PathConvert(tar_path.clone(), e))?; let header = entry.header(); let entry = match header.entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header.size()?; - - // If the blob is small enough, read it off the wire, compute the digest, - // and upload it to the [BlobService] in the background. - let (size, digest) = if header_size <= CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 { - let mut buffer = Vec::with_capacity(header_size as usize); - let mut hasher = blake3::Hasher::new(); - let mut reader = InspectReader::new(&mut entry, |bytes| { - hasher.write_all(bytes).unwrap(); - }); - - // Ensure that we don't buffer into memory until we've acquired a permit. - // This prevents consuming too much memory when performing concurrent - // blob uploads. - let permit = semaphore - .clone() - // This cast is safe because ensure the header_size is less than - // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32. - .acquire_many_owned(header_size as u32) - .await - .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer).await?; - - let digest: B3Digest = hasher.finalize().as_bytes().into(); - - { - let blob_service = blob_service.clone(); - let digest = digest.clone(); - async_blob_uploads.spawn({ - async move { - let mut writer = blob_service.open_write().await; - - tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?; - - let blob_digest = writer.close().await?; - - assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); - - // Make sure we hold the permit until we finish writing the blob - // to the [BlobService]. - drop(permit); - Ok(()) - } - }); - } - - (size, digest) - } else { - let mut writer = blob_service.open_write().await; - - let size = tokio::io::copy(&mut entry, &mut writer).await?; - - let digest = writer.close().await?; - - (size, digest) - }; + let size = header + .size() + .map_err(|e| Error::Size(tar_path.clone(), e))?; + + let digest = blob_uploader + .upload(&path, size, &mut entry) + .await + .map_err(Error::BlobUploadError)?; + + let executable = entry + .header() + .mode() + .map_err(|e| Error::Mode(tar_path, e))? + & 64 + != 0; IngestionEntry::Regular { path, size, - executable: entry.header().mode()? & 64 != 0, + executable, digest, } } tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry - .link_name()? - .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? - .into(), + .link_name() + .map_err(|e| Error::LinkName(tar_path.clone(), e))? + .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))? + .into_owned() + .into_os_string() + .into_encoded_bytes(), path, }, // Push a bogus directory marker so we can make sure this directoy gets // created. We don't know the digest and size until after reading the full // tarball. - tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() }, + tokio_tar::EntryType::Directory => IngestionEntry::Dir { path }, tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, - entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), + entry_type => return Err(Error::EntryType(tar_path, entry_type).into()), }; nodes.add(entry)?; } - while let Some(result) = async_blob_uploads.join_next().await { - result.expect("task panicked")?; - } + blob_uploader.join().await.map_err(Error::BlobUploadError)?; let root_node = ingest_entries( directory_service, @@ -193,7 +163,7 @@ where /// An error is returned if this is not the case and ingestion will fail. struct IngestionEntryGraph { graph: DiGraph<IngestionEntry, ()>, - path_to_index: HashMap<PathBuf, NodeIndex>, + path_to_index: HashMap<crate::path::PathBuf, NodeIndex>, root_node: Option<NodeIndex>, } @@ -218,7 +188,7 @@ impl IngestionEntryGraph { /// and new nodes are not directories, the node is replaced and is disconnected from its /// children. pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> { - let path = entry.path().to_path_buf(); + let path = entry.path().to_owned(); let index = match self.path_to_index.get(entry.path()) { Some(&index) => { @@ -233,12 +203,12 @@ impl IngestionEntryGraph { None => self.graph.add_node(entry), }; - // A path with 1 component is the root node + // for archives, a path with 1 component is the root node if path.components().count() == 1 { // We expect archives to contain a single root node, if there is another root node // entry with a different path name, this is unsupported. if let Some(root_node) = self.root_node { - if self.get_node(root_node).path() != path { + if self.get_node(root_node).path() != path.as_ref() { return Err(Error::UnexpectedNumberOfTopLevelEntries); } } @@ -247,7 +217,7 @@ impl IngestionEntryGraph { } else if let Some(parent_path) = path.parent() { // Recursively add the parent node until it hits the root node. let parent_index = self.add(IngestionEntry::Dir { - path: parent_path.to_path_buf(), + path: parent_path.to_owned(), })?; // Insert an edge from the parent directory to the child entry. @@ -332,23 +302,29 @@ mod test { lazy_static! { pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); - pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() }; - pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() }; - pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() }; + pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { + path: "a".parse().unwrap() + }; + pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { + path: "b".parse().unwrap() + }; + pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { + path: "a/b".parse().unwrap() + }; pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { - path: "a".into(), + path: "a".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { - path: "a/b".into(), + path: "a/b".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { - path: "a/b/c".into(), + path: "a/b/c".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs new file mode 100644 index 0000000000..8135d871d6 --- /dev/null +++ b/tvix/castore/src/import/blobs.rs @@ -0,0 +1,177 @@ +use std::{ + io::{Cursor, Write}, + sync::Arc, +}; + +use tokio::{ + io::AsyncRead, + sync::Semaphore, + task::{JoinError, JoinSet}, +}; +use tokio_util::io::InspectReader; + +use crate::{blobservice::BlobService, B3Digest, Path, PathBuf}; + +/// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the +/// background. +/// +/// This is a u32 since we acquire a weighted semaphore using the size of the blob. +/// [Semaphore::acquire_many_owned] takes a u32, so we need to ensure the size of +/// the blob can be represented using a u32 and will not cause an overflow. +const CONCURRENT_BLOB_UPLOAD_THRESHOLD: u32 = 1024 * 1024; + +/// The maximum amount of bytes allowed to be buffered in memory to perform async blob uploads. +const MAX_BUFFER_SIZE: usize = 128 * 1024 * 1024; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unable to read blob contents for {0}: {1}")] + BlobRead(PathBuf, std::io::Error), + + // FUTUREWORK: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(PathBuf, std::io::Error), + + #[error("unexpected size for {path} wanted: {wanted} got: {got}")] + UnexpectedSize { + path: PathBuf, + wanted: u64, + got: u64, + }, + + #[error("blob upload join error: {0}")] + JoinError(#[from] JoinError), +} + +/// The concurrent blob uploader provides a mechanism for concurrently uploading small blobs. +/// This is useful when ingesting from sources like tarballs and archives which each blob entry +/// must be read sequentially. Ingesting many small blobs sequentially becomes slow due to +/// round trip time with the blob service. The concurrent blob uploader will buffer small +/// blobs in memory and upload them to the blob service in the background. +/// +/// Once all blobs have been uploaded, make sure to call [ConcurrentBlobUploader::join] to wait +/// for all background jobs to complete and check for any errors. +pub struct ConcurrentBlobUploader<BS> { + blob_service: BS, + upload_tasks: JoinSet<Result<(), Error>>, + upload_semaphore: Arc<Semaphore>, +} + +impl<BS> ConcurrentBlobUploader<BS> +where + BS: BlobService + Clone + 'static, +{ + /// Creates a new concurrent blob uploader which uploads blobs to the provided + /// blob service. + pub fn new(blob_service: BS) -> Self { + Self { + blob_service, + upload_tasks: JoinSet::new(), + upload_semaphore: Arc::new(Semaphore::new(MAX_BUFFER_SIZE)), + } + } + + /// Uploads a blob to the blob service. If the blob is small enough it will be read to a buffer + /// and uploaded in the background. + /// This will read the entirety of the provided reader unless an error occurs, even if blobs + /// are uploaded in the background.. + pub async fn upload<R>( + &mut self, + path: &Path, + expected_size: u64, + mut r: R, + ) -> Result<B3Digest, Error> + where + R: AsyncRead + Unpin, + { + if expected_size < CONCURRENT_BLOB_UPLOAD_THRESHOLD as u64 { + let mut buffer = Vec::with_capacity(expected_size as usize); + let mut hasher = blake3::Hasher::new(); + let mut reader = InspectReader::new(&mut r, |bytes| { + hasher.write_all(bytes).unwrap(); + }); + + let permit = self + .upload_semaphore + .clone() + // This cast is safe because ensure the header_size is less than + // CONCURRENT_BLOB_UPLOAD_THRESHOLD which is a u32. + .acquire_many_owned(expected_size as u32) + .await + .unwrap(); + let size = tokio::io::copy(&mut reader, &mut buffer) + .await + .map_err(|e| Error::BlobRead(path.into(), e))?; + let digest: B3Digest = hasher.finalize().as_bytes().into(); + + if size != expected_size { + return Err(Error::UnexpectedSize { + path: path.into(), + wanted: expected_size, + got: size, + }); + } + + self.upload_tasks.spawn({ + let blob_service = self.blob_service.clone(); + let expected_digest = digest.clone(); + let path = path.to_owned(); + let r = Cursor::new(buffer); + async move { + let digest = upload_blob(&blob_service, &path, expected_size, r).await?; + + assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch"); + + // Make sure we hold the permit until we finish writing the blob + // to the [BlobService]. + drop(permit); + Ok(()) + } + }); + + return Ok(digest); + } + + upload_blob(&self.blob_service, path, expected_size, r).await + } + + /// Waits for all background upload jobs to complete, returning any upload errors. + pub async fn join(mut self) -> Result<(), Error> { + while let Some(result) = self.upload_tasks.join_next().await { + result??; + } + Ok(()) + } +} + +async fn upload_blob<BS, R>( + blob_service: &BS, + path: &Path, + expected_size: u64, + mut r: R, +) -> Result<B3Digest, Error> +where + BS: BlobService, + R: AsyncRead + Unpin, +{ + let mut writer = blob_service.open_write().await; + + let size = tokio::io::copy(&mut r, &mut writer) + .await + .map_err(|e| Error::BlobRead(path.into(), e))?; + + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.into(), e))?; + + if size != expected_size { + return Err(Error::UnexpectedSize { + path: path.into(), + wanted: expected_size, + got: size, + }); + } + + Ok(digest) +} diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 15dd0664de..e3fba617e0 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -1,39 +1,20 @@ -use std::{fs::FileType, path::PathBuf}; +use super::PathBuf; use crate::Error as CastoreError; +/// Represents all error types that emitted by ingest_entries. +/// It can represent errors uploading individual Directories and finalizing +/// the upload. +/// It also contains a generic error kind that'll carry ingestion-method +/// specific errors. #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum IngestionError<E: std::fmt::Display> { + #[error("error from producer: {0}")] + Producer(#[from] E), + #[error("failed to upload directory at {0}: {1}")] UploadDirectoryError(PathBuf, CastoreError), - #[error("invalid encoding encountered for entry {0:?}")] - InvalidEncoding(PathBuf), - - #[error("unable to stat {0}: {1}")] - UnableToStat(PathBuf, std::io::Error), - - #[error("unable to open {0}: {1}")] - UnableToOpen(PathBuf, std::io::Error), - - #[error("unable to read {0}: {1}")] - UnableToRead(PathBuf, std::io::Error), - - #[error("unsupported file {0} type: {1:?}")] - UnsupportedFileType(PathBuf, FileType), -} - -impl From<CastoreError> for Error { - fn from(value: CastoreError) -> Self { - match value { - CastoreError::InvalidRequest(_) => panic!("tvix bug"), - CastoreError::StorageError(_) => panic!("error"), - } - } -} - -impl From<Error> for std::io::Error { - fn from(value: Error) -> Self { - std::io::Error::new(std::io::ErrorKind::Other, value) - } + #[error("failed to finalize directory upload: {0}")] + FinalizeDirectoryUpload(CastoreError), } diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 6709d4a127..9d3ecfe6ab 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -1,8 +1,11 @@ +//! Import from a real filesystem. + use futures::stream::BoxStream; use futures::StreamExt; +use std::fs::FileType; +use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; -use std::path::Path; use tracing::instrument; use walkdir::DirEntry; use walkdir::WalkDir; @@ -10,13 +13,11 @@ use walkdir::WalkDir; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::proto::node::Node; +use crate::B3Digest; use super::ingest_entries; -use super::upload_blob_at_path; -use super::Error; use super::IngestionEntry; - -///! Imports that deal with a real filesystem. +use super::IngestionError; /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and /// [DirectoryService]. It returns the root node or an error. @@ -30,11 +31,11 @@ pub async fn ingest_path<BS, DS, P>( blob_service: BS, directory_service: DS, path: P, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where - P: AsRef<Path> + std::fmt::Debug, + P: AsRef<std::path::Path> + std::fmt::Debug, BS: BlobService + Clone, - DS: AsRef<dyn DirectoryService>, + DS: DirectoryService, { let iter = WalkDir::new(path.as_ref()) .follow_links(false) @@ -55,13 +56,13 @@ where pub fn dir_entries_to_ingestion_stream<'a, BS, I>( blob_service: BS, iter: I, - root: &'a Path, + root: &'a std::path::Path, ) -> BoxStream<'a, Result<IngestionEntry, Error>> where BS: BlobService + Clone + 'a, I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a, { - let prefix = root.parent().unwrap_or_else(|| Path::new("")); + let prefix = root.parent().unwrap_or_else(|| std::path::Path::new("")); Box::pin( futures::stream::iter(iter) @@ -72,7 +73,7 @@ where Ok(dir_entry) => { dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await } - Err(e) => Err(Error::UnableToStat( + Err(e) => Err(Error::Stat( prefix.to_path_buf(), e.into_io_error().expect("walkdir err must be some"), )), @@ -91,32 +92,37 @@ where pub async fn dir_entry_to_ingestion_entry<BS>( blob_service: BS, entry: &DirEntry, - prefix: &Path, + prefix: &std::path::Path, ) -> Result<IngestionEntry, Error> where BS: BlobService, { let file_type = entry.file_type(); - let path = entry + let fs_path = entry .path() .strip_prefix(prefix) - .expect("Tvix bug: failed to strip root path prefix") - .to_path_buf(); + .expect("Tvix bug: failed to strip root path prefix"); + + // convert to castore PathBuf + let path = crate::path::PathBuf::from_host_path(fs_path, false) + .unwrap_or_else(|e| panic!("Tvix bug: walkdir direntry cannot be parsed: {}", e)); if file_type.is_dir() { Ok(IngestionEntry::Dir { path }) } else if file_type.is_symlink() { let target = std::fs::read_link(entry.path()) - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?; + .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))? + .into_os_string() + .into_vec(); Ok(IngestionEntry::Symlink { path, target }) } else if file_type.is_file() { let metadata = entry .metadata() - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; + .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?; - let digest = upload_blob_at_path(blob_service, entry.path().to_path_buf()).await?; + let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?; Ok(IngestionEntry::Regular { path, @@ -127,6 +133,53 @@ where digest, }) } else { - Ok(IngestionEntry::Unknown { path, file_type }) + return Err(Error::FileType(fs_path.to_path_buf(), file_type)); } } + +/// Uploads the file at the provided [Path] the the [BlobService]. +#[instrument(skip(blob_service), fields(path), err)] +async fn upload_blob<BS>( + blob_service: BS, + path: impl AsRef<std::path::Path>, +) -> Result<B3Digest, Error> +where + BS: BlobService, +{ + let mut file = match tokio::fs::File::open(path.as_ref()).await { + Ok(file) => file, + Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)), + }; + + let mut writer = blob_service.open_write().await; + + if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { + return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)); + }; + + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?; + + Ok(digest) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unsupported file type at {0}: {1:?}")] + FileType(std::path::PathBuf, FileType), + + #[error("unable to stat {0}: {1}")] + Stat(std::path::PathBuf, std::io::Error), + + #[error("unable to open {0}: {1}")] + Open(std::path::PathBuf, std::io::Error), + + #[error("unable to read {0}: {1}")] + BlobRead(std::path::PathBuf, std::io::Error), + + // TODO: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(std::path::PathBuf, std::io::Error), +} diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index e9fdc750f8..4223fe5387 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,9 +4,9 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. -use crate::blobservice::BlobService; use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; +use crate::path::{Path, PathBuf}; use crate::proto::node::Node; use crate::proto::Directory; use crate::proto::DirectoryNode; @@ -14,23 +14,17 @@ use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; -use std::fs::FileType; use tracing::Level; -#[cfg(target_family = "unix")] -use std::os::unix::ffi::OsStrExt; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, -}; +use std::collections::HashMap; use tracing::instrument; mod error; -pub use error::Error; +pub use error::IngestionError; pub mod archive; +pub mod blobs; pub mod fs; /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService]. @@ -51,10 +45,14 @@ pub mod fs; /// /// On success, returns the root node. #[instrument(skip_all, ret(level = Level::TRACE), err)] -pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error> +pub async fn ingest_entries<DS, S, E>( + directory_service: DS, + mut entries: S, +) -> Result<Node, IngestionError<E>> where - DS: AsRef<dyn DirectoryService>, - S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin, + DS: DirectoryService, + S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin, + E: std::error::Error, { // For a given path, this holds the [Directory] structs as they are populated. let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); @@ -68,20 +66,11 @@ where // we break the loop manually. .expect("Tvix bug: unexpected end of stream")?; - debug_assert!( - entry - .path() - .components() - .all(|x| matches!(x, std::path::Component::Normal(_))), - "path may only contain normal components" - ); - let name = entry .path() .file_name() // If this is the root node, it will have an empty name. .unwrap_or_default() - .as_bytes() .to_owned() .into(); @@ -89,7 +78,8 @@ where IngestionEntry::Dir { .. } => { // If the entry is a directory, we traversed all its children (and // populated it in `directories`). - // If we don't have it in there, it's an empty directory. + // If we don't have it in directories, it's a directory without + // children. let directory = directories .remove(entry.path()) // In that case, it contained no children @@ -102,9 +92,12 @@ where // If we don't have one yet (as that's the first one to upload), // initialize the putter. maybe_directory_putter - .get_or_insert_with(|| directory_service.as_ref().put_multiple_start()) + .get_or_insert_with(|| directory_service.put_multiple_start()) .put(directory) - .await?; + .await + .map_err(|e| { + IngestionError::UploadDirectoryError(entry.path().to_owned(), e) + })?; Node::Directory(DirectoryNode { name, @@ -114,7 +107,7 @@ where } IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { name, - target: target.as_os_str().as_bytes().to_owned().into(), + target: target.to_owned().into(), }), IngestionEntry::Regular { size, @@ -127,23 +120,27 @@ where size: *size, executable: *executable, }), - IngestionEntry::Unknown { path, file_type } => { - return Err(Error::UnsupportedFileType(path.clone(), *file_type)); - } }; - if entry.path().components().count() == 1 { + let parent = entry + .path() + .parent() + .expect("Tvix bug: got entry with root node"); + + if parent == crate::Path::ROOT { break node; + } else { + // record node in parent directory, creating a new [Directory] if not there yet. + directories.entry(parent.to_owned()).or_default().add(node); } - - // record node in parent directory, creating a new [Directory] if not there yet. - directories - .entry(entry.path().parent().unwrap().to_path_buf()) - .or_default() - .add(node); }; assert!( + entries.count().await == 0, + "Tvix bug: left over elements in the stream" + ); + + assert!( directories.is_empty(), "Tvix bug: left over directories after processing ingestion stream" ); @@ -152,7 +149,10 @@ where // they're all persisted to the backend. if let Some(mut directory_putter) = maybe_directory_putter { #[cfg_attr(not(debug_assertions), allow(unused))] - let root_directory_digest = directory_putter.close().await?; + let root_directory_digest = directory_putter + .close() + .await + .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?; #[cfg(debug_assertions)] { @@ -174,31 +174,6 @@ where Ok(root_node) } -/// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path), err)] -async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error> -where - BS: BlobService, -{ - let mut file = match tokio::fs::File::open(&path).await { - Ok(file) => file, - Err(e) => return Err(Error::UnableToRead(path, e)), - }; - - let mut writer = blob_service.open_write().await; - - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::UnableToRead(path, e)); - }; - - let digest = writer - .close() - .await - .map_err(|e| Error::UnableToRead(path, e))?; - - Ok(digest) -} - #[derive(Debug, Clone, Eq, PartialEq)] pub enum IngestionEntry { Regular { @@ -209,15 +184,11 @@ pub enum IngestionEntry { }, Symlink { path: PathBuf, - target: PathBuf, + target: Vec<u8>, }, Dir { path: PathBuf, }, - Unknown { - path: PathBuf, - file_type: FileType, - }, } impl IngestionEntry { @@ -226,7 +197,6 @@ impl IngestionEntry { IngestionEntry::Regular { path, .. } => path, IngestionEntry::Symlink { path, .. } => path, IngestionEntry::Dir { path } => path, - IngestionEntry::Unknown { path, .. } => path, } } @@ -234,3 +204,138 @@ impl IngestionEntry { matches!(self, IngestionEntry::Dir { .. }) } } + +#[cfg(test)] +mod test { + use rstest::rstest; + + use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}; + use crate::proto::node::Node; + use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode}; + use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST}; + + use super::ingest_entries; + use super::IngestionEntry; + + #[rstest] + #[case::single_file(vec![IngestionEntry::Regular { + path: "foo".parse().unwrap(), + size: 42, + executable: true, + digest: DUMMY_DIGEST.clone(), + }], + Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true } + ))] + #[case::single_symlink(vec![IngestionEntry::Symlink { + path: "foo".parse().unwrap(), + target: b"blub".into(), + }], + Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()}) + )] + #[case::single_dir(vec![IngestionEntry::Dir { + path: "foo".parse().unwrap(), + }], + Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()}) + )] + #[case::dir_with_keep(vec![ + IngestionEntry::Regular { + path: "foo/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Dir { + path: "foo".parse().unwrap(), + }, + ], + Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() }) + )] + /// This is intentionally a bit unsorted, though it still satisfies all + /// requirements we have on the order of elements in the stream. + #[case::directory_complicated(vec![ + IngestionEntry::Regular { + path: "blub/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Regular { + path: "blub/keep/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Dir { + path: "blub/keep".parse().unwrap(), + }, + IngestionEntry::Symlink { + path: "blub/aa".parse().unwrap(), + target: b"/nix/store/somewhereelse".into(), + }, + IngestionEntry::Dir { + path: "blub".parse().unwrap(), + }, + ], + Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() }) + )] + #[tokio::test] + async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) { + let directory_service = MemoryDirectoryService::default(); + + let root_node = ingest_entries( + directory_service.clone(), + futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)), + ) + .await + .expect("must succeed"); + + assert_eq!(exp_root_node, root_node, "root node should match"); + } + + #[rstest] + #[should_panic] + #[case::empty_entries(vec![])] + #[should_panic] + #[case::missing_intermediate_dir(vec![ + IngestionEntry::Regular { + path: "blub/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + ])] + #[should_panic] + #[case::leaf_after_parent(vec![ + IngestionEntry::Dir { + path: "blub".parse().unwrap(), + }, + IngestionEntry::Regular { + path: "blub/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + ])] + #[should_panic] + #[case::root_in_entry(vec![ + IngestionEntry::Regular { + path: ".keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Dir { + path: "".parse().unwrap(), + }, + ])] + #[tokio::test] + async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) { + let directory_service = MemoryDirectoryService::default(); + + let _ = ingest_entries( + directory_service.clone(), + futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)), + ) + .await; + } +} |