LCOV - code coverage report
Current view: top level - frontend/handlers - storage_write_grpc_test.cc (source / functions) Coverage Total Hit
Test: _coverage_report.dat Lines: 100.0 % 245 245
Test Date: 2026-07-02 21:01:18 Functions: 100.0 % 12 12

            Line data    Source code
       1              : // In-process gRPC tests for `StorageWriteService` streaming RPCs.
       2              : 
       3              : #include <cstdint>
       4              : #include <cstdlib>
       5              : #include <filesystem>
       6              : #include <memory>
       7              : #include <random>
       8              : #include <string>
       9              : #include <system_error>
      10              : #include <utility>
      11              : #include <vector>
      12              : 
      13              : #include "absl/strings/str_cat.h"
      14              : #include "backend/schema/schema.h"
      15              : #include "backend/storage/duckdb/duckdb_storage.h"
      16              : #include "backend/storage/storage.h"
      17              : #include "frontend/handlers/storage_write.h"
      18              : #include "grpcpp/create_channel.h"
      19              : #include "grpcpp/grpcpp.h"
      20              : #include "grpcpp/security/credentials.h"
      21              : #include "grpcpp/security/server_credentials.h"
      22              : #include "grpcpp/server.h"
      23              : #include "grpcpp/server_builder.h"
      24              : #include "gtest/gtest.h"
      25              : #include "proto/storage_write.grpc.pb.h"
      26              : #include "proto/storage_write.pb.h"
      27              : 
      28              : namespace bigquery_emulator {
      29              : namespace frontend {
      30              : namespace {
      31              : 
      32              : namespace fs = std::filesystem;
      33              : 
      34            7 : fs::path MakeTempDataDir(absl::string_view prefix) {
      35            7 :   const char* tmpdir_env = std::getenv("TMPDIR");
      36            7 :   const std::string tmpdir = tmpdir_env != nullptr ? tmpdir_env : "/tmp";
      37            7 :   std::random_device rd;
      38            7 :   std::seed_seq seed{rd(), rd()};
      39            7 :   std::mt19937_64 rng(seed);
      40            7 :   fs::path out = fs::path(tmpdir) /
      41            7 :                  absl::StrCat("bqemu-", std::string(prefix), "-", rng());
      42            7 :   std::error_code ec;
      43            7 :   fs::remove_all(out, ec);
      44            7 :   return out;
      45            7 : }
      46              : 
      47              : class StorageWriteGrpcTest : public ::testing::Test {
      48              :  protected:
      49            7 :   void SetUp() override {
      50            7 :     data_dir_ = MakeTempDataDir("storage-write-grpc-test");
      51            7 :     auto opened =
      52            7 :         backend::storage::duckdb::DuckDBStorage::Open(data_dir_.string());
      53           14 :     ASSERT_TRUE(opened.ok()) << opened.status();
      54            7 :     storage_ = std::move(opened).value();
      55            7 :     service_ = std::make_unique<StorageWriteService>(storage_.get());
      56              : 
      57            7 :     int bound_port = 0;
      58            7 :     ::grpc::ServerBuilder builder;
      59            7 :     builder.AddListeningPort(
      60            7 :         "127.0.0.1:0", ::grpc::InsecureServerCredentials(), &bound_port);
      61            7 :     builder.RegisterService(service_.get());
      62            7 :     server_ = builder.BuildAndStart();
      63            7 :     ASSERT_NE(server_, nullptr);
      64            7 :     ASSERT_NE(bound_port, 0);
      65              : 
      66            7 :     const std::string target = absl::StrCat("127.0.0.1:", bound_port);
      67            7 :     channel_ =
      68            7 :         ::grpc::CreateChannel(target, ::grpc::InsecureChannelCredentials());
      69            7 :     ASSERT_NE(channel_, nullptr);
      70            7 :     stub_ = v1::StorageWrite::NewStub(channel_);
      71            7 :     ASSERT_NE(stub_, nullptr);
      72            7 :   }
      73              : 
      74            7 :   void TearDown() override {
      75            7 :     if (server_ != nullptr) server_->Shutdown();
      76            7 :     stub_.reset();
      77            7 :     channel_.reset();
      78            7 :     server_.reset();
      79            7 :     service_.reset();
      80            7 :     storage_.reset();
      81            7 :     std::error_code ec;
      82            7 :     fs::remove_all(data_dir_, ec);
      83            7 :   }
      84              : 
      85            6 :   void CreatePeopleTable() {
      86            6 :     backend::schema::TableSchema schema;
      87            6 :     backend::schema::ColumnSchema id;
      88            6 :     id.name = "id";
      89            6 :     id.type = backend::schema::ColumnType::kInt64;
      90            6 :     id.mode = backend::schema::ColumnMode::kRequired;
      91            6 :     schema.columns.push_back(id);
      92            6 :     backend::schema::ColumnSchema name;
      93            6 :     name.name = "name";
      94            6 :     name.type = backend::schema::ColumnType::kString;
      95            6 :     name.mode = backend::schema::ColumnMode::kNullable;
      96            6 :     schema.columns.push_back(name);
      97            6 :     ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
      98            6 :     ASSERT_TRUE(storage_->CreateTable({"proj-test", "ds", "t"}, schema).ok());
      99            6 :   }
     100              : 
     101              :   v1::AppendRowsRequest BuildAppendRequest(const std::string& stream_name,
     102              :                                            int64_t first_id,
     103            7 :                                            int64_t count) {
     104            7 :     v1::AppendRowsRequest req;
     105            7 :     req.set_write_stream(stream_name);
     106            7 :     auto* proto_rows = req.mutable_proto_rows();
     107           23 :     for (int64_t i = 0; i < count; ++i) {
     108           16 :       auto* row = proto_rows->add_rows();
     109           16 :       row->add_cells()->set_string_value(absl::StrCat(first_id + i));
     110           16 :       row->add_cells()->set_string_value(absl::StrCat("person-", first_id + i));
     111           16 :     }
     112            7 :     return req;
     113            7 :   }
     114              : 
     115              :   fs::path data_dir_{};
     116              :   std::unique_ptr<backend::storage::duckdb::DuckDBStorage> storage_{};
     117              :   std::unique_ptr<StorageWriteService> service_{};
     118              :   std::unique_ptr<::grpc::Server> server_{};
     119              :   std::shared_ptr<::grpc::Channel> channel_{};
     120              :   std::unique_ptr<v1::StorageWrite::Stub> stub_{};
     121              : };
     122              : 
     123            1 : TEST_F(StorageWriteGrpcTest, AppendRowsCommitsToCommittedStream) {
     124            1 :   CreatePeopleTable();
     125            1 :   v1::CreateWriteStreamRequest create_req;
     126            1 :   create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
     127            1 :   create_req.mutable_write_stream()->set_type(v1::WriteStream::COMMITTED);
     128            1 :   v1::WriteStream stream;
     129            1 :   ::grpc::ClientContext create_ctx;
     130            1 :   ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
     131              : 
     132            1 :   ::grpc::ClientContext append_ctx;
     133            1 :   auto stream_rw = stub_->AppendRows(&append_ctx);
     134            1 :   ASSERT_NE(stream_rw, nullptr);
     135              : 
     136            1 :   v1::AppendRowsRequest req1 = BuildAppendRequest(stream.name(), 0, 2);
     137            1 :   req1.set_trace_id("trace-1");
     138            1 :   ASSERT_TRUE(stream_rw->Write(req1));
     139              : 
     140            1 :   v1::AppendRowsResponse resp1;
     141            1 :   ASSERT_TRUE(stream_rw->Read(&resp1));
     142            1 :   ASSERT_EQ(resp1.response_case(), v1::AppendRowsResponse::kAppendResult);
     143            1 :   EXPECT_EQ(resp1.append_result().offset(), 0);
     144            1 :   EXPECT_EQ(resp1.row_count(), 2);
     145            1 :   EXPECT_EQ(resp1.trace_id(), "trace-1");
     146              : 
     147            1 :   v1::AppendRowsRequest req2 = BuildAppendRequest("", 2, 3);
     148            1 :   ASSERT_TRUE(stream_rw->Write(req2));
     149            1 :   v1::AppendRowsResponse resp2;
     150            1 :   ASSERT_TRUE(stream_rw->Read(&resp2));
     151            1 :   ASSERT_EQ(resp2.response_case(), v1::AppendRowsResponse::kAppendResult);
     152            1 :   EXPECT_EQ(resp2.append_result().offset(), 2);
     153            1 :   EXPECT_EQ(resp2.row_count(), 3);
     154              : 
     155            1 :   ASSERT_TRUE(stream_rw->WritesDone());
     156            1 :   ASSERT_TRUE(stream_rw->Finish().ok());
     157              : 
     158            1 :   auto iter_or = storage_->ScanRows({"proj-test", "ds", "t"});
     159            2 :   ASSERT_TRUE(iter_or.ok()) << iter_or.status();
     160            1 :   std::unique_ptr<backend::storage::RowIterator> iter = std::move(*iter_or);
     161            1 :   std::vector<backend::storage::Row> rows;
     162            1 :   backend::storage::Row row;
     163            6 :   while (true) {
     164            6 :     auto has = iter->Next(&row);
     165            6 :     ASSERT_TRUE(has.ok());
     166            6 :     if (!*has) break;
     167            5 :     rows.push_back(row);
     168            5 :   }
     169            1 :   ASSERT_EQ(rows.size(), 5u);
     170            1 : }
     171              : 
     172            1 : TEST_F(StorageWriteGrpcTest, AppendRowsToDefaultStreamMintsLazily) {
     173            1 :   CreatePeopleTable();
     174            1 :   ::grpc::ClientContext append_ctx;
     175            1 :   auto stream_rw = stub_->AppendRows(&append_ctx);
     176            1 :   ASSERT_NE(stream_rw, nullptr);
     177            1 :   const std::string default_stream =
     178            1 :       "projects/proj-test/datasets/ds/tables/t/streams/_default";
     179            1 :   v1::AppendRowsRequest req = BuildAppendRequest(default_stream, 0, 4);
     180            1 :   ASSERT_TRUE(stream_rw->Write(req));
     181            1 :   v1::AppendRowsResponse resp;
     182            1 :   ASSERT_TRUE(stream_rw->Read(&resp));
     183            1 :   ASSERT_EQ(resp.response_case(), v1::AppendRowsResponse::kAppendResult);
     184            1 :   EXPECT_EQ(resp.row_count(), 4);
     185            1 :   ASSERT_TRUE(stream_rw->WritesDone());
     186            1 :   ASSERT_TRUE(stream_rw->Finish().ok());
     187            1 :   EXPECT_EQ(service_->StreamsForTesting(), 1u);
     188            1 : }
     189              : 
     190            1 : TEST_F(StorageWriteGrpcTest, AppendRowsShapeMismatchSurfaceOnEnvelope) {
     191            1 :   CreatePeopleTable();
     192            1 :   v1::CreateWriteStreamRequest create_req;
     193            1 :   create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
     194            1 :   v1::WriteStream stream;
     195            1 :   ::grpc::ClientContext create_ctx;
     196            1 :   ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
     197              : 
     198            1 :   ::grpc::ClientContext append_ctx;
     199            1 :   auto stream_rw = stub_->AppendRows(&append_ctx);
     200            1 :   ASSERT_NE(stream_rw, nullptr);
     201              : 
     202            1 :   v1::AppendRowsRequest bad;
     203            1 :   bad.set_write_stream(stream.name());
     204            1 :   bad.mutable_proto_rows()->add_rows()->add_cells()->set_string_value("0");
     205            1 :   ASSERT_TRUE(stream_rw->Write(bad));
     206            1 :   v1::AppendRowsResponse resp_bad;
     207            1 :   ASSERT_TRUE(stream_rw->Read(&resp_bad));
     208            1 :   ASSERT_EQ(resp_bad.response_case(), v1::AppendRowsResponse::kErrorMessage);
     209              : 
     210            1 :   v1::AppendRowsRequest good = BuildAppendRequest("", 0, 1);
     211            1 :   ASSERT_TRUE(stream_rw->Write(good));
     212            1 :   v1::AppendRowsResponse resp_good;
     213            1 :   ASSERT_TRUE(stream_rw->Read(&resp_good));
     214            1 :   EXPECT_EQ(resp_good.response_case(), v1::AppendRowsResponse::kAppendResult);
     215              : 
     216            1 :   ASSERT_TRUE(stream_rw->WritesDone());
     217            1 :   ASSERT_TRUE(stream_rw->Finish().ok());
     218            1 : }
     219              : 
     220            1 : TEST_F(StorageWriteGrpcTest, AppendRowsUnknownStreamIsNotFound) {
     221            1 :   CreatePeopleTable();
     222            1 :   ::grpc::ClientContext append_ctx;
     223            1 :   auto stream_rw = stub_->AppendRows(&append_ctx);
     224            1 :   ASSERT_NE(stream_rw, nullptr);
     225            1 :   v1::AppendRowsRequest req = BuildAppendRequest(
     226            1 :       "projects/proj-test/datasets/ds/tables/t/streams/s99", 0, 1);
     227            1 :   ASSERT_TRUE(stream_rw->Write(req));
     228            1 :   v1::AppendRowsResponse resp;
     229            1 :   while (stream_rw->Read(&resp)) {}
     230            1 :   ::grpc::Status status = stream_rw->Finish();
     231            1 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::NOT_FOUND);
     232            1 : }
     233              : 
     234            1 : TEST_F(StorageWriteGrpcTest, GetWriteStreamReturnsRecordedMetadata) {
     235            1 :   CreatePeopleTable();
     236            1 :   v1::CreateWriteStreamRequest create_req;
     237            1 :   create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
     238            1 :   v1::WriteStream stream;
     239            1 :   ::grpc::ClientContext create_ctx;
     240            1 :   ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
     241              : 
     242            1 :   ::grpc::ClientContext get_ctx;
     243            1 :   v1::GetWriteStreamRequest get_req;
     244            1 :   get_req.set_name(stream.name());
     245            1 :   v1::WriteStream got;
     246            1 :   ASSERT_TRUE(stub_->GetWriteStream(&get_ctx, get_req, &got).ok());
     247            1 :   EXPECT_EQ(got.name(), stream.name());
     248            1 :   EXPECT_EQ(got.type(), v1::WriteStream::COMMITTED);
     249            1 : }
     250              : 
     251            1 : TEST_F(StorageWriteGrpcTest, GetWriteStreamUnknownIsNotFound) {
     252            1 :   ::grpc::ClientContext get_ctx;
     253            1 :   v1::GetWriteStreamRequest get_req;
     254            1 :   get_req.set_name("projects/proj-test/datasets/ds/tables/t/streams/ghost");
     255            1 :   v1::WriteStream got;
     256            1 :   ::grpc::Status status = stub_->GetWriteStream(&get_ctx, get_req, &got);
     257            1 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::NOT_FOUND);
     258            1 : }
     259              : 
     260            1 : TEST_F(StorageWriteGrpcTest, BufferedStreamFlushCommitsRows) {
     261            1 :   CreatePeopleTable();
     262            1 :   v1::CreateWriteStreamRequest create_req;
     263            1 :   create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
     264            1 :   create_req.mutable_write_stream()->set_type(v1::WriteStream::BUFFERED);
     265            1 :   v1::WriteStream stream;
     266            1 :   ::grpc::ClientContext create_ctx;
     267            1 :   ASSERT_TRUE(stub_->CreateWriteStream(&create_ctx, create_req, &stream).ok());
     268              : 
     269            1 :   ::grpc::ClientContext append_ctx;
     270            1 :   auto stream_rw = stub_->AppendRows(&append_ctx);
     271            1 :   ASSERT_NE(stream_rw, nullptr);
     272            1 :   ASSERT_TRUE(stream_rw->Write(BuildAppendRequest(stream.name(), 0, 2)));
     273            1 :   v1::AppendRowsResponse resp1;
     274            1 :   ASSERT_TRUE(stream_rw->Read(&resp1));
     275            1 :   ASSERT_EQ(resp1.response_case(), v1::AppendRowsResponse::kAppendResult);
     276            1 :   ASSERT_TRUE(stream_rw->Write(BuildAppendRequest("", 2, 3)));
     277            1 :   v1::AppendRowsResponse resp2;
     278            1 :   ASSERT_TRUE(stream_rw->Read(&resp2));
     279            1 :   ASSERT_EQ(resp2.response_case(), v1::AppendRowsResponse::kAppendResult);
     280            1 :   ASSERT_TRUE(stream_rw->WritesDone());
     281            1 :   ASSERT_TRUE(stream_rw->Finish().ok());
     282              : 
     283            1 :   auto pre_flush = storage_->ScanRows({"proj-test", "ds", "t"});
     284            1 :   ASSERT_TRUE(pre_flush.ok());
     285            1 :   std::unique_ptr<backend::storage::RowIterator> pre_iter =
     286            1 :       std::move(*pre_flush);
     287            1 :   backend::storage::Row row;
     288            1 :   auto has = pre_iter->Next(&row);
     289            1 :   ASSERT_TRUE(has.ok());
     290            1 :   EXPECT_FALSE(*has);
     291              : 
     292            1 :   v1::FlushRowsRequest flush_req;
     293            1 :   flush_req.set_write_stream(stream.name());
     294            1 :   flush_req.set_offset(4);
     295            1 :   v1::FlushRowsResponse flush_resp;
     296            1 :   ::grpc::ClientContext flush_ctx;
     297            1 :   ASSERT_TRUE(stub_->FlushRows(&flush_ctx, flush_req, &flush_resp).ok());
     298              : 
     299            1 :   v1::FinalizeWriteStreamRequest finalize_req;
     300            1 :   finalize_req.set_name(stream.name());
     301            1 :   v1::FinalizeWriteStreamResponse finalize_resp;
     302            1 :   ::grpc::ClientContext finalize_ctx;
     303            1 :   ASSERT_TRUE(
     304            1 :       stub_->FinalizeWriteStream(&finalize_ctx, finalize_req, &finalize_resp)
     305            1 :           .ok());
     306            1 :   EXPECT_EQ(finalize_resp.row_count(), 5);
     307              : 
     308            1 :   auto iter_or = storage_->ScanRows({"proj-test", "ds", "t"});
     309            1 :   ASSERT_TRUE(iter_or.ok());
     310            1 :   std::unique_ptr<backend::storage::RowIterator> iter = std::move(*iter_or);
     311            1 :   std::size_t count = 0;
     312            6 :   while (true) {
     313            6 :     auto next = iter->Next(&row);
     314            6 :     ASSERT_TRUE(next.ok());
     315            6 :     if (!*next) break;
     316            5 :     ++count;
     317            5 :   }
     318            1 :   EXPECT_EQ(count, 5u);
     319            1 : }
     320              : 
     321              : }  // namespace
     322              : }  // namespace frontend
     323              : }  // namespace bigquery_emulator
        

Generated by: LCOV version 2.0-1