about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-20T21·28+0200
committerclbot <clbot@tvl.fyi>2024-03-24T17·42+0000
commit5f069a3eb8c3a089f1231bf4a618e4153736df96 (patch)
treed68eef9c5dde6991483a331e50dcfb62ae8a0507
parentc92ef2df64f4013e72037cefb548f68d158488cc (diff)
refactor(tvix/castore/directory): have SimplePutter use Validator r/7773
This simplifies a bunch of code, and gets rid of some TODOs.

Also, move it out of castore/utils, and into its own file.

Change-Id: Ie63e05a6cdfb2a73e878cf7107f9172aed1cdf13
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11224
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
-rw-r--r--tvix/castore/src/directoryservice/memory.rs4
-rw-r--r--tvix/castore/src/directoryservice/mod.rs3
-rw-r--r--tvix/castore/src/directoryservice/simple_putter.rs75
-rw-r--r--tvix/castore/src/directoryservice/sled.rs4
-rw-r--r--tvix/castore/src/directoryservice/utils.rs55
5 files changed, 82 insertions, 59 deletions
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index 528ffe2f2c..2cbbbd1b16 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -5,8 +5,8 @@ use std::sync::{Arc, RwLock};
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::{traverse_directory, SimplePutter};
-use super::{DirectoryPutter, DirectoryService};
+use super::utils::traverse_directory;
+use super::{DirectoryPutter, DirectoryService, SimplePutter};
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index b6d1c2fcab..f9d8e08b31 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -6,6 +6,7 @@ mod closure_validator;
 mod from_addr;
 mod grpc;
 mod memory;
+mod simple_putter;
 mod sled;
 mod traverse;
 mod utils;
@@ -14,8 +15,10 @@ pub use self::closure_validator::ClosureValidator;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
+pub use self::simple_putter::SimplePutter;
 pub use self::sled::SledDirectoryService;
 pub use self::traverse::descend_to;
+pub use self::utils::traverse_directory;
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs
new file mode 100644
index 0000000000..25617ebcac
--- /dev/null
+++ b/tvix/castore/src/directoryservice/simple_putter.rs
@@ -0,0 +1,75 @@
+use super::ClosureValidator;
+use super::DirectoryPutter;
+use super::DirectoryService;
+use crate::proto;
+use crate::B3Digest;
+use crate::Error;
+use tonic::async_trait;
+use tracing::instrument;
+use tracing::warn;
+
+/// This is an implementation of DirectoryPutter that simply
+/// inserts individual Directory messages one by one, on close, after
+/// they successfully validated.
+pub struct SimplePutter<DS: DirectoryService> {
+    directory_service: DS,
+
+    directory_validator: Option<ClosureValidator>,
+}
+
+impl<DS: DirectoryService> SimplePutter<DS> {
+    pub fn new(directory_service: DS) -> Self {
+        Self {
+            directory_service,
+            directory_validator: Some(Default::default()),
+        }
+    }
+}
+
+#[async_trait]
+impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        match self.directory_validator {
+            None => return Err(Error::StorageError("already closed".to_string())),
+            Some(ref mut validator) => {
+                validator.add(directory)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    #[instrument(level = "trace", skip_all, ret, err)]
+    async fn close(&mut self) -> Result<B3Digest, Error> {
+        match self.directory_validator.take() {
+            None => Err(Error::InvalidRequest("already closed".to_string())),
+            Some(validator) => {
+                // retrieve the validated directories.
+                let directories = validator.finalize()?;
+
+                // Get the root digest, which is at the end (cf. insertion order)
+                let root_digest = directories
+                    .last()
+                    .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
+                    .digest();
+
+                // call an individual put for each directory and await the insertion.
+                for directory in directories {
+                    let exp_digest = directory.digest();
+                    let actual_digest = self.directory_service.put(directory).await?;
+
+                    // ensure the digest the backend told us matches our expectations.
+                    if exp_digest != actual_digest {
+                        warn!(directory.digest_expected=%exp_digest, directory.digest_actual=%actual_digest, "unexpected digest");
+                        return Err(Error::StorageError(
+                            "got unexpected digest from backend during put".into(),
+                        ));
+                    }
+                }
+
+                Ok(root_digest)
+            }
+        }
+    }
+}
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index 9acd385418..f41c5f7188 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -7,8 +7,8 @@ use std::path::Path;
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::{traverse_directory, SimplePutter};
-use super::DirectoryService;
+use super::utils::traverse_directory;
+use super::{DirectoryService, SimplePutter};
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index 6fa1a9e5fd..01c521076c 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -1,4 +1,3 @@
-use super::DirectoryPutter;
 use super::DirectoryService;
 use crate::proto;
 use crate::B3Digest;
@@ -6,8 +5,6 @@ use crate::Error;
 use async_stream::stream;
 use futures::stream::BoxStream;
 use std::collections::{HashSet, VecDeque};
-use tonic::async_trait;
-use tracing::instrument;
 use tracing::warn;
 
 /// Traverses a [proto::Directory] from the root to the children.
@@ -83,55 +80,3 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
 
     Box::pin(stream)
 }
-
-/// This is a simple implementation of a Directory uploader.
-/// TODO: verify connectivity? Factor out these checks into generic helpers?
-pub struct SimplePutter<DS: DirectoryService> {
-    directory_service: DS,
-    last_directory_digest: Option<B3Digest>,
-    closed: bool,
-}
-
-impl<DS: DirectoryService> SimplePutter<DS> {
-    pub fn new(directory_service: DS) -> Self {
-        Self {
-            directory_service,
-            closed: false,
-            last_directory_digest: None,
-        }
-    }
-}
-
-#[async_trait]
-impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
-    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
-        if self.closed {
-            return Err(Error::StorageError("already closed".to_string()));
-        }
-
-        let digest = self.directory_service.put(directory).await?;
-
-        // track the last directory digest
-        self.last_directory_digest = Some(digest);
-
-        Ok(())
-    }
-
-    #[instrument(level = "trace", skip_all, ret, err)]
-    async fn close(&mut self) -> Result<B3Digest, Error> {
-        if self.closed {
-            return Err(Error::StorageError("already closed".to_string()));
-        }
-
-        match &self.last_directory_digest {
-            Some(last_digest) => {
-                self.closed = true;
-                Ok(last_digest.clone())
-            }
-            None => Err(Error::InvalidRequest(
-                "no directories sent, can't show root digest".to_string(),
-            )),
-        }
-    }
-}