about summary refs log tree commit diff
path: root/tvix/glue/src/tvix_store_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r--tvix/glue/src/tvix_store_io.rs93
1 files changed, 71 insertions, 22 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index f0f2f5cf91..8f44d2fe83 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -15,9 +15,11 @@ use std::{
     path::{Path, PathBuf},
     sync::Arc,
 };
-use tokio_util::io::SyncIoBridge;
+use tokio::io::AsyncBufRead;
+use tokio_util::io::{InspectReader, SyncIoBridge};
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
+use tvix_castore::import::archive::ingest_archive;
 use tvix_castore::import::fs::dir_entry_iter_to_ingestion_stream;
 use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
 use tvix_store::utils::AsyncIoBridge;
@@ -32,6 +34,7 @@ use tvix_castore::{
 use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
 
 use crate::builtins::FetcherError;
+use crate::decompression::DecompressedReader;
 use crate::known_paths::KnownPaths;
 use crate::tvix_build::derivation_to_build_request;
 
@@ -298,7 +301,7 @@ impl TvixStoreIO {
         path: &Path,
         ca: CAHash,
         root_node: Node,
-    ) -> io::Result<(PathInfo, StorePath)> {
+    ) -> io::Result<(PathInfo, NixHash, StorePath)> {
         // Ask the PathInfoService for the NAR size and sha256
         // We always need it no matter what is the actual hash mode
         // because the path info construct a narinfo which *always*
@@ -327,7 +330,11 @@ impl TvixStoreIO {
         let path_info =
             tvix_store::import::derive_nar_ca_path_info(nar_size, nar_sha256, Some(ca), root_node);
 
-        Ok((path_info, output_path.to_owned()))
+        Ok((
+            path_info,
+            NixHash::Sha256(nar_sha256),
+            output_path.to_owned(),
+        ))
     }
 
     pub(crate) async fn register_node_in_path_info_service(
@@ -337,7 +344,7 @@ impl TvixStoreIO {
         ca: CAHash,
         root_node: Node,
     ) -> io::Result<StorePath> {
-        let (path_info, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
+        let (path_info, _, output_path) = self.node_to_path_info(name, path, ca, root_node).await?;
         let _path_info = self.path_info_service.as_ref().put(path_info).await?;
 
         Ok(output_path)
@@ -372,33 +379,34 @@ impl TvixStoreIO {
             .is_some())
     }
 
-    pub async fn fetch_url(
-        &self,
-        url: &str,
-        name: &str,
-        hash: Option<&NixHash>,
-    ) -> Result<StorePath, ErrorKind> {
+    async fn download<'a>(&self, url: &str) -> Result<impl AsyncBufRead + Unpin + 'a, ErrorKind> {
         let resp = self
             .http_client
             .get(url)
             .send()
             .await
             .map_err(FetcherError::from)?;
-        let mut sha = Sha256::new();
-        let mut data = tokio_util::io::StreamReader::new(
-            resp.bytes_stream()
-                .inspect_ok(|data| {
-                    sha.update(data);
-                })
-                .map_err(|e| {
-                    let e = e.without_url();
-                    warn!(%e, "failed to get response body");
-                    io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
-                }),
-        );
+        Ok(tokio_util::io::StreamReader::new(
+            resp.bytes_stream().map_err(|e| {
+                let e = e.without_url();
+                warn!(%e, "failed to get response body");
+                io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+            }),
+        ))
+    }
 
+    pub async fn fetch_url(
+        &self,
+        url: &str,
+        name: &str,
+        hash: Option<&NixHash>,
+    ) -> Result<StorePath, ErrorKind> {
+        let mut sha = Sha256::new();
+        let data = self.download(url).await?;
+        let mut data = InspectReader::new(data, |b| sha.update(b));
         let mut blob = self.blob_service.open_write().await;
         let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
+        drop(data);
         let blob_digest = blob.close().await?;
         let got = NixHash::Sha256(sha.finalize().into());
 
@@ -453,6 +461,47 @@ impl TvixStoreIO {
 
         Ok(path.to_owned())
     }
+
+    pub async fn fetch_tarball(
+        &self,
+        url: &str,
+        name: &str,
+        ca: Option<CAHash>,
+    ) -> Result<StorePath, ErrorKind> {
+        let data = self.download(url).await?;
+        let data = DecompressedReader::new(data);
+        let archive = tokio_tar::Archive::new(data);
+        let node = ingest_archive(&self.blob_service, &self.directory_service, archive)
+            .await
+            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+
+        let (path_info, got, output_path) = self
+            .node_to_path_info(
+                name,
+                Path::new(""),
+                ca.clone().expect("TODO: support unspecified CA hash"),
+                node,
+            )
+            .await?;
+
+        if let Some(wanted) = &ca {
+            if *wanted.hash() != got {
+                return Err(FetcherError::HashMismatch {
+                    url: url.to_owned(),
+                    wanted: wanted.hash().into_owned(),
+                    got,
+                }
+                .into());
+            }
+        }
+
+        self.path_info_service
+            .put(path_info)
+            .await
+            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+
+        Ok(output_path)
+    }
 }
 
 impl EvalIO for TvixStoreIO {