From 091de12a9a735e71c119e543dab9f2999a36a5a1 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Mon, 22 Apr 2024 14:02:48 +0300 Subject: refactor(tvix/glue): move Fetch[er] into its own types, fetch lazily We actually want to delay fetching until we actually need the file. A simple evaluation asking for `.outPath` or `.drvPath` should work even in a pure offline environment. Before this CL, the fetching logic was quite distributed between tvix_store_io, and builtins/fetchers.rs. Rather than having various functions and conversions between structs, describe a Fetch as an enum type, with the fields describing the fetch. Define a store_path() function on top of `Fetch` which can be used to ask for the calculated store path (if the digest has been provided upfront). Have a `Fetcher` struct, and give it a `fetch_and_persist` function, taking a `Fetch` as well as a desired name, and have it deal with all the logic of persisting the PathInfos. It also returns a StorePathRef, similar to the `.store_path()` method on a `Fetch` struct. In a followup CL, we can extend KnownPaths to track fetches AND derivations, and then use `Fetcher` when we need to do IO into that store path. Change-Id: Ib39a96baeb661750a8706b461f8ba4abb342e777 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11500 Reviewed-by: raitobezarius Autosubmit: flokli Tested-by: BuildkiteCI --- tvix/Cargo.lock | 2 + tvix/Cargo.nix | 8 + tvix/glue/Cargo.toml | 2 + tvix/glue/src/builtins/errors.rs | 16 +- tvix/glue/src/builtins/fetchers.rs | 355 +++++++++++----------------------- tvix/glue/src/fetchers.rs | 387 +++++++++++++++++++++++++++++++++++++ tvix/glue/src/lib.rs | 1 + tvix/glue/src/tvix_store_io.rs | 152 ++------------- 8 files changed, 531 insertions(+), 392 deletions(-) create mode 100644 tvix/glue/src/fetchers.rs diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 364fbed662..b2be20a897 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4394,6 +4394,7 @@ dependencies = [ "hex-literal", "lazy_static", "magic", + "md-5", "nix 0.27.1", "nix-compat", "pin-project", @@ -4402,6 +4403,7 @@ dependencies = [ "rstest", "serde", "serde_json", + "sha1", "sha2", "tempfile", "thiserror", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 9efdb7dee9..25e5dccfd7 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -14101,6 +14101,10 @@ rec { name = "magic"; packageId = "magic"; } + { + name = "md-5"; + packageId = "md-5"; + } { name = "nix-compat"; packageId = "nix-compat"; @@ -14123,6 +14127,10 @@ rec { name = "serde_json"; packageId = "serde_json"; } + { + name = "sha1"; + packageId = "sha1"; + } { name = "sha2"; packageId = "sha2"; diff --git a/tvix/glue/Cargo.toml b/tvix/glue/Cargo.toml index 1c08616cfb..f280117674 100644 --- a/tvix/glue/Cargo.toml +++ b/tvix/glue/Cargo.toml @@ -25,6 +25,8 @@ thiserror = "1.0.38" serde = "1.0.195" serde_json = "1.0" sha2 = "0.10.8" +sha1 = "0.10.6" +md-5 = "0.10.6" walkdir = "2.4.0" [dependencies.async-compression] diff --git a/tvix/glue/src/builtins/errors.rs b/tvix/glue/src/builtins/errors.rs index 53351cf902..5aced2bde4 100644 --- a/tvix/glue/src/builtins/errors.rs +++ b/tvix/glue/src/builtins/errors.rs @@ -41,17 +41,17 @@ pub enum FetcherError { #[error("Invalid hash type '{0}' for fetcher")] InvalidHashType(&'static str), - #[error("Error in store path for fetcher output: {0}")] - StorePath(#[from] BuildStorePathError), - #[error(transparent)] Http(#[from] reqwest::Error), -} -impl From for tvix_eval::ErrorKind { - fn from(err: FetcherError) -> Self { - tvix_eval::ErrorKind::TvixError(Rc::new(err)) - } + #[error(transparent)] + Io(#[from] std::io::Error), + + #[error(transparent)] + Import(#[from] tvix_castore::import::Error), + + #[error("Error calculating store path for fetcher output: {0}")] + StorePath(#[from] BuildStorePathError), } /// Errors related to `builtins.path` and `builtins.filterSource`, diff --git a/tvix/glue/src/builtins/fetchers.rs b/tvix/glue/src/builtins/fetchers.rs index d5735b7d09..ec5dd969bc 100644 --- a/tvix/glue/src/builtins/fetchers.rs +++ b/tvix/glue/src/builtins/fetchers.rs @@ -1,197 +1,74 @@ //! Contains builtins that fetch paths from the Internet -use crate::tvix_store_io::TvixStoreIO; -use bstr::ByteSlice; -use nix_compat::nixhash::{self, CAHash}; -use nix_compat::store_path::{build_ca_path, StorePathRef}; +use super::utils::select_string; +use crate::{ + fetchers::{url_basename, Fetch}, + tvix_store_io::TvixStoreIO, +}; +use nix_compat::nixhash; +use nix_compat::nixhash::NixHash; use std::rc::Rc; +use tracing::info; use tvix_eval::builtin_macros::builtins; +use tvix_eval::generators::Gen; use tvix_eval::generators::GenCo; -use tvix_eval::{CatchableErrorKind, ErrorKind, NixContextElement, NixString, Value}; - -use super::utils::select_string; -use super::{DerivationError, FetcherError}; - -/// Attempts to mimic `nix::libutil::baseNameOf` -fn url_basename(s: &str) -> &str { - if s.is_empty() { - return ""; - } - - let mut last = s.len() - 1; - if s.chars().nth(last).unwrap() == '/' && last > 0 { - last -= 1; - } - - if last == 0 { - return ""; - } +use tvix_eval::{CatchableErrorKind, ErrorKind, Value}; - let pos = match s[..=last].rfind('/') { - Some(pos) => { - if pos == last - 1 { - 0 - } else { - pos - } - } - None => 0, - }; - - &s[(pos + 1)..=last] -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum HashMode { - Flat, - Recursive, -} - -/// Struct representing the arguments passed to fetcher functions -#[derive(Debug, PartialEq, Eq)] -struct FetchArgs { +struct NixFetchArgs { url: String, - name: String, - hash: Option, + name: Option, + sha256: Option<[u8; 32]>, } -impl FetchArgs { - pub fn new( - url: String, - name: Option, - sha256: Option, - mode: HashMode, - ) -> nixhash::NixHashResult { - Ok(Self { - name: name.unwrap_or_else(|| url_basename(&url).to_owned()), +// `fetchurl` and `fetchTarball` accept a single argument, which can either be the URL (as string), +// or an attrset, where `url`, `sha256` and `name` keys are allowed. +async fn extract_fetch_args( + co: &GenCo, + args: Value, +) -> Result, ErrorKind> { + if let Ok(url) = args.to_str() { + // Get the raw bytes, not the ToString repr. + let url = String::from_utf8(url.as_bytes().to_vec()).map_err(|_| ErrorKind::Utf8)?; + return Ok(Ok(NixFetchArgs { url, - hash: sha256 - .map(|h| { - let hash = nixhash::from_str(&h, Some("sha256"))?; - Ok(match mode { - HashMode::Flat => Some(nixhash::CAHash::Flat(hash)), - HashMode::Recursive => Some(nixhash::CAHash::Nar(hash)), - }) - }) - .transpose()? - .flatten(), - }) + name: None, + sha256: None, + })); } - fn store_path(&self) -> Result, ErrorKind> { - let Some(h) = &self.hash else { - return Ok(None); - }; - build_ca_path(&self.name, h, Vec::::new(), false) - .map(Some) - .map_err(|e| FetcherError::from(e).into()) - } - - async fn extract( - co: &GenCo, - args: Value, - default_name: Option<&str>, - mode: HashMode, - ) -> Result, ErrorKind> { - if let Ok(url) = args.to_str() { - return Ok(Ok(FetchArgs::new( - url.to_str()?.to_owned(), - None, - None, - mode, - ) - .map_err(DerivationError::InvalidOutputHash)?)); - } - - let attrs = args.to_attrs().map_err(|_| ErrorKind::TypeError { - expected: "attribute set or string", - actual: args.type_of(), - })?; - - let url = match select_string(co, &attrs, "url").await? { - Ok(s) => s.ok_or_else(|| ErrorKind::AttributeNotFound { name: "url".into() })?, - Err(cek) => return Ok(Err(cek)), - }; - let name = match select_string(co, &attrs, "name").await? { - Ok(s) => s.or_else(|| default_name.map(|s| s.to_owned())), - Err(cek) => return Ok(Err(cek)), - }; - let sha256 = match select_string(co, &attrs, "sha256").await? { - Ok(s) => s, - Err(cek) => return Ok(Err(cek)), - }; + let attrs = args.to_attrs().map_err(|_| ErrorKind::TypeError { + expected: "attribute set or contextless string", + actual: args.type_of(), + })?; - Ok(Ok( - FetchArgs::new(url, name, sha256, mode).map_err(DerivationError::InvalidOutputHash)? - )) - } -} + let url = match select_string(co, &attrs, "url").await? { + Ok(s) => s.ok_or_else(|| ErrorKind::AttributeNotFound { name: "url".into() })?, + Err(cek) => return Ok(Err(cek)), + }; + let name = match select_string(co, &attrs, "name").await? { + Ok(s) => s, + Err(cek) => return Ok(Err(cek)), + }; + let sha256_str = match select_string(co, &attrs, "sha256").await? { + Ok(s) => s, + Err(cek) => return Ok(Err(cek)), + }; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum FetchMode { - Url, - Tarball, -} + // TODO: disallow other attrset keys, to match Nix' behaviour. -impl From for HashMode { - fn from(value: FetchMode) -> Self { - match value { - FetchMode::Url => HashMode::Flat, - FetchMode::Tarball => HashMode::Recursive, - } - } -} + // parse the sha256 string into a digest. + let sha256 = match sha256_str { + Some(sha256_str) => { + let nixhash = nixhash::from_str(&sha256_str, Some("sha256")) + // TODO: DerivationError::InvalidOutputHash should be moved to ErrorKind::InvalidHash and used here instead + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; -impl FetchMode { - fn default_name(self) -> Option<&'static str> { - match self { - FetchMode::Url => None, - FetchMode::Tarball => Some("source"), + Some(nixhash.digest_as_bytes().try_into().expect("is sha256")) } - } -} - -fn string_from_store_path(store_path: StorePathRef) -> NixString { - NixString::new_context_from( - NixContextElement::Plain(store_path.to_absolute_path()).into(), - store_path.to_absolute_path(), - ) -} - -async fn fetch( - state: Rc, - co: GenCo, - args: Value, - mode: FetchMode, -) -> Result { - let args = match FetchArgs::extract(&co, args, mode.default_name(), mode.into()).await? { - Ok(args) => args, - Err(cek) => return Ok(cek.into()), + None => None, }; - if let Some(store_path) = args.store_path()? { - if state.store_path_exists(store_path).await? { - return Ok(string_from_store_path(store_path).into()); - } - } - - let ca = args.hash; - let store_path = Rc::clone(&state).tokio_handle.block_on(async move { - match mode { - FetchMode::Url => { - state - .fetch_url( - &args.url, - &args.name, - ca.as_ref().map(|c| c.hash().into_owned()).as_ref(), - ) - .await - } - FetchMode::Tarball => state.fetch_tarball(&args.url, &args.name, ca).await, - } - })?; - - Ok(string_from_store_path(store_path.as_ref()).into()) + Ok(Ok(NixFetchArgs { url, name, sha256 })) } #[allow(unused_variables)] // for the `state` arg, for now @@ -199,7 +76,36 @@ async fn fetch( pub(crate) mod fetcher_builtins { use super::*; - use tvix_eval::generators::Gen; + /// Consumes a fetch. + /// If there is enough info to calculate the store path without fetching, + /// queue the fetch to be fetched lazily, and return the store path. + /// If there's not enough info to calculate it, do the fetch now, and then + /// return the store path. + fn fetch_lazy(state: Rc, name: String, fetch: Fetch) -> Result { + match fetch + .store_path(&name) + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))? + { + Some(store_path) => { + let path = store_path.to_absolute_path().into(); + // TODO: add fetch to fetcher + drop(fetch); + + Ok(Value::Path(Box::new(path))) + } + None => { + // If we don't have enough info, do the fetch now. + info!(?fetch, "triggering required fetch"); + + let (store_path, _root_node) = state + .tokio_handle + .block_on(async { state.fetcher.ingest_and_persist(&name, fetch).await }) + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; + + Ok(Value::Path(Box::new(store_path.to_absolute_path().into()))) + } + } + } #[builtin("fetchurl")] async fn builtin_fetchurl( @@ -207,7 +113,21 @@ pub(crate) mod fetcher_builtins { co: GenCo, args: Value, ) -> Result { - fetch(state, co, args, FetchMode::Url).await + let args = match extract_fetch_args(&co, args).await? { + Ok(args) => args, + Err(cek) => return Ok(Value::from(cek)), + }; + + // Derive the name from the URL basename if not set explicitly. + let name = args + .name + .unwrap_or_else(|| url_basename(&args.url).to_owned()); + + fetch_lazy( + state, + name, + Fetch::URL(args.url, args.sha256.map(NixHash::Sha256)), + ) } #[builtin("fetchTarball")] @@ -216,7 +136,18 @@ pub(crate) mod fetcher_builtins { co: GenCo, args: Value, ) -> Result { - fetch(state, co, args, FetchMode::Tarball).await + let args = match extract_fetch_args(&co, args).await? { + Ok(args) => args, + Err(cek) => return Ok(Value::from(cek)), + }; + + // Name defaults to "source" if not set explicitly. + const DEFAULT_NAME_FETCH_TARBALL: &str = "source"; + let name = args + .name + .unwrap_or_else(|| DEFAULT_NAME_FETCH_TARBALL.to_owned()); + + fetch_lazy(state, name, Fetch::Tarball(args.url, args.sha256)) } #[builtin("fetchGit")] @@ -228,71 +159,3 @@ pub(crate) mod fetcher_builtins { Err(ErrorKind::NotImplemented("fetchGit")) } } - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use nix_compat::store_path::StorePath; - - use super::*; - - #[test] - fn fetchurl_store_path() { - let url = "https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch"; - let sha256 = "0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax"; - let args = FetchArgs::new(url.into(), None, Some(sha256.into()), HashMode::Flat).unwrap(); - - assert_eq!( - args.store_path().unwrap().unwrap().to_owned(), - StorePath::from_str("06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch").unwrap() - ) - } - - #[test] - fn fetch_tarball_store_path() { - let url = "https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz"; - let sha256 = "1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm"; - let args = FetchArgs::new( - url.into(), - Some("source".into()), - Some(sha256.into()), - HashMode::Recursive, - ) - .unwrap(); - - assert_eq!( - args.store_path().unwrap().unwrap().to_owned(), - StorePath::from_str("7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source").unwrap() - ) - } - - mod url_basename { - use super::*; - - #[test] - fn empty_path() { - assert_eq!(url_basename(""), ""); - } - - #[test] - fn path_on_root() { - assert_eq!(url_basename("/dir"), "dir"); - } - - #[test] - fn relative_path() { - assert_eq!(url_basename("dir/foo"), "foo"); - } - - #[test] - fn root_with_trailing_slash() { - assert_eq!(url_basename("/"), ""); - } - - #[test] - fn trailing_slash() { - assert_eq!(url_basename("/dir/"), "dir"); - } - } -} diff --git a/tvix/glue/src/fetchers.rs b/tvix/glue/src/fetchers.rs new file mode 100644 index 0000000000..977be4a203 --- /dev/null +++ b/tvix/glue/src/fetchers.rs @@ -0,0 +1,387 @@ +use futures::TryStreamExt; +use md5::Md5; +use nix_compat::{ + nixhash::{CAHash, HashAlgo, NixHash}, + store_path::{build_ca_path, BuildStorePathError, StorePathRef}, +}; +use sha1::Sha1; +use sha2::{digest::Output, Digest, Sha256, Sha512}; +use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use tokio_util::io::InspectReader; +use tracing::warn; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{node::Node, FileNode}, +}; +use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; + +use crate::{builtins::FetcherError, decompression::DecompressedReader}; + +/// Representing options for doing a fetch. +#[derive(Clone, Debug)] +pub enum Fetch { + /// Fetch a literal file from the given URL, with an optional expected + /// NixHash of it. + /// TODO: check if this is *always* sha256, and if so, make it [u8; 32]. + URL(String, Option), + + /// Fetch a tarball from the given URL and unpack. + /// The file must be a tape archive (.tar) compressed with gzip, bzip2 or xz. + /// The top-level path component of the files in the tarball is removed, + /// so it is best if the tarball contains a single directory at top level. + /// Optionally, a sha256 digest can be provided to verify the unpacked + /// contents against. + Tarball(String, Option<[u8; 32]>), + + /// TODO + Git(), +} + +impl Fetch { + /// If the [Fetch] contains an expected hash upfront, returns the resulting + /// store path. + /// This doesn't do any fetching. + pub fn store_path<'a>( + &self, + name: &'a str, + ) -> Result>, BuildStorePathError> { + let ca_hash = match self { + Fetch::URL(_, Some(nixhash)) => CAHash::Flat(nixhash.clone()), + Fetch::Tarball(_, Some(nar_sha256)) => CAHash::Nar(NixHash::Sha256(*nar_sha256)), + _ => return Ok(None), + }; + + // calculate the store path of this fetch + build_ca_path(name, &ca_hash, Vec::::new(), false).map(Some) + } +} + +/// Knows how to fetch a given [Fetch]. +pub struct Fetcher { + http_client: reqwest::Client, + blob_service: BS, + directory_service: DS, + path_info_service: PS, +} + +impl Fetcher { + pub fn new(blob_service: BS, directory_service: DS, path_info_service: PS) -> Self { + Self { + http_client: reqwest::Client::new(), + blob_service, + directory_service, + path_info_service, + } + } + + /// Constructs a HTTP request to the passed URL, and returns a AsyncReadBuf to it. + async fn download<'a>( + &self, + url: &str, + ) -> Result { + let resp = self.http_client.get(url).send().await?; + Ok(tokio_util::io::StreamReader::new( + resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(%e, "failed to get response body"); + std::io::Error::new(std::io::ErrorKind::BrokenPipe, e) + }), + )) + } +} + +/// Copies all data from the passed reader to the passed writer. +/// Afterwards, it also returns the resulting [Digest], as well as the number of +/// bytes copied. +/// The exact hash function used is left generic over all [Digest]. +async fn hash( + mut r: impl AsyncRead + Unpin, + mut w: impl AsyncWrite + Unpin, +) -> std::io::Result<(Output, u64)> { + let mut hasher = D::new(); + let bytes_copied = tokio::io::copy( + &mut InspectReader::new(&mut r, |d| hasher.write_all(d).unwrap()), + &mut w, + ) + .await?; + Ok((hasher.finalize(), bytes_copied)) +} + +impl Fetcher +where + BS: AsRef<(dyn BlobService + 'static)> + Send + Sync, + DS: AsRef<(dyn DirectoryService + 'static)>, + PS: PathInfoService, +{ + /// Ingest the data from a specified [Fetch]. + /// On success, return the root node, a content digest and length. + /// Returns an error if there was a failure during fetching, or the contents + /// didn't match the previously communicated hash contained inside the FetchArgs. + pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> { + match fetch { + Fetch::URL(url, exp_nixhash) => { + // Construct a AsyncRead reading from the data as its downloaded. + let mut r = self.download(&url).await?; + + // Construct a AsyncWrite to write into the BlobService. + let mut blob_writer = self.blob_service.open_write().await; + + // Copy the contents from the download reader to the blob writer. + // Calculate the digest of the file received, depending on the + // communicated expected hash (or sha256 if none provided). + let (actual_nixhash, blob_size) = match exp_nixhash + .as_ref() + .map(NixHash::algo) + .unwrap_or_else(|| HashAlgo::Sha256) + { + HashAlgo::Sha256 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| (NixHash::Sha256(digest.into()), bytes_written), + )?, + HashAlgo::Md5 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| (NixHash::Md5(digest.into()), bytes_written), + )?, + HashAlgo::Sha1 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| (NixHash::Sha1(digest.into()), bytes_written), + )?, + HashAlgo::Sha512 => hash::(&mut r, &mut blob_writer).await.map( + |(digest, bytes_written)| { + (NixHash::Sha512(Box::new(digest.into())), bytes_written) + }, + )?, + }; + + if let Some(exp_nixhash) = exp_nixhash { + if exp_nixhash != actual_nixhash { + return Err(FetcherError::HashMismatch { + url, + wanted: exp_nixhash, + got: actual_nixhash, + }); + } + } + + // Construct and return the FileNode describing the downloaded contents. + Ok(( + Node::File(FileNode { + name: vec![].into(), + digest: blob_writer.close().await?.into(), + size: blob_size, + executable: false, + }), + CAHash::Flat(actual_nixhash), + blob_size, + )) + } + Fetch::Tarball(url, exp_nar_sha256) => { + // Construct a AsyncRead reading from the data as its downloaded. + let r = self.download(&url).await?; + + // Pop compression. + let r = DecompressedReader::new(r); + // Open the archive. + let archive = tokio_tar::Archive::new(r); + + // Ingest the archive, get the root node + let node = tvix_castore::import::archive::ingest_archive( + &self.blob_service, + &self.directory_service, + archive, + ) + .await?; + + // If an expected NAR sha256 was provided, compare with the one + // calculated from our root node. + // Even if no expected NAR sha256 has been provided, we need + // the actual one later. + let (nar_size, actual_nar_sha256) = self + .path_info_service + .calculate_nar(&node) + .await + .map_err(|e| { + // convert the generic Store error to an IO error. + FetcherError::Io(e.into()) + })?; + + if let Some(exp_nar_sha256) = exp_nar_sha256 { + if exp_nar_sha256 != actual_nar_sha256 { + return Err(FetcherError::HashMismatch { + url, + wanted: NixHash::Sha256(exp_nar_sha256), + got: NixHash::Sha256(actual_nar_sha256), + }); + } + } + + Ok(( + node, + CAHash::Nar(NixHash::Sha256(actual_nar_sha256)), + nar_size, + )) + } + Fetch::Git() => todo!(), + } + } + + /// Ingests the data from a specified [Fetch], persists the returned node + /// in the PathInfoService, and returns the calculated StorePath, as well as + /// the root node pointing to the contents. + /// The root node can be used to descend into the data without doing the + /// lookup to the PathInfoService again. + pub async fn ingest_and_persist<'a>( + &self, + name: &'a str, + fetch: Fetch, + ) -> Result<(StorePathRef<'a>, Node), FetcherError> { + // Fetch file, return the (unnamed) (File)Node of its contents, ca hash and filesize. + let (mut node, ca_hash, size) = self.ingest(fetch).await?; + + // Calculate the store path to return later, which is done with the ca_hash. + let store_path = build_ca_path(name, &ca_hash, Vec::::new(), false)?; + + // Rename the node name to match the Store Path. + if let Node::File(file_node) = &mut node { + file_node.name = store_path.to_string().into(); + } else { + unreachable!("Tvix bug: do_fetch for URL returned non-FileNode"); + } + + // If the resulting hash is not a CAHash::Nar, we also need to invoke + // `calculate_nar` to calculate this representation, as it's required in + // the [PathInfo]. + let (nar_size, nar_sha256) = match &ca_hash { + CAHash::Flat(_nix_hash) => self + .path_info_service + .calculate_nar(&node) + .await + .map_err(|e| FetcherError::Io(e.into()))?, + CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256), + CAHash::Nar(_) => unreachable!("Tvix bug: fetch returned non-sha256 CAHash::Nar"), + CAHash::Text(_) => unreachable!("Tvix bug: fetch returned CAHash::Text"), + }; + + // Construct the PathInfo and persist it. + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { node: Some(node) }), + references: vec![], + narinfo: Some(tvix_store::proto::NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: vec![], + deriver: None, + ca: Some(ca_hash.into()), + }), + }; + + let path_info = self + .path_info_service + .put(path_info) + .await + .map_err(|e| FetcherError::Io(e.into()))?; + + Ok((store_path, path_info.node.unwrap().node.unwrap())) + } +} + +/// Attempts to mimic `nix::libutil::baseNameOf` +pub(crate) fn url_basename(s: &str) -> &str { + if s.is_empty() { + return ""; + } + + let mut last = s.len() - 1; + if s.chars().nth(last).unwrap() == '/' && last > 0 { + last -= 1; + } + + if last == 0 { + return ""; + } + + let pos = match s[..=last].rfind('/') { + Some(pos) => { + if pos == last - 1 { + 0 + } else { + pos + } + } + None => 0, + }; + + &s[(pos + 1)..=last] +} + +#[cfg(test)] +mod tests { + mod fetch { + use nix_compat::nixbase32; + + use crate::fetchers::Fetch; + + use super::super::*; + + #[test] + fn fetchurl_store_path() { + let url = "https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch"; + let exp_nixhash = NixHash::Sha256( + nixbase32::decode_fixed("0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax") + .unwrap(), + ); + + let fetch = Fetch::URL(url.into(), Some(exp_nixhash)); + assert_eq!( + "06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch", + &fetch + .store_path("notmuch-extract-patch") + .unwrap() + .unwrap() + .to_string(), + ) + } + + #[test] + fn fetch_tarball_store_path() { + let url = "https://github.com/NixOS/nixpkgs/archive/91050ea1e57e50388fa87a3302ba12d188ef723a.tar.gz"; + let exp_nixbase32 = + nixbase32::decode_fixed("1hf6cgaci1n186kkkjq106ryf8mmlq9vnwgfwh625wa8hfgdn4dm") + .unwrap(); + let fetch = Fetch::Tarball(url.into(), Some(exp_nixbase32)); + + assert_eq!( + "7adgvk5zdfq4pwrhsm3n9lzypb12gw0g-source", + &fetch.store_path("source").unwrap().unwrap().to_string(), + ) + } + } + + mod url_basename { + use super::super::*; + + #[test] + fn empty_path() { + assert_eq!(url_basename(""), ""); + } + + #[test] + fn path_on_root() { + assert_eq!(url_basename("/dir"), "dir"); + } + + #[test] + fn relative_path() { + assert_eq!(url_basename("dir/foo"), "foo"); + } + + #[test] + fn root_with_trailing_slash() { + assert_eq!(url_basename("/"), ""); + } + + #[test] + fn trailing_slash() { + assert_eq!(url_basename("/dir/"), "dir"); + } + } +} diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs index f04d5ec3a0..8528f09e52 100644 --- a/tvix/glue/src/lib.rs +++ b/tvix/glue/src/lib.rs @@ -1,4 +1,5 @@ pub mod builtins; +pub mod fetchers; pub mod known_paths; pub mod refscan; pub mod tvix_build; diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 3c86f42630..0994c44dfa 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -4,10 +4,9 @@ use async_recursion::async_recursion; use bytes::Bytes; use futures::{StreamExt, TryStreamExt}; use nix_compat::nixhash::NixHash; -use nix_compat::store_path::{build_ca_path, StorePathRef}; +use nix_compat::store_path::StorePathRef; use nix_compat::{nixhash::CAHash, store_path::StorePath}; use sha2::{Digest, Sha256}; -use std::rc::Rc; use std::{ cell::RefCell, collections::BTreeSet, @@ -15,24 +14,22 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::io::AsyncBufRead; -use tokio_util::io::{InspectReader, SyncIoBridge}; +use tokio_util::io::SyncIoBridge; use tracing::{error, instrument, warn, Level}; use tvix_build::buildservice::BuildService; -use tvix_castore::import::archive::ingest_archive; -use tvix_eval::{ErrorKind, EvalIO, FileType, StdIO}; +use tvix_castore::proto::node::Node; +use tvix_eval::{EvalIO, FileType, StdIO}; use tvix_store::utils::AsyncIoBridge; use tvix_castore::{ blobservice::BlobService, directoryservice::{self, DirectoryService}, - proto::{node::Node, FileNode, NamedNode}, + proto::NamedNode, B3Digest, }; use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; -use crate::builtins::FetcherError; -use crate::decompression::DecompressedReader; +use crate::fetchers::Fetcher; use crate::known_paths::KnownPaths; use crate::tvix_build::derivation_to_build_request; @@ -60,7 +57,10 @@ pub struct TvixStoreIO { #[allow(dead_code)] build_service: Arc, pub(crate) tokio_handle: tokio::runtime::Handle, - http_client: reqwest::Client, + + pub(crate) fetcher: + Fetcher, Arc, Arc>, + pub(crate) known_paths: RefCell, } @@ -73,13 +73,13 @@ impl TvixStoreIO { tokio_handle: tokio::runtime::Handle, ) -> Self { Self { - blob_service, - directory_service, - path_info_service, + blob_service: blob_service.clone(), + directory_service: directory_service.clone(), + path_info_service: path_info_service.clone(), std_io: StdIO {}, build_service, tokio_handle, - http_client: reqwest::Client::new(), + fetcher: Fetcher::new(blob_service, directory_service, path_info_service), known_paths: Default::default(), } } @@ -358,130 +358,6 @@ impl TvixStoreIO { .await? .is_some()) } - - async fn download<'a>(&self, url: &str) -> Result { - let resp = self - .http_client - .get(url) - .send() - .await - .map_err(FetcherError::from)?; - Ok(tokio_util::io::StreamReader::new( - resp.bytes_stream().map_err(|e| { - let e = e.without_url(); - warn!(%e, "failed to get response body"); - io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) - }), - )) - } - - pub async fn fetch_url( - &self, - url: &str, - name: &str, - hash: Option<&NixHash>, - ) -> Result { - let mut sha = Sha256::new(); - let data = self.download(url).await?; - let mut data = InspectReader::new(data, |b| sha.update(b)); - let mut blob = self.blob_service.open_write().await; - let size = tokio::io::copy(&mut data, blob.as_mut()).await?; - drop(data); - let blob_digest = blob.close().await?; - let got = NixHash::Sha256(sha.finalize().into()); - - let hash = CAHash::Flat(if let Some(wanted) = hash { - if *wanted != got { - return Err(FetcherError::HashMismatch { - url: url.to_owned(), - wanted: wanted.clone(), - got, - } - .into()); - } - wanted.clone() - } else { - got - }); - - let path = build_ca_path(name, &hash, Vec::::new(), false) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let node = Node::File(FileNode { - name: path.to_string().into(), - digest: blob_digest.into(), - size, - executable: false, - }); - - let (nar_size, nar_sha256) = self - .path_info_service - .calculate_nar(&node) - .await - .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?; - - let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(node.clone()), - }), - references: vec![], - narinfo: Some(tvix_store::proto::NarInfo { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: vec![], - deriver: None, /* ? */ - ca: Some((&hash).into()), - }), - }; - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - Ok(path.to_owned()) - } - - pub async fn fetch_tarball( - &self, - url: &str, - name: &str, - ca: Option, - ) -> Result { - let data = self.download(url).await?; - let data = DecompressedReader::new(data); - let archive = tokio_tar::Archive::new(data); - let node = ingest_archive(&self.blob_service, &self.directory_service, archive) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - let (path_info, got, output_path) = self - .node_to_path_info( - name, - Path::new(""), - ca.clone().expect("TODO: support unspecified CA hash"), - node, - ) - .await?; - - if let Some(wanted) = &ca { - if *wanted.hash() != got { - return Err(FetcherError::HashMismatch { - url: url.to_owned(), - wanted: wanted.hash().into_owned(), - got, - } - .into()); - } - } - - self.path_info_service - .put(path_info) - .await - .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?; - - Ok(output_path) - } } impl EvalIO for TvixStoreIO { -- cgit 1.4.1