about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/sled.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice/sled.rs')
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs169
1 files changed, 73 insertions, 96 deletions
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 7b6d7fd7ab..eb3cf2ff1b 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -1,140 +1,117 @@
 use super::PathInfoService;
-use crate::nar::calculate_size_and_sha256;
 use crate::proto::PathInfo;
-use futures::stream::iter;
+use async_stream::try_stream;
 use futures::stream::BoxStream;
+use nix_compat::nixbase32;
 use prost::Message;
 use std::path::Path;
 use tonic::async_trait;
+use tracing::instrument;
 use tracing::warn;
-use tvix_castore::proto as castorepb;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use tvix_castore::Error;
 
 /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
 ///
 /// The PathInfo messages are stored as encoded protos, and keyed by their output hash,
 /// as that's currently the only request type available.
-pub struct SledPathInfoService<BS, DS> {
+pub struct SledPathInfoService {
     db: sled::Db,
-
-    blob_service: BS,
-    directory_service: DS,
 }
 
-impl<BS, DS> SledPathInfoService<BS, DS> {
-    pub fn new<P: AsRef<Path>>(
-        p: P,
-        blob_service: BS,
-        directory_service: DS,
-    ) -> Result<Self, sled::Error> {
+impl SledPathInfoService {
+    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
         let config = sled::Config::default()
             .use_compression(false) // is a required parameter
             .path(p);
         let db = config.open()?;
 
-        Ok(Self {
-            db,
-            blob_service,
-            directory_service,
-        })
+        Ok(Self { db })
     }
 
-    pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> {
+    pub fn new_temporary() -> Result<Self, sled::Error> {
         let config = sled::Config::default().temporary(true);
         let db = config.open()?;
 
-        Ok(Self {
-            db,
-            blob_service,
-            directory_service,
-        })
+        Ok(Self { db })
     }
 }
 
 #[async_trait]
-impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS>
-where
-    BS: AsRef<dyn BlobService> + Send + Sync,
-    DS: AsRef<dyn DirectoryService> + Send + Sync,
-{
+impl PathInfoService for SledPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => match PathInfo::decode(&*data) {
-                Ok(path_info) => Ok(Some(path_info)),
-                Err(e) => {
+        let resp = tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            move || db.get(digest.as_slice())
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to retrieve PathInfo: {}", e);
+            Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+        })?;
+        match resp {
+            None => Ok(None),
+            Some(data) => {
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
                     warn!("failed to decode stored PathInfo: {}", e);
-                    Err(Error::StorageError(format!(
-                        "failed to decode stored PathInfo: {}",
-                        e
-                    )))
-                }
-            },
-            Err(e) => {
-                warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+                Ok(Some(path_info))
             }
         }
     }
 
+    #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))]
     async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> {
         // Call validate on the received PathInfo message.
-        match path_info.validate() {
-            Err(e) => Err(Error::InvalidRequest(format!(
-                "failed to validate PathInfo: {}",
-                e
-            ))),
-            // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database.
-            // This overwrites existing PathInfo objects.
-            Ok(nix_path) => match self
-                .db
-                .insert(*nix_path.digest(), path_info.encode_to_vec())
-            {
-                Ok(_) => Ok(path_info),
-                Err(e) => {
-                    warn!("failed to insert PathInfo: {}", e);
-                    Err(Error::StorageError(format! {
-                        "failed to insert PathInfo: {}", e
-                    }))
-                }
-            },
-        }
-    }
+        let store_path = path_info
+            .validate()
+            .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?;
+
+        // In case the PathInfo is valid, we were able to parse a StorePath.
+        // Store it in the database, keyed by its digest.
+        // This overwrites existing PathInfo objects.
+        tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            let k = *store_path.digest();
+            let data = path_info.encode_to_vec();
+            move || db.insert(k, data)
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to insert PathInfo: {}", e);
+            Error::StorageError(format! {
+                "failed to insert PathInfo: {}", e
+            })
+        })?;
 
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service)
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))
+        Ok(path_info)
     }
 
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
-        Box::pin(iter(self.db.iter().values().map(|v| match v {
-            Ok(data) => {
-                // we retrieved some bytes
-                match PathInfo::decode(&*data) {
-                    Ok(path_info) => Ok(path_info),
-                    Err(e) => {
-                        warn!("failed to decode stored PathInfo: {}", e);
-                        Err(Error::StorageError(format!(
-                            "failed to decode stored PathInfo: {}",
-                            e
-                        )))
-                    }
-                }
-            }
-            Err(e) => {
-                warn!("failed to retrieve PathInfo: {}", e);
-                Err(Error::StorageError(format!(
-                    "failed to retrieve PathInfo: {}",
-                    e
-                )))
+        let db = self.db.clone();
+        let mut it = db.iter().values();
+
+        Box::pin(try_stream! {
+            // Don't block the executor while waiting for .next(), so wrap that
+            // in a spawn_blocking call.
+            // We need to pass around it to be able to reuse it.
+            while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || {
+                (it.next(), it)
+            }).await? {
+                it = new_it;
+                let data = elem.map_err(|e| {
+                    warn!("failed to retrieve PathInfo: {}", e);
+                    Error::StorageError(format!("failed to retrieve PathInfo: {}", e))
+                })?;
+
+                let path_info = PathInfo::decode(&*data).map_err(|e| {
+                    warn!("failed to decode stored PathInfo: {}", e);
+                    Error::StorageError(format!("failed to decode stored PathInfo: {}", e))
+                })?;
+
+                yield path_info
             }
-        })))
+        })
     }
 }