LCOV - code coverage report
Current view: top level - frontend/handlers - query_execute_test.cc (source / functions) Coverage Total Hit
Test: _coverage_report.dat Lines: 98.8 % 256 253
Test Date: 2026-07-02 21:01:18 Functions: 93.8 % 16 15

            Line data    Source code
       1              : // StreamQueryResults / ExecuteQuery tests for `QueryService`.
       2              : 
       3              : #include "frontend/handlers/query_test_fixture.h"
       4              : 
       5              : namespace bigquery_emulator {
       6              : namespace frontend {
       7              : namespace {
       8            1 : TEST_F(QueryServiceTest, ExecuteQuerySelect1StreamsSchemaThenRow) {
       9            1 :   v1::QueryRequest req = MakeRequest("SELECT 1 AS one");
      10            1 :   MessageCollector collector;
      11            1 :   ::grpc::Status status = StreamQueryResults(
      12            1 :       storage_.get(), req, collector.Writer(), engine_.get());
      13            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
      14              : 
      15            1 :   const auto& messages = collector.messages();
      16              :   // Five messages: schema, one row, phase_timings, then the trailing
      17              :   // `statement_type` + `emulator_route` pair the gateway folds into
      18              :   // `Job.statistics.query.{statementType,emulatorRoute}`. See
      19              :   // `docs/ENGINE_POLICY.md` Item 5 and
      20              :   // `docs/ENGINE_POLICY.md`.
      21            1 :   ASSERT_EQ(messages.size(), 5u);
      22              : 
      23            1 :   EXPECT_TRUE(messages[0].has_schema());
      24            1 :   EXPECT_EQ(messages[0].cells_size(), 0);
      25            1 :   ASSERT_EQ(messages[0].schema().fields_size(), 1);
      26            1 :   EXPECT_EQ(messages[0].schema().fields(0).name(), "one");
      27            1 :   EXPECT_EQ(messages[0].schema().fields(0).type(), "INT64");
      28              : 
      29            1 :   EXPECT_FALSE(messages[1].has_schema());
      30            1 :   ASSERT_EQ(messages[1].cells_size(), 1);
      31            1 :   EXPECT_EQ(messages[1].cells(0).string_value(), "1");
      32              : 
      33            1 :   ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
      34            1 :   const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
      35            1 :   ASSERT_NE(stmt, nullptr);
      36            1 :   EXPECT_EQ(stmt->statement_type(), "SELECT");
      37              :   // `SELECT 1` has no FROM clause -> semantic executor (see
      38              :   // `docs/ENGINE_POLICY.md`).
      39            1 :   const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
      40            1 :   ASSERT_NE(route, nullptr);
      41            1 :   EXPECT_EQ(route->emulator_route(), "semantic_executor");
      42            1 : }
      43              : 
      44            1 : TEST_F(QueryServiceTest, ExecuteQuerySelectFromTableStreamsAllRows) {
      45            1 :   CreatePeopleTable();
      46            1 :   std::vector<backend::storage::Row> rows;
      47            3 :   auto append = [&](int64_t id, std::string name) {
      48            3 :     backend::storage::Row r;
      49            3 :     r.cells = {
      50            3 :         backend::storage::Value::Int64(id),
      51            3 :         backend::storage::Value::String(std::move(name)),
      52            3 :         backend::storage::Value::Array({}),
      53            3 :     };
      54            3 :     rows.push_back(std::move(r));
      55            3 :   };
      56            1 :   append(1, "ada");
      57            1 :   append(2, "linus");
      58            1 :   append(3, "grace");
      59            1 :   ASSERT_TRUE(
      60            1 :       storage_->AppendRows({"proj-test", "ds", "t"}, absl::MakeConstSpan(rows))
      61            1 :           .ok());
      62              : 
      63            1 :   v1::QueryRequest req = MakeRequest("SELECT id, name FROM ds.t ORDER BY id");
      64            1 :   MessageCollector collector;
      65            1 :   ::grpc::Status status = StreamQueryResults(
      66            1 :       storage_.get(), req, collector.Writer(), engine_.get());
      67            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
      68              : 
      69            1 :   const auto& messages = collector.messages();
      70              :   // Seven messages: schema, three rows, phase_timings, plus the
      71              :   // trailing `statement_type` + `emulator_route` pair.
      72            1 :   ASSERT_EQ(messages.size(), 7u);
      73              : 
      74            1 :   ASSERT_TRUE(messages[0].has_schema());
      75            1 :   ASSERT_EQ(messages[0].schema().fields_size(), 2);
      76            1 :   EXPECT_EQ(messages[0].schema().fields(0).name(), "id");
      77            1 :   EXPECT_EQ(messages[0].schema().fields(0).type(), "INT64");
      78            1 :   EXPECT_EQ(messages[0].schema().fields(1).name(), "name");
      79            1 :   EXPECT_EQ(messages[0].schema().fields(1).type(), "STRING");
      80              : 
      81            1 :   ASSERT_EQ(messages[1].cells_size(), 2);
      82            1 :   EXPECT_EQ(messages[1].cells(0).string_value(), "1");
      83            1 :   EXPECT_EQ(messages[1].cells(1).string_value(), "ada");
      84            1 :   ASSERT_EQ(messages[2].cells_size(), 2);
      85            1 :   EXPECT_EQ(messages[2].cells(0).string_value(), "2");
      86            1 :   EXPECT_EQ(messages[2].cells(1).string_value(), "linus");
      87            1 :   ASSERT_EQ(messages[3].cells_size(), 2);
      88            1 :   EXPECT_EQ(messages[3].cells(0).string_value(), "3");
      89            1 :   EXPECT_EQ(messages[3].cells(1).string_value(), "grace");
      90              : 
      91            1 :   ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
      92            1 :   const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
      93            1 :   ASSERT_NE(stmt, nullptr);
      94            1 :   EXPECT_EQ(stmt->statement_type(), "SELECT");
      95            1 :   const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
      96            1 :   ASSERT_NE(route, nullptr);
      97            1 :   EXPECT_EQ(route->emulator_route(), "duckdb_native");
      98            1 : }
      99              : 
     100            1 : TEST_F(QueryServiceTest, ExecuteQueryEmptyTableEmitsSchemaThenStatementType) {
     101            1 :   CreatePeopleTable();
     102            1 :   v1::QueryRequest req = MakeRequest("SELECT id, name FROM ds.t");
     103            1 :   MessageCollector collector;
     104            1 :   ::grpc::Status status = StreamQueryResults(
     105            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     106            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     107              :   // Four messages: the schema (always emitted, even for zero-row
     108              :   // results), phase_timings, and the trailing `statement_type` +
     109              :   // `emulator_route` pair.
     110            1 :   ASSERT_EQ(collector.messages().size(), 4u);
     111            1 :   EXPECT_TRUE(collector.messages()[0].has_schema());
     112            1 :   ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
     113            1 :   const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
     114            1 :   ASSERT_NE(stmt, nullptr);
     115            1 :   EXPECT_EQ(stmt->statement_type(), "SELECT");
     116            1 :   const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
     117            1 :   ASSERT_NE(route, nullptr);
     118            1 :   EXPECT_EQ(route->emulator_route(), "duckdb_native");
     119            1 : }
     120              : 
     121            1 : TEST_F(QueryServiceTest, ExecuteQuerySyntaxErrorIsInvalidArgument) {
     122            1 :   v1::QueryRequest req = MakeRequest("SELECT FROM");
     123            1 :   MessageCollector collector;
     124            1 :   ::grpc::Status status = StreamQueryResults(
     125            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     126            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
     127            2 :       << status.error_message();
     128            1 :   EXPECT_TRUE(collector.messages().empty());
     129            1 : }
     130              : 
     131            1 : TEST_F(QueryServiceTest, ExecuteQueryUseLegacySqlIsInvalidArgument) {
     132            1 :   v1::QueryRequest req = MakeRequest("SELECT 1");
     133            1 :   req.set_use_legacy_sql(true);
     134            1 :   MessageCollector collector;
     135            1 :   ::grpc::Status status = StreamQueryResults(
     136            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     137            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
     138            2 :       << status.error_message();
     139            1 :   EXPECT_TRUE(collector.messages().empty());
     140            1 : }
     141              : 
     142            1 : TEST_F(QueryServiceTest, ExecuteQueryMissingProjectIsInvalidArgument) {
     143            1 :   v1::QueryRequest req;
     144            1 :   req.set_sql("SELECT 1");
     145            1 :   MessageCollector collector;
     146            1 :   ::grpc::Status status = StreamQueryResults(
     147            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     148            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
     149            2 :       << status.error_message();
     150            1 : }
     151              : 
     152            1 : TEST_F(QueryServiceTest, ExecuteQueryEmptySqlIsInvalidArgument) {
     153            1 :   v1::QueryRequest req = MakeRequest("");
     154            1 :   MessageCollector collector;
     155            1 :   ::grpc::Status status = StreamQueryResults(
     156            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     157            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
     158            2 :       << status.error_message();
     159            1 : }
     160              : 
     161            1 : TEST(QueryServiceWithoutStorageTest, ExecuteQueryReturnsFailedPrecondition) {
     162            1 :   v1::QueryRequest req;
     163            1 :   req.set_project_id("proj-test");
     164            1 :   req.set_sql("SELECT 1");
     165            1 :   std::vector<v1::QueryResultRow> messages;
     166            1 :   ::grpc::Status status = StreamQueryResults(
     167            1 :       /*storage=*/nullptr, req, [&](const v1::QueryResultRow& m) {
     168            0 :         messages.push_back(m);
     169            0 :         return true;
     170            0 :       });
     171            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::FAILED_PRECONDITION)
     172            2 :       << status.error_message();
     173            1 :   EXPECT_TRUE(messages.empty());
     174            1 : }
     175              : 
     176            1 : TEST_F(QueryServiceTest, ExecuteQueryCancelledWriterReturnsCancelled) {
     177            1 :   v1::QueryRequest req = MakeRequest("SELECT 1 AS one");
     178            1 :   ::grpc::Status status = StreamQueryResults(
     179            1 :       storage_.get(),
     180            1 :       req,
     181            1 :       [](const v1::QueryResultRow&) { return false; },
     182            1 :       engine_.get());
     183            2 :   EXPECT_EQ(status.error_code(), ::grpc::StatusCode::CANCELLED)
     184            2 :       << status.error_message();
     185            1 : }
     186              : 
     187              : // ---------------------------------------------------------------------------
     188              : // Statement classification
     189              : //
     190              : // `StreamQueryResults` analyzes the statement once up front so it can
     191              : // pick the right engine entry point: SELECT keeps the existing
     192              : // schema+rows protocol, INSERT/UPDATE/DELETE/MERGE routes through
     193              : // ExecuteDml and emits a final dml_stats summary, and DDL is rejected
     194              : // with UNIMPLEMENTED until CREATE/DROP/ALTER lands.
     195              : // ---------------------------------------------------------------------------
     196              : 
     197            1 : TEST_F(QueryServiceTest, ExecuteQueryInsertEmitsDmlStats) {
     198            1 :   CreatePeopleTable();
     199            1 :   v1::QueryRequest req = MakeRequest(
     200            1 :       "INSERT INTO ds.t (id, name, tags) "
     201            1 :       "VALUES (1, 'ada', ['math']), (2, 'linus', [])");
     202            1 :   MessageCollector collector;
     203            1 :   ::grpc::Status status = StreamQueryResults(
     204            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     205            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     206              : 
     207              :   // DML response shape: dml_stats, phase_timings, then the trailing
     208              :   // statement_type + emulator_route pair.
     209            1 :   const auto& messages = collector.messages();
     210            1 :   ASSERT_EQ(messages.size(), 4u);
     211            1 :   EXPECT_FALSE(messages[0].has_schema());
     212            1 :   EXPECT_EQ(messages[0].cells_size(), 0);
     213            1 :   ASSERT_TRUE(messages[0].has_dml_stats());
     214            1 :   EXPECT_EQ(messages[0].dml_stats().inserted_row_count(), 2);
     215            1 :   EXPECT_EQ(messages[0].dml_stats().updated_row_count(), 0);
     216            1 :   EXPECT_EQ(messages[0].dml_stats().deleted_row_count(), 0);
     217            1 :   ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
     218            1 :   const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
     219            1 :   ASSERT_NE(stmt, nullptr);
     220            1 :   EXPECT_EQ(stmt->statement_type(), "INSERT");
     221              :   // INSERT against a user table routes through the semantic
     222              :   // executor's DML path (/ `docs/ENGINE_POLICY.md`).
     223            1 :   const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
     224            1 :   ASSERT_NE(route, nullptr);
     225            1 :   EXPECT_EQ(route->emulator_route(), "semantic_executor");
     226              : 
     227              :   // Storage round-trip: the rows actually landed.
     228            1 :   auto scan = storage_->ScanRows({"proj-test", "ds", "t"});
     229            2 :   ASSERT_TRUE(scan.ok()) << scan.status();
     230            1 :   int rows_seen = 0;
     231            1 :   backend::storage::Row row;
     232            3 :   while (true) {
     233            3 :     auto has = (*scan)->Next(&row);
     234            6 :     ASSERT_TRUE(has.ok()) << has.status();
     235            3 :     if (!*has) break;
     236            2 :     ++rows_seen;
     237            2 :   }
     238            1 :   EXPECT_EQ(rows_seen, 2);
     239            1 : }
     240              : 
     241            1 : TEST_F(QueryServiceTest, ExecuteQueryDdlEmitsStatementType) {
     242              :   // CREATE TABLE routes through the coordinator's `kControlOp`
     243              :   // route to `backend/engine/control/control_op_executor`. The
     244              :   // reply stream carries exactly two messages: the
     245              :   // `statement_type` trailer the gateway folds into
     246              :   // `Job.statistics.query.statementType` and the `emulator_route`
     247              :   // trailer the gateway folds into
     248              :   // `Job.statistics.query.emulatorRoute` (loopback-only). See
     249              :   // `docs/ENGINE_POLICY.md` Item 5 and
     250              :   // `docs/ENGINE_POLICY.md`.
     251            1 :   ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
     252            1 :   v1::QueryRequest req =
     253            1 :       MakeRequest("CREATE TABLE ds.new_table (id INT64, name STRING)");
     254            1 :   MessageCollector collector;
     255            1 :   ::grpc::Status status = StreamQueryResults(
     256            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     257            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     258            1 :   ASSERT_EQ(collector.messages().size(), 3u);
     259            1 :   ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
     260            1 :   const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
     261            1 :   ASSERT_NE(stmt, nullptr);
     262            1 :   EXPECT_EQ(stmt->statement_type(), "CREATE_TABLE");
     263            1 :   const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
     264            1 :   ASSERT_NE(route, nullptr);
     265            1 :   EXPECT_EQ(route->emulator_route(), "control_op");
     266              : 
     267            1 :   auto sch = storage_->GetSchema({"proj-test", "ds", "new_table"});
     268            2 :   ASSERT_TRUE(sch.ok()) << sch.status();
     269            1 :   ASSERT_EQ(sch->columns.size(), 2u);
     270            1 :   EXPECT_EQ(sch->columns[0].name, "id");
     271            1 :   EXPECT_EQ(sch->columns[1].name, "name");
     272            1 : }
     273              : 
     274            1 : TEST_F(QueryServiceTest, ExecuteQuerySqlUdfCreateThenCall) {
     275            1 :   const char* kCreate = R"(CREATE FUNCTION customfunc(
     276            1 :   arr ARRAY<STRUCT<name STRING, val INT64>>
     277            1 : ) AS (
     278            1 :   (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
     279            1 : ))";
     280            1 :   MessageCollector create_collector;
     281            1 :   ::grpc::Status create_status = StreamQueryResults(storage_.get(),
     282            1 :                                                     MakeRequest(kCreate),
     283            1 :                                                     create_collector.Writer(),
     284            1 :                                                     engine_.get());
     285            2 :   ASSERT_TRUE(create_status.ok()) << create_status.error_message();
     286              : 
     287            1 :   const char* kSelect = R"(SELECT customfunc([
     288            1 :   STRUCT<name STRING, val INT64>("foo", 10),
     289            1 :   STRUCT<name STRING, val INT64>("bar", 40),
     290            1 :   STRUCT<name STRING, val INT64>("foo", 20)
     291            1 : ]))";
     292            1 :   MessageCollector select_collector;
     293            1 :   ::grpc::Status select_status = StreamQueryResults(storage_.get(),
     294            1 :                                                     MakeRequest(kSelect),
     295            1 :                                                     select_collector.Writer(),
     296            1 :                                                     engine_.get());
     297            2 :   ASSERT_TRUE(select_status.ok()) << select_status.error_message();
     298            1 :   const auto& messages = select_collector.messages();
     299            1 :   ASSERT_GE(messages.size(), 2u);
     300            1 :   ASSERT_TRUE(messages[0].has_schema());
     301            1 :   ASSERT_FALSE(messages[1].has_schema());
     302            1 :   ASSERT_EQ(messages[1].cells_size(), 1);
     303            1 :   EXPECT_EQ(messages[1].cells(0).string_value(), "30");
     304            1 : }
     305              : 
     306            1 : TEST_F(QueryServiceTest, ExecuteQueryDeleteEmitsDmlStats) {
     307            1 :   CreatePeopleTable();
     308              :   // DELETE runs end-to-end against the DuckDB engine; the handler
     309              :   // streams a single dml_stats message with the matching
     310              :   // deletedRowCount.
     311            1 :   v1::QueryRequest req = MakeRequest("DELETE FROM ds.t WHERE FALSE");
     312            1 :   MessageCollector collector;
     313            1 :   ::grpc::Status status = StreamQueryResults(
     314            1 :       storage_.get(), req, collector.Writer(), engine_.get());
     315            2 :   ASSERT_TRUE(status.ok()) << status.error_message();
     316            1 :   const auto& messages = collector.messages();
     317              :   // dml_stats, phase_timings, then the trailing statement_type +
     318              :   // emulator_route pair.
     319            1 :   ASSERT_EQ(messages.size(), 4u);
     320            1 :   ASSERT_TRUE(messages[0].has_dml_stats());
     321            1 :   EXPECT_EQ(messages[0].dml_stats().deleted_row_count(), 0);
     322            1 :   ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
     323            1 :   const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
     324            1 :   ASSERT_NE(stmt, nullptr);
     325            1 :   EXPECT_EQ(stmt->statement_type(), "DELETE");
     326            1 :   const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
     327            1 :   ASSERT_NE(route, nullptr);
     328            1 :   EXPECT_EQ(route->emulator_route(), "semantic_executor");
     329            1 : }
     330              : 
     331              : }  // namespace
     332              : }  // namespace frontend
     333              : }  // namespace bigquery_emulator
        

Generated by: LCOV version 2.0-1