Line data Source code
1 : // Unit + in-process gRPC tests for `StorageWriteService`.
2 : //
3 : // The shape mirrors `storage_read_test.cc`: an in-memory DuckDB
4 : // storage instance is fabricated under SetUp, the service is
5 : // constructed against it, and each test pokes the handler with a
6 : // hand-built request proto. The bidirectional `AppendRows` RPC needs
7 : // a real `ServerReaderWriter`, so the streaming tests stand up an
8 : // in-process gRPC server (same `grpc::testing` transport
9 : // `storage_read_test.cc` uses for `ReadRows`) and drive the streaming
10 : // reply through a client stub.
11 :
12 : #include "frontend/handlers/storage_write.h"
13 :
14 : #include <cstdint>
15 : #include <cstdlib>
16 : #include <filesystem>
17 : #include <memory>
18 : #include <random>
19 : #include <string>
20 : #include <system_error>
21 : #include <utility>
22 : #include <vector>
23 :
24 : #include "absl/strings/str_cat.h"
25 : #include "backend/schema/schema.h"
26 : #include "backend/storage/duckdb/duckdb_storage.h"
27 : #include "backend/storage/storage.h"
28 : #include "grpcpp/grpcpp.h"
29 : #include "gtest/gtest.h"
30 : #include "proto/emulator.pb.h"
31 : #include "proto/storage_write.grpc.pb.h"
32 : #include "proto/storage_write.pb.h"
33 :
34 : namespace bigquery_emulator {
35 : namespace frontend {
36 : namespace {
37 :
38 : namespace fs = std::filesystem;
39 :
40 7 : fs::path MakeTempDataDir(absl::string_view prefix) {
41 7 : const char* tmpdir_env = std::getenv("TMPDIR");
42 7 : const std::string tmpdir = tmpdir_env != nullptr ? tmpdir_env : "/tmp";
43 7 : std::random_device rd;
44 7 : std::seed_seq seed{rd(), rd()};
45 7 : std::mt19937_64 rng(seed);
46 7 : fs::path out = fs::path(tmpdir) /
47 7 : absl::StrCat("bqemu-", std::string(prefix), "-", rng());
48 7 : std::error_code ec;
49 7 : fs::remove_all(out, ec);
50 7 : return out;
51 7 : }
52 :
53 : class StorageWriteServiceTest : public ::testing::Test {
54 : protected:
55 7 : void SetUp() override {
56 7 : data_dir_ = MakeTempDataDir("storage-write-test");
57 7 : auto opened =
58 7 : backend::storage::duckdb::DuckDBStorage::Open(data_dir_.string());
59 14 : ASSERT_TRUE(opened.ok()) << opened.status();
60 7 : storage_ = std::move(opened).value();
61 7 : service_ = std::make_unique<StorageWriteService>(storage_.get());
62 7 : }
63 :
64 7 : void TearDown() override {
65 7 : service_.reset();
66 7 : storage_.reset();
67 7 : std::error_code ec;
68 7 : fs::remove_all(data_dir_, ec);
69 7 : }
70 :
71 : // Two-column toy schema (id + name). Mirrors the StorageWrite
72 : // happy path; tests append rows against it and round-trip through
73 : // `Storage::ScanRows` to confirm the rows landed.
74 6 : void CreatePeopleTable() {
75 6 : backend::schema::TableSchema schema;
76 6 : backend::schema::ColumnSchema id;
77 6 : id.name = "id";
78 6 : id.type = backend::schema::ColumnType::kInt64;
79 6 : id.mode = backend::schema::ColumnMode::kRequired;
80 6 : schema.columns.push_back(id);
81 6 : backend::schema::ColumnSchema name;
82 6 : name.name = "name";
83 6 : name.type = backend::schema::ColumnType::kString;
84 6 : name.mode = backend::schema::ColumnMode::kNullable;
85 6 : schema.columns.push_back(name);
86 6 : ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
87 6 : ASSERT_TRUE(storage_->CreateTable({"proj-test", "ds", "t"}, schema).ok());
88 6 : }
89 :
90 : fs::path data_dir_{};
91 : std::unique_ptr<backend::storage::duckdb::DuckDBStorage> storage_{};
92 : std::unique_ptr<StorageWriteService> service_{};
93 : };
94 :
95 : // ---------------------------------------------------------------------------
96 : // CreateWriteStream: validation + happy path
97 : // ---------------------------------------------------------------------------
98 :
99 1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMintsCommittedStream) {
100 1 : CreatePeopleTable();
101 1 : v1::CreateWriteStreamRequest req;
102 1 : req.set_parent("projects/proj-test/datasets/ds/tables/t");
103 1 : req.mutable_write_stream()->set_type(v1::WriteStream::COMMITTED);
104 1 : v1::WriteStream resp;
105 1 : ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
106 2 : ASSERT_TRUE(status.ok()) << status.error_message();
107 :
108 : // Stream id nests under the table path with a server-assigned
109 : // suffix; the helper mints `s1` for the first call.
110 1 : EXPECT_EQ(resp.name(), "projects/proj-test/datasets/ds/tables/t/streams/s1");
111 1 : EXPECT_EQ(resp.type(), v1::WriteStream::COMMITTED);
112 1 : ASSERT_EQ(resp.schema().fields_size(), 2);
113 1 : EXPECT_EQ(resp.schema().fields(0).name(), "id");
114 1 : EXPECT_EQ(resp.schema().fields(1).name(), "name");
115 1 : EXPECT_FALSE(resp.create_time().empty());
116 1 : EXPECT_EQ(service_->StreamsForTesting(), 1u);
117 1 : }
118 :
119 1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamDefaultsToCommitted) {
120 : // BigQuery's documented default for an unspecified stream type is
121 : // COMMITTED. The emulator follows that.
122 1 : CreatePeopleTable();
123 1 : v1::CreateWriteStreamRequest req;
124 1 : req.set_parent("projects/proj-test/datasets/ds/tables/t");
125 1 : v1::WriteStream resp;
126 1 : ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
127 2 : ASSERT_TRUE(status.ok()) << status.error_message();
128 1 : EXPECT_EQ(resp.type(), v1::WriteStream::COMMITTED);
129 1 : }
130 :
131 1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMintsPendingStream) {
132 1 : CreatePeopleTable();
133 1 : v1::CreateWriteStreamRequest req;
134 1 : req.set_parent("projects/proj-test/datasets/ds/tables/t");
135 1 : req.mutable_write_stream()->set_type(v1::WriteStream::PENDING);
136 1 : v1::WriteStream resp;
137 1 : ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
138 2 : ASSERT_TRUE(status.ok()) << status.error_message();
139 1 : EXPECT_EQ(resp.type(), v1::WriteStream::PENDING);
140 1 : EXPECT_EQ(service_->StreamsForTesting(), 1u);
141 1 : }
142 :
143 1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMintsBufferedStream) {
144 1 : CreatePeopleTable();
145 1 : v1::CreateWriteStreamRequest req;
146 1 : req.set_parent("projects/proj-test/datasets/ds/tables/t");
147 1 : req.mutable_write_stream()->set_type(v1::WriteStream::BUFFERED);
148 1 : v1::WriteStream resp;
149 1 : ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
150 2 : ASSERT_TRUE(status.ok()) << status.error_message();
151 1 : EXPECT_EQ(resp.type(), v1::WriteStream::BUFFERED);
152 1 : EXPECT_FALSE(resp.name().empty());
153 1 : }
154 :
155 1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamRejectsMalformedParent) {
156 1 : CreatePeopleTable();
157 1 : v1::CreateWriteStreamRequest req;
158 : // Drops the `tables/` segment; the parser refuses anything that
159 : // does not match `projects/{p}/datasets/{d}/tables/{t}`.
160 1 : req.set_parent("projects/proj-test/datasets/ds/t");
161 1 : v1::WriteStream resp;
162 1 : ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
163 1 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT);
164 1 : }
165 :
166 1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMissingTableIsNotFound) {
167 : // Dataset exists but the table does not. Storage::GetSchema returns
168 : // NotFound and the handler maps that onto gRPC NOT_FOUND so a
169 : // BigQuery REST 404 envelope can be synthesized at the gateway.
170 1 : ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
171 1 : v1::CreateWriteStreamRequest req;
172 1 : req.set_parent("projects/proj-test/datasets/ds/tables/missing");
173 1 : v1::WriteStream resp;
174 1 : ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
175 1 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::NOT_FOUND);
176 1 : }
177 :
178 1 : TEST_F(StorageWriteServiceTest, BatchCommitRequiresFinalizedPendingStream) {
179 1 : CreatePeopleTable();
180 1 : v1::CreateWriteStreamRequest create_req;
181 1 : create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
182 1 : create_req.mutable_write_stream()->set_type(v1::WriteStream::PENDING);
183 1 : v1::WriteStream stream;
184 1 : ASSERT_TRUE(service_->CreateWriteStream(nullptr, &create_req, &stream).ok());
185 :
186 1 : v1::BatchCommitWriteStreamsRequest commit_req;
187 1 : commit_req.set_parent("projects/proj-test/datasets/ds/tables/t");
188 1 : commit_req.add_write_streams(stream.name());
189 1 : v1::BatchCommitWriteStreamsResponse commit_resp;
190 1 : ::grpc::Status status =
191 1 : service_->BatchCommitWriteStreams(nullptr, &commit_req, &commit_resp);
192 1 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::FAILED_PRECONDITION);
193 1 : }
194 :
195 : } // namespace
196 : } // namespace frontend
197 : } // namespace bigquery_emulator
|