about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/tests.rs
blob: 7480ca80822509925842df94e2333b773aff042a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
use std::io;
use std::pin::pin;

use test_case::test_case;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;

use super::B3Digest;
use super::BlobService;
use super::MemoryBlobService;
use super::SledBlobService;
use crate::fixtures;

// TODO: avoid having to define all different services we test against for all functions.
// maybe something like rstest can be used?

fn gen_memory_blob_service() -> impl BlobService {
    MemoryBlobService::default()
}
fn gen_sled_blob_service() -> impl BlobService {
    SledBlobService::new_temporary().unwrap()
}

// TODO: add GRPC blob service here.

/// Using [BlobService::has] on a non-existing blob should return false
#[test_case(gen_memory_blob_service(); "memory")]
#[test_case(gen_sled_blob_service(); "sled")]
fn has_nonexistent_false(blob_service: impl BlobService) {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        assert!(!blob_service
            .has(&fixtures::BLOB_A_DIGEST)
            .await
            .expect("must not fail"));
    })
}

/// Trying to read a non-existing blob should return a None instead of a reader.
#[test_case(gen_memory_blob_service(); "memory")]
#[test_case(gen_sled_blob_service(); "sled")]
fn not_found_read(blob_service: impl BlobService) {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        assert!(blob_service
            .open_read(&fixtures::BLOB_A_DIGEST)
            .await
            .expect("must not fail")
            .is_none())
    })
}

/// Put a blob in the store, check has, get it back.
/// We test both with small and big blobs.
#[test_case(gen_memory_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "memory-small")]
#[test_case(gen_sled_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "sled-small")]
#[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")]
#[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")]
fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        let mut w = blob_service.open_write().await;

        let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w)
            .await
            .expect("copy must succeed");
        assert_eq!(
            blob_contents.len(),
            l as usize,
            "written bytes must match blob length"
        );

        let digest = w.close().await.expect("close must succeed");

        assert_eq!(*blob_digest, digest, "returned digest must be correct");

        assert!(
            blob_service.has(blob_digest).await.expect("must not fail"),
            "blob service should now have the blob"
        );

        let mut r = blob_service
            .open_read(blob_digest)
            .await
            .expect("open_read must succeed")
            .expect("must be some");

        let mut buf: Vec<u8> = Vec::new();
        let mut pinned_reader = pin!(r);
        let l = tokio::io::copy(&mut pinned_reader, &mut buf)
            .await
            .expect("copy must succeed");
        // let l = io::copy(&mut r, &mut buf).expect("copy must succeed");

        assert_eq!(
            blob_contents.len(),
            l as usize,
            "read bytes must match blob length"
        );

        assert_eq!(blob_contents, buf, "read blob contents must match");
    })
}

/// Put a blob in the store, and seek inside it a bit.
#[test_case(gen_memory_blob_service(); "memory")]
#[test_case(gen_sled_blob_service(); "sled")]
fn put_seek(blob_service: impl BlobService) {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        let mut w = blob_service.open_write().await;

        tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w)
            .await
            .expect("copy must succeed");
        w.close().await.expect("close must succeed");

        // open a blob for reading
        let mut r = blob_service
            .open_read(&fixtures::BLOB_B_DIGEST)
            .await
            .expect("open_read must succeed")
            .expect("must be some");

        let mut pos: u64 = 0;

        // read the first 10 bytes, they must match the data in the fixture.
        {
            let mut buf = [0; 10];
            r.read_exact(&mut buf).await.expect("must succeed");

            assert_eq!(
                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
                buf,
                "expected first 10 bytes to match"
            );

            pos += buf.len() as u64;
        }
        // seek by 0 bytes, using SeekFrom::Start.
        let p = r
            .seek(io::SeekFrom::Start(pos))
            .await
            .expect("must not fail");
        assert_eq!(pos, p);

        // read the next 10 bytes, they must match the data in the fixture.
        {
            let mut buf = [0; 10];
            r.read_exact(&mut buf).await.expect("must succeed");

            assert_eq!(
                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
                buf,
                "expected data to match"
            );

            pos += buf.len() as u64;
        }

        // seek by 5 bytes, using SeekFrom::Start.
        let p = r
            .seek(io::SeekFrom::Start(pos + 5))
            .await
            .expect("must not fail");
        pos += 5;
        assert_eq!(pos, p);

        // read the next 10 bytes, they must match the data in the fixture.
        {
            let mut buf = [0; 10];
            r.read_exact(&mut buf).await.expect("must succeed");

            assert_eq!(
                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
                buf,
                "expected data to match"
            );

            pos += buf.len() as u64;
        }

        // seek by 12345 bytes, using SeekFrom::
        let p = r
            .seek(io::SeekFrom::Current(12345))
            .await
            .expect("must not fail");
        pos += 12345;
        assert_eq!(pos, p);

        // read the next 10 bytes, they must match the data in the fixture.
        {
            let mut buf = [0; 10];
            r.read_exact(&mut buf).await.expect("must succeed");

            assert_eq!(
                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
                buf,
                "expected data to match"
            );

            #[allow(unused_assignments)]
            {
                pos += buf.len() as u64;
            }
        }

        // seeking to the end is okay…
        let p = r
            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
            .await
            .expect("must not fail");
        pos = fixtures::BLOB_B.len() as u64;
        assert_eq!(pos, p);

        {
            // but it returns no more data.
            let mut buf: Vec<u8> = Vec::new();
            r.read_to_end(&mut buf).await.expect("must not fail");
            assert!(buf.is_empty(), "expected no more data to be read");
        }

        // seeking past the end…
        // should either be ok, but then return 0 bytes.
        // this matches the behaviour or a Cursor<Vec<u8>>.
        if let Ok(_pos) = r
            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1))
            .await
        {
            let mut buf: Vec<u8> = Vec::new();
            r.read_to_end(&mut buf).await.expect("must not fail");
            assert!(buf.is_empty(), "expected no more data to be read");
        }
        // or not be okay.

        // TODO: this is only broken for the gRPC version
        // We expect seeking backwards or relative to the end to fail.
        // r.seek(io::SeekFrom::Current(-1))
        //     .expect_err("SeekFrom::Current(-1) expected to fail");

        // r.seek(io::SeekFrom::Start(pos - 1))
        //     .expect_err("SeekFrom::Start(pos-1) expected to fail");

        // r.seek(io::SeekFrom::End(0))
        //     .expect_err("SeekFrom::End(_) expected to fail");
    })
}