LCOV - code coverage report
Current view: top level - backend/engine/duckdb - duckdb_executor_test.cc (source / functions) Coverage Total Hit
Test: _coverage_report.dat Lines: 100.0 % 238 238
Test Date: 2026-07-02 21:01:18 Functions: 100.0 % 6 6

            Line data    Source code
       1              : // Unit tests for the route-typed DuckDB executor. These exercise
       2              : // the contract the future `LocalCoordinatorEngine` relies on: each
       3              : // method takes an already-analyzed `ResolvedStatement` and produces
       4              : // the same wire-facing reply the legacy `DuckDBEngine` shim used
       5              : // to produce before `docs/ENGINE_POLICY.md` deleted it
       6              : // in favor of `LocalCoordinatorEngine`. We
       7              : // rebuild the analyzer up-front (the way the coordinator will) and
       8              : // hand the resolved root straight to the executor, so the executor's
       9              : // pre-resolution validation, transpiler invocation, DuckDB
      10              : // per-query connection, and Arrow result-row path are all on the
      11              : // critical path.
      12              : 
      13              : #include "backend/engine/duckdb/duckdb_executor.h"
      14              : 
      15              : #include <algorithm>
      16              : #include <memory>
      17              : #include <string>
      18              : #include <utility>
      19              : #include <vector>
      20              : 
      21              : #include "absl/status/status.h"
      22              : #include "absl/status/statusor.h"
      23              : #include "absl/strings/string_view.h"
      24              : #include "backend/engine/duckdb/duckdb_executor_test_fixture.h"
      25              : #include "backend/engine/duckdb/transpiler/transpiler.h"
      26              : #include "backend/schema/schema.h"
      27              : #include "backend/storage/storage.h"
      28              : #include "googlesql/resolved_ast/resolved_ast.h"
      29              : #include "gtest/gtest.h"
      30              : 
      31              : namespace bigquery_emulator {
      32              : namespace backend {
      33              : namespace engine {
      34              : namespace duckdb {
      35              : namespace {
      36              : 
      37            1 : TEST_F(DuckDbExecutorTest, ExecuteQuerySelectStarFromTableStreamsAllRows) {
      38              :   // The executor's smoke test: hand it a fully-analyzed
      39              :   // `SELECT * FROM ds.people` and confirm the wire output matches
      40              :   // the canonical three rows we seeded into storage. Covers the
      41              :   // analyze-then-execute split that the coordinator will rely on.
      42            1 :   CreatePeopleTable();
      43            1 :   CatalogBundle bundle = MakeCatalog();
      44            1 :   auto analyzed = Analyze("SELECT * FROM ds.people",
      45            1 :                           bundle.catalog.get(),
      46            1 :                           /*all_statements=*/false);
      47            2 :   ASSERT_TRUE(analyzed.ok()) << analyzed.status();
      48            1 :   const ::googlesql::ResolvedStatement* stmt =
      49            1 :       (*analyzed)->resolved_statement();
      50            1 :   ASSERT_NE(stmt, nullptr);
      51              : 
      52            1 :   absl::StatusOr<std::unique_ptr<RowSource>> source = executor_->ExecuteQuery(
      53            1 :       MakeRequest("SELECT * FROM ds.people"), *stmt, bundle.catalog.get());
      54            2 :   ASSERT_TRUE(source.ok()) << source.status();
      55              : 
      56            1 :   const schema::TableSchema& s = (*source)->schema();
      57            1 :   ASSERT_EQ(s.columns.size(), 2u);
      58            1 :   EXPECT_EQ(s.columns[0].name, "id");
      59            1 :   EXPECT_EQ(s.columns[0].type, schema::ColumnType::kInt64);
      60            1 :   EXPECT_EQ(s.columns[1].name, "name");
      61            1 :   EXPECT_EQ(s.columns[1].type, schema::ColumnType::kString);
      62              : 
      63              :   // DuckDB does not promise a stable row order without ORDER BY.
      64            1 :   std::vector<std::pair<int64_t, std::string>> seen;
      65            1 :   storage::Row row;
      66            4 :   while (true) {
      67            4 :     auto has = (*source)->Next(&row);
      68            8 :     ASSERT_TRUE(has.ok()) << has.status();
      69            4 :     if (!*has) break;
      70            3 :     ASSERT_EQ(row.cells.size(), 2u);
      71            3 :     ASSERT_EQ(row.cells[0].kind(), storage::Value::Kind::kInt64);
      72            3 :     ASSERT_EQ(row.cells[1].kind(), storage::Value::Kind::kString);
      73            3 :     seen.emplace_back(row.cells[0].int64_value(), row.cells[1].string_value());
      74            3 :   }
      75            1 :   std::vector<std::pair<int64_t, std::string>> want = {
      76            1 :       {1, "ada"}, {2, "linus"}, {3, "grace"}};
      77            1 :   std::sort(seen.begin(), seen.end());
      78            1 :   std::sort(want.begin(), want.end());
      79            1 :   EXPECT_EQ(seen, want);
      80            1 : }
      81              : 
      82              : TEST_F(DuckDbExecutorTest,
      83            1 :        ExecuteQueryNarrowsColumnsWhenAnalyzerSchemaIsSubsetOfTable) {
      84              :   // Regression: the legacy executor stripped the wrapping pass-through
      85              :   // ProjectScan and handed the bare TableScan to the transpiler, which
      86              :   // emitted SELECT for *all* of the table's columns. With
      87              :   // `prune_unused_columns=false` (the analyzer setting both
      88              :   // `LocalCoordinatorEngine` and the legacy DuckDBEngine use), the
      89              :   // TableScan retains every storage column even when the user-spelled
      90              :   // SELECT only asks for a subset; the result chunk would then arrive
      91              :   // with one extra Arrow column that `arrow_to_bq::ChunkRowToCells`
      92              :   // refused to render against the analyzer-output schema:
      93              :   //
      94              :   //   arrow_to_bq: chunk has 3 columns but analyzer output schema has 2
      95              :   //
      96              :   // The fix: hand `EmitQueryStmt` the QueryStmt itself; the outermost
      97              :   // SELECT projects only the analyzer-output columns. Pin a 3-column
      98              :   // table source against a 2-column projection here so a regression
      99              :   // (e.g. someone re-introducing the strip-and-bypass shortcut) fails
     100              :   // at the executor unit-test level instead of the
     101              :   // `frontend/handlers/query_test.cc` integration suite.
     102            1 :   schema::TableSchema bq_schema;
     103            1 :   schema::ColumnSchema id;
     104            1 :   id.name = "id";
     105            1 :   id.type = schema::ColumnType::kInt64;
     106            1 :   id.mode = schema::ColumnMode::kRequired;
     107            1 :   bq_schema.columns.push_back(id);
     108            1 :   schema::ColumnSchema name;
     109            1 :   name.name = "name";
     110            1 :   name.type = schema::ColumnType::kString;
     111            1 :   name.mode = schema::ColumnMode::kNullable;
     112            1 :   bq_schema.columns.push_back(name);
     113            1 :   schema::ColumnSchema tags;
     114            1 :   tags.name = "tags";
     115            1 :   tags.type = schema::ColumnType::kString;
     116            1 :   tags.mode = schema::ColumnMode::kRepeated;
     117            1 :   bq_schema.columns.push_back(tags);
     118            1 :   ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
     119            1 :   ASSERT_TRUE(
     120            1 :       storage_->CreateTable({"proj-test", "ds", "wide"}, bq_schema).ok());
     121            1 :   std::vector<storage::Row> rows;
     122            3 :   auto append = [&](int64_t v_id, std::string v_name) {
     123            3 :     storage::Row r;
     124            3 :     r.cells = {
     125            3 :         storage::Value::Int64(v_id),
     126            3 :         storage::Value::String(std::move(v_name)),
     127            3 :         storage::Value::Array({}),
     128            3 :     };
     129            3 :     rows.push_back(std::move(r));
     130            3 :   };
     131            1 :   append(1, "ada");
     132            1 :   append(2, "linus");
     133            1 :   append(3, "grace");
     134            1 :   ASSERT_TRUE(
     135            1 :       storage_
     136            1 :           ->AppendRows({"proj-test", "ds", "wide"}, absl::MakeConstSpan(rows))
     137            1 :           .ok());
     138              : 
     139            1 :   CatalogBundle bundle = MakeCatalog();
     140            1 :   auto analyzed = Analyze("SELECT id, name FROM ds.wide ORDER BY id",
     141            1 :                           bundle.catalog.get(),
     142            1 :                           /*all_statements=*/false);
     143            2 :   ASSERT_TRUE(analyzed.ok()) << analyzed.status();
     144            1 :   const ::googlesql::ResolvedStatement* stmt =
     145            1 :       (*analyzed)->resolved_statement();
     146            1 :   ASSERT_NE(stmt, nullptr);
     147              : 
     148            1 :   absl::StatusOr<std::unique_ptr<RowSource>> source = executor_->ExecuteQuery(
     149            1 :       MakeRequest("SELECT id, name FROM ds.wide ORDER BY id"),
     150            1 :       *stmt,
     151            1 :       bundle.catalog.get());
     152            2 :   ASSERT_TRUE(source.ok()) << source.status();
     153              : 
     154            1 :   const schema::TableSchema& s = (*source)->schema();
     155            1 :   ASSERT_EQ(s.columns.size(), 2u);
     156            1 :   EXPECT_EQ(s.columns[0].name, "id");
     157            1 :   EXPECT_EQ(s.columns[1].name, "name");
     158              : 
     159            1 :   std::vector<std::pair<int64_t, std::string>> seen;
     160            1 :   storage::Row row;
     161            4 :   while (true) {
     162            4 :     auto has = (*source)->Next(&row);
     163            8 :     ASSERT_TRUE(has.ok()) << has.status();
     164            4 :     if (!*has) break;
     165            3 :     ASSERT_EQ(row.cells.size(), 2u);
     166            3 :     seen.emplace_back(row.cells[0].int64_value(), row.cells[1].string_value());
     167            3 :   }
     168            1 :   std::vector<std::pair<int64_t, std::string>> want = {
     169            1 :       {1, "ada"}, {2, "linus"}, {3, "grace"}};
     170            1 :   EXPECT_EQ(seen, want);
     171            1 : }
     172              : 
     173            1 : TEST_F(DuckDbExecutorTest, ExecuteQueryRejectsNonQueryStatement) {
     174              :   // The coordinator is supposed to dispatch DDL through the control-op
     175              :   // route, not through the DuckDB executor; defensively the executor
     176              :   // returns INVALID_ARGUMENT (not UNIMPLEMENTED) when fed a non-query
     177              :   // statement on its query surface so a routing bug surfaces loudly
     178              :   // instead of looking like a transpiler gap.
     179            1 :   CatalogBundle bundle = MakeCatalog();
     180            1 :   auto analyzed = Analyze("CREATE TABLE ds.t (id INT64)",
     181            1 :                           bundle.catalog.get(),
     182            1 :                           /*all_statements=*/true);
     183            2 :   ASSERT_TRUE(analyzed.ok()) << analyzed.status();
     184            1 :   const ::googlesql::ResolvedStatement* stmt =
     185            1 :       (*analyzed)->resolved_statement();
     186            1 :   ASSERT_NE(stmt, nullptr);
     187              : 
     188            1 :   auto source = executor_->ExecuteQuery(
     189            1 :       MakeRequest("CREATE TABLE ds.t (id INT64)"), *stmt, bundle.catalog.get());
     190            1 :   ASSERT_FALSE(source.ok());
     191            2 :   EXPECT_EQ(source.status().code(), absl::StatusCode::kInvalidArgument)
     192            2 :       << source.status();
     193            1 : }
     194              : 
     195            1 : TEST_F(DuckDbExecutorTest, ExecuteBigframesCacheJoinShape) {
     196              :   // Regression for bigframes `cache()` join SQL: output schema column
     197              :   // `bfuid_col_4` must stay INT64 (amount=3), not pick up STRING
     198              :   // `column_0` ('John') from a misaligned DuckDB chunk.
     199            1 :   static constexpr char kSql[] = R"sql(
     200            1 : SELECT `level_0`, `column_0`, `bfuid_col_2`, `bfuid_col_4`, `column_1`, `bfuid_col_14` AS `bfuid_col_15`, `bfuid_col_10` AS `bfuid_col_16`, `bfuid_col_13` AS `bfuid_col_17`, `bfuid_col_9` AS `bfuid_col_18`, `bfuid_col_11` AS `bfuid_col_19` FROM (SELECT
     201            1 :   `t11`.`level_0`, `t11`.`column_0`, `t11`.`bfuid_col_2`, `t11`.`bfuid_col_9`, `t11`.`bfuid_col_10`, `t11`.`bfuid_col_11`,
     202            1 :   `t6`.`bfuid_col_3`, `t6`.`bfuid_col_4`, `t6`.`column_1`, `t6`.`bfuid_col_13`, `t6`.`bfuid_col_14`
     203            1 : FROM (
     204            1 :   SELECT * FROM (
     205            1 :     SELECT `t7`.`level_0`, `t7`.`column_0`, `t8`.`bfuid_col_2`, `t7`.`bfuid_col_5` AS `bfuid_col_9`, `t8`.`bfuid_col_7` AS `bfuid_col_10`, `t8`.`bfuid_col_8` AS `bfuid_col_11`
     206            1 :     FROM (SELECT * FROM (SELECT * FROM UNNEST(ARRAY<STRUCT<`level_0` INT64, `column_0` STRING, `bfuid_col_5` INT64>>[STRUCT(0, 'John', 0)]) AS `level_0`) AS `t1`) AS `t7`
     207            1 :     LEFT OUTER JOIN (
     208            1 :       SELECT `t2`.`level_0` AS `bfuid_col_1`, `t2`.`column_0` AS `bfuid_col_2`, `t2`.`bfuid_col_6` AS `bfuid_col_7`, TRUE AS `bfuid_col_8`
     209            1 :       FROM (SELECT * FROM UNNEST(ARRAY<STRUCT<`level_0` INT64, `column_0` STRING, `bfuid_col_6` INT64>>[STRUCT(0, 'group_1', 0)]) AS `level_0`) AS `t2`
     210            1 :     ) AS `t8` ON COALESCE(`t7`.`level_0`, 0) = COALESCE(`t8`.`bfuid_col_1`, 0) AND COALESCE(`t7`.`level_0`, 1) = COALESCE(`t8`.`bfuid_col_1`, 1)
     211            1 :   ) AS `t9`
     212            1 : ) AS `t11`
     213            1 : LEFT OUTER JOIN (
     214            1 :   SELECT `t0`.`level_0` AS `bfuid_col_3`, `t0`.`column_0` AS `bfuid_col_4`, `t0`.`column_1`, `t0`.`bfuid_col_12` AS `bfuid_col_13`, TRUE AS `bfuid_col_14`
     215            1 :   FROM (SELECT * FROM UNNEST(ARRAY<STRUCT<`level_0` INT64, `column_0` INT64, `column_1` BOOLEAN, `bfuid_col_12` INT64>>[STRUCT(0, 3, TRUE, 0)]) AS `level_0`) AS `t0`
     216            1 : ) AS `t6` ON COALESCE(`t11`.`level_0`, 0) = COALESCE(`t6`.`bfuid_col_3`, 0) AND COALESCE(`t11`.`level_0`, 1) = COALESCE(`t6`.`bfuid_col_3`, 1)) AS `t`
     217            1 : )sql";
     218            1 :   const std::string sql(kSql);
     219            1 :   CatalogBundle bundle = MakeCatalog();
     220            1 :   auto analyzed = Analyze(sql, bundle.catalog.get(), /*all_statements=*/false);
     221            2 :   ASSERT_TRUE(analyzed.ok()) << analyzed.status();
     222            1 :   const ::googlesql::ResolvedStatement* stmt =
     223            1 :       (*analyzed)->resolved_statement();
     224            1 :   ASSERT_NE(stmt, nullptr);
     225              : 
     226            1 :   const auto* query_stmt = stmt->GetAs<::googlesql::ResolvedQueryStmt>();
     227            1 :   ASSERT_NE(query_stmt, nullptr);
     228            1 :   transpiler::Transpiler transpiler;
     229            1 :   std::string transpiled = transpiler.Transpile(query_stmt);
     230            2 :   ASSERT_FALSE(transpiled.empty()) << "bigframes cache join must transpile";
     231              : 
     232            1 :   absl::StatusOr<std::unique_ptr<RowSource>> source =
     233            1 :       executor_->ExecuteQuery(MakeRequest(sql), *stmt, bundle.catalog.get());
     234            2 :   ASSERT_TRUE(source.ok()) << source.status();
     235              : 
     236            1 :   const schema::TableSchema& out_schema = (*source)->schema();
     237            1 :   ASSERT_EQ(out_schema.columns.size(), 10u);
     238            1 :   int bfuid_col_4_idx = -1;
     239            4 :   for (size_t i = 0; i < out_schema.columns.size(); ++i) {
     240            4 :     if (out_schema.columns[i].name == "bfuid_col_4") {
     241            1 :       bfuid_col_4_idx = static_cast<int>(i);
     242            1 :       break;
     243            1 :     }
     244            4 :   }
     245            2 :   ASSERT_EQ(bfuid_col_4_idx, 3) << "bfuid_col_4 schema position drift";
     246              : 
     247            1 :   storage::Row row;
     248            1 :   auto has = (*source)->Next(&row);
     249            2 :   ASSERT_TRUE(has.ok()) << has.status() << "\nSQL:\n" << transpiled;
     250            1 :   ASSERT_TRUE(*has);
     251            1 :   ASSERT_EQ(row.cells.size(), 10u);
     252            1 :   EXPECT_EQ(row.cells[static_cast<size_t>(bfuid_col_4_idx)].int64_value(), 3);
     253            1 :   EXPECT_EQ(row.cells[1].string_value(), "John");
     254            1 : }
     255              : 
     256            1 : TEST_F(DuckDbExecutorTest, ExecuteDmlInsertSelectQualifyDedupesRows) {
     257            1 :   schema::TableSchema src_schema;
     258            1 :   schema::ColumnSchema id;
     259            1 :   id.name = "id";
     260            1 :   id.type = schema::ColumnType::kString;
     261            1 :   id.mode = schema::ColumnMode::kRequired;
     262            1 :   src_schema.columns.push_back(id);
     263            1 :   schema::ColumnSchema tie;
     264            1 :   tie.name = "tie_break";
     265            1 :   tie.type = schema::ColumnType::kInt64;
     266            1 :   tie.mode = schema::ColumnMode::kRequired;
     267            1 :   src_schema.columns.push_back(tie);
     268            1 :   schema::ColumnSchema value;
     269            1 :   value.name = "value";
     270            1 :   value.type = schema::ColumnType::kInt64;
     271            1 :   value.mode = schema::ColumnMode::kRequired;
     272            1 :   src_schema.columns.push_back(value);
     273            1 :   ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
     274            1 :   ASSERT_TRUE(
     275            1 :       storage_->CreateTable({"proj-test", "ds", "ins_src"}, src_schema).ok());
     276            1 :   schema::TableSchema dst_schema = src_schema;
     277            1 :   ASSERT_TRUE(
     278            1 :       storage_->CreateTable({"proj-test", "ds", "ins_dst"}, dst_schema).ok());
     279              : 
     280            1 :   std::vector<storage::Row> seed = {
     281            1 :       storage::Row{{storage::Value::String("a"),
     282            1 :                     storage::Value::Int64(1),
     283            1 :                     storage::Value::Int64(10)}},
     284            1 :       storage::Row{{storage::Value::String("a"),
     285            1 :                     storage::Value::Int64(2),
     286            1 :                     storage::Value::Int64(20)}},
     287            1 :       storage::Row{{storage::Value::String("b"),
     288            1 :                     storage::Value::Int64(1),
     289            1 :                     storage::Value::Int64(30)}},
     290            1 :   };
     291            1 :   ASSERT_TRUE(storage_
     292            1 :                   ->AppendRows({"proj-test", "ds", "ins_src"},
     293            1 :                                absl::MakeConstSpan(seed))
     294            1 :                   .ok());
     295              : 
     296            1 :   const std::string sql =
     297            1 :       "INSERT INTO ds.ins_dst (id, tie_break, value) "
     298            1 :       "SELECT * FROM ds.ins_src "
     299            1 :       "QUALIFY ROW_NUMBER() OVER ("
     300            1 :       "  PARTITION BY id ORDER BY tie_break DESC"
     301            1 :       ") = 1";
     302            1 :   CatalogBundle bundle = MakeCatalog();
     303            1 :   auto analyzed = Analyze(sql, bundle.catalog.get(), /*all_statements=*/true);
     304            2 :   ASSERT_TRUE(analyzed.ok()) << analyzed.status();
     305            1 :   const ::googlesql::ResolvedStatement* stmt =
     306            1 :       (*analyzed)->resolved_statement();
     307            1 :   ASSERT_NE(stmt, nullptr);
     308              : 
     309            1 :   auto result =
     310            1 :       executor_->ExecuteDml(MakeRequest(sql), *stmt, bundle.catalog.get());
     311            2 :   ASSERT_TRUE(result.ok()) << result.status();
     312            1 :   EXPECT_EQ(result->stats.inserted_row_count, 2);
     313              : 
     314            1 :   auto scan = storage_->ScanRows({"proj-test", "ds", "ins_dst"});
     315            2 :   ASSERT_TRUE(scan.ok()) << scan.status();
     316            1 :   storage::Row row;
     317            1 :   int rows = 0;
     318            3 :   while (true) {
     319            3 :     auto has = (*scan)->Next(&row);
     320            6 :     ASSERT_TRUE(has.ok()) << has.status();
     321            3 :     if (!*has) break;
     322            2 :     ASSERT_EQ(row.cells.size(), 3u);
     323            2 :     ++rows;
     324            2 :   }
     325            1 :   EXPECT_EQ(rows, 2);
     326            1 : }
     327              : 
     328              : }  // namespace
     329              : }  // namespace duckdb
     330              : }  // namespace engine
     331              : }  // namespace backend
     332              : }  // namespace bigquery_emulator
        

Generated by: LCOV version 2.0-1