about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-20T20·36+0200
committerflokli <flokli@flokli.de>2024-03-28T07·58+0000
commit74023a07a4b3d8e99645217b04683c0e54e23be8 (patch)
treee5c7f42540efe6dc94666e2694031b14ccd6715b
parent05d3f21eaf0834d44b6f32a49ae3276ebcb6571c (diff)
refactor(tvix/castore/*): drop utils.rs and grpc directorysvc tests r/7795
This drops pretty much all of castore/utils.rs.

There were only two things left in there, both a bit messy and only used
for tests:

Some `gen_*_service()` helper functions. These can be expressed by
`from_addr("memory://")`.

The other thing was some plumbing code to test the gRPC layer, by
exposing a in-memory implementation via gRPC, and then connecting to
that channel via a gRPC client again.

Previous CLs moved the connection setup code to
{directory,blob}service::tests::utils, close to where we exercise them,
the new rstest-based tests.

The tests interacting directly on the gRPC types are removed, all
scenarios that were in there show now be covered through the rstest ones
on the trait level.

Change-Id: I450ccccf983b4c62145a25d81c36a40846664814
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11223
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
-rw-r--r--tvix/build/src/buildservice/from_addr.rs18
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs163
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs3
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs8
-rw-r--r--tvix/castore/src/lib.rs1
-rw-r--r--tvix/castore/src/proto/tests/grpc_blobservice.rs94
-rw-r--r--tvix/castore/src/proto/tests/grpc_directoryservice.rs246
-rw-r--r--tvix/castore/src/proto/tests/mod.rs2
-rw-r--r--tvix/castore/src/tests/import.rs19
-rw-r--r--tvix/castore/src/utils.rs89
10 files changed, 31 insertions, 612 deletions
diff --git a/tvix/build/src/buildservice/from_addr.rs b/tvix/build/src/buildservice/from_addr.rs
index ee2b4e50b4..f5c4e6a490 100644
--- a/tvix/build/src/buildservice/from_addr.rs
+++ b/tvix/build/src/buildservice/from_addr.rs
@@ -51,7 +51,10 @@ mod tests {
 
     use super::from_addr;
     use test_case::test_case;
-    use tvix_castore::utils::{gen_blob_service, gen_directory_service};
+    use tvix_castore::{
+        blobservice::{BlobService, MemoryBlobService},
+        directoryservice::{DirectoryService, MemoryDirectoryService},
+    };
 
     /// This uses an unsupported scheme.
     #[test_case("http://foo.example/test", false; "unsupported scheme")]
@@ -71,14 +74,13 @@ mod tests {
     #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
     #[tokio::test]
     async fn test_from_addr(uri_str: &str, is_ok: bool) {
+        let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
+        let directory_service: Arc<dyn DirectoryService> =
+            Arc::from(MemoryDirectoryService::default());
         assert_eq!(
-            from_addr(
-                uri_str,
-                Arc::from(gen_blob_service()),
-                Arc::from(gen_directory_service())
-            )
-            .await
-            .is_ok(),
+            from_addr(uri_str, blob_service, directory_service)
+                .await
+                .is_ok(),
             is_ok
         )
     }
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index fe410a3825..84cf01e167 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -220,18 +220,6 @@ pub struct GRPCPutter {
     )>,
 }
 
-impl GRPCPutter {
-    // allows checking if the tx part of the channel is closed.
-    // only used in the test case.
-    #[cfg(test)]
-    fn is_closed(&self) -> bool {
-        match self.rq {
-            None => true,
-            Some((_, ref directory_sender)) => directory_sender.is_closed(),
-        }
-    }
-}
-
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
@@ -280,163 +268,18 @@ impl DirectoryPutter for GRPCPutter {
 
 #[cfg(test)]
 mod tests {
-    use core::time;
-    use futures::StreamExt;
-    use std::{any::Any, time::Duration};
+    use std::time::Duration;
     use tempfile::TempDir;
     use tokio::net::UnixListener;
     use tokio_retry::{strategy::ExponentialBackoff, Retry};
     use tokio_stream::wrappers::UnixListenerStream;
 
     use crate::{
-        directoryservice::{
-            grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService,
-            MemoryDirectoryService,
-        },
-        fixtures::{self, DIRECTORY_A, DIRECTORY_B},
+        directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService},
+        fixtures,
         proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper},
-        utils::gen_directorysvc_grpc_client,
     };
 
-    #[tokio::test]
-    async fn test() {
-        // create the GrpcDirectoryService
-        let directory_service =
-            super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await);
-
-        // try to get DIRECTORY_A should return Ok(None)
-        assert_eq!(
-            None,
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must not fail")
-        );
-
-        // Now upload it
-        assert_eq!(
-            DIRECTORY_A.digest(),
-            directory_service
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect("must succeed")
-        );
-
-        // And retrieve it, compare for equality.
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must succeed")
-                .expect("must be some")
-        );
-
-        // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
-        directory_service
-            .put(DIRECTORY_B.clone())
-            .await
-            .expect_err("must fail");
-
-        // Putting DIRECTORY_B in a put_multiple will succeed, but the close
-        // will always fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-            handle.close().await.expect_err("must fail");
-        }
-
-        // Uploading A and then B should succeed, and closing should return the digest of B.
-        let mut handle = directory_service.put_multiple_start();
-        handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
-        handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-        let digest = handle.close().await.expect("must succeed");
-        assert_eq!(DIRECTORY_B.digest(), digest);
-
-        // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
-        let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
-        assert_eq!(
-            DIRECTORY_B.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-
-        // Uploading B and then A should fail, because B refers to A, which
-        // hasn't been uploaded yet.
-        // However, the client can burst, so we might not have received the
-        // error back from the server.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            // sending out B will always be fine
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // whether we will be able to put A as well depends on whether we
-            // already received the error about B.
-            if handle.put(DIRECTORY_A.clone()).await.is_ok() {
-                // If we didn't, and this was Ok(_), …
-                // a subsequent close MUST fail (because it waits for the
-                // server)
-                handle.close().await.expect_err("must fail");
-            }
-        }
-
-        // Now we do the same test as before, send B, then A, but wait
-        // a long long time so we already received the error from the server
-        // (causing the internal stream to be closed).
-        // Uploading anything else subsequently should then fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // get a GRPCPutter, so we can peek at [is_closed].
-            let handle_any = &mut handle as &mut dyn Any;
-
-            // `unchecked_downcast_mut` is unstable for now,
-            // https://github.com/rust-lang/rust/issues/90850
-            // We do the same thing here.
-            // The reason for why we cannot use the checked downcast lies
-            // in the fact that:
-            // - GRPCPutter has type ID A
-            // - Box<GRPCPutter> has type ID B
-            // - "Box<dyn GRPCPutter>" (invalid type) has type ID C
-            // B seems different from C in this context.
-            // We cannot unpack and perform upcast coercion of the traits as it's an unstable
-            // feature.
-            // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose
-            // of not making leak `is_closed` in the original trait.
-            let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) };
-            let mut is_closed = false;
-            for _try in 1..1000 {
-                if handle.is_closed() {
-                    is_closed = true;
-                    break;
-                }
-                tokio::time::sleep(time::Duration::from_millis(10)).await;
-            }
-
-            assert!(
-                is_closed,
-                "expected channel to eventually close, but never happened"
-            );
-
-            handle
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect_err("must fail");
-        }
-    }
-
     /// This ensures connecting via gRPC works as expected.
     #[tokio::test]
     async fn test_valid_unix_path_ping_pong() {
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index 5eb2d1919e..cec49bb2c6 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -180,7 +180,8 @@ async fn upload_reject_failing_validation(directory_service: impl DirectoryServi
     );
 
     // Try to upload via put_multiple. We're a bit more permissive here, the
-    // intermediate .put() might succeed, but then the close MUST fail.
+    // intermediate .put() might succeed, due to client-side bursting (in the
+    // case of gRPC), but then the close MUST fail.
     let mut handle = directory_service.put_multiple_start();
     if handle.put(broken_directory).await.is_ok() {
         assert!(
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
index 5c6975351b..573581edbd 100644
--- a/tvix/castore/src/directoryservice/traverse.rs
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -87,14 +87,16 @@ where
 mod tests {
     use std::path::PathBuf;
 
-    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP};
-    use crate::utils::gen_directory_service;
+    use crate::{
+        directoryservice,
+        fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
+    };
 
     use super::descend_to;
 
     #[tokio::test]
     async fn test_descend_to() {
-        let directory_service = gen_directory_service();
+        let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
         let mut handle = directory_service.put_multiple_start();
         handle
diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs
index 1ce092135b..1a7ac6b4b4 100644
--- a/tvix/castore/src/lib.rs
+++ b/tvix/castore/src/lib.rs
@@ -12,7 +12,6 @@ pub mod fs;
 pub mod import;
 pub mod proto;
 pub mod tonic;
-pub mod utils;
 
 pub use digests::{B3Digest, B3_LEN};
 pub use errors::Error;
diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs
deleted file mode 100644
index fb202b7d8a..0000000000
--- a/tvix/castore/src/proto/tests/grpc_blobservice.rs
+++ /dev/null
@@ -1,94 +0,0 @@
-use crate::fixtures::{BLOB_A, BLOB_A_DIGEST};
-use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest};
-use crate::utils::gen_blobsvc_grpc_client;
-use tokio_stream::StreamExt;
-
-/// Trying to read a non-existent blob should return a not found error.
-#[tokio::test]
-async fn not_found_read() {
-    let mut grpc_client = gen_blobsvc_grpc_client().await;
-
-    let resp = grpc_client
-        .read(ReadBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-        })
-        .await;
-
-    // We can't use unwrap_err here, because the Ok value doesn't implement
-    // debug.
-    if let Err(e) = resp {
-        assert_eq!(e.code(), tonic::Code::NotFound);
-    } else {
-        panic!("resp is not err")
-    }
-}
-
-/// Trying to stat a non-existent blob should return a not found error.
-#[tokio::test]
-async fn not_found_stat() {
-    let mut grpc_client = gen_blobsvc_grpc_client().await;
-
-    let resp = grpc_client
-        .stat(StatBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-            ..Default::default()
-        })
-        .await
-        .expect_err("must fail");
-
-    // The resp should be a status with Code::NotFound
-    assert_eq!(resp.code(), tonic::Code::NotFound);
-}
-
-/// Put a blob in the store, get it back.
-#[tokio::test]
-async fn put_read_stat() {
-    let mut grpc_client = gen_blobsvc_grpc_client().await;
-
-    // Send blob A.
-    let put_resp = grpc_client
-        .put(tokio_stream::once(BlobChunk {
-            data: BLOB_A.clone(),
-        }))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    assert_eq!(BLOB_A_DIGEST.as_slice(), put_resp.digest);
-
-    // Stat for the digest of A.
-    // We currently don't ask for more granular chunking data, as we don't
-    // expose it yet.
-    let _resp = grpc_client
-        .stat(StatBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-            ..Default::default()
-        })
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    // Read the blob. It should return the same data.
-    let resp = grpc_client
-        .read(ReadBlobRequest {
-            digest: BLOB_A_DIGEST.clone().into(),
-        })
-        .await;
-
-    let mut rx = resp.ok().unwrap().into_inner();
-
-    // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
-    let item = rx
-        .next()
-        .await
-        .expect("must be some")
-        .expect("must succeed");
-
-    assert_eq!(BLOB_A.clone(), item.data);
-
-    // … and no more elements
-    assert!(rx.next().await.is_none());
-
-    // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
-    // Test with some bigger blob too
-}
diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
deleted file mode 100644
index dcd9a0ef01..0000000000
--- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs
+++ /dev/null
@@ -1,246 +0,0 @@
-use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
-use crate::proto::directory_service_client::DirectoryServiceClient;
-use crate::proto::get_directory_request::ByWhat;
-use crate::proto::GetDirectoryRequest;
-use crate::proto::{Directory, DirectoryNode, SymlinkNode};
-use crate::utils::gen_directorysvc_grpc_client;
-use tokio_stream::StreamExt;
-use tonic::transport::Channel;
-use tonic::Status;
-
-/// Send the specified GetDirectoryRequest.
-/// Returns an error in the case of an error response, or an error in one of
-/// the items in the stream, or a Vec<Directory> in the case of a successful
-/// request.
-async fn get_directories(
-    grpc_client: &mut DirectoryServiceClient<Channel>,
-    get_directory_request: GetDirectoryRequest,
-) -> Result<Vec<Directory>, Status> {
-    let resp = grpc_client.get(get_directory_request).await;
-
-    // if the response is an error itself, return the error, otherwise unpack
-    let stream = match resp {
-        Ok(resp) => resp,
-        Err(status) => return Err(status),
-    }
-    .into_inner();
-
-    let directory_results: Vec<Result<Directory, Status>> = stream.collect().await;
-
-    // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status>
-    directory_results.into_iter().collect()
-}
-
-/// Trying to get a non-existent Directory should return a not found error.
-#[tokio::test]
-async fn not_found() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    let resp = grpc_client
-        .get(GetDirectoryRequest {
-            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
-            ..Default::default()
-        })
-        .await;
-
-    let stream = resp.expect("must succeed").into_inner();
-
-    let items: Vec<_> = stream.collect().await;
-
-    // The stream should contain one element, an error with Code::NotFound.
-    assert_eq!(1, items.len());
-    let item = items[0].clone();
-
-    assert!(item.is_err(), "must be err");
-    assert_eq!(
-        tonic::Code::NotFound,
-        item.unwrap_err().code(),
-        "must be err"
-    );
-}
-
-/// Put a Directory into the store, get it back.
-#[tokio::test]
-async fn put_get() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // send directory A.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::once(DIRECTORY_A.clone()))
-            .await
-            .expect("must succeed")
-            .into_inner()
-    };
-
-    // the sent root_digest should match the calculated digest
-    assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().as_slice());
-
-    // get it back
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
-            ..Default::default()
-        },
-    )
-    .await
-    .expect("must not error");
-
-    assert_eq!(vec![DIRECTORY_A.clone()], items);
-}
-
-/// Put multiple Directories into the store, and get them back
-#[tokio::test]
-async fn put_get_multiple() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // sending "b" (which refers to "a") without sending "a" first should fail.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::once(DIRECTORY_B.clone()))
-            .await
-            .expect_err("must fail")
-    };
-
-    assert_eq!(tonic::Code::InvalidArgument, put_resp.code());
-
-    // sending "a", then "b" should succeed, and the response should contain the digest of b.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::iter(vec![
-                DIRECTORY_A.clone(),
-                DIRECTORY_B.clone(),
-            ]))
-            .await
-            .expect("must succeed")
-            .into_inner()
-    };
-
-    assert_eq!(DIRECTORY_B.digest().as_slice(), put_resp.root_digest);
-
-    // now, request b, first in non-recursive mode.
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            recursive: false,
-            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to only get b.
-    assert_eq!(vec![DIRECTORY_B.clone()], items);
-
-    // now, request b, but in recursive mode.
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            recursive: true,
-            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to get b, and then a, because that's how we traverse down.
-    assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items);
-}
-
-/// Put multiple Directories into the store, and omit duplicates.
-#[tokio::test]
-async fn put_get_dedup() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // Send "A", then "C", which refers to "A" two times
-    // Pretend we're a dumb client sending A twice.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::iter(vec![
-                DIRECTORY_A.clone(),
-                DIRECTORY_A.clone(),
-                DIRECTORY_C.clone(),
-            ]))
-            .await
-            .expect("must succeed")
-    };
-
-    assert_eq!(
-        DIRECTORY_C.digest().as_slice(),
-        put_resp.into_inner().root_digest
-    );
-
-    // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice.
-    let items = get_directories(
-        &mut grpc_client,
-        GetDirectoryRequest {
-            recursive: true,
-            by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())),
-        },
-    )
-    .await
-    .expect("must not error");
-
-    // We expect to get C, and then A (once, as the second A has been deduplicated).
-    assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items);
-}
-
-/// Trying to upload a Directory failing validation should fail.
-#[tokio::test]
-async fn put_reject_failed_validation() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // construct a broken Directory message that fails validation
-    let broken_directory = Directory {
-        symlinks: vec![SymlinkNode {
-            name: "".into(),
-            target: "doesntmatter".into(),
-        }],
-        ..Default::default()
-    };
-    assert!(broken_directory.validate().is_err());
-
-    // send it over, it must fail
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::once(broken_directory))
-            .await
-            .expect_err("must fail")
-    };
-
-    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
-}
-
-/// Trying to upload a Directory with wrong size should fail.
-#[tokio::test]
-async fn put_reject_wrong_size() {
-    let mut grpc_client = gen_directorysvc_grpc_client().await;
-
-    // Construct a directory referring to DIRECTORY_A, but with wrong size.
-    let broken_parent_directory = Directory {
-        directories: vec![DirectoryNode {
-            name: "foo".into(),
-            digest: DIRECTORY_A.digest().into(),
-            size: 42,
-        }],
-        ..Default::default()
-    };
-    // Make sure we got the size wrong.
-    assert_ne!(
-        broken_parent_directory.directories[0].size,
-        DIRECTORY_A.size()
-    );
-
-    // now upload both (first A, then the broken parent). This must fail.
-    let put_resp = {
-        grpc_client
-            .put(tokio_stream::iter(vec![
-                DIRECTORY_A.clone(),
-                broken_parent_directory,
-            ]))
-            .await
-            .expect_err("must fail")
-    };
-    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
-}
diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs
index 8b62fadeb5..8d903bacb6 100644
--- a/tvix/castore/src/proto/tests/mod.rs
+++ b/tvix/castore/src/proto/tests/mod.rs
@@ -1,4 +1,2 @@
 mod directory;
 mod directory_nodes_iterator;
-mod grpc_blobservice;
-mod grpc_directoryservice;
diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs
index 99e993f36d..b44b71cd78 100644
--- a/tvix/castore/src/tests/import.rs
+++ b/tvix/castore/src/tests/import.rs
@@ -1,8 +1,9 @@
-use crate::blobservice::BlobService;
+use crate::blobservice::{self, BlobService};
+use crate::directoryservice;
 use crate::fixtures::*;
 use crate::import::ingest_path;
 use crate::proto;
-use crate::utils::{gen_blob_service, gen_directory_service};
+
 use std::sync::Arc;
 use tempfile::TempDir;
 
@@ -12,8 +13,8 @@ use std::os::unix::ffi::OsStrExt;
 #[cfg(target_family = "unix")]
 #[tokio::test]
 async fn symlink() {
-    let blob_service = gen_blob_service();
-    let directory_service = gen_directory_service();
+    let blob_service = blobservice::from_addr("memory://").await.unwrap();
+    let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
     let tmpdir = TempDir::new().unwrap();
 
@@ -43,8 +44,9 @@ async fn symlink() {
 
 #[tokio::test]
 async fn single_file() {
-    let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
-    let directory_service = gen_directory_service();
+    let blob_service =
+        Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>;
+    let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
     let tmpdir = TempDir::new().unwrap();
 
@@ -75,8 +77,9 @@ async fn single_file() {
 #[cfg(target_family = "unix")]
 #[tokio::test]
 async fn complicated() {
-    let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
-    let directory_service = gen_directory_service();
+    let blob_service =
+        Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>;
+    let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
     let tmpdir = TempDir::new().unwrap();
 
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
deleted file mode 100644
index ca68f9f26c..0000000000
--- a/tvix/castore/src/utils.rs
+++ /dev/null
@@ -1,89 +0,0 @@
-//! A crate containing constructors to provide instances of a BlobService and
-//! DirectoryService. Only used for testing purposes, but across crates.
-//! Should be removed once we have a better concept of a "Service registry".
-use tonic::transport::{Channel, Endpoint, Server, Uri};
-
-use crate::{
-    blobservice::{BlobService, MemoryBlobService},
-    directoryservice::{DirectoryService, MemoryDirectoryService},
-    proto::{
-        blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer,
-        directory_service_client::DirectoryServiceClient,
-        directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper,
-        GRPCDirectoryServiceWrapper,
-    },
-};
-
-pub fn gen_blob_service() -> Box<dyn BlobService> {
-    Box::<MemoryBlobService>::default()
-}
-
-pub fn gen_directory_service() -> Box<dyn DirectoryService> {
-    Box::<MemoryDirectoryService>::default()
-}
-
-/// This will spawn the a gRPC server with a DirectoryService client, connect a
-/// gRPC DirectoryService client and return it.
-#[allow(dead_code)]
-pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
-    let (left, right) = tokio::io::duplex(64);
-
-    // spin up a server, which will only connect once, to the left side.
-    tokio::spawn(async {
-        // spin up a new DirectoryService
-        let mut server = Server::builder();
-        let router = server.add_service(DirectoryServiceServer::new(
-            GRPCDirectoryServiceWrapper::new(gen_directory_service()),
-        ));
-
-        router
-            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
-            .await
-    });
-
-    // Create a client, connecting to the right side. The URI is unused.
-    let mut maybe_right = Some(right);
-    DirectoryServiceClient::new(
-        Endpoint::try_from("http://[::]:50051")
-            .unwrap()
-            .connect_with_connector(tower::service_fn(move |_: Uri| {
-                let right = maybe_right.take().unwrap();
-                async move { Ok::<_, std::io::Error>(right) }
-            }))
-            .await
-            .unwrap(),
-    )
-}
-
-/// This will spawn the a gRPC server with a BlobService client, connect a
-/// gRPC BlobService client and return it.
-#[allow(dead_code)]
-pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
-    let (left, right) = tokio::io::duplex(64);
-
-    // spin up a server, which will only connect once, to the left side.
-    tokio::spawn(async {
-        // spin up a new DirectoryService
-        let mut server = Server::builder();
-        let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
-            gen_blob_service(),
-        )));
-
-        router
-            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
-            .await
-    });
-
-    // Create a client, connecting to the right side. The URI is unused.
-    let mut maybe_right = Some(right);
-    BlobServiceClient::new(
-        Endpoint::try_from("http://[::]:50051")
-            .unwrap()
-            .connect_with_connector(tower::service_fn(move |_: Uri| {
-                let right = maybe_right.take().unwrap();
-                async move { Ok::<_, std::io::Error>(right) }
-            }))
-            .await
-            .unwrap(),
-    )
-}