about summary refs log tree commit diff
path: root/tvix/castore/src/import/fs.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/import/fs.rs')
-rw-r--r--tvix/castore/src/import/fs.rs91
1 files changed, 72 insertions, 19 deletions
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 6709d4a127..9d3ecfe6ab 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -1,8 +1,11 @@
+//! Import from a real filesystem.
+
 use futures::stream::BoxStream;
 use futures::StreamExt;
+use std::fs::FileType;
+use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
-use std::path::Path;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
@@ -10,13 +13,11 @@ use walkdir::WalkDir;
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::proto::node::Node;
+use crate::B3Digest;
 
 use super::ingest_entries;
-use super::upload_blob_at_path;
-use super::Error;
 use super::IngestionEntry;
-
-///! Imports that deal with a real filesystem.
+use super::IngestionError;
 
 /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
 /// [DirectoryService]. It returns the root node or an error.
@@ -30,11 +31,11 @@ pub async fn ingest_path<BS, DS, P>(
     blob_service: BS,
     directory_service: DS,
     path: P,
-) -> Result<Node, Error>
+) -> Result<Node, IngestionError<Error>>
 where
-    P: AsRef<Path> + std::fmt::Debug,
+    P: AsRef<std::path::Path> + std::fmt::Debug,
     BS: BlobService + Clone,
-    DS: AsRef<dyn DirectoryService>,
+    DS: DirectoryService,
 {
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
@@ -55,13 +56,13 @@ where
 pub fn dir_entries_to_ingestion_stream<'a, BS, I>(
     blob_service: BS,
     iter: I,
-    root: &'a Path,
+    root: &'a std::path::Path,
 ) -> BoxStream<'a, Result<IngestionEntry, Error>>
 where
     BS: BlobService + Clone + 'a,
     I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
 {
-    let prefix = root.parent().unwrap_or_else(|| Path::new(""));
+    let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
 
     Box::pin(
         futures::stream::iter(iter)
@@ -72,7 +73,7 @@ where
                         Ok(dir_entry) => {
                             dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
                         }
-                        Err(e) => Err(Error::UnableToStat(
+                        Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
                             e.into_io_error().expect("walkdir err must be some"),
                         )),
@@ -91,32 +92,37 @@ where
 pub async fn dir_entry_to_ingestion_entry<BS>(
     blob_service: BS,
     entry: &DirEntry,
-    prefix: &Path,
+    prefix: &std::path::Path,
 ) -> Result<IngestionEntry, Error>
 where
     BS: BlobService,
 {
     let file_type = entry.file_type();
 
-    let path = entry
+    let fs_path = entry
         .path()
         .strip_prefix(prefix)
-        .expect("Tvix bug: failed to strip root path prefix")
-        .to_path_buf();
+        .expect("Tvix bug: failed to strip root path prefix");
+
+    // convert to castore PathBuf
+    let path = crate::path::PathBuf::from_host_path(fs_path, false)
+        .unwrap_or_else(|e| panic!("Tvix bug: walkdir direntry cannot be parsed: {}", e));
 
     if file_type.is_dir() {
         Ok(IngestionEntry::Dir { path })
     } else if file_type.is_symlink() {
         let target = std::fs::read_link(entry.path())
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))?
+            .into_os_string()
+            .into_vec();
 
         Ok(IngestionEntry::Symlink { path, target })
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
-            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+            .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
-        let digest = upload_blob_at_path(blob_service, entry.path().to_path_buf()).await?;
+        let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
 
         Ok(IngestionEntry::Regular {
             path,
@@ -127,6 +133,53 @@ where
             digest,
         })
     } else {
-        Ok(IngestionEntry::Unknown { path, file_type })
+        return Err(Error::FileType(fs_path.to_path_buf(), file_type));
     }
 }
+
+/// Uploads the file at the provided [Path] the the [BlobService].
+#[instrument(skip(blob_service), fields(path), err)]
+async fn upload_blob<BS>(
+    blob_service: BS,
+    path: impl AsRef<std::path::Path>,
+) -> Result<B3Digest, Error>
+where
+    BS: BlobService,
+{
+    let mut file = match tokio::fs::File::open(path.as_ref()).await {
+        Ok(file) => file,
+        Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)),
+    };
+
+    let mut writer = blob_service.open_write().await;
+
+    if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
+        return Err(Error::BlobRead(path.as_ref().to_path_buf(), e));
+    };
+
+    let digest = writer
+        .close()
+        .await
+        .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?;
+
+    Ok(digest)
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("unsupported file type at {0}: {1:?}")]
+    FileType(std::path::PathBuf, FileType),
+
+    #[error("unable to stat {0}: {1}")]
+    Stat(std::path::PathBuf, std::io::Error),
+
+    #[error("unable to open {0}: {1}")]
+    Open(std::path::PathBuf, std::io::Error),
+
+    #[error("unable to read {0}: {1}")]
+    BlobRead(std::path::PathBuf, std::io::Error),
+
+    // TODO: proper error for blob finalize
+    #[error("unable to finalize blob {0}: {1}")]
+    BlobFinalize(std::path::PathBuf, std::io::Error),
+}