LCOV - code coverage report
Current view: top level - frontend/handlers - storage_write_test.cc (source / functions) Coverage Total Hit
Test: _coverage_report.dat Lines: 100.0 % 119 119
Test Date: 2026-07-02 21:01:18 Functions: 100.0 % 11 11

            Line data    Source code
       1              : // Unit + in-process gRPC tests for `StorageWriteService`.
       2              : //
       3              : // The shape mirrors `storage_read_test.cc`: an in-memory DuckDB
       4              : // storage instance is fabricated under SetUp, the service is
       5              : // constructed against it, and each test pokes the handler with a
       6              : // hand-built request proto. The bidirectional `AppendRows` RPC needs
       7              : // a real `ServerReaderWriter`, so the streaming tests stand up an
       8              : // in-process gRPC server (same `grpc::testing` transport
       9              : // `storage_read_test.cc` uses for `ReadRows`) and drive the streaming
      10              : // reply through a client stub.
      11              : 
      12              : #include "frontend/handlers/storage_write.h"
      13              : 
      14              : #include <cstdint>
      15              : #include <cstdlib>
      16              : #include <filesystem>
      17              : #include <memory>
      18              : #include <random>
      19              : #include <string>
      20              : #include <system_error>
      21              : #include <utility>
      22              : #include <vector>
      23              : 
      24              : #include "absl/strings/str_cat.h"
      25              : #include "backend/schema/schema.h"
      26              : #include "backend/storage/duckdb/duckdb_storage.h"
      27              : #include "backend/storage/storage.h"
      28              : #include "grpcpp/grpcpp.h"
      29              : #include "gtest/gtest.h"
      30              : #include "proto/emulator.pb.h"
      31              : #include "proto/storage_write.grpc.pb.h"
      32              : #include "proto/storage_write.pb.h"
      33              : 
      34              : namespace bigquery_emulator {
      35              : namespace frontend {
      36              : namespace {
      37              : 
      38              : namespace fs = std::filesystem;
      39              : 
      40            7 : fs::path MakeTempDataDir(absl::string_view prefix) {
      41            7 :   const char* tmpdir_env = std::getenv("TMPDIR");
      42            7 :   const std::string tmpdir = tmpdir_env != nullptr ? tmpdir_env : "/tmp";
      43            7 :   std::random_device rd;
      44            7 :   std::seed_seq seed{rd(), rd()};
      45            7 :   std::mt19937_64 rng(seed);
      46            7 :   fs::path out = fs::path(tmpdir) /
      47            7 :                  absl::StrCat("bqemu-", std::string(prefix), "-", rng());
      48            7 :   std::error_code ec;
      49            7 :   fs::remove_all(out, ec);
      50            7 :   return out;
      51            7 : }
      52              : 
      53              : class StorageWriteServiceTest : public ::testing::Test {
      54              :  protected:
      55            7 :   void SetUp() override {
      56            7 :     data_dir_ = MakeTempDataDir("storage-write-test");
      57            7 :     auto opened =
      58            7 :         backend::storage::duckdb::DuckDBStorage::Open(data_dir_.string());
      59           14 :     ASSERT_TRUE(opened.ok()) << opened.status();
      60            7 :     storage_ = std::move(opened).value();
      61            7 :     service_ = std::make_unique<StorageWriteService>(storage_.get());
      62            7 :   }
      63              : 
      64            7 :   void TearDown() override {
      65            7 :     service_.reset();
      66            7 :     storage_.reset();
      67            7 :     std::error_code ec;
      68            7 :     fs::remove_all(data_dir_, ec);
      69            7 :   }
      70              : 
      71              :   // Two-column toy schema (id + name). Mirrors the StorageWrite
      72              :   // happy path; tests append rows against it and round-trip through
      73              :   // `Storage::ScanRows` to confirm the rows landed.
      74            6 :   void CreatePeopleTable() {
      75            6 :     backend::schema::TableSchema schema;
      76            6 :     backend::schema::ColumnSchema id;
      77            6 :     id.name = "id";
      78            6 :     id.type = backend::schema::ColumnType::kInt64;
      79            6 :     id.mode = backend::schema::ColumnMode::kRequired;
      80            6 :     schema.columns.push_back(id);
      81            6 :     backend::schema::ColumnSchema name;
      82            6 :     name.name = "name";
      83            6 :     name.type = backend::schema::ColumnType::kString;
      84            6 :     name.mode = backend::schema::ColumnMode::kNullable;
      85            6 :     schema.columns.push_back(name);
      86            6 :     ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
      87            6 :     ASSERT_TRUE(storage_->CreateTable({"proj-test", "ds", "t"}, schema).ok());
      88            6 :   }
      89              : 
      90              :   fs::path data_dir_{};
      91              :   std::unique_ptr<backend::storage::duckdb::DuckDBStorage> storage_{};
      92              :   std::unique_ptr<StorageWriteService> service_{};
      93              : };
      94              : 
      95              : // ---------------------------------------------------------------------------
      96              : // CreateWriteStream: validation + happy path
      97              : // ---------------------------------------------------------------------------
      98              : 
      99            1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMintsCommittedStream) {
     100            1 :   CreatePeopleTable();
     101            1 :   v1::CreateWriteStreamRequest req;
     102            1 :   req.set_parent("projects/proj-test/datasets/ds/tables/t");
     103            1 :   req.mutable_write_stream()->set_type(v1::WriteStream::COMMITTED);
     104            1 :   v1::WriteStream resp;
     105            1 :   ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
     106            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     107              : 
     108              :   // Stream id nests under the table path with a server-assigned
     109              :   // suffix; the helper mints `s1` for the first call.
     110            1 :   EXPECT_EQ(resp.name(), "projects/proj-test/datasets/ds/tables/t/streams/s1");
     111            1 :   EXPECT_EQ(resp.type(), v1::WriteStream::COMMITTED);
     112            1 :   ASSERT_EQ(resp.schema().fields_size(), 2);
     113            1 :   EXPECT_EQ(resp.schema().fields(0).name(), "id");
     114            1 :   EXPECT_EQ(resp.schema().fields(1).name(), "name");
     115            1 :   EXPECT_FALSE(resp.create_time().empty());
     116            1 :   EXPECT_EQ(service_->StreamsForTesting(), 1u);
     117            1 : }
     118              : 
     119            1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamDefaultsToCommitted) {
     120              :   // BigQuery's documented default for an unspecified stream type is
     121              :   // COMMITTED. The emulator follows that.
     122            1 :   CreatePeopleTable();
     123            1 :   v1::CreateWriteStreamRequest req;
     124            1 :   req.set_parent("projects/proj-test/datasets/ds/tables/t");
     125            1 :   v1::WriteStream resp;
     126            1 :   ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
     127            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     128            1 :   EXPECT_EQ(resp.type(), v1::WriteStream::COMMITTED);
     129            1 : }
     130              : 
     131            1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMintsPendingStream) {
     132            1 :   CreatePeopleTable();
     133            1 :   v1::CreateWriteStreamRequest req;
     134            1 :   req.set_parent("projects/proj-test/datasets/ds/tables/t");
     135            1 :   req.mutable_write_stream()->set_type(v1::WriteStream::PENDING);
     136            1 :   v1::WriteStream resp;
     137            1 :   ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
     138            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     139            1 :   EXPECT_EQ(resp.type(), v1::WriteStream::PENDING);
     140            1 :   EXPECT_EQ(service_->StreamsForTesting(), 1u);
     141            1 : }
     142              : 
     143            1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMintsBufferedStream) {
     144            1 :   CreatePeopleTable();
     145            1 :   v1::CreateWriteStreamRequest req;
     146            1 :   req.set_parent("projects/proj-test/datasets/ds/tables/t");
     147            1 :   req.mutable_write_stream()->set_type(v1::WriteStream::BUFFERED);
     148            1 :   v1::WriteStream resp;
     149            1 :   ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
     150            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     151            1 :   EXPECT_EQ(resp.type(), v1::WriteStream::BUFFERED);
     152            1 :   EXPECT_FALSE(resp.name().empty());
     153            1 : }
     154              : 
     155            1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamRejectsMalformedParent) {
     156            1 :   CreatePeopleTable();
     157            1 :   v1::CreateWriteStreamRequest req;
     158              :   // Drops the `tables/` segment; the parser refuses anything that
     159              :   // does not match `projects/{p}/datasets/{d}/tables/{t}`.
     160            1 :   req.set_parent("projects/proj-test/datasets/ds/t");
     161            1 :   v1::WriteStream resp;
     162            1 :   ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
     163            1 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT);
     164            1 : }
     165              : 
     166            1 : TEST_F(StorageWriteServiceTest, CreateWriteStreamMissingTableIsNotFound) {
     167              :   // Dataset exists but the table does not. Storage::GetSchema returns
     168              :   // NotFound and the handler maps that onto gRPC NOT_FOUND so a
     169              :   // BigQuery REST 404 envelope can be synthesized at the gateway.
     170            1 :   ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
     171            1 :   v1::CreateWriteStreamRequest req;
     172            1 :   req.set_parent("projects/proj-test/datasets/ds/tables/missing");
     173            1 :   v1::WriteStream resp;
     174            1 :   ::grpc::Status status = service_->CreateWriteStream(nullptr, &req, &resp);
     175            1 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::NOT_FOUND);
     176            1 : }
     177              : 
     178            1 : TEST_F(StorageWriteServiceTest, BatchCommitRequiresFinalizedPendingStream) {
     179            1 :   CreatePeopleTable();
     180            1 :   v1::CreateWriteStreamRequest create_req;
     181            1 :   create_req.set_parent("projects/proj-test/datasets/ds/tables/t");
     182            1 :   create_req.mutable_write_stream()->set_type(v1::WriteStream::PENDING);
     183            1 :   v1::WriteStream stream;
     184            1 :   ASSERT_TRUE(service_->CreateWriteStream(nullptr, &create_req, &stream).ok());
     185              : 
     186            1 :   v1::BatchCommitWriteStreamsRequest commit_req;
     187            1 :   commit_req.set_parent("projects/proj-test/datasets/ds/tables/t");
     188            1 :   commit_req.add_write_streams(stream.name());
     189            1 :   v1::BatchCommitWriteStreamsResponse commit_resp;
     190            1 :   ::grpc::Status status =
     191            1 :       service_->BatchCommitWriteStreams(nullptr, &commit_req, &commit_resp);
     192            1 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::FAILED_PRECONDITION);
     193            1 : }
     194              : 
     195              : }  // namespace
     196              : }  // namespace frontend
     197              : }  // namespace bigquery_emulator
        

Generated by: LCOV version 2.0-1