From 156a5a0fb6eeb231431a86f8d53f3ef50816c03d Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 28 Mar 2024 22:05:46 +0100 Subject: refactor(tvix/glue): drop ingest_entries_sync Make this function async, and do the block_on on the (single) callsite. Change-Id: Ib8b0b54ab5370fe02ef95f38a45d8866868a9d60 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11285 Reviewed-by: Connor Brewster Tested-by: BuildkiteCI --- tvix/glue/src/builtins/import.rs | 15 +++++++++------ tvix/glue/src/tvix_store_io.rs | 23 +++++++++++------------ 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 50b99690ee..88e483031f 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -102,12 +102,15 @@ async fn filtered_ingest( pin_mut!(entries_stream); - state - .ingest_entries_sync(entries_stream) - .map_err(|err| ErrorKind::IO { - path: Some(path.to_path_buf()), - error: err.into(), - }) + state.tokio_handle.block_on(async { + state + .ingest_entries(entries_stream) + .await + .map_err(|err| ErrorKind::IO { + path: Some(path.to_path_buf()), + error: err.into(), + }) + }) } #[builtins(state = "Rc")] diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index dc1974527b..cbda365c20 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -273,21 +273,20 @@ impl TvixStoreIO { .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e)) } - /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`] - /// with a [`tokio::runtime::Handle::block_on`] call for synchronicity. - pub(crate) fn ingest_entries_sync(&self, entries_stream: S) -> io::Result + /// This forwards the ingestion to the [`tvix_castore::import::ingest_entries`], + /// passing the blob_service and directory_service that's used. + /// The error is mapped to std::io::Error for simplicity. + pub(crate) async fn ingest_entries(&self, entries_stream: S) -> io::Result where S: Stream + Unpin, { - self.tokio_handle.block_on(async move { - tvix_castore::import::ingest_entries( - &self.blob_service, - &self.directory_service, - entries_stream, - ) - .await - .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err)) - }) + tvix_castore::import::ingest_entries( + &self.blob_service, + &self.directory_service, + entries_stream, + ) + .await + .map_err(|err| std::io::Error::new(io::ErrorKind::Other, err)) } pub(crate) async fn node_to_path_info( -- cgit 1.4.1