about summary refs log tree commit diff
path: root/tvix/glue/src/tvix_store_io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/tvix_store_io.rs')
-rw-r--r--tvix/glue/src/tvix_store_io.rs107
1 files changed, 103 insertions, 4 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index c09f0098e4..30ab97c0ca 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -4,7 +4,12 @@ use async_recursion::async_recursion;
 use bytes::Bytes;
 use futures::Stream;
 use futures::{StreamExt, TryStreamExt};
+use nix_compat::nixhash::NixHash;
+use nix_compat::store_path::{build_ca_path, StorePathRef};
 use nix_compat::{nixhash::CAHash, store_path::StorePath};
+use sha2::{Digest, Sha256};
+use std::marker::Unpin;
+use std::rc::Rc;
 use std::{
     cell::RefCell,
     collections::BTreeSet,
@@ -15,17 +20,18 @@ use std::{
 use tokio::io::AsyncReadExt;
 use tracing::{error, instrument, warn, Level};
 use tvix_build::buildservice::BuildService;
-use tvix_eval::{EvalIO, FileType, StdIO};
+use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO};
 use walkdir::DirEntry;
 
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::{self, DirectoryService},
-    proto::{node::Node, NamedNode},
+    proto::{node::Node, FileNode, NamedNode},
     B3Digest,
 };
 use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo};
 
+use crate::builtins::FetcherError;
 use crate::known_paths::KnownPaths;
 use crate::tvix_build::derivation_to_build_request;
 
@@ -51,7 +57,8 @@ pub struct TvixStoreIO {
     std_io: StdIO,
     #[allow(dead_code)]
     build_service: Arc<dyn BuildService>,
-    tokio_handle: tokio::runtime::Handle,
+    pub(crate) tokio_handle: tokio::runtime::Handle,
+    http_client: reqwest::Client,
     pub(crate) known_paths: RefCell<KnownPaths>,
 }
 
@@ -70,6 +77,7 @@ impl TvixStoreIO {
             std_io: StdIO {},
             build_service,
             tokio_handle,
+            http_client: reqwest::Client::new(),
             known_paths: Default::default(),
         }
     }
@@ -278,7 +286,7 @@ impl TvixStoreIO {
     /// with a [`tokio::runtime::Handle::block_on`] call for synchronicity.
     pub(crate) fn ingest_entries_sync<S>(&self, entries_stream: S) -> io::Result<Node>
     where
-        S: Stream<Item = DirEntry> + std::marker::Unpin,
+        S: Stream<Item = DirEntry> + Unpin,
     {
         self.tokio_handle.block_on(async move {
             tvix_castore::import::ingest_entries(
@@ -346,6 +354,97 @@ impl TvixStoreIO {
                 .await
         })
     }
+
+    pub async fn store_path_exists<'a>(&'a self, store_path: StorePathRef<'a>) -> io::Result<bool> {
+        Ok(self
+            .path_info_service
+            .as_ref()
+            .get(*store_path.digest())
+            .await?
+            .is_some())
+    }
+
+    pub async fn fetch_url(
+        &self,
+        url: &str,
+        name: &str,
+        hash: Option<&NixHash>,
+    ) -> Result<StorePath, ErrorKind> {
+        let resp = self
+            .http_client
+            .get(url)
+            .send()
+            .await
+            .map_err(FetcherError::from)?;
+        let mut sha = Sha256::new();
+        let mut data = tokio_util::io::StreamReader::new(
+            resp.bytes_stream()
+                .inspect_ok(|data| {
+                    sha.update(data);
+                })
+                .map_err(|e| {
+                    let e = e.without_url();
+                    warn!(%e, "failed to get response body");
+                    io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
+                }),
+        );
+
+        let mut blob = self.blob_service.open_write().await;
+        let size = tokio::io::copy(&mut data, blob.as_mut()).await?;
+        let blob_digest = blob.close().await?;
+        let got = NixHash::Sha256(sha.finalize().into());
+
+        let hash = CAHash::Flat(if let Some(wanted) = hash {
+            if *wanted != got {
+                return Err(FetcherError::HashMismatch {
+                    url: url.to_owned(),
+                    wanted: wanted.clone(),
+                    got,
+                }
+                .into());
+            }
+            wanted.clone()
+        } else {
+            got
+        });
+
+        let path = build_ca_path(name, &hash, Vec::<String>::new(), false)
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+        let node = Node::File(FileNode {
+            name: path.to_string().into(),
+            digest: blob_digest.into(),
+            size,
+            executable: false,
+        });
+
+        let (nar_size, nar_sha256) = self
+            .path_info_service
+            .calculate_nar(&node)
+            .await
+            .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?;
+
+        let path_info = PathInfo {
+            node: Some(tvix_castore::proto::Node {
+                node: Some(node.clone()),
+            }),
+            references: vec![],
+            narinfo: Some(tvix_store::proto::NarInfo {
+                nar_size,
+                nar_sha256: nar_sha256.to_vec().into(),
+                signatures: vec![],
+                reference_names: vec![],
+                deriver: None, /* ? */
+                ca: Some((&hash).into()),
+            }),
+        };
+
+        self.path_info_service
+            .put(path_info)
+            .await
+            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+
+        Ok(path.to_owned())
+    }
 }
 
 impl EvalIO for TvixStoreIO {