LCOV - code coverage report
Current view: top level - backend/engine/control - control_op_executor_ctas_test.cc (source / functions) Coverage Total Hit
Test: _coverage_report.dat Lines: 98.3 % 229 225
Test Date: 2026-07-02 21:01:18 Functions: 100.0 % 13 13

            Line data    Source code
       1              : // CREATE TABLE AS SELECT tests for the control-op executor. Kept in a
       2              : // separate file so `control_op_executor_test.cc` stays under the
       3              : // cpp-lint file-length cap.
       4              : 
       5              : #include <algorithm>
       6              : #include <cstdint>
       7              : #include <cstdlib>
       8              : #include <filesystem>
       9              : #include <memory>
      10              : #include <random>
      11              : #include <string>
      12              : #include <system_error>
      13              : #include <utility>
      14              : #include <vector>
      15              : 
      16              : #include "absl/status/status.h"
      17              : #include "absl/strings/str_cat.h"
      18              : #include "absl/strings/string_view.h"
      19              : #include "backend/catalog/googlesql_catalog.h"
      20              : #include "backend/engine/control/control_op_executor.h"
      21              : #include "backend/engine/engine.h"
      22              : #include "backend/schema/schema.h"
      23              : #include "backend/storage/duckdb/duckdb_storage.h"
      24              : #include "backend/storage/storage.h"
      25              : #include "googlesql/public/analyzer.h"
      26              : #include "googlesql/public/analyzer_options.h"
      27              : #include "googlesql/public/analyzer_output.h"
      28              : #include "googlesql/public/language_options.h"
      29              : #include "googlesql/public/options.pb.h"
      30              : #include "googlesql/public/types/type_factory.h"
      31              : #include "googlesql/resolved_ast/resolved_ast.h"
      32              : #include "gtest/gtest.h"
      33              : 
      34              : namespace bigquery_emulator {
      35              : namespace backend {
      36              : namespace engine {
      37              : namespace control {
      38              : namespace {
      39              : 
      40              : namespace fs = std::filesystem;
      41              : 
      42            8 : ::googlesql::LanguageOptions MakeLanguageOptions() {
      43            8 :   ::googlesql::LanguageOptions language;
      44            8 :   language.EnableMaximumLanguageFeatures();
      45            8 :   language.set_product_mode(::googlesql::PRODUCT_EXTERNAL);
      46            8 :   language.set_name_resolution_mode(::googlesql::NAME_RESOLUTION_DEFAULT);
      47            8 :   return language;
      48            8 : }
      49              : 
      50            4 : ::googlesql::AnalyzerOptions MakeAnalyzerOptions() {
      51            4 :   ::googlesql::AnalyzerOptions options(MakeLanguageOptions());
      52            4 :   options.set_error_message_mode(::googlesql::ERROR_MESSAGE_ONE_LINE);
      53            4 :   options.set_attach_error_location_payload(true);
      54            4 :   options.CreateDefaultArenasIfNotSet();
      55            4 :   options.mutable_language()->SetSupportsAllStatementKinds();
      56            4 :   return options;
      57            4 : }
      58              : 
      59              : class ControlOpExecutorCtasTest : public ::testing::Test {
      60              :  protected:
      61            4 :   void SetUp() override {
      62            4 :     const char* tmpdir_env = std::getenv("TMPDIR");
      63            4 :     const std::string tmpdir = tmpdir_env != nullptr ? tmpdir_env : "/tmp";
      64            4 :     std::random_device rd;
      65            4 :     std::seed_seq seed{rd(), rd()};
      66            4 :     std::mt19937_64 rng(seed);
      67            4 :     data_dir_ =
      68            4 :         fs::path(tmpdir) / absl::StrCat("bqemu-control-op-ctas-test-", rng());
      69            4 :     std::error_code ec;
      70            4 :     fs::remove_all(data_dir_, ec);
      71            4 :     auto opened = storage::duckdb::DuckDBStorage::Open(data_dir_.string());
      72            8 :     ASSERT_TRUE(opened.ok()) << opened.status();
      73            4 :     storage_ = std::move(opened).value();
      74            4 :     executor_ = std::make_unique<ControlOpExecutor>(storage_.get());
      75            4 :     ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
      76            4 :   }
      77              : 
      78            4 :   void TearDown() override {
      79            4 :     executor_.reset();
      80            4 :     storage_.reset();
      81            4 :     std::error_code ec;
      82            4 :     fs::remove_all(data_dir_, ec);
      83            4 :   }
      84              : 
      85            4 :   QueryRequest MakeRequest(absl::string_view sql) {
      86            4 :     QueryRequest req;
      87            4 :     req.project_id = "proj-test";
      88            4 :     req.sql = std::string(sql);
      89            4 :     return req;
      90            4 :   }
      91              : 
      92            1 :   void CreatePeopleTable() {
      93            1 :     schema::TableSchema bq_schema;
      94            1 :     schema::ColumnSchema id;
      95            1 :     id.name = "id";
      96            1 :     id.type = schema::ColumnType::kInt64;
      97            1 :     id.mode = schema::ColumnMode::kRequired;
      98            1 :     bq_schema.columns.push_back(id);
      99            1 :     schema::ColumnSchema name;
     100            1 :     name.name = "name";
     101            1 :     name.type = schema::ColumnType::kString;
     102            1 :     name.mode = schema::ColumnMode::kNullable;
     103            1 :     bq_schema.columns.push_back(name);
     104            1 :     ASSERT_TRUE(
     105            1 :         storage_->CreateTable({"proj-test", "ds", "people"}, bq_schema).ok());
     106              : 
     107            3 :     auto make_row = [](int64_t id_val, std::string name_val) {
     108            3 :       storage::Row r;
     109            3 :       r.cells = {
     110            3 :           storage::Value::Int64(id_val),
     111            3 :           storage::Value::String(std::move(name_val)),
     112            3 :       };
     113            3 :       return r;
     114            3 :     };
     115            1 :     std::vector<storage::Row> rows = {
     116            1 :         make_row(1, "ada"),
     117            1 :         make_row(2, "linus"),
     118            1 :         make_row(3, "grace"),
     119            1 :     };
     120            1 :     ASSERT_TRUE(storage_
     121            1 :                     ->AppendRows({"proj-test", "ds", "people"},
     122            1 :                                  absl::MakeConstSpan(rows))
     123            1 :                     .ok());
     124            1 :   }
     125              : 
     126              :   struct CatalogBundle {
     127              :     std::unique_ptr<::googlesql::TypeFactory> type_factory{};
     128              :     std::unique_ptr<catalog::GoogleSqlCatalog> catalog{};
     129              :   };
     130            4 :   CatalogBundle MakeCatalog() {
     131            4 :     auto type_factory = std::make_unique<::googlesql::TypeFactory>();
     132            4 :     auto catalog = std::make_unique<catalog::GoogleSqlCatalog>(
     133            4 :         "proj-test", storage_.get(), type_factory.get(), MakeLanguageOptions());
     134            4 :     return {std::move(type_factory), std::move(catalog)};
     135            4 :   }
     136              : 
     137            4 :   absl::Status RunDdl(absl::string_view sql) {
     138            4 :     CatalogBundle bundle = MakeCatalog();
     139            4 :     ::googlesql::AnalyzerOptions options = MakeAnalyzerOptions();
     140            4 :     ::googlesql::TypeFactory type_factory;
     141            4 :     std::unique_ptr<const ::googlesql::AnalyzerOutput> output;
     142            4 :     absl::Status analyze = ::googlesql::AnalyzeStatement(
     143            4 :         sql, options, bundle.catalog.get(), &type_factory, &output);
     144            4 :     if (!analyze.ok()) return analyze;
     145            4 :     if (output == nullptr || output->resolved_statement() == nullptr) {
     146            0 :       return absl::InternalError(
     147            0 :           "ControlOpExecutorCtasTest::RunDdl: analyzer produced no resolved "
     148            0 :           "statement");
     149            0 :     }
     150            4 :     return executor_->ExecuteDdl(
     151            4 :         MakeRequest(sql), *output->resolved_statement(), bundle.catalog.get());
     152            4 :   }
     153              : 
     154              :   fs::path data_dir_{};
     155              :   std::unique_ptr<storage::duckdb::DuckDBStorage> storage_{};
     156              :   std::unique_ptr<ControlOpExecutor> executor_{};
     157              : };
     158              : 
     159            1 : TEST_F(ControlOpExecutorCtasTest, CreateTableAsSelectMaterializesSourceRows) {
     160            1 :   CreatePeopleTable();
     161            1 :   absl::Status s =
     162            1 :       RunDdl("CREATE TABLE ds.people_copy AS SELECT id, name FROM ds.people");
     163            2 :   ASSERT_TRUE(s.ok()) << s;
     164              : 
     165            1 :   auto schema = storage_->GetSchema({"proj-test", "ds", "people_copy"});
     166            2 :   ASSERT_TRUE(schema.ok()) << schema.status();
     167            1 :   ASSERT_EQ(schema->columns.size(), 2u);
     168              : 
     169            1 :   auto scan = storage_->ScanRows({"proj-test", "ds", "people_copy"});
     170            2 :   ASSERT_TRUE(scan.ok()) << scan.status();
     171            1 :   std::vector<std::pair<int64_t, std::string>> seen;
     172            1 :   storage::Row row;
     173            4 :   while (true) {
     174            4 :     auto has = (*scan)->Next(&row);
     175            8 :     ASSERT_TRUE(has.ok()) << has.status();
     176            4 :     if (!*has) break;
     177            3 :     ASSERT_EQ(row.cells.size(), 2u);
     178            3 :     seen.emplace_back(row.cells[0].int64_value(), row.cells[1].string_value());
     179            3 :   }
     180            1 :   std::sort(seen.begin(), seen.end());
     181            1 :   std::vector<std::pair<int64_t, std::string>> want = {
     182            1 :       {1, "ada"}, {2, "linus"}, {3, "grace"}};
     183            1 :   EXPECT_EQ(seen, want);
     184            1 : }
     185              : 
     186              : TEST_F(ControlOpExecutorCtasTest,
     187            1 :        CreateTableAsSelectUnnestNarrowsToOutputSchema) {
     188              :   // Regression for TestCopiesAndExtracts / generateTableCTAS: UNNEST
     189              :   // ordinality adds `__bq_input_rn` to the inner scan emit; without the
     190              :   // outer projection narrow the drained DuckDB table has one extra
     191              :   // column and `arrow_to_bq` rejects the schema mismatch.
     192            1 :   absl::Status s = RunDdl(
     193            1 :       "CREATE TABLE ds.unnest_copy AS "
     194            1 :       "SELECT 2000 + r AS year, IF(r > 1, 'foo', 'bar') AS token "
     195            1 :       "FROM UNNEST(GENERATE_ARRAY(0, 2)) AS r");
     196            2 :   ASSERT_TRUE(s.ok()) << s;
     197              : 
     198            1 :   auto schema = storage_->GetSchema({"proj-test", "ds", "unnest_copy"});
     199            2 :   ASSERT_TRUE(schema.ok()) << schema.status();
     200            1 :   ASSERT_EQ(schema->columns.size(), 2u);
     201            1 :   ASSERT_EQ(schema->columns[0].name, "year");
     202            1 :   ASSERT_EQ(schema->columns[1].name, "token");
     203              : 
     204            1 :   auto scan = storage_->ScanRows({"proj-test", "ds", "unnest_copy"});
     205            2 :   ASSERT_TRUE(scan.ok()) << scan.status();
     206            1 :   storage::Row row;
     207            1 :   int rows = 0;
     208            4 :   while (true) {
     209            4 :     auto has = (*scan)->Next(&row);
     210            8 :     ASSERT_TRUE(has.ok()) << has.status();
     211            4 :     if (!*has) break;
     212            3 :     ASSERT_EQ(row.cells.size(), 2u);
     213            3 :     ++rows;
     214            3 :   }
     215            1 :   EXPECT_EQ(rows, 3);
     216            1 : }
     217              : 
     218            1 : TEST_F(ControlOpExecutorCtasTest, CreateTableAsSelectCrossJoinUnnestSubquery) {
     219              :   // Bench heavy-case setup: CTAS over a subquery whose FROM is two
     220              :   // standalone UNNEST relations cross-joined (BigQuery's >1M-row
     221              :   // GENERATE_ARRAY pattern).
     222            1 :   absl::Status s = RunDdl(
     223            1 :       "CREATE TABLE ds.cross_join_ctas AS "
     224            1 :       "SELECT id, MOD(id, 7) AS k "
     225            1 :       "FROM ("
     226            1 :       "  SELECT n + (m - 1) * 10 AS id "
     227            1 :       "  FROM UNNEST(GENERATE_ARRAY(1, 3)) AS n "
     228            1 :       "  CROSS JOIN UNNEST(GENERATE_ARRAY(1, 2)) AS m"
     229            1 :       ")");
     230            2 :   ASSERT_TRUE(s.ok()) << s;
     231              : 
     232            1 :   auto schema = storage_->GetSchema({"proj-test", "ds", "cross_join_ctas"});
     233            2 :   ASSERT_TRUE(schema.ok()) << schema.status();
     234            1 :   ASSERT_EQ(schema->columns.size(), 2u);
     235              : 
     236            1 :   auto scan = storage_->ScanRows({"proj-test", "ds", "cross_join_ctas"});
     237            2 :   ASSERT_TRUE(scan.ok()) << scan.status();
     238            1 :   int rows = 0;
     239            1 :   storage::Row row;
     240            7 :   while (true) {
     241            7 :     auto has = (*scan)->Next(&row);
     242           14 :     ASSERT_TRUE(has.ok()) << has.status();
     243            7 :     if (!*has) break;
     244            6 :     ASSERT_EQ(row.cells.size(), 2u);
     245            6 :     ++rows;
     246            6 :   }
     247            1 :   EXPECT_EQ(rows, 6);
     248            1 : }
     249              : 
     250            1 : TEST_F(ControlOpExecutorCtasTest, CreateTableAsSelectExceptRowNumberDedup) {
     251            1 :   schema::TableSchema src_schema;
     252            1 :   schema::ColumnSchema id;
     253            1 :   id.name = "id";
     254            1 :   id.type = schema::ColumnType::kString;
     255            1 :   id.mode = schema::ColumnMode::kRequired;
     256            1 :   src_schema.columns.push_back(id);
     257            1 :   schema::ColumnSchema tie;
     258            1 :   tie.name = "tie_break";
     259            1 :   tie.type = schema::ColumnType::kInt64;
     260            1 :   tie.mode = schema::ColumnMode::kRequired;
     261            1 :   src_schema.columns.push_back(tie);
     262            1 :   schema::ColumnSchema value;
     263            1 :   value.name = "value";
     264            1 :   value.type = schema::ColumnType::kInt64;
     265            1 :   value.mode = schema::ColumnMode::kRequired;
     266            1 :   src_schema.columns.push_back(value);
     267            1 :   ASSERT_TRUE(
     268            1 :       storage_->CreateTable({"proj-test", "ds", "merge_src"}, src_schema).ok());
     269              : 
     270            1 :   std::vector<storage::Row> seed = {
     271            1 :       storage::Row{{storage::Value::String("a"),
     272            1 :                     storage::Value::Int64(1),
     273            1 :                     storage::Value::Int64(10)}},
     274            1 :       storage::Row{{storage::Value::String("a"),
     275            1 :                     storage::Value::Int64(2),
     276            1 :                     storage::Value::Int64(20)}},
     277            1 :       storage::Row{{storage::Value::String("b"),
     278            1 :                     storage::Value::Int64(1),
     279            1 :                     storage::Value::Int64(30)}},
     280            1 :   };
     281            1 :   ASSERT_TRUE(storage_
     282            1 :                   ->AppendRows({"proj-test", "ds", "merge_src"},
     283            1 :                                absl::MakeConstSpan(seed))
     284            1 :                   .ok());
     285              : 
     286            1 :   absl::Status s = RunDdl(
     287            1 :       "CREATE OR REPLACE TABLE ds.merge_deduped AS "
     288            1 :       "SELECT * EXCEPT(rn) FROM ("
     289            1 :       "  SELECT *, ROW_NUMBER() OVER ("
     290            1 :       "    PARTITION BY id ORDER BY tie_break DESC"
     291            1 :       "  ) AS rn FROM ds.merge_src"
     292            1 :       ") WHERE rn = 1");
     293            2 :   ASSERT_TRUE(s.ok()) << s;
     294              : 
     295            1 :   auto schema = storage_->GetSchema({"proj-test", "ds", "merge_deduped"});
     296            2 :   ASSERT_TRUE(schema.ok()) << schema.status();
     297            1 :   ASSERT_EQ(schema->columns.size(), 3u);
     298              : 
     299            1 :   auto scan = storage_->ScanRows({"proj-test", "ds", "merge_deduped"});
     300            2 :   ASSERT_TRUE(scan.ok()) << scan.status();
     301            1 :   storage::Row row;
     302            1 :   int rows = 0;
     303            3 :   while (true) {
     304            3 :     auto has = (*scan)->Next(&row);
     305            6 :     ASSERT_TRUE(has.ok()) << has.status();
     306            3 :     if (!*has) break;
     307            2 :     ASSERT_EQ(row.cells.size(), 3u);
     308            2 :     ++rows;
     309            2 :   }
     310            1 :   EXPECT_EQ(rows, 2);
     311            1 : }
     312              : 
     313              : }  // namespace
     314              : }  // namespace control
     315              : }  // namespace engine
     316              : }  // namespace backend
     317              : }  // namespace bigquery_emulator
        

Generated by: LCOV version 2.0-1