about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-05-01T10·52+0300
committerclbot <clbot@tvl.fyi>2024-05-02T15·26+0000
commit516c6dc572581872167851f0a68afc9025ca1350 (patch)
tree2049788097482d242fbd2fee65f39fe1e38507c6
parentabc0553eb8a0015bc277f73c44b0b73424b76aae (diff)
refactor(tvix/castore/import): use crate Path[Buf] in IngestionEntry r/8067
This explicitly splits ingestion-method-specific path types from the
castore types.

Change-Id: Ia3b16105fadb8d52927a4ed79dc4b34efdf4311b
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11563
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
-rw-r--r--tvix/castore/src/import/archive.rs81
-rw-r--r--tvix/castore/src/import/error.rs2
-rw-r--r--tvix/castore/src/import/fs.rs36
-rw-r--r--tvix/castore/src/import/mod.rs24
4 files changed, 74 insertions, 69 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index 2da8b945d9..fb8ef9a50b 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -1,8 +1,8 @@
 //! Imports from an archive (tarballs)
 
+use std::collections::HashMap;
 use std::io::{Cursor, Write};
 use std::sync::Arc;
-use std::{collections::HashMap, path::PathBuf};
 
 use petgraph::graph::{DiGraph, NodeIndex};
 use petgraph::visit::{DfsPostOrder, EdgeRef};
@@ -21,6 +21,8 @@ use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
 use crate::B3Digest;
 
+type TarPathBuf = std::path::PathBuf;
+
 /// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
 /// background.
 ///
@@ -41,29 +43,32 @@ pub enum Error {
     NextEntry(std::io::Error),
 
     #[error("unable to read path for entry: {0}")]
-    Path(std::io::Error),
+    PathRead(std::io::Error),
+
+    #[error("unable to convert path {0} for entry: {1}")]
+    PathConvert(TarPathBuf, std::io::Error),
 
     #[error("unable to read size field for {0}: {1}")]
-    Size(PathBuf, std::io::Error),
+    Size(TarPathBuf, std::io::Error),
 
     #[error("unable to read mode field for {0}: {1}")]
-    Mode(PathBuf, std::io::Error),
+    Mode(TarPathBuf, std::io::Error),
 
     #[error("unable to read link name field for {0}: {1}")]
-    LinkName(PathBuf, std::io::Error),
+    LinkName(TarPathBuf, std::io::Error),
 
     #[error("unable to read blob contents for {0}: {1}")]
-    BlobRead(PathBuf, std::io::Error),
+    BlobRead(TarPathBuf, std::io::Error),
 
-    // TODO: proper error for blob finalize
+    // FUTUREWORK: proper error for blob finalize
     #[error("unable to finalize blob {0}: {1}")]
-    BlobFinalize(PathBuf, std::io::Error),
+    BlobFinalize(TarPathBuf, std::io::Error),
 
     #[error("unsupported tar entry {0} type: {1:?}")]
-    EntryType(PathBuf, tokio_tar::EntryType),
+    EntryType(TarPathBuf, tokio_tar::EntryType),
 
     #[error("symlink missing target {0}")]
-    MissingSymlinkTarget(PathBuf),
+    MissingSymlinkTarget(TarPathBuf),
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
@@ -94,7 +99,11 @@ where
 
     let mut entries_iter = archive.entries().map_err(Error::Entries)?;
     while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
-        let path: PathBuf = entry.path().map_err(Error::Path)?.into();
+        let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into();
+
+        // construct a castore PathBuf, which we use in the produced IngestionEntry.
+        let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true)
+            .map_err(|e| Error::PathConvert(tar_path.clone(), e))?;
 
         let header = entry.header();
         let entry = match header.entry_type() {
@@ -103,7 +112,7 @@ where
             | tokio_tar::EntryType::Continuous => {
                 let header_size = header
                     .size()
-                    .map_err(|e| Error::Size(path.to_path_buf(), e))?;
+                    .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
                 // If the blob is small enough, read it off the wire, compute the digest,
                 // and upload it to the [BlobService] in the background.
@@ -126,7 +135,7 @@ where
                         .unwrap();
                     let size = tokio::io::copy(&mut reader, &mut buffer)
                         .await
-                        .map_err(|e| Error::Size(path.to_path_buf(), e))?;
+                        .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
                     let digest: B3Digest = hasher.finalize().as_bytes().into();
 
@@ -134,18 +143,18 @@ where
                         let blob_service = blob_service.clone();
                         let digest = digest.clone();
                         async_blob_uploads.spawn({
-                            let path = path.clone();
+                            let tar_path = tar_path.clone();
                             async move {
                                 let mut writer = blob_service.open_write().await;
 
                                 tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
                                     .await
-                                    .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
+                                    .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
 
                                 let blob_digest = writer
                                     .close()
                                     .await
-                                    .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
+                                    .map_err(|e| Error::BlobFinalize(tar_path, e))?;
 
                                 assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
 
@@ -163,12 +172,12 @@ where
 
                     let size = tokio::io::copy(&mut entry, &mut writer)
                         .await
-                        .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
+                        .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
 
                     let digest = writer
                         .close()
                         .await
-                        .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
+                        .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?;
 
                     (size, digest)
                 };
@@ -176,7 +185,7 @@ where
                 let executable = entry
                     .header()
                     .mode()
-                    .map_err(|e| Error::Mode(path.to_path_buf(), e))?
+                    .map_err(|e| Error::Mode(tar_path, e))?
                     & 64
                     != 0;
 
@@ -190,8 +199,8 @@ where
             tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
                     .link_name()
-                    .map_err(|e| Error::LinkName(path.to_path_buf(), e))?
-                    .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
+                    .map_err(|e| Error::LinkName(tar_path.clone(), e))?
+                    .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))?
                     .into_owned()
                     .into_os_string()
                     .into_encoded_bytes(),
@@ -200,11 +209,11 @@ where
             // Push a bogus directory marker so we can make sure this directoy gets
             // created. We don't know the digest and size until after reading the full
             // tarball.
-            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() },
+            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path },
 
             tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
 
-            entry_type => return Err(Error::EntryType(path, entry_type).into()),
+            entry_type => return Err(Error::EntryType(tar_path, entry_type).into()),
         };
 
         nodes.add(entry)?;
@@ -239,7 +248,7 @@ where
 /// An error is returned if this is not the case and ingestion will fail.
 struct IngestionEntryGraph {
     graph: DiGraph<IngestionEntry, ()>,
-    path_to_index: HashMap<PathBuf, NodeIndex>,
+    path_to_index: HashMap<crate::path::PathBuf, NodeIndex>,
     root_node: Option<NodeIndex>,
 }
 
@@ -264,7 +273,7 @@ impl IngestionEntryGraph {
     /// and new nodes are not directories, the node is replaced and is disconnected from its
     /// children.
     pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
-        let path = entry.path().to_path_buf();
+        let path = entry.path().to_owned();
 
         let index = match self.path_to_index.get(entry.path()) {
             Some(&index) => {
@@ -284,7 +293,7 @@ impl IngestionEntryGraph {
             // We expect archives to contain a single root node, if there is another root node
             // entry with a different path name, this is unsupported.
             if let Some(root_node) = self.root_node {
-                if self.get_node(root_node).path() != path {
+                if self.get_node(root_node).path() != &path {
                     return Err(Error::UnexpectedNumberOfTopLevelEntries);
                 }
             }
@@ -293,7 +302,7 @@ impl IngestionEntryGraph {
         } else if let Some(parent_path) = path.parent() {
             // Recursively add the parent node until it hits the root node.
             let parent_index = self.add(IngestionEntry::Dir {
-                path: parent_path.to_path_buf(),
+                path: parent_path.to_owned(),
             })?;
 
             // Insert an edge from the parent directory to the child entry.
@@ -378,23 +387,29 @@ mod test {
 
     lazy_static! {
         pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
-        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() };
-        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() };
-        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() };
+        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir {
+            path: "a".parse().unwrap()
+        };
+        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir {
+            path: "b".parse().unwrap()
+        };
+        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir {
+            path: "a/b".parse().unwrap()
+        };
         pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
-            path: "a".into(),
+            path: "a".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b".into(),
+            path: "a/b".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b/c".into(),
+            path: "a/b/c".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs
index 3c6689dce5..e3fba617e0 100644
--- a/tvix/castore/src/import/error.rs
+++ b/tvix/castore/src/import/error.rs
@@ -1,4 +1,4 @@
-use std::path::PathBuf;
+use super::PathBuf;
 
 use crate::Error as CastoreError;
 
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 9a0bb5855a..b8cfac86f8 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -6,8 +6,6 @@ use std::fs::FileType;
 use std::os::unix::ffi::OsStringExt;
 use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
-use std::path::Path;
-use std::path::PathBuf;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
@@ -35,7 +33,7 @@ pub async fn ingest_path<BS, DS, P>(
     path: P,
 ) -> Result<Node, IngestionError<Error>>
 where
-    P: AsRef<Path> + std::fmt::Debug,
+    P: AsRef<std::path::Path> + std::fmt::Debug,
     BS: BlobService + Clone,
     DS: AsRef<dyn DirectoryService>,
 {
@@ -58,13 +56,13 @@ where
 pub fn dir_entries_to_ingestion_stream<'a, BS, I>(
     blob_service: BS,
     iter: I,
-    root: &'a Path,
+    root: &'a std::path::Path,
 ) -> BoxStream<'a, Result<IngestionEntry, Error>>
 where
     BS: BlobService + Clone + 'a,
     I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
 {
-    let prefix = root.parent().unwrap_or_else(|| Path::new(""));
+    let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
 
     Box::pin(
         futures::stream::iter(iter)
@@ -94,18 +92,21 @@ where
 pub async fn dir_entry_to_ingestion_entry<BS>(
     blob_service: BS,
     entry: &DirEntry,
-    prefix: &Path,
+    prefix: &std::path::Path,
 ) -> Result<IngestionEntry, Error>
 where
     BS: BlobService,
 {
     let file_type = entry.file_type();
 
-    let path = entry
+    let fs_path = entry
         .path()
         .strip_prefix(prefix)
-        .expect("Tvix bug: failed to strip root path prefix")
-        .to_path_buf();
+        .expect("Tvix bug: failed to strip root path prefix");
+
+    // convert to castore PathBuf
+    let path = crate::path::PathBuf::from_host_path(fs_path, false)
+        .unwrap_or_else(|e| panic!("Tvix bug: walkdir direntry cannot be parsed: {}", e));
 
     if file_type.is_dir() {
         Ok(IngestionEntry::Dir { path })
@@ -132,13 +133,16 @@ where
             digest,
         })
     } else {
-        return Err(Error::FileType(path, file_type));
+        return Err(Error::FileType(fs_path.to_path_buf(), file_type));
     }
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
 #[instrument(skip(blob_service), fields(path), err)]
-async fn upload_blob<BS>(blob_service: BS, path: impl AsRef<Path>) -> Result<B3Digest, Error>
+async fn upload_blob<BS>(
+    blob_service: BS,
+    path: impl AsRef<std::path::Path>,
+) -> Result<B3Digest, Error>
 where
     BS: BlobService,
 {
@@ -164,18 +168,18 @@ where
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("unsupported file type at {0}: {1:?}")]
-    FileType(PathBuf, FileType),
+    FileType(std::path::PathBuf, FileType),
 
     #[error("unable to stat {0}: {1}")]
-    Stat(PathBuf, std::io::Error),
+    Stat(std::path::PathBuf, std::io::Error),
 
     #[error("unable to open {0}: {1}")]
-    Open(PathBuf, std::io::Error),
+    Open(std::path::PathBuf, std::io::Error),
 
     #[error("unable to read {0}: {1}")]
-    BlobRead(PathBuf, std::io::Error),
+    BlobRead(std::path::PathBuf, std::io::Error),
 
     // TODO: proper error for blob finalize
     #[error("unable to finalize blob {0}: {1}")]
-    BlobFinalize(PathBuf, std::io::Error),
+    BlobFinalize(std::path::PathBuf, std::io::Error),
 }
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index bf21001822..53ebc2b339 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -6,6 +6,7 @@
 
 use crate::directoryservice::DirectoryPutter;
 use crate::directoryservice::DirectoryService;
+use crate::path::PathBuf;
 use crate::proto::node::Node;
 use crate::proto::Directory;
 use crate::proto::DirectoryNode;
@@ -16,13 +17,7 @@ use futures::{Stream, StreamExt};
 
 use tracing::Level;
 
-#[cfg(target_family = "unix")]
-use std::os::unix::ffi::OsStrExt;
-
-use std::{
-    collections::HashMap,
-    path::{Path, PathBuf},
-};
+use std::collections::HashMap;
 use tracing::instrument;
 
 mod error;
@@ -70,20 +65,11 @@ where
             // we break the loop manually.
             .expect("Tvix bug: unexpected end of stream")?;
 
-        debug_assert!(
-            entry
-                .path()
-                .components()
-                .all(|x| matches!(x, std::path::Component::Normal(_))),
-            "path may only contain normal components"
-        );
-
         let name = entry
             .path()
             .file_name()
             // If this is the root node, it will have an empty name.
             .unwrap_or_default()
-            .as_bytes()
             .to_owned()
             .into();
 
@@ -108,7 +94,7 @@ where
                     .put(directory)
                     .await
                     .map_err(|e| {
-                        IngestionError::UploadDirectoryError(entry.path().to_path_buf(), e)
+                        IngestionError::UploadDirectoryError(entry.path().to_owned(), e)
                     })?;
 
                 Node::Directory(DirectoryNode {
@@ -140,7 +126,7 @@ where
 
         // record node in parent directory, creating a new [Directory] if not there yet.
         directories
-            .entry(entry.path().parent().unwrap().to_path_buf())
+            .entry(entry.path().parent().unwrap().to_owned())
             .or_default()
             .add(node);
     };
@@ -197,7 +183,7 @@ pub enum IngestionEntry {
 }
 
 impl IngestionEntry {
-    fn path(&self) -> &Path {
+    fn path(&self) -> &PathBuf {
         match self {
             IngestionEntry::Regular { path, .. } => path,
             IngestionEntry::Symlink { path, .. } => path,