about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/glue/src/tvix_store_io.rs323
1 files changed, 166 insertions, 157 deletions
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index c7133e1d10..1f709906de 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -131,167 +131,176 @@ impl TvixStoreIO {
                     .borrow()
                     .get_fetch_for_output_path(store_path);
 
-                if let Some((name, fetch)) = maybe_fetch {
-                    info!(?fetch, "triggering lazy fetch");
-                    let (sp, root_node) = self
-                        .fetcher
-                        .ingest_and_persist(&name, fetch)
-                        .await
-                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
-
-                    debug_assert_eq!(
-                        sp.to_string(),
-                        store_path.to_string(),
-                        "store path returned from fetcher should match"
-                    );
-
-                    return Ok(Some(root_node));
-                }
-
-                // Look up the derivation for this output path.
-                let (drv_path, drv) = {
-                    let known_paths = self.known_paths.borrow();
-                    match known_paths.get_drv_path_for_output_path(store_path) {
-                        Some(drv_path) => (
-                            drv_path.to_owned(),
-                            known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
-                        ),
-                        None => {
-                            warn!(store_path=%store_path, "no drv found");
-                            // let StdIO take over
-                            return Ok(None);
-                        }
+                match maybe_fetch {
+                    Some((name, fetch)) => {
+                        info!(?fetch, "triggering lazy fetch");
+                        let (sp, root_node) = self
+                            .fetcher
+                            .ingest_and_persist(&name, fetch)
+                            .await
+                            .map_err(|e| {
+                            std::io::Error::new(std::io::ErrorKind::InvalidData, e)
+                        })?;
+
+                        debug_assert_eq!(
+                            sp.to_string(),
+                            store_path.to_string(),
+                            "store path returned from fetcher should match"
+                        );
+
+                        root_node
                     }
-                };
-
-                warn!("triggering build");
-
-                // derivation_to_build_request needs castore nodes for all inputs.
-                // Provide them, which means, here is where we recursively build
-                // all dependencies.
-                #[allow(clippy::mutable_key_type)]
-                let input_nodes: BTreeSet<Node> =
-                    futures::stream::iter(drv.input_derivations.iter())
-                        .map(|(input_drv_path, output_names)| {
-                            // look up the derivation object
-                            let input_drv = {
-                                let known_paths = self.known_paths.borrow();
-                                known_paths
-                                    .get_drv_by_drvpath(input_drv_path)
-                                    .unwrap_or_else(|| panic!("{} not found", input_drv_path))
-                                    .to_owned()
+                    None => {
+                        // Look up the derivation for this output path.
+                        let (drv_path, drv) = {
+                            let known_paths = self.known_paths.borrow();
+                            match known_paths.get_drv_path_for_output_path(store_path) {
+                                Some(drv_path) => (
+                                    drv_path.to_owned(),
+                                    known_paths.get_drv_by_drvpath(drv_path).unwrap().to_owned(),
+                                ),
+                                None => {
+                                    warn!(store_path=%store_path, "no drv found");
+                                    // let StdIO take over
+                                    return Ok(None);
+                                }
+                            }
+                        };
+
+                        warn!("triggering build");
+
+                        // derivation_to_build_request needs castore nodes for all inputs.
+                        // Provide them, which means, here is where we recursively build
+                        // all dependencies.
+                        #[allow(clippy::mutable_key_type)]
+                        let input_nodes: BTreeSet<Node> =
+                            futures::stream::iter(drv.input_derivations.iter())
+                                .map(|(input_drv_path, output_names)| {
+                                    // look up the derivation object
+                                    let input_drv = {
+                                        let known_paths = self.known_paths.borrow();
+                                        known_paths
+                                            .get_drv_by_drvpath(input_drv_path)
+                                            .unwrap_or_else(|| {
+                                                panic!("{} not found", input_drv_path)
+                                            })
+                                            .to_owned()
+                                    };
+
+                                    // convert output names to actual paths
+                                    let output_paths: Vec<StorePath> = output_names
+                                        .iter()
+                                        .map(|output_name| {
+                                            input_drv
+                                                .outputs
+                                                .get(output_name)
+                                                .expect("missing output_name")
+                                                .path
+                                                .as_ref()
+                                                .expect("missing output path")
+                                                .clone()
+                                        })
+                                        .collect();
+                                    // For each output, ask for the castore node.
+                                    // We're in a per-derivation context, so if they're
+                                    // not built yet they'll all get built together.
+                                    // If they don't need to build, we can however still
+                                    // substitute all in parallel (if they don't need to
+                                    // be built) - so we turn this into a stream of streams.
+                                    // It's up to the builder to deduplicate same build requests.
+                                    futures::stream::iter(output_paths.into_iter()).map(
+                                        |output_path| async move {
+                                            let node = self
+                                                .store_path_to_node(&output_path, Path::new(""))
+                                                .await?;
+
+                                            if let Some(node) = node {
+                                                Ok(node)
+                                            } else {
+                                                Err(io::Error::other("no node produced"))
+                                            }
+                                        },
+                                    )
+                                })
+                                .flatten()
+                                .buffer_unordered(10) // TODO: make configurable
+                                .try_collect()
+                                .await?;
+
+                        // TODO: check if input sources are sufficiently dealth with,
+                        // I think yes, they must be imported into the store by other
+                        // operations, so dealt with in the Some(…) match arm
+
+                        // synthesize the build request.
+                        let build_request = derivation_to_build_request(&drv, input_nodes)?;
+
+                        // create a build
+                        let build_result = self
+                            .build_service
+                            .as_ref()
+                            .do_build(build_request)
+                            .await
+                            .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+
+                        // TODO: refscan?
+
+                        // For each output, insert a PathInfo.
+                        for output in &build_result.outputs {
+                            let root_node = output.node.as_ref().expect("invalid root node");
+
+                            // calculate the nar representation
+                            let (nar_size, nar_sha256) =
+                                self.path_info_service.calculate_nar(root_node).await?;
+
+                            // assemble the PathInfo to persist
+                            let path_info = PathInfo {
+                                node: Some(tvix_castore::proto::Node {
+                                    node: Some(root_node.clone()),
+                                }),
+                                references: vec![], // TODO: refscan
+                                narinfo: Some(tvix_store::proto::NarInfo {
+                                    nar_size,
+                                    nar_sha256: Bytes::from(nar_sha256.to_vec()),
+                                    signatures: vec![],
+                                    reference_names: vec![], // TODO: refscan
+                                    deriver: Some(tvix_store::proto::StorePath {
+                                        name: drv_path
+                                            .name()
+                                            .strip_suffix(".drv")
+                                            .expect("missing .drv suffix")
+                                            .to_string(),
+                                        digest: drv_path.digest().to_vec().into(),
+                                    }),
+                                    ca: drv.fod_digest().map(
+                                        |fod_digest| -> tvix_store::proto::nar_info::Ca {
+                                            (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(
+                                                fod_digest,
+                                            )))
+                                                .into()
+                                        },
+                                    ),
+                                }),
                             };
 
-                            // convert output names to actual paths
-                            let output_paths: Vec<StorePath> = output_names
-                                .iter()
-                                .map(|output_name| {
-                                    input_drv
-                                        .outputs
-                                        .get(output_name)
-                                        .expect("missing output_name")
-                                        .path
-                                        .as_ref()
-                                        .expect("missing output path")
-                                        .clone()
-                                })
-                                .collect();
-                            // For each output, ask for the castore node.
-                            // We're in a per-derivation context, so if they're
-                            // not built yet they'll all get built together.
-                            // If they don't need to build, we can however still
-                            // substitute all in parallel (if they don't need to
-                            // be built) - so we turn this into a stream of streams.
-                            // It's up to the builder to deduplicate same build requests.
-                            futures::stream::iter(output_paths.into_iter()).map(
-                                |output_path| async move {
-                                    let node = self
-                                        .store_path_to_node(&output_path, Path::new(""))
-                                        .await?;
-
-                                    if let Some(node) = node {
-                                        Ok(node)
-                                    } else {
-                                        Err(io::Error::other("no node produced"))
-                                    }
-                                },
-                            )
-                        })
-                        .flatten()
-                        .buffer_unordered(10) // TODO: make configurable
-                        .try_collect()
-                        .await?;
-
-                // TODO: check if input sources are sufficiently dealth with,
-                // I think yes, they must be imported into the store by other
-                // operations, so dealt with in the Some(…) match arm
-
-                // synthesize the build request.
-                let build_request = derivation_to_build_request(&drv, input_nodes)?;
-
-                // create a build
-                let build_result = self
-                    .build_service
-                    .as_ref()
-                    .do_build(build_request)
-                    .await
-                    .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-
-                // TODO: refscan?
-
-                // For each output, insert a PathInfo.
-                for output in &build_result.outputs {
-                    let root_node = output.node.as_ref().expect("invalid root node");
-
-                    // calculate the nar representation
-                    let (nar_size, nar_sha256) =
-                        self.path_info_service.calculate_nar(root_node).await?;
-
-                    // assemble the PathInfo to persist
-                    let path_info = PathInfo {
-                        node: Some(tvix_castore::proto::Node {
-                            node: Some(root_node.clone()),
-                        }),
-                        references: vec![], // TODO: refscan
-                        narinfo: Some(tvix_store::proto::NarInfo {
-                            nar_size,
-                            nar_sha256: Bytes::from(nar_sha256.to_vec()),
-                            signatures: vec![],
-                            reference_names: vec![], // TODO: refscan
-                            deriver: Some(tvix_store::proto::StorePath {
-                                name: drv_path
-                                    .name()
-                                    .strip_suffix(".drv")
-                                    .expect("missing .drv suffix")
-                                    .to_string(),
-                                digest: drv_path.digest().to_vec().into(),
-                            }),
-                            ca: drv.fod_digest().map(
-                                |fod_digest| -> tvix_store::proto::nar_info::Ca {
-                                    (&CAHash::Nar(nix_compat::nixhash::NixHash::Sha256(fod_digest)))
-                                        .into()
-                                },
-                            ),
-                        }),
-                    };
-
-                    self.path_info_service
-                        .put(path_info)
-                        .await
-                        .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
-                }
+                            self.path_info_service
+                                .put(path_info)
+                                .await
+                                .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e))?;
+                        }
 
-                // find the output for the store path requested
-                build_result
-                    .outputs
-                    .into_iter()
-                    .find(|output_node| {
-                        output_node.node.as_ref().expect("invalid node").get_name()
-                            == store_path.to_string().as_bytes()
-                    })
-                    .expect("build didn't produce the store path")
-                    .node
-                    .expect("invalid node")
+                        // find the output for the store path requested
+                        build_result
+                            .outputs
+                            .into_iter()
+                            .find(|output_node| {
+                                output_node.node.as_ref().expect("invalid node").get_name()
+                                    == store_path.to_string().as_bytes()
+                            })
+                            .expect("build didn't produce the store path")
+                            .node
+                            .expect("invalid node")
+                    }
+                }
             }
         };