From 516c6dc572581872167851f0a68afc9025ca1350 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 1 May 2024 13:52:04 +0300 Subject: refactor(tvix/castore/import): use crate Path[Buf] in IngestionEntry This explicitly splits ingestion-method-specific path types from the castore types. Change-Id: Ia3b16105fadb8d52927a4ed79dc4b34efdf4311b Reviewed-on: https://cl.tvl.fyi/c/depot/+/11563 Autosubmit: flokli Reviewed-by: Connor Brewster Tested-by: BuildkiteCI --- tvix/castore/src/import/archive.rs | 81 ++++++++++++++++++++++---------------- tvix/castore/src/import/error.rs | 2 +- tvix/castore/src/import/fs.rs | 36 +++++++++-------- tvix/castore/src/import/mod.rs | 24 +++-------- 4 files changed, 74 insertions(+), 69 deletions(-) diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index 2da8b945d9..fb8ef9a50b 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -1,8 +1,8 @@ //! Imports from an archive (tarballs) +use std::collections::HashMap; use std::io::{Cursor, Write}; use std::sync::Arc; -use std::{collections::HashMap, path::PathBuf}; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::{DfsPostOrder, EdgeRef}; @@ -21,6 +21,8 @@ use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; use crate::B3Digest; +type TarPathBuf = std::path::PathBuf; + /// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the /// background. /// @@ -41,29 +43,32 @@ pub enum Error { NextEntry(std::io::Error), #[error("unable to read path for entry: {0}")] - Path(std::io::Error), + PathRead(std::io::Error), + + #[error("unable to convert path {0} for entry: {1}")] + PathConvert(TarPathBuf, std::io::Error), #[error("unable to read size field for {0}: {1}")] - Size(PathBuf, std::io::Error), + Size(TarPathBuf, std::io::Error), #[error("unable to read mode field for {0}: {1}")] - Mode(PathBuf, std::io::Error), + Mode(TarPathBuf, std::io::Error), #[error("unable to read link name field for {0}: {1}")] - LinkName(PathBuf, std::io::Error), + LinkName(TarPathBuf, std::io::Error), #[error("unable to read blob contents for {0}: {1}")] - BlobRead(PathBuf, std::io::Error), + BlobRead(TarPathBuf, std::io::Error), - // TODO: proper error for blob finalize + // FUTUREWORK: proper error for blob finalize #[error("unable to finalize blob {0}: {1}")] - BlobFinalize(PathBuf, std::io::Error), + BlobFinalize(TarPathBuf, std::io::Error), #[error("unsupported tar entry {0} type: {1:?}")] - EntryType(PathBuf, tokio_tar::EntryType), + EntryType(TarPathBuf, tokio_tar::EntryType), #[error("symlink missing target {0}")] - MissingSymlinkTarget(PathBuf), + MissingSymlinkTarget(TarPathBuf), #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, @@ -94,7 +99,11 @@ where let mut entries_iter = archive.entries().map_err(Error::Entries)?; while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { - let path: PathBuf = entry.path().map_err(Error::Path)?.into(); + let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into(); + + // construct a castore PathBuf, which we use in the produced IngestionEntry. + let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true) + .map_err(|e| Error::PathConvert(tar_path.clone(), e))?; let header = entry.header(); let entry = match header.entry_type() { @@ -103,7 +112,7 @@ where | tokio_tar::EntryType::Continuous => { let header_size = header .size() - .map_err(|e| Error::Size(path.to_path_buf(), e))?; + .map_err(|e| Error::Size(tar_path.clone(), e))?; // If the blob is small enough, read it off the wire, compute the digest, // and upload it to the [BlobService] in the background. @@ -126,7 +135,7 @@ where .unwrap(); let size = tokio::io::copy(&mut reader, &mut buffer) .await - .map_err(|e| Error::Size(path.to_path_buf(), e))?; + .map_err(|e| Error::Size(tar_path.clone(), e))?; let digest: B3Digest = hasher.finalize().as_bytes().into(); @@ -134,18 +143,18 @@ where let blob_service = blob_service.clone(); let digest = digest.clone(); async_blob_uploads.spawn({ - let path = path.clone(); + let tar_path = tar_path.clone(); async move { let mut writer = blob_service.open_write().await; tokio::io::copy(&mut Cursor::new(buffer), &mut writer) .await - .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?; + .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; let blob_digest = writer .close() .await - .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?; + .map_err(|e| Error::BlobFinalize(tar_path, e))?; assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); @@ -163,12 +172,12 @@ where let size = tokio::io::copy(&mut entry, &mut writer) .await - .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?; + .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; let digest = writer .close() .await - .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?; + .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?; (size, digest) }; @@ -176,7 +185,7 @@ where let executable = entry .header() .mode() - .map_err(|e| Error::Mode(path.to_path_buf(), e))? + .map_err(|e| Error::Mode(tar_path, e))? & 64 != 0; @@ -190,8 +199,8 @@ where tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry .link_name() - .map_err(|e| Error::LinkName(path.to_path_buf(), e))? - .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? + .map_err(|e| Error::LinkName(tar_path.clone(), e))? + .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))? .into_owned() .into_os_string() .into_encoded_bytes(), @@ -200,11 +209,11 @@ where // Push a bogus directory marker so we can make sure this directoy gets // created. We don't know the digest and size until after reading the full // tarball. - tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() }, + tokio_tar::EntryType::Directory => IngestionEntry::Dir { path }, tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, - entry_type => return Err(Error::EntryType(path, entry_type).into()), + entry_type => return Err(Error::EntryType(tar_path, entry_type).into()), }; nodes.add(entry)?; @@ -239,7 +248,7 @@ where /// An error is returned if this is not the case and ingestion will fail. struct IngestionEntryGraph { graph: DiGraph, - path_to_index: HashMap, + path_to_index: HashMap, root_node: Option, } @@ -264,7 +273,7 @@ impl IngestionEntryGraph { /// and new nodes are not directories, the node is replaced and is disconnected from its /// children. pub fn add(&mut self, entry: IngestionEntry) -> Result { - let path = entry.path().to_path_buf(); + let path = entry.path().to_owned(); let index = match self.path_to_index.get(entry.path()) { Some(&index) => { @@ -284,7 +293,7 @@ impl IngestionEntryGraph { // We expect archives to contain a single root node, if there is another root node // entry with a different path name, this is unsupported. if let Some(root_node) = self.root_node { - if self.get_node(root_node).path() != path { + if self.get_node(root_node).path() != &path { return Err(Error::UnexpectedNumberOfTopLevelEntries); } } @@ -293,7 +302,7 @@ impl IngestionEntryGraph { } else if let Some(parent_path) = path.parent() { // Recursively add the parent node until it hits the root node. let parent_index = self.add(IngestionEntry::Dir { - path: parent_path.to_path_buf(), + path: parent_path.to_owned(), })?; // Insert an edge from the parent directory to the child entry. @@ -378,23 +387,29 @@ mod test { lazy_static! { pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); - pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() }; - pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() }; - pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() }; + pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { + path: "a".parse().unwrap() + }; + pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { + path: "b".parse().unwrap() + }; + pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { + path: "a/b".parse().unwrap() + }; pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { - path: "a".into(), + path: "a".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { - path: "a/b".into(), + path: "a/b".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { - path: "a/b/c".into(), + path: "a/b/c".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 3c6689dce5..e3fba617e0 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use super::PathBuf; use crate::Error as CastoreError; diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 9a0bb5855a..b8cfac86f8 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -6,8 +6,6 @@ use std::fs::FileType; use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; -use std::path::Path; -use std::path::PathBuf; use tracing::instrument; use walkdir::DirEntry; use walkdir::WalkDir; @@ -35,7 +33,7 @@ pub async fn ingest_path( path: P, ) -> Result> where - P: AsRef + std::fmt::Debug, + P: AsRef + std::fmt::Debug, BS: BlobService + Clone, DS: AsRef, { @@ -58,13 +56,13 @@ where pub fn dir_entries_to_ingestion_stream<'a, BS, I>( blob_service: BS, iter: I, - root: &'a Path, + root: &'a std::path::Path, ) -> BoxStream<'a, Result> where BS: BlobService + Clone + 'a, I: Iterator> + Send + 'a, { - let prefix = root.parent().unwrap_or_else(|| Path::new("")); + let prefix = root.parent().unwrap_or_else(|| std::path::Path::new("")); Box::pin( futures::stream::iter(iter) @@ -94,18 +92,21 @@ where pub async fn dir_entry_to_ingestion_entry( blob_service: BS, entry: &DirEntry, - prefix: &Path, + prefix: &std::path::Path, ) -> Result where BS: BlobService, { let file_type = entry.file_type(); - let path = entry + let fs_path = entry .path() .strip_prefix(prefix) - .expect("Tvix bug: failed to strip root path prefix") - .to_path_buf(); + .expect("Tvix bug: failed to strip root path prefix"); + + // convert to castore PathBuf + let path = crate::path::PathBuf::from_host_path(fs_path, false) + .unwrap_or_else(|e| panic!("Tvix bug: walkdir direntry cannot be parsed: {}", e)); if file_type.is_dir() { Ok(IngestionEntry::Dir { path }) @@ -132,13 +133,16 @@ where digest, }) } else { - return Err(Error::FileType(path, file_type)); + return Err(Error::FileType(fs_path.to_path_buf(), file_type)); } } /// Uploads the file at the provided [Path] the the [BlobService]. #[instrument(skip(blob_service), fields(path), err)] -async fn upload_blob(blob_service: BS, path: impl AsRef) -> Result +async fn upload_blob( + blob_service: BS, + path: impl AsRef, +) -> Result where BS: BlobService, { @@ -164,18 +168,18 @@ where #[derive(Debug, thiserror::Error)] pub enum Error { #[error("unsupported file type at {0}: {1:?}")] - FileType(PathBuf, FileType), + FileType(std::path::PathBuf, FileType), #[error("unable to stat {0}: {1}")] - Stat(PathBuf, std::io::Error), + Stat(std::path::PathBuf, std::io::Error), #[error("unable to open {0}: {1}")] - Open(PathBuf, std::io::Error), + Open(std::path::PathBuf, std::io::Error), #[error("unable to read {0}: {1}")] - BlobRead(PathBuf, std::io::Error), + BlobRead(std::path::PathBuf, std::io::Error), // TODO: proper error for blob finalize #[error("unable to finalize blob {0}: {1}")] - BlobFinalize(PathBuf, std::io::Error), + BlobFinalize(std::path::PathBuf, std::io::Error), } diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index bf21001822..53ebc2b339 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -6,6 +6,7 @@ use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; +use crate::path::PathBuf; use crate::proto::node::Node; use crate::proto::Directory; use crate::proto::DirectoryNode; @@ -16,13 +17,7 @@ use futures::{Stream, StreamExt}; use tracing::Level; -#[cfg(target_family = "unix")] -use std::os::unix::ffi::OsStrExt; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, -}; +use std::collections::HashMap; use tracing::instrument; mod error; @@ -70,20 +65,11 @@ where // we break the loop manually. .expect("Tvix bug: unexpected end of stream")?; - debug_assert!( - entry - .path() - .components() - .all(|x| matches!(x, std::path::Component::Normal(_))), - "path may only contain normal components" - ); - let name = entry .path() .file_name() // If this is the root node, it will have an empty name. .unwrap_or_default() - .as_bytes() .to_owned() .into(); @@ -108,7 +94,7 @@ where .put(directory) .await .map_err(|e| { - IngestionError::UploadDirectoryError(entry.path().to_path_buf(), e) + IngestionError::UploadDirectoryError(entry.path().to_owned(), e) })?; Node::Directory(DirectoryNode { @@ -140,7 +126,7 @@ where // record node in parent directory, creating a new [Directory] if not there yet. directories - .entry(entry.path().parent().unwrap().to_path_buf()) + .entry(entry.path().parent().unwrap().to_owned()) .or_default() .add(node); }; @@ -197,7 +183,7 @@ pub enum IngestionEntry { } impl IngestionEntry { - fn path(&self) -> &Path { + fn path(&self) -> &PathBuf { match self { IngestionEntry::Regular { path, .. } => path, IngestionEntry::Symlink { path, .. } => path, -- cgit 1.4.1