about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/simple_putter.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-20T21·28+0200
committerclbot <clbot@tvl.fyi>2024-03-24T17·42+0000
commit5f069a3eb8c3a089f1231bf4a618e4153736df96 (patch)
treed68eef9c5dde6991483a331e50dcfb62ae8a0507 /tvix/castore/src/directoryservice/simple_putter.rs
parentc92ef2df64f4013e72037cefb548f68d158488cc (diff)
refactor(tvix/castore/directory): have SimplePutter use Validator r/7773
This simplifies a bunch of code, and gets rid of some TODOs.

Also, move it out of castore/utils, and into its own file.

Change-Id: Ie63e05a6cdfb2a73e878cf7107f9172aed1cdf13
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11224
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore/src/directoryservice/simple_putter.rs')
-rw-r--r--tvix/castore/src/directoryservice/simple_putter.rs75
1 files changed, 75 insertions, 0 deletions
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs
new file mode 100644
index 0000000000..25617ebcac
--- /dev/null
+++ b/tvix/castore/src/directoryservice/simple_putter.rs
@@ -0,0 +1,75 @@
+use super::ClosureValidator;
+use super::DirectoryPutter;
+use super::DirectoryService;
+use crate::proto;
+use crate::B3Digest;
+use crate::Error;
+use tonic::async_trait;
+use tracing::instrument;
+use tracing::warn;
+
+/// This is an implementation of DirectoryPutter that simply
+/// inserts individual Directory messages one by one, on close, after
+/// they successfully validated.
+pub struct SimplePutter<DS: DirectoryService> {
+    directory_service: DS,
+
+    directory_validator: Option<ClosureValidator>,
+}
+
+impl<DS: DirectoryService> SimplePutter<DS> {
+    pub fn new(directory_service: DS) -> Self {
+        Self {
+            directory_service,
+            directory_validator: Some(Default::default()),
+        }
+    }
+}
+
+#[async_trait]
+impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        match self.directory_validator {
+            None => return Err(Error::StorageError("already closed".to_string())),
+            Some(ref mut validator) => {
+                validator.add(directory)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    #[instrument(level = "trace", skip_all, ret, err)]
+    async fn close(&mut self) -> Result<B3Digest, Error> {
+        match self.directory_validator.take() {
+            None => Err(Error::InvalidRequest("already closed".to_string())),
+            Some(validator) => {
+                // retrieve the validated directories.
+                let directories = validator.finalize()?;
+
+                // Get the root digest, which is at the end (cf. insertion order)
+                let root_digest = directories
+                    .last()
+                    .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
+                    .digest();
+
+                // call an individual put for each directory and await the insertion.
+                for directory in directories {
+                    let exp_digest = directory.digest();
+                    let actual_digest = self.directory_service.put(directory).await?;
+
+                    // ensure the digest the backend told us matches our expectations.
+                    if exp_digest != actual_digest {
+                        warn!(directory.digest_expected=%exp_digest, directory.digest_actual=%actual_digest, "unexpected digest");
+                        return Err(Error::StorageError(
+                            "got unexpected digest from backend during put".into(),
+                        ));
+                    }
+                }
+
+                Ok(root_digest)
+            }
+        }
+    }
+}