From 345cebaebb1bf37b0bf251032428f5ed85dd26e3 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 19 Mar 2024 16:26:18 +0200 Subject: refactor(tvix/castore/blob): drop simplefs This functionality is provided by the object store backend too (using `objectstore+file://$some_path`). This backend also supports content-defined chunking and compresses chunks with zstd. Change-Id: I5968c713112c400d23897c59db06b6c713c9d8cb Reviewed-on: https://cl.tvl.fyi/c/depot/+/11205 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/castore/src/blobservice/from_addr.rs | 11 +- tvix/castore/src/blobservice/mod.rs | 2 - tvix/castore/src/blobservice/simplefs.rs | 196 ------------------------------ 3 files changed, 1 insertion(+), 208 deletions(-) delete mode 100644 tvix/castore/src/blobservice/simplefs.rs diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 1f37ef61ed..1473ee7e9c 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -3,8 +3,7 @@ use url::Url; use crate::{proto::blob_service_client::BlobServiceClient, Error}; use super::{ - BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService, - SimpleFilesystemBlobService, SledBlobService, + BlobService, GRPCBlobService, MemoryBlobService, ObjectStoreBlobService, SledBlobService, }; /// Constructs a new instance of a [BlobService] from an URI. @@ -13,7 +12,6 @@ use super::{ /// - `memory://` ([MemoryBlobService]) /// - `sled://` ([SledBlobService]) /// - `grpc+*://` ([GRPCBlobService]) -/// - `simplefs://` ([SimpleFilesystemBlobService]) /// /// See their `from_url` methods for more details about their syntax. pub async fn from_addr(uri: &str) -> Result, crate::Error> { @@ -58,13 +56,6 @@ pub async fn from_addr(uri: &str) -> Result, crate::Error> let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); Box::new(GRPCBlobService::from_client(client)) } - "simplefs" => { - if url.path().is_empty() { - return Err(Error::StorageError("Invalid filesystem path".to_string())); - } - - Box::new(SimpleFilesystemBlobService::new(url.path().into()).await?) - } scheme if scheme.starts_with("objectstore+") => { // We need to convert the URL to string, strip the prefix there, and then // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index 7324fcf516..5eb88a4794 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -11,7 +11,6 @@ mod grpc; mod memory; mod naive_seeker; mod object_store; -mod simplefs; mod sled; #[cfg(test)] @@ -23,7 +22,6 @@ pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; pub use self::memory::MemoryBlobService; pub use self::object_store::ObjectStoreBlobService; -pub use self::simplefs::SimpleFilesystemBlobService; pub use self::sled::SledBlobService; /// The base trait all BlobService services need to implement. diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs deleted file mode 100644 index 2dcb24f342..0000000000 --- a/tvix/castore/src/blobservice/simplefs.rs +++ /dev/null @@ -1,196 +0,0 @@ -use std::{ - io, - path::{Path, PathBuf}, - pin::pin, - task::Poll, -}; - -use bytes::Buf; -use data_encoding::HEXLOWER; -use pin_project_lite::pin_project; -use tokio::io::AsyncWriteExt; -use tonic::async_trait; -use tracing::instrument; - -use crate::B3Digest; - -use super::{BlobReader, BlobService, BlobWriter}; - -/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant -/// filesystem. -/// -/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All -/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into -/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef` -/// -/// **Disclaimer** : This very simple implementation is subject to change and does not give any -/// final guarantees on the on-disk format. -/// TODO: migrate to object_store? -#[derive(Clone)] -pub struct SimpleFilesystemBlobService { - /// Where the blobs are located on a filesystem already mounted. - path: PathBuf, -} - -impl SimpleFilesystemBlobService { - pub async fn new(path: PathBuf) -> std::io::Result { - tokio::fs::create_dir_all(&path).await?; - tokio::fs::create_dir_all(path.join("tmp")).await?; - tokio::fs::create_dir_all(path.join("blobs")).await?; - - Ok(Self { path }) - } -} - -fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf { - let prefix = HEXLOWER.encode(&digest.as_slice()[..2]); - let pathname = HEXLOWER.encode(digest.as_slice()); - - root.join("blobs").join(prefix).join(pathname) -} - -#[async_trait] -impl BlobService for SimpleFilesystemBlobService { - #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] - async fn has(&self, digest: &B3Digest) -> io::Result { - Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?) - } - - #[instrument(skip_all, err, fields(blob.digest=%digest))] - async fn open_read(&self, digest: &B3Digest) -> io::Result>> { - let dst_path = derive_path(&self.path, digest); - let reader = match tokio::fs::File::open(dst_path).await { - Ok(file) => { - let reader: Box = Box::new(file); - Ok(Some(reader)) - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), - Err(e) => Err(e), - }; - - Ok(reader?) - } - - #[instrument(skip_all)] - async fn open_write(&self) -> Box { - let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await { - Ok(file) => Ok(file), - Err(e) => match e { - async_tempfile::Error::Io(io_error) => Err(io_error), - async_tempfile::Error::InvalidFile => Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "invalid or missing file specified", - )), - async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "invalid or missing directory specified", - )), - }, - }; - - Box::new(SimpleFilesystemBlobWriter { - root: self.path.clone(), - file, - digester: blake3::Hasher::new(), - }) - } -} - -pin_project! { - struct SimpleFilesystemBlobWriter { - root: PathBuf, - file: std::io::Result, - digester: blake3::Hasher - } -} - -impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter { - fn poll_write( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - if let Err(e) = self.file.as_mut() { - return Poll::Ready(Err(std::mem::replace( - e, - std::io::Error::new( - std::io::ErrorKind::NotConnected, - "this file is already closed", - ), - ))); - } - - let writer = self.file.as_mut().unwrap(); - match pin!(writer).poll_write(cx, buf) { - Poll::Ready(Ok(n)) => { - let this = self.project(); - this.digester.update(buf.take(n).into_inner()); - Poll::Ready(Ok(n)) - } - Poll::Ready(Err(e)) => Poll::Ready(Err(e)), - Poll::Pending => Poll::Pending, - } - } - - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - if let Err(e) = self.file.as_mut() { - return Poll::Ready(Err(std::mem::replace( - e, - std::io::Error::new( - std::io::ErrorKind::NotConnected, - "this file is already closed", - ), - ))); - } - - let writer = self.file.as_mut().unwrap(); - pin!(writer).poll_flush(cx) - } - - fn poll_shutdown( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - if let Err(e) = self.file.as_mut() { - return Poll::Ready(Err(std::mem::replace( - e, - std::io::Error::new( - std::io::ErrorKind::NotConnected, - "this file is already closed", - ), - ))); - } - - let writer = self.file.as_mut().unwrap(); - pin!(writer).poll_shutdown(cx) - } -} - -#[async_trait] -impl BlobWriter for SimpleFilesystemBlobWriter { - async fn close(&mut self) -> io::Result { - if let Err(e) = self.file.as_mut() { - return Err(std::mem::replace( - e, - std::io::Error::new( - std::io::ErrorKind::NotConnected, - "this file is already closed", - ), - )); - } - - let writer = self.file.as_mut().unwrap(); - writer.sync_all().await?; - writer.flush().await?; - - let digest: B3Digest = self.digester.finalize().as_bytes().into(); - let dst_path = derive_path(&self.root, &digest); - tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?; - tokio::fs::rename(writer.file_path(), dst_path).await?; - - Ok(digest) - } -} -- cgit 1.4.1