about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/directoryservice/closure_validator.rs2
-rw-r--r--tvix/castore/src/proto/grpc_directoryservice_wrapper.rs106
2 files changed, 19 insertions, 89 deletions
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs
index ecd9acfca1..cc421d4ab5 100644
--- a/tvix/castore/src/directoryservice/closure_validator.rs
+++ b/tvix/castore/src/directoryservice/closure_validator.rs
@@ -39,7 +39,7 @@ pub struct ClosureValidator {
 impl ClosureValidator {
     /// Insert a new Directory into the closure.
     /// Perform individual Directory validation, validation of insertion order
-    // and size fields.
+    /// and size fields.
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
     pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
         let digest = directory.digest();
diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
index 87eca3d149..7d741a3f07 100644
--- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,7 +1,7 @@
+use crate::directoryservice::ClosureValidator;
 use crate::proto;
 use crate::{directoryservice::DirectoryService, B3Digest};
 use futures::StreamExt;
-use std::collections::HashMap;
 use std::ops::Deref;
 use tokio::sync::mpsc::channel;
 use tokio_stream::wrappers::ReceiverStream;
@@ -88,97 +88,27 @@ where
         request: Request<Streaming<proto::Directory>>,
     ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
         let mut req_inner = request.into_inner();
-        // TODO: let this use DirectoryPutter to the store it's connected to,
-        // and move the validation logic into [SimplePutter].
-
-        // This keeps track of the seen directory keys, and their size.
-        // This is used to validate the size field of a reference to a previously sent directory.
-        // We don't need to keep the contents around, they're stored in the DB.
-        // https://github.com/rust-lang/rust-clippy/issues/5812
-        #[allow(clippy::mutable_key_type)]
-        let mut seen_directories_sizes: HashMap<B3Digest, u64> = HashMap::new();
-        let mut last_directory_dgst: Option<B3Digest> = None;
-
-        // Consume directories, and insert them into the store.
-        // Reject directory messages that refer to Directories not sent in the same stream.
+
+        // We put all Directory messages we receive into ClosureValidator first.
+        let mut validator = ClosureValidator::default();
         while let Some(directory) = req_inner.message().await? {
-            // validate the directory itself.
-            if let Err(e) = directory.validate() {
-                warn!(err = %e, "invalid directory");
-                Err(Status::invalid_argument(format!(
-                    "directory failed validation: {}",
-                    e,
-                )))?;
-            }
+            validator.add(directory)?;
+        }
 
-            // for each child directory this directory refers to, we need
-            // to ensure it has been seen already in this stream, and that the size
-            // matches what we recorded.
-            for child_directory in &directory.directories {
-                let child_directory_digest: B3Digest = child_directory
-                    .digest
-                    .clone()
-                    .try_into()
-                    .map_err(|_e| Status::internal("invalid child directory digest len"))?;
-
-                match seen_directories_sizes.get(&child_directory_digest) {
-                    None => {
-                        return Err(Status::invalid_argument(format!(
-                            "child directory '{:?}' ({}) in directory '{}' not seen yet",
-                            child_directory.name,
-                            &child_directory_digest,
-                            &directory.digest(),
-                        )));
-                    }
-                    Some(seen_child_directory_size) => {
-                        if seen_child_directory_size != &child_directory.size {
-                            return Err(Status::invalid_argument(format!(
-                                    "child directory '{:?}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
-                                    child_directory.name,
-                                    &child_directory_digest,
-                                    &directory.digest(),
-                                    seen_child_directory_size,
-                                    child_directory.size,
-                            )));
-                        }
-                    }
-                }
-            }
+        // drain, which validates connectivity too.
+        let directories = validator.finalize()?;
 
-            // NOTE: We can't know if a directory we're receiving actually is
-            // part of the closure, because we receive directories from the leaf nodes up to
-            // the root.
-            // The only thing we could to would be doing a final check when the
-            // last Directory was received, that all Directories received so far are
-            // reachable from that (root) node.
-
-            let dgst = directory.digest();
-            seen_directories_sizes.insert(dgst.clone(), directory.size());
-            last_directory_dgst = Some(dgst.clone());
-
-            // check if the directory already exists in the database. We can skip
-            // inserting if it's already there, as that'd be a no-op.
-            match self.directory_service.get(&dgst).await {
-                Err(e) => {
-                    warn!(err = %e, "failed to check if directory already exists");
-                    return Err(e.into());
-                }
-                // skip if already exists
-                Ok(Some(_)) => {}
-                // insert if it doesn't already exist
-                Ok(None) => {
-                    self.directory_service.put(directory).await?;
-                }
-            }
+        let mut directory_putter = self.directory_service.put_multiple_start();
+        for directory in directories {
+            directory_putter.put(directory).await?;
         }
 
-        // We're done receiving. peek at last_directory_digest and either return the digest,
-        // or an error, if we received an empty stream.
-        match last_directory_dgst {
-            None => Err(Status::invalid_argument("no directories received")),
-            Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse {
-                root_digest: last_directory_dgst.into(),
-            })),
-        }
+        // Properly close the directory putter. Peek at last_directory_digest
+        // and return it, or propagate errors.
+        let last_directory_dgst = directory_putter.close().await?;
+
+        Ok(Response::new(proto::PutDirectoryResponse {
+            root_digest: last_directory_dgst.into(),
+        }))
     }
 }