Line data Source code
1 : // StreamQueryResults / ExecuteQuery tests for `QueryService`.
2 :
3 : #include "frontend/handlers/query_test_fixture.h"
4 :
5 : namespace bigquery_emulator {
6 : namespace frontend {
7 : namespace {
8 1 : TEST_F(QueryServiceTest, ExecuteQuerySelect1StreamsSchemaThenRow) {
9 1 : v1::QueryRequest req = MakeRequest("SELECT 1 AS one");
10 1 : MessageCollector collector;
11 1 : ::grpc::Status status = StreamQueryResults(
12 1 : storage_.get(), req, collector.Writer(), engine_.get());
13 2 : ASSERT_TRUE(status.ok()) << status.error_message();
14 :
15 1 : const auto& messages = collector.messages();
16 : // Five messages: schema, one row, phase_timings, then the trailing
17 : // `statement_type` + `emulator_route` pair the gateway folds into
18 : // `Job.statistics.query.{statementType,emulatorRoute}`. See
19 : // `docs/ENGINE_POLICY.md` Item 5 and
20 : // `docs/ENGINE_POLICY.md`.
21 1 : ASSERT_EQ(messages.size(), 5u);
22 :
23 1 : EXPECT_TRUE(messages[0].has_schema());
24 1 : EXPECT_EQ(messages[0].cells_size(), 0);
25 1 : ASSERT_EQ(messages[0].schema().fields_size(), 1);
26 1 : EXPECT_EQ(messages[0].schema().fields(0).name(), "one");
27 1 : EXPECT_EQ(messages[0].schema().fields(0).type(), "INT64");
28 :
29 1 : EXPECT_FALSE(messages[1].has_schema());
30 1 : ASSERT_EQ(messages[1].cells_size(), 1);
31 1 : EXPECT_EQ(messages[1].cells(0).string_value(), "1");
32 :
33 1 : ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
34 1 : const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
35 1 : ASSERT_NE(stmt, nullptr);
36 1 : EXPECT_EQ(stmt->statement_type(), "SELECT");
37 : // `SELECT 1` has no FROM clause -> semantic executor (see
38 : // `docs/ENGINE_POLICY.md`).
39 1 : const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
40 1 : ASSERT_NE(route, nullptr);
41 1 : EXPECT_EQ(route->emulator_route(), "semantic_executor");
42 1 : }
43 :
44 1 : TEST_F(QueryServiceTest, ExecuteQuerySelectFromTableStreamsAllRows) {
45 1 : CreatePeopleTable();
46 1 : std::vector<backend::storage::Row> rows;
47 3 : auto append = [&](int64_t id, std::string name) {
48 3 : backend::storage::Row r;
49 3 : r.cells = {
50 3 : backend::storage::Value::Int64(id),
51 3 : backend::storage::Value::String(std::move(name)),
52 3 : backend::storage::Value::Array({}),
53 3 : };
54 3 : rows.push_back(std::move(r));
55 3 : };
56 1 : append(1, "ada");
57 1 : append(2, "linus");
58 1 : append(3, "grace");
59 1 : ASSERT_TRUE(
60 1 : storage_->AppendRows({"proj-test", "ds", "t"}, absl::MakeConstSpan(rows))
61 1 : .ok());
62 :
63 1 : v1::QueryRequest req = MakeRequest("SELECT id, name FROM ds.t ORDER BY id");
64 1 : MessageCollector collector;
65 1 : ::grpc::Status status = StreamQueryResults(
66 1 : storage_.get(), req, collector.Writer(), engine_.get());
67 2 : ASSERT_TRUE(status.ok()) << status.error_message();
68 :
69 1 : const auto& messages = collector.messages();
70 : // Seven messages: schema, three rows, phase_timings, plus the
71 : // trailing `statement_type` + `emulator_route` pair.
72 1 : ASSERT_EQ(messages.size(), 7u);
73 :
74 1 : ASSERT_TRUE(messages[0].has_schema());
75 1 : ASSERT_EQ(messages[0].schema().fields_size(), 2);
76 1 : EXPECT_EQ(messages[0].schema().fields(0).name(), "id");
77 1 : EXPECT_EQ(messages[0].schema().fields(0).type(), "INT64");
78 1 : EXPECT_EQ(messages[0].schema().fields(1).name(), "name");
79 1 : EXPECT_EQ(messages[0].schema().fields(1).type(), "STRING");
80 :
81 1 : ASSERT_EQ(messages[1].cells_size(), 2);
82 1 : EXPECT_EQ(messages[1].cells(0).string_value(), "1");
83 1 : EXPECT_EQ(messages[1].cells(1).string_value(), "ada");
84 1 : ASSERT_EQ(messages[2].cells_size(), 2);
85 1 : EXPECT_EQ(messages[2].cells(0).string_value(), "2");
86 1 : EXPECT_EQ(messages[2].cells(1).string_value(), "linus");
87 1 : ASSERT_EQ(messages[3].cells_size(), 2);
88 1 : EXPECT_EQ(messages[3].cells(0).string_value(), "3");
89 1 : EXPECT_EQ(messages[3].cells(1).string_value(), "grace");
90 :
91 1 : ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
92 1 : const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
93 1 : ASSERT_NE(stmt, nullptr);
94 1 : EXPECT_EQ(stmt->statement_type(), "SELECT");
95 1 : const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
96 1 : ASSERT_NE(route, nullptr);
97 1 : EXPECT_EQ(route->emulator_route(), "duckdb_native");
98 1 : }
99 :
100 1 : TEST_F(QueryServiceTest, ExecuteQueryEmptyTableEmitsSchemaThenStatementType) {
101 1 : CreatePeopleTable();
102 1 : v1::QueryRequest req = MakeRequest("SELECT id, name FROM ds.t");
103 1 : MessageCollector collector;
104 1 : ::grpc::Status status = StreamQueryResults(
105 1 : storage_.get(), req, collector.Writer(), engine_.get());
106 2 : ASSERT_TRUE(status.ok()) << status.error_message();
107 : // Four messages: the schema (always emitted, even for zero-row
108 : // results), phase_timings, and the trailing `statement_type` +
109 : // `emulator_route` pair.
110 1 : ASSERT_EQ(collector.messages().size(), 4u);
111 1 : EXPECT_TRUE(collector.messages()[0].has_schema());
112 1 : ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
113 1 : const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
114 1 : ASSERT_NE(stmt, nullptr);
115 1 : EXPECT_EQ(stmt->statement_type(), "SELECT");
116 1 : const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
117 1 : ASSERT_NE(route, nullptr);
118 1 : EXPECT_EQ(route->emulator_route(), "duckdb_native");
119 1 : }
120 :
121 1 : TEST_F(QueryServiceTest, ExecuteQuerySyntaxErrorIsInvalidArgument) {
122 1 : v1::QueryRequest req = MakeRequest("SELECT FROM");
123 1 : MessageCollector collector;
124 1 : ::grpc::Status status = StreamQueryResults(
125 1 : storage_.get(), req, collector.Writer(), engine_.get());
126 2 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
127 2 : << status.error_message();
128 1 : EXPECT_TRUE(collector.messages().empty());
129 1 : }
130 :
131 1 : TEST_F(QueryServiceTest, ExecuteQueryUseLegacySqlIsInvalidArgument) {
132 1 : v1::QueryRequest req = MakeRequest("SELECT 1");
133 1 : req.set_use_legacy_sql(true);
134 1 : MessageCollector collector;
135 1 : ::grpc::Status status = StreamQueryResults(
136 1 : storage_.get(), req, collector.Writer(), engine_.get());
137 2 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
138 2 : << status.error_message();
139 1 : EXPECT_TRUE(collector.messages().empty());
140 1 : }
141 :
142 1 : TEST_F(QueryServiceTest, ExecuteQueryMissingProjectIsInvalidArgument) {
143 1 : v1::QueryRequest req;
144 1 : req.set_sql("SELECT 1");
145 1 : MessageCollector collector;
146 1 : ::grpc::Status status = StreamQueryResults(
147 1 : storage_.get(), req, collector.Writer(), engine_.get());
148 2 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
149 2 : << status.error_message();
150 1 : }
151 :
152 1 : TEST_F(QueryServiceTest, ExecuteQueryEmptySqlIsInvalidArgument) {
153 1 : v1::QueryRequest req = MakeRequest("");
154 1 : MessageCollector collector;
155 1 : ::grpc::Status status = StreamQueryResults(
156 1 : storage_.get(), req, collector.Writer(), engine_.get());
157 2 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::INVALID_ARGUMENT)
158 2 : << status.error_message();
159 1 : }
160 :
161 1 : TEST(QueryServiceWithoutStorageTest, ExecuteQueryReturnsFailedPrecondition) {
162 1 : v1::QueryRequest req;
163 1 : req.set_project_id("proj-test");
164 1 : req.set_sql("SELECT 1");
165 1 : std::vector<v1::QueryResultRow> messages;
166 1 : ::grpc::Status status = StreamQueryResults(
167 1 : /*storage=*/nullptr, req, [&](const v1::QueryResultRow& m) {
168 0 : messages.push_back(m);
169 0 : return true;
170 0 : });
171 2 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::FAILED_PRECONDITION)
172 2 : << status.error_message();
173 1 : EXPECT_TRUE(messages.empty());
174 1 : }
175 :
176 1 : TEST_F(QueryServiceTest, ExecuteQueryCancelledWriterReturnsCancelled) {
177 1 : v1::QueryRequest req = MakeRequest("SELECT 1 AS one");
178 1 : ::grpc::Status status = StreamQueryResults(
179 1 : storage_.get(),
180 1 : req,
181 1 : [](const v1::QueryResultRow&) { return false; },
182 1 : engine_.get());
183 2 : EXPECT_EQ(status.error_code(), ::grpc::StatusCode::CANCELLED)
184 2 : << status.error_message();
185 1 : }
186 :
187 : // ---------------------------------------------------------------------------
188 : // Statement classification
189 : //
190 : // `StreamQueryResults` analyzes the statement once up front so it can
191 : // pick the right engine entry point: SELECT keeps the existing
192 : // schema+rows protocol, INSERT/UPDATE/DELETE/MERGE routes through
193 : // ExecuteDml and emits a final dml_stats summary, and DDL is rejected
194 : // with UNIMPLEMENTED until CREATE/DROP/ALTER lands.
195 : // ---------------------------------------------------------------------------
196 :
197 1 : TEST_F(QueryServiceTest, ExecuteQueryInsertEmitsDmlStats) {
198 1 : CreatePeopleTable();
199 1 : v1::QueryRequest req = MakeRequest(
200 1 : "INSERT INTO ds.t (id, name, tags) "
201 1 : "VALUES (1, 'ada', ['math']), (2, 'linus', [])");
202 1 : MessageCollector collector;
203 1 : ::grpc::Status status = StreamQueryResults(
204 1 : storage_.get(), req, collector.Writer(), engine_.get());
205 2 : ASSERT_TRUE(status.ok()) << status.error_message();
206 :
207 : // DML response shape: dml_stats, phase_timings, then the trailing
208 : // statement_type + emulator_route pair.
209 1 : const auto& messages = collector.messages();
210 1 : ASSERT_EQ(messages.size(), 4u);
211 1 : EXPECT_FALSE(messages[0].has_schema());
212 1 : EXPECT_EQ(messages[0].cells_size(), 0);
213 1 : ASSERT_TRUE(messages[0].has_dml_stats());
214 1 : EXPECT_EQ(messages[0].dml_stats().inserted_row_count(), 2);
215 1 : EXPECT_EQ(messages[0].dml_stats().updated_row_count(), 0);
216 1 : EXPECT_EQ(messages[0].dml_stats().deleted_row_count(), 0);
217 1 : ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
218 1 : const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
219 1 : ASSERT_NE(stmt, nullptr);
220 1 : EXPECT_EQ(stmt->statement_type(), "INSERT");
221 : // INSERT against a user table routes through the semantic
222 : // executor's DML path (/ `docs/ENGINE_POLICY.md`).
223 1 : const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
224 1 : ASSERT_NE(route, nullptr);
225 1 : EXPECT_EQ(route->emulator_route(), "semantic_executor");
226 :
227 : // Storage round-trip: the rows actually landed.
228 1 : auto scan = storage_->ScanRows({"proj-test", "ds", "t"});
229 2 : ASSERT_TRUE(scan.ok()) << scan.status();
230 1 : int rows_seen = 0;
231 1 : backend::storage::Row row;
232 3 : while (true) {
233 3 : auto has = (*scan)->Next(&row);
234 6 : ASSERT_TRUE(has.ok()) << has.status();
235 3 : if (!*has) break;
236 2 : ++rows_seen;
237 2 : }
238 1 : EXPECT_EQ(rows_seen, 2);
239 1 : }
240 :
241 1 : TEST_F(QueryServiceTest, ExecuteQueryDdlEmitsStatementType) {
242 : // CREATE TABLE routes through the coordinator's `kControlOp`
243 : // route to `backend/engine/control/control_op_executor`. The
244 : // reply stream carries exactly two messages: the
245 : // `statement_type` trailer the gateway folds into
246 : // `Job.statistics.query.statementType` and the `emulator_route`
247 : // trailer the gateway folds into
248 : // `Job.statistics.query.emulatorRoute` (loopback-only). See
249 : // `docs/ENGINE_POLICY.md` Item 5 and
250 : // `docs/ENGINE_POLICY.md`.
251 1 : ASSERT_TRUE(storage_->CreateDataset({"proj-test", "ds"}, "US").ok());
252 1 : v1::QueryRequest req =
253 1 : MakeRequest("CREATE TABLE ds.new_table (id INT64, name STRING)");
254 1 : MessageCollector collector;
255 1 : ::grpc::Status status = StreamQueryResults(
256 1 : storage_.get(), req, collector.Writer(), engine_.get());
257 2 : ASSERT_TRUE(status.ok()) << status.error_message();
258 1 : ASSERT_EQ(collector.messages().size(), 3u);
259 1 : ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
260 1 : const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
261 1 : ASSERT_NE(stmt, nullptr);
262 1 : EXPECT_EQ(stmt->statement_type(), "CREATE_TABLE");
263 1 : const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
264 1 : ASSERT_NE(route, nullptr);
265 1 : EXPECT_EQ(route->emulator_route(), "control_op");
266 :
267 1 : auto sch = storage_->GetSchema({"proj-test", "ds", "new_table"});
268 2 : ASSERT_TRUE(sch.ok()) << sch.status();
269 1 : ASSERT_EQ(sch->columns.size(), 2u);
270 1 : EXPECT_EQ(sch->columns[0].name, "id");
271 1 : EXPECT_EQ(sch->columns[1].name, "name");
272 1 : }
273 :
274 1 : TEST_F(QueryServiceTest, ExecuteQuerySqlUdfCreateThenCall) {
275 1 : const char* kCreate = R"(CREATE FUNCTION customfunc(
276 1 : arr ARRAY<STRUCT<name STRING, val INT64>>
277 1 : ) AS (
278 1 : (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
279 1 : ))";
280 1 : MessageCollector create_collector;
281 1 : ::grpc::Status create_status = StreamQueryResults(storage_.get(),
282 1 : MakeRequest(kCreate),
283 1 : create_collector.Writer(),
284 1 : engine_.get());
285 2 : ASSERT_TRUE(create_status.ok()) << create_status.error_message();
286 :
287 1 : const char* kSelect = R"(SELECT customfunc([
288 1 : STRUCT<name STRING, val INT64>("foo", 10),
289 1 : STRUCT<name STRING, val INT64>("bar", 40),
290 1 : STRUCT<name STRING, val INT64>("foo", 20)
291 1 : ]))";
292 1 : MessageCollector select_collector;
293 1 : ::grpc::Status select_status = StreamQueryResults(storage_.get(),
294 1 : MakeRequest(kSelect),
295 1 : select_collector.Writer(),
296 1 : engine_.get());
297 2 : ASSERT_TRUE(select_status.ok()) << select_status.error_message();
298 1 : const auto& messages = select_collector.messages();
299 1 : ASSERT_GE(messages.size(), 2u);
300 1 : ASSERT_TRUE(messages[0].has_schema());
301 1 : ASSERT_FALSE(messages[1].has_schema());
302 1 : ASSERT_EQ(messages[1].cells_size(), 1);
303 1 : EXPECT_EQ(messages[1].cells(0).string_value(), "30");
304 1 : }
305 :
306 1 : TEST_F(QueryServiceTest, ExecuteQueryDeleteEmitsDmlStats) {
307 1 : CreatePeopleTable();
308 : // DELETE runs end-to-end against the DuckDB engine; the handler
309 : // streams a single dml_stats message with the matching
310 : // deletedRowCount.
311 1 : v1::QueryRequest req = MakeRequest("DELETE FROM ds.t WHERE FALSE");
312 1 : MessageCollector collector;
313 1 : ::grpc::Status status = StreamQueryResults(
314 1 : storage_.get(), req, collector.Writer(), engine_.get());
315 2 : ASSERT_TRUE(status.ok()) << status.error_message();
316 1 : const auto& messages = collector.messages();
317 : // dml_stats, phase_timings, then the trailing statement_type +
318 : // emulator_route pair.
319 1 : ASSERT_EQ(messages.size(), 4u);
320 1 : ASSERT_TRUE(messages[0].has_dml_stats());
321 1 : EXPECT_EQ(messages[0].dml_stats().deleted_row_count(), 0);
322 1 : ASSERT_NE(collector.PhaseTimingsTrailer(), nullptr);
323 1 : const v1::QueryResultRow* stmt = collector.StatementTypeTrailer();
324 1 : ASSERT_NE(stmt, nullptr);
325 1 : EXPECT_EQ(stmt->statement_type(), "DELETE");
326 1 : const v1::QueryResultRow* route = collector.EmulatorRouteTrailer();
327 1 : ASSERT_NE(route, nullptr);
328 1 : EXPECT_EQ(route->emulator_route(), "semantic_executor");
329 1 : }
330 :
331 : } // namespace
332 : } // namespace frontend
333 : } // namespace bigquery_emulator
|