use pin_project_lite::pin_project; use tokio::io::AsyncRead; pin_project! { /// Wraps an existing AsyncRead, and allows querying for the digest of all /// data read "through" it. /// The hash function is configurable by type parameter. pub struct HashingReader where R: AsyncRead, H: digest::Digest, { #[pin] inner: R, hasher: H, } } pub type B3HashingReader = HashingReader; impl HashingReader where R: AsyncRead, H: digest::Digest, { pub fn from(r: R) -> Self { Self { inner: r, hasher: H::new(), } } /// Return the digest. pub fn digest(self) -> digest::Output { self.hasher.finalize() } } impl tokio::io::AsyncRead for HashingReader where R: AsyncRead, H: digest::Digest, { fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { let buf_filled_len_before = buf.filled().len(); let this = self.project(); let ret = this.inner.poll_read(cx, buf); // write everything new filled into the hasher. this.hasher.update(&buf.filled()[buf_filled_len_before..]); ret } } #[cfg(test)] mod tests { use std::io::Cursor; use test_case::test_case; use crate::fixtures::BLOB_A; use crate::fixtures::BLOB_A_DIGEST; use crate::fixtures::BLOB_B; use crate::fixtures::BLOB_B_DIGEST; use crate::fixtures::EMPTY_BLOB_DIGEST; use crate::{B3Digest, B3HashingReader}; #[test_case(&BLOB_A, &BLOB_A_DIGEST; "blob a")] #[test_case(&BLOB_B, &BLOB_B_DIGEST; "blob b")] #[test_case(&[], &EMPTY_BLOB_DIGEST; "empty blob")] #[tokio::test] async fn test_b3_hashing_reader(data: &[u8], b3_digest: &B3Digest) { let r = Cursor::new(data); let mut hr = B3HashingReader::from(r); tokio::io::copy(&mut hr, &mut tokio::io::sink()) .await .expect("read must succeed"); assert_eq!(*b3_digest, hr.digest().into()); } }