From 538d5fc8eef825f97add6a34f708d9a6b87a3ebd Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 16 Apr 2024 17:48:47 +0300 Subject: fix(tvix/castore/blobservice/grpc): don't use NaiveSeeker for now Userland likes to seek backwards, and until we have store composition and can serve chunks from a local cache, we need to buffer the individual chunks in memory. Change-Id: I66978a0722d5f55ed4a9a49d116cecb64a01995d Reviewed-on: https://cl.tvl.fyi/c/depot/+/11448 Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/castore/src/blobservice/grpc.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 632acef158..5663cd3838 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,10 +1,15 @@ -use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader}; +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, }; use futures::sink::SinkExt; -use std::{io, pin::pin, sync::Arc, task::Poll}; +use std::{ + io::{self, Cursor}, + pin::pin, + sync::Arc, + task::Poll, +}; use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -55,11 +60,10 @@ impl BlobService for GRPCBlobService { #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result>> { // First try to get a list of chunks. In case there's only one chunk returned, - // or the backend does not support chunking, return a NaiveSeeker. - // Otherwise use a ChunkedReader. - // TODO: we should check if we want to replace NaiveSeeker with a simple - // Cursor on the raw `Vec`, as seeking backwards is something that - // clients generally do. + // buffer its data into a Vec, otherwise use a ChunkedReader. + // We previously used NaiveSeeker here, but userland likes to seek backwards too often, + // and without store composition this will get very noisy. + // FUTUREWORK: use CombinedBlobService and store composition. match self.chunks(digest).await { Ok(None) => Ok(None), Ok(Some(chunks)) => { @@ -82,9 +86,13 @@ impl BlobService for GRPCBlobService { }); // Use StreamReader::new to convert to an AsyncRead. - let data_reader = tokio_util::io::StreamReader::new(data_stream); + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); + + let mut buf = Vec::new(); + // TODO: only do this up to a certain limit. + tokio::io::copy(&mut data_reader, &mut buf).await?; - Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) + Ok(Some(Box::new(Cursor::new(buf)))) } Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), -- cgit 1.4.1