about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/nix_http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice/nix_http.rs')
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs196
1 files changed, 74 insertions, 122 deletions
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index bdb0e2c3cb..cccd4805c6 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,6 +1,3 @@
-use std::io::{self, BufRead, Read, Write};
-
-use data_encoding::BASE64;
 use futures::{stream::BoxStream, TryStreamExt};
 use nix_compat::{
     narinfo::{self, NarInfo},
@@ -8,7 +5,10 @@ use nix_compat::{
     nixhash::NixHash,
 };
 use reqwest::StatusCode;
-use sha2::{digest::FixedOutput, Digest, Sha256};
+use sha2::Digest;
+use std::io::{self, Write};
+use tokio::io::{AsyncRead, BufReader};
+use tokio_util::io::InspectReader;
 use tonic::async_trait;
 use tracing::{debug, instrument, warn};
 use tvix_castore::{
@@ -32,8 +32,7 @@ use super::PathInfoService;
 ///
 /// The client is expected to be (indirectly) using the same [BlobService] and
 /// [DirectoryService], so able to fetch referred Directories and Blobs.
-/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not
-/// implemented and return an error if called.
+/// [PathInfoService::put] is not implemented and returns an error if called.
 /// TODO: what about reading from nix-cache-info?
 pub struct NixHTTPPathInfoService<BS, DS> {
     base_url: url::Url,
@@ -71,7 +70,7 @@ where
     BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
     DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
 {
-    #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))]
+    #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let narinfo_url = self
             .base_url
@@ -171,85 +170,83 @@ where
             )));
         }
 
-        // get an AsyncRead of the response body.
-        let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+        // get a reader of the response body.
+        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
             let e = e.without_url();
             warn!(e=%e, "failed to get response body");
             io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
         }));
-        let sync_r = tokio_util::io::SyncIoBridge::new(async_r);
 
-        // handle decompression, by wrapping the reader.
-        let sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
-            Some("none") => Box::new(sync_r),
-            Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
-            Some(comp) => {
-                return Err(Error::InvalidRequest(
-                    format!("unsupported compression: {}", comp).to_string(),
-                ))
-            }
-            None => {
-                return Err(Error::InvalidRequest(
-                    "unsupported compression: bzip2".to_string(),
-                ))
+        // handle decompression, depending on the compression field.
+        let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+            Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
+            Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some(comp_str) => {
+                return Err(Error::StorageError(format!(
+                    "unsupported compression: {comp_str}"
+                )));
             }
         };
-
-        let res = tokio::task::spawn_blocking({
-            let blob_service = self.blob_service.clone();
-            let directory_service = self.directory_service.clone();
-            move || -> io::Result<_> {
-                // Wrap the reader once more, so we can calculate NarSize and NarHash
-                let mut sync_r = io::BufReader::new(NarReader::from(sync_r));
-                let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?;
-
-                let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner();
-
-                Ok((root_node, nar_hash, nar_size))
-            }
-        })
+        let mut nar_hash = sha2::Sha256::new();
+        let mut nar_size = 0;
+
+        // Assemble NarHash and NarSize as we read bytes.
+        let r = InspectReader::new(r, |b| {
+            nar_size += b.len() as u64;
+            nar_hash.write_all(b).unwrap();
+        });
+
+        // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
+        let mut r = BufReader::new(r);
+
+        let root_node = crate::nar::ingest_nar(
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+            &mut r,
+        )
         .await
-        .unwrap();
-
-        match res {
-            Ok((root_node, nar_hash, nar_size)) => {
-                // ensure the ingested narhash and narsize do actually match.
-                if narinfo.nar_size != nar_size {
-                    warn!(
-                        narinfo.nar_size = narinfo.nar_size,
-                        http.nar_size = nar_size,
-                        "NARSize mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarSize mismatch".to_string(),
-                    ))?;
-                }
-                if narinfo.nar_hash != nar_hash {
-                    warn!(
-                        narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
-                        http.nar_hash = %NixHash::Sha256(nar_hash),
-                        "NarHash mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarHash mismatch".to_string(),
-                    ))?;
-                }
-
-                Ok(Some(PathInfo {
-                    node: Some(castorepb::Node {
-                        // set the name of the root node to the digest-name of the store path.
-                        node: Some(
-                            root_node.rename(narinfo.store_path.to_string().to_owned().into()),
-                        ),
-                    }),
-                    references: pathinfo.references,
-                    narinfo: pathinfo.narinfo,
-                }))
-            }
-            Err(e) => Err(e.into()),
+        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+
+        // ensure the ingested narhash and narsize do actually match.
+        if narinfo.nar_size != nar_size {
+            warn!(
+                narinfo.nar_size = narinfo.nar_size,
+                http.nar_size = nar_size,
+                "NarSize mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarSize mismatch".to_string(),
+            ))?;
+        }
+        let nar_hash: [u8; 32] = nar_hash.finalize().into();
+        if narinfo.nar_hash != nar_hash {
+            warn!(
+                narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
+                http.nar_hash = %NixHash::Sha256(nar_hash),
+                "NarHash mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarHash mismatch".to_string(),
+            ))?;
         }
+
+        Ok(Some(PathInfo {
+            node: Some(castorepb::Node {
+                // set the name of the root node to the digest-name of the store path.
+                node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+            }),
+            references: pathinfo.references,
+            narinfo: pathinfo.narinfo,
+        }))
     }
 
     #[instrument(skip_all, fields(path_info=?_path_info))]
@@ -259,16 +256,6 @@ where
         ))
     }
 
-    #[instrument(skip_all, fields(root_node=?root_node))]
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        Err(Error::InvalidRequest(
-            "calculate_nar not supported for this backend".to_string(),
-        ))
-    }
-
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         Box::pin(futures::stream::once(async {
             Err(Error::InvalidRequest(
@@ -277,38 +264,3 @@ where
         }))
     }
 }
-
-/// Small helper reader implementing [std::io::Read].
-/// It can be used to wrap another reader, counts the number of bytes read
-/// and the sha256 digest of the contents.
-struct NarReader<R: Read> {
-    r: R,
-
-    sha256: sha2::Sha256,
-    bytes_read: u64,
-}
-
-impl<R: Read> NarReader<R> {
-    pub fn from(inner: R) -> Self {
-        Self {
-            r: inner,
-            sha256: Sha256::new(),
-            bytes_read: 0,
-        }
-    }
-
-    /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read.
-    pub fn into_inner(self) -> (R, [u8; 32], u64) {
-        (self.r, self.sha256.finalize_fixed().into(), self.bytes_read)
-    }
-}
-
-impl<R: Read> Read for NarReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.r.read(buf).map(|n| {
-            self.bytes_read += n as u64;
-            self.sha256.write_all(&buf[..n]).unwrap();
-            n
-        })
-    }
-}