Line data Source code
1 : // Unit tests for the route-typed DuckDB executor. These exercise
2 : // the contract the future `LocalCoordinatorEngine` relies on: each
3 : // method takes an already-analyzed `ResolvedStatement` and produces
4 : // the same wire-facing reply the legacy `DuckDBEngine` shim used
5 : // to produce before `docs/ENGINE_POLICY.md` deleted it
6 : // in favor of `LocalCoordinatorEngine`. We
7 : // rebuild the analyzer up-front (the way the coordinator will) and
8 : // hand the resolved root straight to the executor, so the executor's
9 : // pre-resolution validation, transpiler invocation, DuckDB
10 : // per-query connection, and Arrow result-row path are all on the
11 : // critical path.
12 :
13 : #include "backend/engine/duckdb/duckdb_executor.h"
14 :
15 : #include <algorithm>
16 : #include <memory>
17 : #include <string>
18 : #include <utility>
19 : #include <vector>
20 :
21 : #include "absl/status/status.h"
22 : #include "absl/status/statusor.h"
23 : #include "absl/strings/string_view.h"
24 : #include "backend/engine/duckdb/duckdb_executor_test_fixture.h"
25 : #include "backend/engine/duckdb/transpiler/transpiler.h"
26 : #include "backend/schema/schema.h"
27 : #include "backend/storage/storage.h"
28 : #include "googlesql/resolved_ast/resolved_ast.h"
29 : #include "gtest/gtest.h"
30 :
31 : namespace bigquery_emulator {
32 : namespace backend {
33 : namespace engine {
34 : namespace duckdb {
35 : namespace {
36 :
37 1 : TEST_F(DuckDbExecutorTest, ExecuteQuerySelectStarFromTableStreamsAllRows) {
38 : // The executor's smoke test: hand it a fully-analyzed
39 : // `SELECT * FROM ds.people` and confirm the wire output matches
40 : // the canonical three rows we seeded into storage. Covers the
41 : // analyze-then-execute split that the coordinator will rely on.
42 1 : CreatePeopleTable();
43 1 : CatalogBundle bundle = MakeCatalog();
44 1 : auto analyzed = Analyze("SELECT * FROM ds.people",
45 1 : bundle.catalog.get(),
46 1 : /*all_statements=*/false);
47 2 : ASSERT_TRUE(analyzed.ok()) << analyzed.status();
48 1 : const ::googlesql::ResolvedStatement* stmt =
49 1 : (*analyzed)->resolved_statement();
50 1 : ASSERT_NE(stmt, nullptr);
51 :
52 1 : absl::StatusOr<std::unique_ptr<RowSource>> source = executor_->ExecuteQuery(
53 1 : MakeRequest("SELECT * FROM ds.people"), *stmt, bundle.catalog.get());
54 2 : ASSERT_TRUE(source.ok()) << source.status();
55 :
56 1 : const schema::TableSchema& s = (*source)->schema();
57 1 : ASSERT_EQ(s.columns.size(), 2u);
58 1 : EXPECT_EQ(s.columns[0].name, "id");
59 1 : EXPECT_EQ(s.columns[0].type, schema::ColumnType::kInt64);
60 1 : EXPECT_EQ(s.columns[1].name, "name");
61 1 : EXPECT_EQ(s.columns[1].type, schema::ColumnType::kString);
62 :
63 : // DuckDB does not promise a stable row order without ORDER BY.
64 1 : std::vector<std::pair<int64_t, std::string>> seen;
65 1 : storage::Row row;
66 4 : while (true) {
67 4 : auto has = (*source)->Next(&row);
68 8 : ASSERT_TRUE(has.ok()) << has.status();
69 4 : if (!*has) break;
70 3 : ASSERT_EQ(row.cells.size(), 2u);
71 3 : ASSERT_EQ(row.cells[0].kind(), storage::Value::Kind::kInt64);
72 3 : ASSERT_EQ(row.cells[1].kind(), storage::Value::Kind::kString);
73 3 : seen.emplace_back(row.cells[0].int64_value(), row.cells[1].string_value());
74 3 : }
75 1 : std::vector<std::pair<int64_t, std::string>> want = {
76 1 : {1, "ada"}, {2, "linus"}, {3, "grace"}};
77 1 : std::sort(seen.begin(), seen.end());
78 1 : std::sort(want.begin(), want.end());
79 1 : EXPECT_EQ(seen, want);
80 1 : }
81 :
82 : TEST_F(DuckDbExecutorTest,
83 1 : ExecuteQueryNarrowsColumnsWhenAnalyzerSchemaIsSubsetOfTable) {
84 : // Regression: the legacy executor stripped the wrapping pass-through
85 : // ProjectScan and handed the bare TableScan to the transpiler, which
86 : // emitted SELECT for *all* of the table's columns. With
87 : // `prune_unused_columns=false` (the analyzer setting both
88 : // `LocalCoordinatorEngine` and the legacy DuckDBEngine use), the
89 : // TableScan retains every storage column even when the user-spelled
90 : // SELECT only asks for a subset; the result chunk would then arrive
91 : // with one extra Arrow column that `arrow_to_bq::ChunkRowToCells`
92 : // refused to render against the analyzer-output schema:
93 : //
94 : // arrow_to_bq: chunk has 3 columns but analyzer output schema has 2
95 : //
96 : // The fix: hand `EmitQueryStmt` the QueryStmt itself; the outermost
97 : // SELECT projects only the analyzer-output columns. Pin a 3-column
98 : // table source against a 2-column projection here so a regression
99 : // (e.g. someone re-introducing the strip-and-bypass shortcut) fails
100 : // at the executor unit-test level instead of the
101 : // `frontend/handlers/query_test.cc` integration suite.
102 1 : schema::TableSchema bq_schema;
103 1 : schema::ColumnSchema id;
104 1 : id.name = "id";
105 1 : id.type = schema::ColumnType::kInt64;
106 1 : id.mode = schema::ColumnMode::kRequired;
107 1 : bq_schema.columns.push_back(id);
108 1 : schema::ColumnSchema name;
109 1 : name.name = "name";
110 1 : name.type = schema::ColumnType::kString;
111 1 : name.mode = schema::ColumnMode::kNullable;
112 1 : bq_schema.columns.push_back(name);
113 1 : schema::ColumnSchema tags;
114 1 : tags.name = "tags";
115 1 : tags.type = schema::ColumnType::kString;
116 1 : tags.mode = schema::ColumnMode::kRepeated;
117 1 : bq_schema.columns.push_back(tags);
118 1 : ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
119 1 : ASSERT_TRUE(
120 1 : storage_->CreateTable({"proj-test", "ds", "wide"}, bq_schema).ok());
121 1 : std::vector<storage::Row> rows;
122 3 : auto append = [&](int64_t v_id, std::string v_name) {
123 3 : storage::Row r;
124 3 : r.cells = {
125 3 : storage::Value::Int64(v_id),
126 3 : storage::Value::String(std::move(v_name)),
127 3 : storage::Value::Array({}),
128 3 : };
129 3 : rows.push_back(std::move(r));
130 3 : };
131 1 : append(1, "ada");
132 1 : append(2, "linus");
133 1 : append(3, "grace");
134 1 : ASSERT_TRUE(
135 1 : storage_
136 1 : ->AppendRows({"proj-test", "ds", "wide"}, absl::MakeConstSpan(rows))
137 1 : .ok());
138 :
139 1 : CatalogBundle bundle = MakeCatalog();
140 1 : auto analyzed = Analyze("SELECT id, name FROM ds.wide ORDER BY id",
141 1 : bundle.catalog.get(),
142 1 : /*all_statements=*/false);
143 2 : ASSERT_TRUE(analyzed.ok()) << analyzed.status();
144 1 : const ::googlesql::ResolvedStatement* stmt =
145 1 : (*analyzed)->resolved_statement();
146 1 : ASSERT_NE(stmt, nullptr);
147 :
148 1 : absl::StatusOr<std::unique_ptr<RowSource>> source = executor_->ExecuteQuery(
149 1 : MakeRequest("SELECT id, name FROM ds.wide ORDER BY id"),
150 1 : *stmt,
151 1 : bundle.catalog.get());
152 2 : ASSERT_TRUE(source.ok()) << source.status();
153 :
154 1 : const schema::TableSchema& s = (*source)->schema();
155 1 : ASSERT_EQ(s.columns.size(), 2u);
156 1 : EXPECT_EQ(s.columns[0].name, "id");
157 1 : EXPECT_EQ(s.columns[1].name, "name");
158 :
159 1 : std::vector<std::pair<int64_t, std::string>> seen;
160 1 : storage::Row row;
161 4 : while (true) {
162 4 : auto has = (*source)->Next(&row);
163 8 : ASSERT_TRUE(has.ok()) << has.status();
164 4 : if (!*has) break;
165 3 : ASSERT_EQ(row.cells.size(), 2u);
166 3 : seen.emplace_back(row.cells[0].int64_value(), row.cells[1].string_value());
167 3 : }
168 1 : std::vector<std::pair<int64_t, std::string>> want = {
169 1 : {1, "ada"}, {2, "linus"}, {3, "grace"}};
170 1 : EXPECT_EQ(seen, want);
171 1 : }
172 :
173 1 : TEST_F(DuckDbExecutorTest, ExecuteQueryRejectsNonQueryStatement) {
174 : // The coordinator is supposed to dispatch DDL through the control-op
175 : // route, not through the DuckDB executor; defensively the executor
176 : // returns INVALID_ARGUMENT (not UNIMPLEMENTED) when fed a non-query
177 : // statement on its query surface so a routing bug surfaces loudly
178 : // instead of looking like a transpiler gap.
179 1 : CatalogBundle bundle = MakeCatalog();
180 1 : auto analyzed = Analyze("CREATE TABLE ds.t (id INT64)",
181 1 : bundle.catalog.get(),
182 1 : /*all_statements=*/true);
183 2 : ASSERT_TRUE(analyzed.ok()) << analyzed.status();
184 1 : const ::googlesql::ResolvedStatement* stmt =
185 1 : (*analyzed)->resolved_statement();
186 1 : ASSERT_NE(stmt, nullptr);
187 :
188 1 : auto source = executor_->ExecuteQuery(
189 1 : MakeRequest("CREATE TABLE ds.t (id INT64)"), *stmt, bundle.catalog.get());
190 1 : ASSERT_FALSE(source.ok());
191 2 : EXPECT_EQ(source.status().code(), absl::StatusCode::kInvalidArgument)
192 2 : << source.status();
193 1 : }
194 :
195 1 : TEST_F(DuckDbExecutorTest, ExecuteBigframesCacheJoinShape) {
196 : // Regression for bigframes `cache()` join SQL: output schema column
197 : // `bfuid_col_4` must stay INT64 (amount=3), not pick up STRING
198 : // `column_0` ('John') from a misaligned DuckDB chunk.
199 1 : static constexpr char kSql[] = R"sql(
200 1 : SELECT `level_0`, `column_0`, `bfuid_col_2`, `bfuid_col_4`, `column_1`, `bfuid_col_14` AS `bfuid_col_15`, `bfuid_col_10` AS `bfuid_col_16`, `bfuid_col_13` AS `bfuid_col_17`, `bfuid_col_9` AS `bfuid_col_18`, `bfuid_col_11` AS `bfuid_col_19` FROM (SELECT
201 1 : `t11`.`level_0`, `t11`.`column_0`, `t11`.`bfuid_col_2`, `t11`.`bfuid_col_9`, `t11`.`bfuid_col_10`, `t11`.`bfuid_col_11`,
202 1 : `t6`.`bfuid_col_3`, `t6`.`bfuid_col_4`, `t6`.`column_1`, `t6`.`bfuid_col_13`, `t6`.`bfuid_col_14`
203 1 : FROM (
204 1 : SELECT * FROM (
205 1 : SELECT `t7`.`level_0`, `t7`.`column_0`, `t8`.`bfuid_col_2`, `t7`.`bfuid_col_5` AS `bfuid_col_9`, `t8`.`bfuid_col_7` AS `bfuid_col_10`, `t8`.`bfuid_col_8` AS `bfuid_col_11`
206 1 : FROM (SELECT * FROM (SELECT * FROM UNNEST(ARRAY<STRUCT<`level_0` INT64, `column_0` STRING, `bfuid_col_5` INT64>>[STRUCT(0, 'John', 0)]) AS `level_0`) AS `t1`) AS `t7`
207 1 : LEFT OUTER JOIN (
208 1 : SELECT `t2`.`level_0` AS `bfuid_col_1`, `t2`.`column_0` AS `bfuid_col_2`, `t2`.`bfuid_col_6` AS `bfuid_col_7`, TRUE AS `bfuid_col_8`
209 1 : FROM (SELECT * FROM UNNEST(ARRAY<STRUCT<`level_0` INT64, `column_0` STRING, `bfuid_col_6` INT64>>[STRUCT(0, 'group_1', 0)]) AS `level_0`) AS `t2`
210 1 : ) AS `t8` ON COALESCE(`t7`.`level_0`, 0) = COALESCE(`t8`.`bfuid_col_1`, 0) AND COALESCE(`t7`.`level_0`, 1) = COALESCE(`t8`.`bfuid_col_1`, 1)
211 1 : ) AS `t9`
212 1 : ) AS `t11`
213 1 : LEFT OUTER JOIN (
214 1 : SELECT `t0`.`level_0` AS `bfuid_col_3`, `t0`.`column_0` AS `bfuid_col_4`, `t0`.`column_1`, `t0`.`bfuid_col_12` AS `bfuid_col_13`, TRUE AS `bfuid_col_14`
215 1 : FROM (SELECT * FROM UNNEST(ARRAY<STRUCT<`level_0` INT64, `column_0` INT64, `column_1` BOOLEAN, `bfuid_col_12` INT64>>[STRUCT(0, 3, TRUE, 0)]) AS `level_0`) AS `t0`
216 1 : ) AS `t6` ON COALESCE(`t11`.`level_0`, 0) = COALESCE(`t6`.`bfuid_col_3`, 0) AND COALESCE(`t11`.`level_0`, 1) = COALESCE(`t6`.`bfuid_col_3`, 1)) AS `t`
217 1 : )sql";
218 1 : const std::string sql(kSql);
219 1 : CatalogBundle bundle = MakeCatalog();
220 1 : auto analyzed = Analyze(sql, bundle.catalog.get(), /*all_statements=*/false);
221 2 : ASSERT_TRUE(analyzed.ok()) << analyzed.status();
222 1 : const ::googlesql::ResolvedStatement* stmt =
223 1 : (*analyzed)->resolved_statement();
224 1 : ASSERT_NE(stmt, nullptr);
225 :
226 1 : const auto* query_stmt = stmt->GetAs<::googlesql::ResolvedQueryStmt>();
227 1 : ASSERT_NE(query_stmt, nullptr);
228 1 : transpiler::Transpiler transpiler;
229 1 : std::string transpiled = transpiler.Transpile(query_stmt);
230 2 : ASSERT_FALSE(transpiled.empty()) << "bigframes cache join must transpile";
231 :
232 1 : absl::StatusOr<std::unique_ptr<RowSource>> source =
233 1 : executor_->ExecuteQuery(MakeRequest(sql), *stmt, bundle.catalog.get());
234 2 : ASSERT_TRUE(source.ok()) << source.status();
235 :
236 1 : const schema::TableSchema& out_schema = (*source)->schema();
237 1 : ASSERT_EQ(out_schema.columns.size(), 10u);
238 1 : int bfuid_col_4_idx = -1;
239 4 : for (size_t i = 0; i < out_schema.columns.size(); ++i) {
240 4 : if (out_schema.columns[i].name == "bfuid_col_4") {
241 1 : bfuid_col_4_idx = static_cast<int>(i);
242 1 : break;
243 1 : }
244 4 : }
245 2 : ASSERT_EQ(bfuid_col_4_idx, 3) << "bfuid_col_4 schema position drift";
246 :
247 1 : storage::Row row;
248 1 : auto has = (*source)->Next(&row);
249 2 : ASSERT_TRUE(has.ok()) << has.status() << "\nSQL:\n" << transpiled;
250 1 : ASSERT_TRUE(*has);
251 1 : ASSERT_EQ(row.cells.size(), 10u);
252 1 : EXPECT_EQ(row.cells[static_cast<size_t>(bfuid_col_4_idx)].int64_value(), 3);
253 1 : EXPECT_EQ(row.cells[1].string_value(), "John");
254 1 : }
255 :
256 1 : TEST_F(DuckDbExecutorTest, ExecuteDmlInsertSelectQualifyDedupesRows) {
257 1 : schema::TableSchema src_schema;
258 1 : schema::ColumnSchema id;
259 1 : id.name = "id";
260 1 : id.type = schema::ColumnType::kString;
261 1 : id.mode = schema::ColumnMode::kRequired;
262 1 : src_schema.columns.push_back(id);
263 1 : schema::ColumnSchema tie;
264 1 : tie.name = "tie_break";
265 1 : tie.type = schema::ColumnType::kInt64;
266 1 : tie.mode = schema::ColumnMode::kRequired;
267 1 : src_schema.columns.push_back(tie);
268 1 : schema::ColumnSchema value;
269 1 : value.name = "value";
270 1 : value.type = schema::ColumnType::kInt64;
271 1 : value.mode = schema::ColumnMode::kRequired;
272 1 : src_schema.columns.push_back(value);
273 1 : ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
274 1 : ASSERT_TRUE(
275 1 : storage_->CreateTable({"proj-test", "ds", "ins_src"}, src_schema).ok());
276 1 : schema::TableSchema dst_schema = src_schema;
277 1 : ASSERT_TRUE(
278 1 : storage_->CreateTable({"proj-test", "ds", "ins_dst"}, dst_schema).ok());
279 :
280 1 : std::vector<storage::Row> seed = {
281 1 : storage::Row{{storage::Value::String("a"),
282 1 : storage::Value::Int64(1),
283 1 : storage::Value::Int64(10)}},
284 1 : storage::Row{{storage::Value::String("a"),
285 1 : storage::Value::Int64(2),
286 1 : storage::Value::Int64(20)}},
287 1 : storage::Row{{storage::Value::String("b"),
288 1 : storage::Value::Int64(1),
289 1 : storage::Value::Int64(30)}},
290 1 : };
291 1 : ASSERT_TRUE(storage_
292 1 : ->AppendRows({"proj-test", "ds", "ins_src"},
293 1 : absl::MakeConstSpan(seed))
294 1 : .ok());
295 :
296 1 : const std::string sql =
297 1 : "INSERT INTO ds.ins_dst (id, tie_break, value) "
298 1 : "SELECT * FROM ds.ins_src "
299 1 : "QUALIFY ROW_NUMBER() OVER ("
300 1 : " PARTITION BY id ORDER BY tie_break DESC"
301 1 : ") = 1";
302 1 : CatalogBundle bundle = MakeCatalog();
303 1 : auto analyzed = Analyze(sql, bundle.catalog.get(), /*all_statements=*/true);
304 2 : ASSERT_TRUE(analyzed.ok()) << analyzed.status();
305 1 : const ::googlesql::ResolvedStatement* stmt =
306 1 : (*analyzed)->resolved_statement();
307 1 : ASSERT_NE(stmt, nullptr);
308 :
309 1 : auto result =
310 1 : executor_->ExecuteDml(MakeRequest(sql), *stmt, bundle.catalog.get());
311 2 : ASSERT_TRUE(result.ok()) << result.status();
312 1 : EXPECT_EQ(result->stats.inserted_row_count, 2);
313 :
314 1 : auto scan = storage_->ScanRows({"proj-test", "ds", "ins_dst"});
315 2 : ASSERT_TRUE(scan.ok()) << scan.status();
316 1 : storage::Row row;
317 1 : int rows = 0;
318 3 : while (true) {
319 3 : auto has = (*scan)->Next(&row);
320 6 : ASSERT_TRUE(has.ok()) << has.status();
321 3 : if (!*has) break;
322 2 : ASSERT_EQ(row.cells.size(), 3u);
323 2 : ++rows;
324 2 : }
325 1 : EXPECT_EQ(rows, 2);
326 1 : }
327 :
328 : } // namespace
329 : } // namespace duckdb
330 : } // namespace engine
331 : } // namespace backend
332 : } // namespace bigquery_emulator
|