Line data Source code
1 : // In-process gRPC tests for `StorageWriteService` streaming RPCs.
2 :
3 : #include <cstdint>
4 : #include <cstdlib>
5 : #include <filesystem>
6 : #include <memory>
7 : #include <random>
8 : #include <string>
9 : #include <system_error>
10 : #include <utility>
11 : #include <vector>
12 :
13 : #include "absl/strings/str_cat.h"
14 : #include "backend/schema/schema.h"
15 : #include "backend/storage/duckdb/duckdb_storage.h"
16 : #include "backend/storage/storage.h"
17 : #include "frontend/handlers/storage_write.h"
18 : #include "grpcpp/create_channel.h"
19 : #include "grpcpp/grpcpp.h"
20 : #include "grpcpp/security/credentials.h"
21 : #include "grpcpp/security/server_credentials.h"
22 : #include "grpcpp/server.h"
23 : #include "grpcpp/server_builder.h"
24 : #include "gtest/gtest.h"
25 : #include "proto/storage_write.grpc.pb.h"
26 : #include "proto/storage_write.pb.h"
27 :
28 : namespace bigquery_emulator {
29 : namespace frontend {
30 : namespace {
31 :
32 : namespace fs = std::filesystem;
33 :
34 7 : fs::path MakeTempDataDir(absl::string_view prefix) {
35 7 : const char* tmpdir_env = std::getenv("TMPDIR");
36 7 : const std::string tmpdir = tmpdir_env != nullptr ? tmpdir_env : "/tmp";
37 7 : std::random_device rd;
38 7 : std::seed_seq seed{rd(), rd()};
39 7 : std::mt19937_64 rng(seed);
40 7 : fs::path out = fs::path(tmpdir) /
41 7 : absl::StrCat("bqemu-", std::string(prefix), "-", rng());
42 7 : std::error_code ec;
43 7 : fs::remove_all(out, ec);
44 7 : return out;
45 7 : }
46 :
47 : class StorageWriteGrpcTest : public ::testing::Test {
48 : protected:
49 7 : void SetUp() override {
50 7 : data_dir_ = MakeTempDataDir("storage-write-grpc-test");
51 7 : auto opened =
52 7 : backend::storage::duckdb::DuckDBStorage::Open(data_dir_.string());
53 14 : ASSERT_TRUE(opened.ok()) << opened.status();
54 7 : storage_ = std::move(opened).value();
55 7 : service_ = std::make_unique<StorageWriteService>(storage_.get());
56 :
57 7 : int bound_port = 0;
58 7 : ::grpc::ServerBuilder builder;
59 7 : builder.AddListeningPort(
60 7 : "127.0.0.1:0", ::grpc::InsecureServerCredentials(), &bound_port);
61 7 : builder.RegisterService(service_.get());
62 7 : server_ = builder.BuildAndStart();
63 7 : ASSERT_NE(server_, nullptr);
64 7 : ASSERT_NE(bound_port, 0);
65 :
66 7 : const std::string target = absl::StrCat("127.0.0.1:", bound_port);
67 7 : channel_ =
68 7 : ::grpc::CreateChannel(target, ::grpc::InsecureChannelCredentials());
69 7 : ASSERT_NE(channel_, nullptr);
70 7 : stub_ = v1::StorageWrite::NewStub(channel_);
71 7 : ASSERT_NE(stub_, nullptr);
72 7 : }
73 :
74 7 : void TearDown() override {
75 7 : if (server_ != nullptr) server_->Shutdown();
76 7 : stub_.reset();
77 7 : channel_.reset();
78 7 : server_.reset();
79 7 : service_.reset();
80 7 : storage_.reset();
81 7 : std::error_code ec;
82 7 : fs::remove_all(data_dir_, ec);
83 7 : }
84 :
85 6 : void CreatePeopleTable() {
86 6 : backend::schema::TableSchema schema;
87 6 : backend::schema::ColumnSchema id;
88 6 : id.name = "id";
89 6 : id.type = backend::schema::ColumnType::kInt64;
90 6 : id.mode = backend::schema::ColumnMode::kRequired;
91 6 : schema.columns.push_back(id);
92 6 : backend::schema::ColumnSchema name;
93 6 : name.name = "name";
94 6 : name.type = backend::schema::ColumnType::kString;
95 6 : name.mode = backend::schema::ColumnMode::kNullable;
96 6 : schema.columns.push_back(name);
97 6 : ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
98 6 : ASSERT_TRUE(storage_->CreateTable({"proj-test", "ds", "t"}, schema).ok());
99 6 : }
100 :
101 : v1::AppendRowsRequest BuildAppendRequest(const std::string& stream_name,
102 : int64_t first_id,
103 7 : int64_t count) {
104 7 : v1::AppendRowsRequest req;
105 7 : req.set_write_stream(stream_name);
106 7 : auto* proto_rows = req.mutable_proto_rows();
107 23 : for (int64_t i = 0; i < count; ++i) {
108 16 : auto* row = proto_rows->add_rows();
109 16 : row->add_cells()->set_string_value(absl::StrCat(first_id + i));
110 16 : row->add_cells()->set_string_value(absl::StrCat("person-", first_id + i));
111 16 : }
112 7 : return req;
113 7 : }
114 :
115 : fs::path data_dir_{};
116 : std::unique_ptr<backend::storage::duckdb::DuckDBStorage> storage_{};
117 : std::unique_ptr<StorageWriteService> service_{};
118 : std::unique_ptr<::grpc::Server> server_{};
119 : std::shared_ptr<::grpc::Channel> channel_{};
120 : std::unique_ptr<v1::StorageWrite::Stub> stub_{};
121 : };
122 :
123 1 : TEST_F(StorageWriteGrpcTest, AppendRowsCommitsToCommittedStream) {
124 1 : CreatePeopleTable();
125 1 : v1::CreateWriteStreamRequest create_req;
126 1 : create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
127 1 : create_req.mutable_write_stream()->set_type(v1::WriteStream::COMMITTED);
128 1 : v1::WriteStream stream;
129 1 : ::grpc::ClientContext create_ctx;
130 1 : ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
131 :
132 1 : ::grpc::ClientContext append_ctx;
133 1 : auto stream_rw = stub_->AppendRows(&append_ctx);
134 1 : ASSERT_NE(stream_rw, nullptr);
135 :
136 1 : v1::AppendRowsRequest req1 = BuildAppendRequest(stream.name(), 0, 2);
137 1 : req1.set_trace_id("trace-1");
138 1 : ASSERT_TRUE(stream_rw->Write(req1));
139 :
140 1 : v1::AppendRowsResponse resp1;
141 1 : ASSERT_TRUE(stream_rw->Read(&resp1));
142 1 : ASSERT_EQ(resp1.response_case(), v1::AppendRowsResponse::kAppendResult);
143 1 : EXPECT_EQ(resp1.append_result().offset(), 0);
144 1 : EXPECT_EQ(resp1.row_count(), 2);
145 1 : EXPECT_EQ(resp1.trace_id(), "trace-1");
146 :
147 1 : v1::AppendRowsRequest req2 = BuildAppendRequest("", 2, 3);
148 1 : ASSERT_TRUE(stream_rw->Write(req2));
149 1 : v1::AppendRowsResponse resp2;
150 1 : ASSERT_TRUE(stream_rw->Read(&resp2));
151 1 : ASSERT_EQ(resp2.response_case(), v1::AppendRowsResponse::kAppendResult);
152 1 : EXPECT_EQ(resp2.append_result().offset(), 2);
153 1 : EXPECT_EQ(resp2.row_count(), 3);
154 :
155 1 : ASSERT_TRUE(stream_rw->WritesDone());
156 1 : ASSERT_TRUE(stream_rw->Finish().ok());
157 :
158 1 : auto iter_or = storage_->ScanRows({"proj-test", "ds", "t"});
159 2 : ASSERT_TRUE(iter_or.ok()) << iter_or.status();
160 1 : std::unique_ptr<backend::storage::RowIterator> iter = std::move(*iter_or);
161 1 : std::vector<backend::storage::Row> rows;
162 1 : backend::storage::Row row;
163 6 : while (true) {
164 6 : auto has = iter->Next(&row);
165 6 : ASSERT_TRUE(has.ok());
166 6 : if (!*has) break;
167 5 : rows.push_back(row);
168 5 : }
169 1 : ASSERT_EQ(rows.size(), 5u);
170 1 : }
171 :
172 1 : TEST_F(StorageWriteGrpcTest, AppendRowsToDefaultStreamMintsLazily) {
173 1 : CreatePeopleTable();
174 1 : ::grpc::ClientContext append_ctx;
175 1 : auto stream_rw = stub_->AppendRows(&append_ctx);
176 1 : ASSERT_NE(stream_rw, nullptr);
177 1 : const std::string default_stream =
178 1 : "projects/proj-test/datasets/ds/tables/t/streams/_default";
179 1 : v1::AppendRowsRequest req = BuildAppendRequest(default_stream, 0, 4);
180 1 : ASSERT_TRUE(stream_rw->Write(req));
181 1 : v1::AppendRowsResponse resp;
182 1 : ASSERT_TRUE(stream_rw->Read(&resp));
183 1 : ASSERT_EQ(resp.response_case(), v1::AppendRowsResponse::kAppendResult);
184 1 : EXPECT_EQ(resp.row_count(), 4);
185 1 : ASSERT_TRUE(stream_rw->WritesDone());
186 1 : ASSERT_TRUE(stream_rw->Finish().ok());
187 1 : EXPECT_EQ(service_->StreamsForTesting(), 1u);
188 1 : }
189 :
190 1 : TEST_F(StorageWriteGrpcTest, AppendRowsShapeMismatchSurfaceOnEnvelope) {
191 1 : CreatePeopleTable();
192 1 : v1::CreateWriteStreamRequest create_req;
193 1 : create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
194 1 : v1::WriteStream stream;
195 1 : ::grpc::ClientContext create_ctx;
196 1 : ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
197 :
198 1 : ::grpc::ClientContext append_ctx;
199 1 : auto stream_rw = stub_->AppendRows(&append_ctx);
200 1 : ASSERT_NE(stream_rw, nullptr);
201 :
202 1 : v1::AppendRowsRequest bad;
203 1 : bad.set_write_stream(stream.name());
204 1 : bad.mutable_proto_rows()->add_rows()->add_cells()->set_string_value("0");
205 1 : ASSERT_TRUE(stream_rw->Write(bad));
206 1 : v1::AppendRowsResponse resp_bad;
207 1 : ASSERT_TRUE(stream_rw->Read(&resp_bad));
208 1 : ASSERT_EQ(resp_bad.response_case(), v1::AppendRowsResponse::kErrorMessage);
209 :
210 1 : v1::AppendRowsRequest good = BuildAppendRequest("", 0, 1);
211 1 : ASSERT_TRUE(stream_rw->Write(good));
212 1 : v1::AppendRowsResponse resp_good;
213 1 : ASSERT_TRUE(stream_rw->Read(&resp_good));
214 1 : EXPECT_EQ(resp_good.response_case(), v1::AppendRowsResponse::kAppendResult);
215 :
216 1 : ASSERT_TRUE(stream_rw->WritesDone());
217 1 : ASSERT_TRUE(stream_rw->Finish().ok());
218 1 : }
219 :
220 1 : TEST_F(StorageWriteGrpcTest, AppendRowsUnknownStreamIsNotFound) {
221 1 : CreatePeopleTable();
222 1 : ::grpc::ClientContext append_ctx;
223 1 : auto stream_rw = stub_->AppendRows(&append_ctx);
224 1 : ASSERT_NE(stream_rw, nullptr);
225 1 : v1::AppendRowsRequest req = BuildAppendRequest(
226 1 : "projects/proj-test/datasets/ds/tables/t/streams/s99", 0, 1);
227 1 : ASSERT_TRUE(stream_rw->Write(req));
228 1 : v1::AppendRowsResponse resp;
229 1 : while (stream_rw->Read(&resp)) {}
230 1 : ::grpc::Status status = stream_rw->Finish();
231 1 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::NOT_FOUND);
232 1 : }
233 :
234 1 : TEST_F(StorageWriteGrpcTest, GetWriteStreamReturnsRecordedMetadata) {
235 1 : CreatePeopleTable();
236 1 : v1::CreateWriteStreamRequest create_req;
237 1 : create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
238 1 : v1::WriteStream stream;
239 1 : ::grpc::ClientContext create_ctx;
240 1 : ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
241 :
242 1 : ::grpc::ClientContext get_ctx;
243 1 : v1::GetWriteStreamRequest get_req;
244 1 : get_req.set_name(stream.name());
245 1 : v1::WriteStream got;
246 1 : ASSERT_TRUE(stub_->GetWriteStream(&get_ctx, get_req, &got).ok());
247 1 : EXPECT_EQ(got.name(), stream.name());
248 1 : EXPECT_EQ(got.type(), v1::WriteStream::COMMITTED);
249 1 : }
250 :
251 1 : TEST_F(StorageWriteGrpcTest, GetWriteStreamUnknownIsNotFound) {
252 1 : ::grpc::ClientContext get_ctx;
253 1 : v1::GetWriteStreamRequest get_req;
254 1 : get_req.set_name("projects/proj-test/datasets/ds/tables/t/streams/ghost");
255 1 : v1::WriteStream got;
256 1 : ::grpc::Status status = stub_->GetWriteStream(&get_ctx, get_req, &got);
257 1 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::NOT_FOUND);
258 1 : }
259 :
260 1 : TEST_F(StorageWriteGrpcTest, BufferedStreamFlushCommitsRows) {
261 1 : CreatePeopleTable();
262 1 : v1::CreateWriteStreamRequest create_req;
263 1 : create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
264 1 : create_req.mutable_write_stream()->set_type(v1::WriteStream::BUFFERED);
265 1 : v1::WriteStream stream;
266 1 : ::grpc::ClientContext create_ctx;
267 1 : ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
268 :
269 1 : ::grpc::ClientContext append_ctx;
270 1 : auto stream_rw = stub_->AppendRows(&append_ctx);
271 1 : ASSERT_NE(stream_rw, nullptr);
272 1 : ASSERT_TRUE(stream_rw->Write(BuildAppendRequest(stream.name(), 0, 2)));
273 1 : v1::AppendRowsResponse resp1;
274 1 : ASSERT_TRUE(stream_rw->Read(&resp1));
275 1 : ASSERT_EQ(resp1.response_case(), v1::AppendRowsResponse::kAppendResult);
276 1 : ASSERT_TRUE(stream_rw->Write(BuildAppendRequest("", 2, 3)));
277 1 : v1::AppendRowsResponse resp2;
278 1 : ASSERT_TRUE(stream_rw->Read(&resp2));
279 1 : ASSERT_EQ(resp2.response_case(), v1::AppendRowsResponse::kAppendResult);
280 1 : ASSERT_TRUE(stream_rw->WritesDone());
281 1 : ASSERT_TRUE(stream_rw->Finish().ok());
282 :
283 1 : auto pre_flush = storage_->ScanRows({"proj-test", "ds", "t"});
284 1 : ASSERT_TRUE(pre_flush.ok());
285 1 : std::unique_ptr<backend::storage::RowIterator> pre_iter =
286 1 : std::move(*pre_flush);
287 1 : backend::storage::Row row;
288 1 : auto has = pre_iter->Next(&row);
289 1 : ASSERT_TRUE(has.ok());
290 1 : EXPECT_FALSE(*has);
291 :
292 1 : v1::FlushRowsRequest flush_req;
293 1 : flush_req.set_write_stream(stream.name());
294 1 : flush_req.set_offset(4);
295 1 : v1::FlushRowsResponse flush_resp;
296 1 : ::grpc::ClientContext flush_ctx;
297 1 : ASSERT_TRUE(stub_->FlushRows(&flush_ctx, flush_req, &flush_resp).ok());
298 :
299 1 : v1::FinalizeWriteStreamRequest finalize_req;
300 1 : finalize_req.set_name(stream.name());
301 1 : v1::FinalizeWriteStreamResponse finalize_resp;
302 1 : ::grpc::ClientContext finalize_ctx;
303 1 : ASSERT_TRUE(
304 1 : stub_->FinalizeWriteStream(&finalize_ctx, finalize_req, &finalize_resp)
305 1 : .ok());
306 1 : EXPECT_EQ(finalize_resp.row_count(), 5);
307 :
308 1 : auto iter_or = storage_->ScanRows({"proj-test", "ds", "t"});
309 1 : ASSERT_TRUE(iter_or.ok());
310 1 : std::unique_ptr<backend::storage::RowIterator> iter = std::move(*iter_or);
311 1 : std::size_t count = 0;
312 6 : while (true) {
313 6 : auto next = iter->Next(&row);
314 6 : ASSERT_TRUE(next.ok());
315 6 : if (!*next) break;
316 5 : ++count;
317 5 : }
318 1 : EXPECT_EQ(count, 5u);
319 1 : }
320 :
321 : } // namespace
322 : } // namespace frontend
323 : } // namespace bigquery_emulator
|