about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-20T19·42+0200
committerclbot <clbot@tvl.fyi>2024-03-24T19·55+0000
commitf7281d8fd52e7c921e03531d7189aee8b1635bd4 (patch)
treebb13cb97237a50058be8e607961af52a94fd3b8c
parent5f069a3eb8c3a089f1231bf4a618e4153736df96 (diff)
refactor(tvix/castore/directory/grpc_wrapper): use ClosureValidator r/7774
This greatly simplifies the code in this function, replacing it with a
much better tested (and more capable!) version of the validation logic.

It also enables the gRPC server frontend to make use of the
DirectoryPutter interface. While this might not be too visible in terms
of latency thanks to gRPC streams bursting, it also enables further
optimizations later (such as bucketing of directory closures).

Change-Id: I21f805aa72377dd5266de3b525905d9f445337d6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11221
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
-rw-r--r--tvix/castore/src/directoryservice/closure_validator.rs2
-rw-r--r--tvix/castore/src/proto/grpc_directoryservice_wrapper.rs106
2 files changed, 19 insertions, 89 deletions
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs
index ecd9acfca1..cc421d4ab5 100644
--- a/tvix/castore/src/directoryservice/closure_validator.rs
+++ b/tvix/castore/src/directoryservice/closure_validator.rs
@@ -39,7 +39,7 @@ pub struct ClosureValidator {
 impl ClosureValidator {
     /// Insert a new Directory into the closure.
     /// Perform individual Directory validation, validation of insertion order
-    // and size fields.
+    /// and size fields.
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
     pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
         let digest = directory.digest();
diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
index 87eca3d149..7d741a3f07 100644
--- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,7 +1,7 @@
+use crate::directoryservice::ClosureValidator;
 use crate::proto;
 use crate::{directoryservice::DirectoryService, B3Digest};
 use futures::StreamExt;
-use std::collections::HashMap;
 use std::ops::Deref;
 use tokio::sync::mpsc::channel;
 use tokio_stream::wrappers::ReceiverStream;
@@ -88,97 +88,27 @@ where
         request: Request<Streaming<proto::Directory>>,
     ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
         let mut req_inner = request.into_inner();
-        // TODO: let this use DirectoryPutter to the store it's connected to,
-        // and move the validation logic into [SimplePutter].
-
-        // This keeps track of the seen directory keys, and their size.
-        // This is used to validate the size field of a reference to a previously sent directory.
-        // We don't need to keep the contents around, they're stored in the DB.
-        // https://github.com/rust-lang/rust-clippy/issues/5812
-        #[allow(clippy::mutable_key_type)]
-        let mut seen_directories_sizes: HashMap<B3Digest, u64> = HashMap::new();
-        let mut last_directory_dgst: Option<B3Digest> = None;
-
-        // Consume directories, and insert them into the store.
-        // Reject directory messages that refer to Directories not sent in the same stream.
+
+        // We put all Directory messages we receive into ClosureValidator first.
+        let mut validator = ClosureValidator::default();
         while let Some(directory) = req_inner.message().await? {
-            // validate the directory itself.
-            if let Err(e) = directory.validate() {
-                warn!(err = %e, "invalid directory");
-                Err(Status::invalid_argument(format!(
-                    "directory failed validation: {}",
-                    e,
-                )))?;
-            }
+            validator.add(directory)?;
+        }
 
-            // for each child directory this directory refers to, we need
-            // to ensure it has been seen already in this stream, and that the size
-            // matches what we recorded.
-            for child_directory in &directory.directories {
-                let child_directory_digest: B3Digest = child_directory
-                    .digest
-                    .clone()
-                    .try_into()
-                    .map_err(|_e| Status::internal("invalid child directory digest len"))?;
-
-                match seen_directories_sizes.get(&child_directory_digest) {
-                    None => {
-                        return Err(Status::invalid_argument(format!(
-                            "child directory '{:?}' ({}) in directory '{}' not seen yet",
-                            child_directory.name,
-                            &child_directory_digest,
-                            &directory.digest(),
-                        )));
-                    }
-                    Some(seen_child_directory_size) => {
-                        if seen_child_directory_size != &child_directory.size {
-                            return Err(Status::invalid_argument(format!(
-                                    "child directory '{:?}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
-                                    child_directory.name,
-                                    &child_directory_digest,
-                                    &directory.digest(),
-                                    seen_child_directory_size,
-                                    child_directory.size,
-                            )));
-                        }
-                    }
-                }
-            }
+        // drain, which validates connectivity too.
+        let directories = validator.finalize()?;
 
-            // NOTE: We can't know if a directory we're receiving actually is
-            // part of the closure, because we receive directories from the leaf nodes up to
-            // the root.
-            // The only thing we could to would be doing a final check when the
-            // last Directory was received, that all Directories received so far are
-            // reachable from that (root) node.
-
-            let dgst = directory.digest();
-            seen_directories_sizes.insert(dgst.clone(), directory.size());
-            last_directory_dgst = Some(dgst.clone());
-
-            // check if the directory already exists in the database. We can skip
-            // inserting if it's already there, as that'd be a no-op.
-            match self.directory_service.get(&dgst).await {
-                Err(e) => {
-                    warn!(err = %e, "failed to check if directory already exists");
-                    return Err(e.into());
-                }
-                // skip if already exists
-                Ok(Some(_)) => {}
-                // insert if it doesn't already exist
-                Ok(None) => {
-                    self.directory_service.put(directory).await?;
-                }
-            }
+        let mut directory_putter = self.directory_service.put_multiple_start();
+        for directory in directories {
+            directory_putter.put(directory).await?;
         }
 
-        // We're done receiving. peek at last_directory_digest and either return the digest,
-        // or an error, if we received an empty stream.
-        match last_directory_dgst {
-            None => Err(Status::invalid_argument("no directories received")),
-            Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse {
-                root_digest: last_directory_dgst.into(),
-            })),
-        }
+        // Properly close the directory putter. Peek at last_directory_digest
+        // and return it, or propagate errors.
+        let last_directory_dgst = directory_putter.close().await?;
+
+        Ok(Response::new(proto::PutDirectoryResponse {
+            root_digest: last_directory_dgst.into(),
+        }))
     }
 }