Line data Source code
1 : // CREATE TABLE AS SELECT tests for the control-op executor. Kept in a
2 : // separate file so `control_op_executor_test.cc` stays under the
3 : // cpp-lint file-length cap.
4 :
5 : #include <algorithm>
6 : #include <cstdint>
7 : #include <cstdlib>
8 : #include <filesystem>
9 : #include <memory>
10 : #include <random>
11 : #include <string>
12 : #include <system_error>
13 : #include <utility>
14 : #include <vector>
15 :
16 : #include "absl/status/status.h"
17 : #include "absl/strings/str_cat.h"
18 : #include "absl/strings/string_view.h"
19 : #include "backend/catalog/googlesql_catalog.h"
20 : #include "backend/engine/control/control_op_executor.h"
21 : #include "backend/engine/engine.h"
22 : #include "backend/schema/schema.h"
23 : #include "backend/storage/duckdb/duckdb_storage.h"
24 : #include "backend/storage/storage.h"
25 : #include "googlesql/public/analyzer.h"
26 : #include "googlesql/public/analyzer_options.h"
27 : #include "googlesql/public/analyzer_output.h"
28 : #include "googlesql/public/language_options.h"
29 : #include "googlesql/public/options.pb.h"
30 : #include "googlesql/public/types/type_factory.h"
31 : #include "googlesql/resolved_ast/resolved_ast.h"
32 : #include "gtest/gtest.h"
33 :
34 : namespace bigquery_emulator {
35 : namespace backend {
36 : namespace engine {
37 : namespace control {
38 : namespace {
39 :
40 : namespace fs = std::filesystem;
41 :
42 8 : ::googlesql::LanguageOptions MakeLanguageOptions() {
43 8 : ::googlesql::LanguageOptions language;
44 8 : language.EnableMaximumLanguageFeatures();
45 8 : language.set_product_mode(::googlesql::PRODUCT_EXTERNAL);
46 8 : language.set_name_resolution_mode(::googlesql::NAME_RESOLUTION_DEFAULT);
47 8 : return language;
48 8 : }
49 :
50 4 : ::googlesql::AnalyzerOptions MakeAnalyzerOptions() {
51 4 : ::googlesql::AnalyzerOptions options(MakeLanguageOptions());
52 4 : options.set_error_message_mode(::googlesql::ERROR_MESSAGE_ONE_LINE);
53 4 : options.set_attach_error_location_payload(true);
54 4 : options.CreateDefaultArenasIfNotSet();
55 4 : options.mutable_language()->SetSupportsAllStatementKinds();
56 4 : return options;
57 4 : }
58 :
59 : class ControlOpExecutorCtasTest : public ::testing::Test {
60 : protected:
61 4 : void SetUp() override {
62 4 : const char* tmpdir_env = std::getenv("TMPDIR");
63 4 : const std::string tmpdir = tmpdir_env != nullptr ? tmpdir_env : "/tmp";
64 4 : std::random_device rd;
65 4 : std::seed_seq seed{rd(), rd()};
66 4 : std::mt19937_64 rng(seed);
67 4 : data_dir_ =
68 4 : fs::path(tmpdir) / absl::StrCat("bqemu-control-op-ctas-test-", rng());
69 4 : std::error_code ec;
70 4 : fs::remove_all(data_dir_, ec);
71 4 : auto opened = storage::duckdb::DuckDBStorage::Open(data_dir_.string());
72 8 : ASSERT_TRUE(opened.ok()) << opened.status();
73 4 : storage_ = std::move(opened).value();
74 4 : executor_ = std::make_unique<ControlOpExecutor>(storage_.get());
75 4 : ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
76 4 : }
77 :
78 4 : void TearDown() override {
79 4 : executor_.reset();
80 4 : storage_.reset();
81 4 : std::error_code ec;
82 4 : fs::remove_all(data_dir_, ec);
83 4 : }
84 :
85 4 : QueryRequest MakeRequest(absl::string_view sql) {
86 4 : QueryRequest req;
87 4 : req.project_id = "proj-test";
88 4 : req.sql = std::string(sql);
89 4 : return req;
90 4 : }
91 :
92 1 : void CreatePeopleTable() {
93 1 : schema::TableSchema bq_schema;
94 1 : schema::ColumnSchema id;
95 1 : id.name = "id";
96 1 : id.type = schema::ColumnType::kInt64;
97 1 : id.mode = schema::ColumnMode::kRequired;
98 1 : bq_schema.columns.push_back(id);
99 1 : schema::ColumnSchema name;
100 1 : name.name = "name";
101 1 : name.type = schema::ColumnType::kString;
102 1 : name.mode = schema::ColumnMode::kNullable;
103 1 : bq_schema.columns.push_back(name);
104 1 : ASSERT_TRUE(
105 1 : storage_->CreateTable({"proj-test", "ds", "people"}, bq_schema).ok());
106 :
107 3 : auto make_row = [](int64_t id_val, std::string name_val) {
108 3 : storage::Row r;
109 3 : r.cells = {
110 3 : storage::Value::Int64(id_val),
111 3 : storage::Value::String(std::move(name_val)),
112 3 : };
113 3 : return r;
114 3 : };
115 1 : std::vector<storage::Row> rows = {
116 1 : make_row(1, "ada"),
117 1 : make_row(2, "linus"),
118 1 : make_row(3, "grace"),
119 1 : };
120 1 : ASSERT_TRUE(storage_
121 1 : ->AppendRows({"proj-test", "ds", "people"},
122 1 : absl::MakeConstSpan(rows))
123 1 : .ok());
124 1 : }
125 :
126 : struct CatalogBundle {
127 : std::unique_ptr<::googlesql::TypeFactory> type_factory{};
128 : std::unique_ptr<catalog::GoogleSqlCatalog> catalog{};
129 : };
130 4 : CatalogBundle MakeCatalog() {
131 4 : auto type_factory = std::make_unique<::googlesql::TypeFactory>();
132 4 : auto catalog = std::make_unique<catalog::GoogleSqlCatalog>(
133 4 : "proj-test", storage_.get(), type_factory.get(), MakeLanguageOptions());
134 4 : return {std::move(type_factory), std::move(catalog)};
135 4 : }
136 :
137 4 : absl::Status RunDdl(absl::string_view sql) {
138 4 : CatalogBundle bundle = MakeCatalog();
139 4 : ::googlesql::AnalyzerOptions options = MakeAnalyzerOptions();
140 4 : ::googlesql::TypeFactory type_factory;
141 4 : std::unique_ptr<const ::googlesql::AnalyzerOutput> output;
142 4 : absl::Status analyze = ::googlesql::AnalyzeStatement(
143 4 : sql, options, bundle.catalog.get(), &type_factory, &output);
144 4 : if (!analyze.ok()) return analyze;
145 4 : if (output == nullptr || output->resolved_statement() == nullptr) {
146 0 : return absl::InternalError(
147 0 : "ControlOpExecutorCtasTest::RunDdl: analyzer produced no resolved "
148 0 : "statement");
149 0 : }
150 4 : return executor_->ExecuteDdl(
151 4 : MakeRequest(sql), *output->resolved_statement(), bundle.catalog.get());
152 4 : }
153 :
154 : fs::path data_dir_{};
155 : std::unique_ptr<storage::duckdb::DuckDBStorage> storage_{};
156 : std::unique_ptr<ControlOpExecutor> executor_{};
157 : };
158 :
159 1 : TEST_F(ControlOpExecutorCtasTest, CreateTableAsSelectMaterializesSourceRows) {
160 1 : CreatePeopleTable();
161 1 : absl::Status s =
162 1 : RunDdl("CREATE TABLE ds.people_copy AS SELECT id, name FROM ds.people");
163 2 : ASSERT_TRUE(s.ok()) << s;
164 :
165 1 : auto schema = storage_->GetSchema({"proj-test", "ds", "people_copy"});
166 2 : ASSERT_TRUE(schema.ok()) << schema.status();
167 1 : ASSERT_EQ(schema->columns.size(), 2u);
168 :
169 1 : auto scan = storage_->ScanRows({"proj-test", "ds", "people_copy"});
170 2 : ASSERT_TRUE(scan.ok()) << scan.status();
171 1 : std::vector<std::pair<int64_t, std::string>> seen;
172 1 : storage::Row row;
173 4 : while (true) {
174 4 : auto has = (*scan)->Next(&row);
175 8 : ASSERT_TRUE(has.ok()) << has.status();
176 4 : if (!*has) break;
177 3 : ASSERT_EQ(row.cells.size(), 2u);
178 3 : seen.emplace_back(row.cells[0].int64_value(), row.cells[1].string_value());
179 3 : }
180 1 : std::sort(seen.begin(), seen.end());
181 1 : std::vector<std::pair<int64_t, std::string>> want = {
182 1 : {1, "ada"}, {2, "linus"}, {3, "grace"}};
183 1 : EXPECT_EQ(seen, want);
184 1 : }
185 :
186 : TEST_F(ControlOpExecutorCtasTest,
187 1 : CreateTableAsSelectUnnestNarrowsToOutputSchema) {
188 : // Regression for TestCopiesAndExtracts / generateTableCTAS: UNNEST
189 : // ordinality adds `__bq_input_rn` to the inner scan emit; without the
190 : // outer projection narrow the drained DuckDB table has one extra
191 : // column and `arrow_to_bq` rejects the schema mismatch.
192 1 : absl::Status s = RunDdl(
193 1 : "CREATE TABLE ds.unnest_copy AS "
194 1 : "SELECT 2000 + r AS year, IF(r > 1, 'foo', 'bar') AS token "
195 1 : "FROM UNNEST(GENERATE_ARRAY(0, 2)) AS r");
196 2 : ASSERT_TRUE(s.ok()) << s;
197 :
198 1 : auto schema = storage_->GetSchema({"proj-test", "ds", "unnest_copy"});
199 2 : ASSERT_TRUE(schema.ok()) << schema.status();
200 1 : ASSERT_EQ(schema->columns.size(), 2u);
201 1 : ASSERT_EQ(schema->columns[0].name, "year");
202 1 : ASSERT_EQ(schema->columns[1].name, "token");
203 :
204 1 : auto scan = storage_->ScanRows({"proj-test", "ds", "unnest_copy"});
205 2 : ASSERT_TRUE(scan.ok()) << scan.status();
206 1 : storage::Row row;
207 1 : int rows = 0;
208 4 : while (true) {
209 4 : auto has = (*scan)->Next(&row);
210 8 : ASSERT_TRUE(has.ok()) << has.status();
211 4 : if (!*has) break;
212 3 : ASSERT_EQ(row.cells.size(), 2u);
213 3 : ++rows;
214 3 : }
215 1 : EXPECT_EQ(rows, 3);
216 1 : }
217 :
218 1 : TEST_F(ControlOpExecutorCtasTest, CreateTableAsSelectCrossJoinUnnestSubquery) {
219 : // Bench heavy-case setup: CTAS over a subquery whose FROM is two
220 : // standalone UNNEST relations cross-joined (BigQuery's >1M-row
221 : // GENERATE_ARRAY pattern).
222 1 : absl::Status s = RunDdl(
223 1 : "CREATE TABLE ds.cross_join_ctas AS "
224 1 : "SELECT id, MOD(id, 7) AS k "
225 1 : "FROM ("
226 1 : " SELECT n + (m - 1) * 10 AS id "
227 1 : " FROM UNNEST(GENERATE_ARRAY(1, 3)) AS n "
228 1 : " CROSS JOIN UNNEST(GENERATE_ARRAY(1, 2)) AS m"
229 1 : ")");
230 2 : ASSERT_TRUE(s.ok()) << s;
231 :
232 1 : auto schema = storage_->GetSchema({"proj-test", "ds", "cross_join_ctas"});
233 2 : ASSERT_TRUE(schema.ok()) << schema.status();
234 1 : ASSERT_EQ(schema->columns.size(), 2u);
235 :
236 1 : auto scan = storage_->ScanRows({"proj-test", "ds", "cross_join_ctas"});
237 2 : ASSERT_TRUE(scan.ok()) << scan.status();
238 1 : int rows = 0;
239 1 : storage::Row row;
240 7 : while (true) {
241 7 : auto has = (*scan)->Next(&row);
242 14 : ASSERT_TRUE(has.ok()) << has.status();
243 7 : if (!*has) break;
244 6 : ASSERT_EQ(row.cells.size(), 2u);
245 6 : ++rows;
246 6 : }
247 1 : EXPECT_EQ(rows, 6);
248 1 : }
249 :
250 1 : TEST_F(ControlOpExecutorCtasTest, CreateTableAsSelectExceptRowNumberDedup) {
251 1 : schema::TableSchema src_schema;
252 1 : schema::ColumnSchema id;
253 1 : id.name = "id";
254 1 : id.type = schema::ColumnType::kString;
255 1 : id.mode = schema::ColumnMode::kRequired;
256 1 : src_schema.columns.push_back(id);
257 1 : schema::ColumnSchema tie;
258 1 : tie.name = "tie_break";
259 1 : tie.type = schema::ColumnType::kInt64;
260 1 : tie.mode = schema::ColumnMode::kRequired;
261 1 : src_schema.columns.push_back(tie);
262 1 : schema::ColumnSchema value;
263 1 : value.name = "value";
264 1 : value.type = schema::ColumnType::kInt64;
265 1 : value.mode = schema::ColumnMode::kRequired;
266 1 : src_schema.columns.push_back(value);
267 1 : ASSERT_TRUE(
268 1 : storage_->CreateTable({"proj-test", "ds", "merge_src"}, src_schema).ok());
269 :
270 1 : std::vector<storage::Row> seed = {
271 1 : storage::Row{{storage::Value::String("a"),
272 1 : storage::Value::Int64(1),
273 1 : storage::Value::Int64(10)}},
274 1 : storage::Row{{storage::Value::String("a"),
275 1 : storage::Value::Int64(2),
276 1 : storage::Value::Int64(20)}},
277 1 : storage::Row{{storage::Value::String("b"),
278 1 : storage::Value::Int64(1),
279 1 : storage::Value::Int64(30)}},
280 1 : };
281 1 : ASSERT_TRUE(storage_
282 1 : ->AppendRows({"proj-test", "ds", "merge_src"},
283 1 : absl::MakeConstSpan(seed))
284 1 : .ok());
285 :
286 1 : absl::Status s = RunDdl(
287 1 : "CREATE OR REPLACE TABLE ds.merge_deduped AS "
288 1 : "SELECT * EXCEPT(rn) FROM ("
289 1 : " SELECT *, ROW_NUMBER() OVER ("
290 1 : " PARTITION BY id ORDER BY tie_break DESC"
291 1 : " ) AS rn FROM ds.merge_src"
292 1 : ") WHERE rn = 1");
293 2 : ASSERT_TRUE(s.ok()) << s;
294 :
295 1 : auto schema = storage_->GetSchema({"proj-test", "ds", "merge_deduped"});
296 2 : ASSERT_TRUE(schema.ok()) << schema.status();
297 1 : ASSERT_EQ(schema->columns.size(), 3u);
298 :
299 1 : auto scan = storage_->ScanRows({"proj-test", "ds", "merge_deduped"});
300 2 : ASSERT_TRUE(scan.ok()) << scan.status();
301 1 : storage::Row row;
302 1 : int rows = 0;
303 3 : while (true) {
304 3 : auto has = (*scan)->Next(&row);
305 6 : ASSERT_TRUE(has.ok()) << has.status();
306 3 : if (!*has) break;
307 2 : ASSERT_EQ(row.cells.size(), 3u);
308 2 : ++rows;
309 2 : }
310 1 : EXPECT_EQ(rows, 2);
311 1 : }
312 :
313 : } // namespace
314 : } // namespace control
315 : } // namespace engine
316 : } // namespace backend
317 : } // namespace bigquery_emulator
|