LCOV - code coverage report
Current view: top level - frontend/handlers - storage_read_grpc_test.cc (source / functions) Coverage Total Hit
Test: _coverage_report.dat Lines: 99.7 % 369 368
Test Date: 2026-07-02 21:01:18 Functions: 100.0 % 15 15

            Line data    Source code
       1              : // In-process gRPC tests for `StorageReadService::ReadRows`.
       2              : 
       3              : #include "absl/strings/str_cat.h"
       4              : #include "absl/types/span.h"
       5              : #include "frontend/handlers/storage_read_test_fixture.h"
       6              : #include "grpcpp/create_channel.h"
       7              : #include "grpcpp/grpcpp.h"
       8              : #include "grpcpp/security/credentials.h"
       9              : #include "grpcpp/security/server_credentials.h"
      10              : #include "grpcpp/server.h"
      11              : #include "grpcpp/server_builder.h"
      12              : #include "proto/emulator.pb.h"
      13              : #include "proto/storage_read.grpc.pb.h"
      14              : 
      15              : namespace bigquery_emulator {
      16              : namespace frontend {
      17              : namespace {
      18              : 
      19              : namespace fs = std::filesystem;
      20              : 
      21              : class StorageReadGrpcTest : public ::testing::Test {
      22              :  protected:
      23           10 :   void SetUp() override {
      24           10 :     data_dir_ = MakeTempDataDir("storage-read-grpc-test");
      25           10 :     auto opened =
      26           10 :         backend::storage::duckdb::DuckDBStorage::Open(data_dir_.string());
      27           20 :     ASSERT_TRUE(opened.ok()) << opened.status();
      28           10 :     storage_ = std::move(opened).value();
      29           10 :     service_ = std::make_unique<StorageReadService>(storage_.get());
      30              : 
      31              :     // Bind to localhost:0 so the OS picks a free port. Mirrors the
      32              :     // shape that `frontend/server/server.cc` uses for the production
      33              :     // engine.
      34           10 :     int bound_port = 0;
      35           10 :     ::grpc::ServerBuilder builder;
      36           10 :     builder.AddListeningPort(
      37           10 :         "127.0.0.1:0", ::grpc::InsecureServerCredentials(), &bound_port);
      38           10 :     builder.RegisterService(service_.get());
      39           10 :     server_ = builder.BuildAndStart();
      40           10 :     ASSERT_NE(server_, nullptr);
      41           10 :     ASSERT_NE(bound_port, 0);
      42              : 
      43           10 :     const std::string target = absl::StrCat("127.0.0.1:", bound_port);
      44           10 :     channel_ =
      45           10 :         ::grpc::CreateChannel(target, ::grpc::InsecureChannelCredentials());
      46           10 :     ASSERT_NE(channel_, nullptr);
      47           10 :     stub_ = v1::StorageRead::NewStub(channel_);
      48           10 :     ASSERT_NE(stub_, nullptr);
      49           10 :   }
      50              : 
      51           10 :   void TearDown() override {
      52           10 :     if (server_ != nullptr) server_->Shutdown();
      53           10 :     stub_.reset();
      54           10 :     channel_.reset();
      55           10 :     server_.reset();
      56           10 :     service_.reset();
      57           10 :     storage_.reset();
      58           10 :     std::error_code ec;
      59           10 :     fs::remove_all(data_dir_, ec);
      60           10 :   }
      61              : 
      62              :   // Creates a fixed `proj-test.ds.t` table with the three-column
      63              :   // people schema (id, name, tags). Tests append rows to it and
      64              :   // round-trip them through ReadRows.
      65           10 :   void CreatePeopleTable() {
      66           10 :     backend::schema::TableSchema schema;
      67           10 :     backend::schema::ColumnSchema id;
      68           10 :     id.name = "id";
      69           10 :     id.type = backend::schema::ColumnType::kInt64;
      70           10 :     id.mode = backend::schema::ColumnMode::kRequired;
      71           10 :     schema.columns.push_back(id);
      72           10 :     backend::schema::ColumnSchema name;
      73           10 :     name.name = "name";
      74           10 :     name.type = backend::schema::ColumnType::kString;
      75           10 :     name.mode = backend::schema::ColumnMode::kNullable;
      76           10 :     schema.columns.push_back(name);
      77           10 :     backend::schema::ColumnSchema tags;
      78           10 :     tags.name = "tags";
      79           10 :     tags.type = backend::schema::ColumnType::kString;
      80           10 :     tags.mode = backend::schema::ColumnMode::kRepeated;
      81           10 :     schema.columns.push_back(tags);
      82           10 :     ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
      83           10 :     ASSERT_TRUE(storage_->CreateTable({"proj-test", "ds", "t"}, schema).ok());
      84           10 :   }
      85              : 
      86              :   // Appends `n` rows with id=0..n-1 and a derived name/tags shape.
      87              :   // Mirrors the StorageReadServiceTest schema so the two suites can
      88              :   // share assertions.
      89            8 :   void AppendPeople(int64_t n) {
      90            8 :     std::vector<backend::storage::Row> rows;
      91            8 :     rows.reserve(n);
      92          325 :     for (int64_t i = 0; i < n; ++i) {
      93          317 :       backend::storage::Row r;
      94          317 :       r.cells.push_back(backend::storage::Value::Int64(i));
      95          317 :       r.cells.push_back(
      96          317 :           backend::storage::Value::String(absl::StrCat("person-", i)));
      97          317 :       std::vector<backend::storage::Value> tag_values;
      98          317 :       tag_values.push_back(
      99          317 :           backend::storage::Value::String(absl::StrCat("t", i)));
     100          317 :       r.cells.push_back(backend::storage::Value::Array(std::move(tag_values)));
     101          317 :       rows.push_back(std::move(r));
     102          317 :     }
     103            8 :     ASSERT_TRUE(
     104            8 :         storage_
     105            8 :             ->AppendRows({"proj-test", "ds", "t"}, absl::MakeConstSpan(rows))
     106            8 :             .ok());
     107            8 :   }
     108              : 
     109              :   fs::path data_dir_{};
     110              :   std::unique_ptr<backend::storage::duckdb::DuckDBStorage> storage_{};
     111              :   std::unique_ptr<StorageReadService> service_{};
     112              :   std::unique_ptr<::grpc::Server> server_{};
     113              :   std::shared_ptr<::grpc::Channel> channel_{};
     114              :   std::unique_ptr<v1::StorageRead::Stub> stub_{};
     115              : };
     116              : 
     117            1 : TEST_F(StorageReadGrpcTest, ReadRowsRoundTripsRowsInOrder) {
     118            1 :   CreatePeopleTable();
     119            1 :   AppendPeople(/*n=*/250);
     120              : 
     121              :   // Step 1: mint a session via the wire.
     122            1 :   ::grpc::ClientContext create_ctx;
     123            1 :   v1::CreateReadSessionRequest create_req;
     124            1 :   create_req.set_parent("projects/proj-test");
     125            1 :   create_req.mutable_read_session()->set_table(
     126            1 :       "projects/proj-test/datasets/ds/tables/t");
     127            1 :   v1::ReadSession session;
     128            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     129            1 :   ASSERT_EQ(session.streams_size(), 1);
     130            1 :   const std::string stream_name = session.streams(0).name();
     131              : 
     132              :   // Step 2: drain the stream. With 250 rows and a page size of 100
     133              :   // we expect three pages: 100, 100, 50.
     134            1 :   ::grpc::ClientContext read_ctx;
     135            1 :   v1::ReadRowsRequest read_req;
     136            1 :   read_req.set_read_stream(stream_name);
     137            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     138            1 :   ASSERT_NE(reader, nullptr);
     139              : 
     140            1 :   std::vector<v1::DataRow> rows;
     141            1 :   std::vector<int64_t> page_sizes;
     142            1 :   v1::ReadRowsResponse page;
     143            4 :   while (reader->Read(&page)) {
     144            3 :     page_sizes.push_back(page.row_count());
     145          250 :     for (const auto& row : page.rows()) {
     146          250 :       rows.push_back(row);
     147          250 :     }
     148            3 :   }
     149            1 :   ::grpc::Status status = reader->Finish();
     150            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     151              : 
     152              :   // Paging: 100 + 100 + 50.
     153            1 :   ASSERT_EQ(page_sizes.size(), 3u);
     154            1 :   EXPECT_EQ(page_sizes[0], 100);
     155            1 :   EXPECT_EQ(page_sizes[1], 100);
     156            1 :   EXPECT_EQ(page_sizes[2], 50);
     157              : 
     158              :   // Row contents: 250 rows in id order with the expected name/tags.
     159            1 :   ASSERT_EQ(rows.size(), 250u);
     160          251 :   for (int64_t i = 0; i < 250; ++i) {
     161          250 :     ASSERT_EQ(rows[i].cells_size(), 3);
     162              :     // INT64 cells ride on Cell.string_value as the decimal repr;
     163              :     // mirrors the catalog handler's wire shape so a single gateway
     164              :     // decoder works for both tabledata.list and ReadRows.
     165          250 :     EXPECT_EQ(rows[i].cells(0).string_value(), absl::StrCat(i));
     166          250 :     EXPECT_EQ(rows[i].cells(1).string_value(), absl::StrCat("person-", i));
     167          250 :     ASSERT_EQ(rows[i].cells(2).array().elements_size(), 1);
     168          250 :     EXPECT_EQ(rows[i].cells(2).array().elements(0).string_value(),
     169          250 :               absl::StrCat("t", i));
     170          250 :   }
     171            1 : }
     172              : 
     173            1 : TEST_F(StorageReadGrpcTest, ReadRowsHonorsOffset) {
     174            1 :   CreatePeopleTable();
     175            1 :   AppendPeople(/*n=*/20);
     176              : 
     177            1 :   ::grpc::ClientContext create_ctx;
     178            1 :   v1::CreateReadSessionRequest create_req;
     179            1 :   create_req.set_parent("projects/proj-test");
     180            1 :   create_req.mutable_read_session()->set_table(
     181            1 :       "projects/proj-test/datasets/ds/tables/t");
     182            1 :   v1::ReadSession session;
     183            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     184              : 
     185            1 :   ::grpc::ClientContext read_ctx;
     186            1 :   v1::ReadRowsRequest read_req;
     187            1 :   read_req.set_read_stream(session.streams(0).name());
     188              :   // Skip the first 15 rows; remaining 5 land in a single page.
     189            1 :   read_req.set_offset(15);
     190            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     191            1 :   ASSERT_NE(reader, nullptr);
     192              : 
     193            1 :   std::vector<v1::DataRow> rows;
     194            1 :   v1::ReadRowsResponse page;
     195            2 :   while (reader->Read(&page)) {
     196            1 :     for (const auto& row : page.rows())
     197            5 :       rows.push_back(row);
     198            1 :   }
     199            1 :   ::grpc::Status status = reader->Finish();
     200            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     201            1 :   ASSERT_EQ(rows.size(), 5u);
     202            6 :   for (size_t i = 0; i < rows.size(); ++i) {
     203            5 :     EXPECT_EQ(rows[i].cells(0).string_value(), absl::StrCat(15 + i));
     204            5 :   }
     205            1 : }
     206              : 
     207            1 : TEST_F(StorageReadGrpcTest, ReadRowsEmptyTableYieldsZeroPages) {
     208              :   // An empty table still mints a valid session; ReadRows must
     209              :   // succeed with no pages emitted (the client loop reads 0 messages
     210              :   // then sees Finish() return OK).
     211            1 :   CreatePeopleTable();
     212            1 :   ::grpc::ClientContext create_ctx;
     213            1 :   v1::CreateReadSessionRequest create_req;
     214            1 :   create_req.set_parent("projects/proj-test");
     215            1 :   create_req.mutable_read_session()->set_table(
     216            1 :       "projects/proj-test/datasets/ds/tables/t");
     217            1 :   v1::ReadSession session;
     218            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     219              : 
     220            1 :   ::grpc::ClientContext read_ctx;
     221            1 :   v1::ReadRowsRequest read_req;
     222            1 :   read_req.set_read_stream(session.streams(0).name());
     223            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     224            1 :   ASSERT_NE(reader, nullptr);
     225              : 
     226            1 :   v1::ReadRowsResponse page;
     227            1 :   int pages = 0;
     228            1 :   while (reader->Read(&page))
     229            0 :     ++pages;
     230            1 :   ::grpc::Status status = reader->Finish();
     231            2 :   EXPECT_TRUE(status.ok()) << status.error_message();
     232            1 :   EXPECT_EQ(pages, 0);
     233            1 : }
     234              : 
     235            1 : TEST_F(StorageReadGrpcTest, ReadRowsUnknownStreamIsNotFound) {
     236            1 :   CreatePeopleTable();
     237            1 :   ::grpc::ClientContext read_ctx;
     238            1 :   v1::ReadRowsRequest read_req;
     239              :   // The handler mints session names starting at s1; an `s99` session
     240              :   // was never created so the handler MUST surface NOT_FOUND.
     241            1 :   read_req.set_read_stream(
     242            1 :       "projects/proj-test/locations/-/sessions/s99/streams/0");
     243            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     244            1 :   ASSERT_NE(reader, nullptr);
     245              : 
     246            1 :   v1::ReadRowsResponse page;
     247            1 :   while (reader->Read(&page)) {}
     248            1 :   ::grpc::Status status = reader->Finish();
     249            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::NOT_FOUND)
     250            2 :       << status.error_message();
     251            1 : }
     252              : 
     253              : // selected_fields projection
     254              : // end-to-end. The session is minted with `selected_fields = [id]`,
     255              : // so the response schema lists only `id` and ReadRows yields one
     256              : // cell per row instead of three. The projection survives a
     257              : // round-trip through the in-process gRPC server, which is the same
     258              : // codec path the production engine uses.
     259            1 : TEST_F(StorageReadGrpcTest, ReadRowsProjectsToSelectedFields) {
     260            1 :   CreatePeopleTable();
     261            1 :   AppendPeople(/*n=*/4);
     262              : 
     263            1 :   ::grpc::ClientContext create_ctx;
     264            1 :   v1::CreateReadSessionRequest create_req;
     265            1 :   create_req.set_parent("projects/proj-test");
     266            1 :   create_req.mutable_read_session()->set_table(
     267            1 :       "projects/proj-test/datasets/ds/tables/t");
     268            1 :   create_req.mutable_read_session()
     269            1 :       ->mutable_read_options()
     270            1 :       ->add_selected_fields("name");
     271            1 :   create_req.mutable_read_session()
     272            1 :       ->mutable_read_options()
     273            1 :       ->add_selected_fields("id");
     274            1 :   v1::ReadSession session;
     275            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     276              : 
     277              :   // Schema reflects the caller-supplied projection order, not the
     278              :   // table's declared column order.
     279            1 :   ASSERT_EQ(session.schema().fields_size(), 2);
     280            1 :   EXPECT_EQ(session.schema().fields(0).name(), "name");
     281            1 :   EXPECT_EQ(session.schema().fields(1).name(), "id");
     282              : 
     283            1 :   ::grpc::ClientContext read_ctx;
     284            1 :   v1::ReadRowsRequest read_req;
     285            1 :   read_req.set_read_stream(session.streams(0).name());
     286            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     287            1 :   ASSERT_NE(reader, nullptr);
     288            1 :   std::vector<v1::DataRow> rows;
     289            1 :   v1::ReadRowsResponse page;
     290            2 :   while (reader->Read(&page)) {
     291            1 :     for (const auto& row : page.rows())
     292            4 :       rows.push_back(row);
     293            1 :   }
     294            1 :   ::grpc::Status status = reader->Finish();
     295            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     296            1 :   ASSERT_EQ(rows.size(), 4u);
     297            5 :   for (size_t i = 0; i < rows.size(); ++i) {
     298              :     // Two cells per row, in projected order: name then id.
     299            4 :     ASSERT_EQ(rows[i].cells_size(), 2);
     300            4 :     EXPECT_EQ(rows[i].cells(0).string_value(), absl::StrCat("person-", i));
     301            4 :     EXPECT_EQ(rows[i].cells(1).string_value(), absl::StrCat(i));
     302            4 :   }
     303            1 : }
     304              : 
     305              : // row_restriction pushdown end-to-end. CreateReadSession
     306              : // parses the predicate, stashes it on the SessionState, and ReadRows
     307              : // hands it to `CreateReadStream` which filters before paginating.
     308            1 : TEST_F(StorageReadGrpcTest, ReadRowsHonorsRowRestriction) {
     309            1 :   CreatePeopleTable();
     310            1 :   AppendPeople(/*n=*/10);
     311              : 
     312            1 :   ::grpc::ClientContext create_ctx;
     313            1 :   v1::CreateReadSessionRequest create_req;
     314            1 :   create_req.set_parent("projects/proj-test");
     315            1 :   create_req.mutable_read_session()->set_table(
     316            1 :       "projects/proj-test/datasets/ds/tables/t");
     317            1 :   create_req.mutable_read_session()
     318            1 :       ->mutable_read_options()
     319            1 :       ->set_row_restriction("id = 5");
     320            1 :   v1::ReadSession session;
     321            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     322              : 
     323            1 :   ::grpc::ClientContext read_ctx;
     324            1 :   v1::ReadRowsRequest read_req;
     325            1 :   read_req.set_read_stream(session.streams(0).name());
     326            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     327            1 :   ASSERT_NE(reader, nullptr);
     328            1 :   std::vector<v1::DataRow> rows;
     329            1 :   v1::ReadRowsResponse page;
     330            2 :   while (reader->Read(&page)) {
     331            1 :     for (const auto& row : page.rows())
     332            1 :       rows.push_back(row);
     333            1 :   }
     334            1 :   ::grpc::Status status = reader->Finish();
     335            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     336            1 :   ASSERT_EQ(rows.size(), 1u);
     337            1 :   EXPECT_EQ(rows[0].cells(0).string_value(), "5");
     338            1 :   EXPECT_EQ(rows[0].cells(1).string_value(), "person-5");
     339            1 : }
     340              : 
     341            1 : TEST_F(StorageReadGrpcTest, ReadRowsHonorsRangeRowRestriction) {
     342            1 :   CreatePeopleTable();
     343            1 :   AppendPeople(/*n=*/10);
     344              : 
     345            1 :   ::grpc::ClientContext create_ctx;
     346            1 :   v1::CreateReadSessionRequest create_req;
     347            1 :   create_req.set_parent("projects/proj-test");
     348            1 :   create_req.mutable_read_session()->set_table(
     349            1 :       "projects/proj-test/datasets/ds/tables/t");
     350            1 :   create_req.mutable_read_session()
     351            1 :       ->mutable_read_options()
     352            1 :       ->set_row_restriction("id > 7");
     353            1 :   v1::ReadSession session;
     354            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     355              : 
     356            1 :   ::grpc::ClientContext read_ctx;
     357            1 :   v1::ReadRowsRequest read_req;
     358            1 :   read_req.set_read_stream(session.streams(0).name());
     359            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     360            1 :   std::vector<v1::DataRow> rows;
     361            1 :   v1::ReadRowsResponse page;
     362            2 :   while (reader->Read(&page)) {
     363            1 :     for (const auto& row : page.rows())
     364            2 :       rows.push_back(row);
     365            1 :   }
     366            1 :   ASSERT_TRUE(reader->Finish().ok());
     367            1 :   ASSERT_EQ(rows.size(), 2u);
     368            1 :   EXPECT_EQ(rows[0].cells(0).string_value(), "8");
     369            1 :   EXPECT_EQ(rows[1].cells(0).string_value(), "9");
     370            1 : }
     371              : 
     372            1 : TEST_F(StorageReadGrpcTest, CreateReadSessionMintsMultipleStreams) {
     373            1 :   CreatePeopleTable();
     374            1 :   AppendPeople(/*n=*/8);
     375              : 
     376            1 :   ::grpc::ClientContext create_ctx;
     377            1 :   v1::CreateReadSessionRequest create_req;
     378            1 :   create_req.set_parent("projects/proj-test");
     379            1 :   create_req.mutable_read_session()->set_table(
     380            1 :       "projects/proj-test/datasets/ds/tables/t");
     381            1 :   create_req.set_max_stream_count(4);
     382            1 :   v1::ReadSession session;
     383            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     384            1 :   ASSERT_EQ(session.streams_size(), 4);
     385              : 
     386            1 :   std::vector<v1::DataRow> all_rows;
     387            5 :   for (int i = 0; i < session.streams_size(); ++i) {
     388            4 :     ::grpc::ClientContext read_ctx;
     389            4 :     v1::ReadRowsRequest read_req;
     390            4 :     read_req.set_read_stream(session.streams(i).name());
     391            4 :     auto reader = stub_->ReadRows(&read_ctx, read_req);
     392            4 :     v1::ReadRowsResponse page;
     393            8 :     while (reader->Read(&page)) {
     394            4 :       for (const auto& row : page.rows())
     395            8 :         all_rows.push_back(row);
     396            4 :     }
     397            4 :     ASSERT_TRUE(reader->Finish().ok());
     398            4 :   }
     399            1 :   ASSERT_EQ(all_rows.size(), 8u);
     400            1 : }
     401              : 
     402            1 : TEST_F(StorageReadGrpcTest, SplitReadStreamPartitionsRemainingRange) {
     403            1 :   CreatePeopleTable();
     404            1 :   AppendPeople(/*n=*/10);
     405              : 
     406            1 :   ::grpc::ClientContext create_ctx;
     407            1 :   v1::CreateReadSessionRequest create_req;
     408            1 :   create_req.set_parent("projects/proj-test");
     409            1 :   create_req.mutable_read_session()->set_table(
     410            1 :       "projects/proj-test/datasets/ds/tables/t");
     411            1 :   v1::ReadSession session;
     412            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     413            1 :   const std::string stream_name = session.streams(0).name();
     414              : 
     415            1 :   ::grpc::ClientContext split_ctx;
     416            1 :   v1::SplitReadStreamRequest split_req;
     417            1 :   split_req.set_name(stream_name);
     418            1 :   split_req.set_fraction(0.5);
     419            1 :   v1::SplitReadStreamResponse split_resp;
     420            1 :   ASSERT_TRUE(stub_->SplitReadStream(&split_ctx, split_req, &split_resp).ok());
     421              : 
     422            2 :   auto drain = [&](const std::string& name) {
     423            2 :     ::grpc::ClientContext read_ctx;
     424            2 :     v1::ReadRowsRequest read_req;
     425            2 :     read_req.set_read_stream(name);
     426            2 :     auto reader = stub_->ReadRows(&read_ctx, read_req);
     427            2 :     std::vector<int64_t> ids;
     428            2 :     v1::ReadRowsResponse page;
     429            4 :     while (reader->Read(&page)) {
     430           10 :       for (const auto& row : page.rows()) {
     431           10 :         ids.push_back(std::stoll(row.cells(0).string_value()));
     432           10 :       }
     433            2 :     }
     434            2 :     EXPECT_TRUE(reader->Finish().ok());
     435            2 :     return ids;
     436            2 :   };
     437              : 
     438            1 :   const auto primary = drain(split_resp.primary_stream().name());
     439            1 :   const auto remainder = drain(split_resp.remainder_stream().name());
     440            1 :   ASSERT_EQ(primary.size() + remainder.size(), 10u);
     441            1 :   EXPECT_EQ(primary.front(), 0);
     442            1 :   EXPECT_EQ(remainder.back(), 9);
     443            1 : }
     444              : 
     445            1 : TEST_F(StorageReadGrpcTest, ReadRowsSchemaDriftIsFailedPrecondition) {
     446              :   // Mint a session against the people table, then drop+recreate the
     447              :   // table with a different schema. The follow-up ReadRows must
     448              :   // detect the drift and return FAILED_PRECONDITION; the gateway
     449              :   // translates that to BigQuery's "schema changed" error.
     450            1 :   CreatePeopleTable();
     451            1 :   AppendPeople(5);
     452              : 
     453            1 :   ::grpc::ClientContext create_ctx;
     454            1 :   v1::CreateReadSessionRequest create_req;
     455            1 :   create_req.set_parent("projects/proj-test");
     456            1 :   create_req.mutable_read_session()->set_table(
     457            1 :       "projects/proj-test/datasets/ds/tables/t");
     458            1 :   v1::ReadSession session;
     459            1 :   ASSERT_TRUE(stub_->CreateReadSession(&create_ctx, create_req, &session).ok());
     460              : 
     461              :   // Drop and recreate with an incompatible shape (1 column instead
     462              :   // of 3). The session's stashed schema is stale; ReadRows trips
     463              :   // the drift check.
     464            1 :   const backend::storage::TableId tbl{"proj-test", "ds", "t"};
     465            1 :   ASSERT_TRUE(storage_->DropTable(tbl).ok());
     466            1 :   backend::schema::TableSchema other;
     467            1 :   backend::schema::ColumnSchema only;
     468            1 :   only.name = "only";
     469            1 :   only.type = backend::schema::ColumnType::kInt64;
     470            1 :   only.mode = backend::schema::ColumnMode::kNullable;
     471            1 :   other.columns.push_back(only);
     472            1 :   ASSERT_TRUE(storage_->CreateTable(tbl, other).ok());
     473              : 
     474            1 :   ::grpc::ClientContext read_ctx;
     475            1 :   v1::ReadRowsRequest read_req;
     476            1 :   read_req.set_read_stream(session.streams(0).name());
     477            1 :   auto reader = stub_->ReadRows(&read_ctx, read_req);
     478            1 :   v1::ReadRowsResponse page;
     479            1 :   while (reader->Read(&page)) {}
     480            1 :   ::grpc::Status status = reader->Finish();
     481            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::FAILED_PRECONDITION)
     482            2 :       << status.error_message();
     483            1 : }
     484              : 
     485              : }  // namespace
     486              : }  // namespace frontend
     487              : }  // namespace bigquery_emulator
        

Generated by: LCOV version 2.0-1