Line data Source code
1 : // Tests for the `RowSource` adapters.
2 : //
3 : // `MaterializedRowSource` and `DrainRowSource` are the
4 : // composition primitives downstream plans use to wrap fast-path
5 : // row producers as semantic-executor inputs. The tests here pin
6 : // the streaming contract (rows come out in order, EOF is sticky,
7 : // schema is preserved) so future plans can rely on it without
8 : // re-deriving the semantics from the source.
9 :
10 : #include "backend/engine/semantic/row_source.h"
11 :
12 : #include <cstdint>
13 : #include <memory>
14 : #include <utility>
15 : #include <vector>
16 :
17 : #include "absl/status/status.h"
18 : #include "absl/status/statusor.h"
19 : #include "backend/engine/engine.h"
20 : #include "backend/schema/schema.h"
21 : #include "backend/storage/storage.h"
22 : #include "gtest/gtest.h"
23 :
24 : namespace bigquery_emulator {
25 : namespace backend {
26 : namespace engine {
27 : namespace semantic {
28 : namespace {
29 :
30 3 : schema::TableSchema MakeSingleInt64Schema() {
31 3 : schema::TableSchema s;
32 3 : schema::ColumnSchema c;
33 3 : c.name = "x";
34 3 : c.type = schema::ColumnType::kInt64;
35 3 : c.mode = schema::ColumnMode::kNullable;
36 3 : s.columns.push_back(c);
37 3 : return s;
38 3 : }
39 :
40 5 : storage::Row MakeIntRow(int64_t v) {
41 5 : storage::Row r;
42 5 : r.cells.push_back(storage::Value::Int64(v));
43 5 : return r;
44 5 : }
45 :
46 1 : TEST(MaterializedRowSourceTest, StreamsRowsAndEofIsSticky) {
47 1 : std::vector<storage::Row> rows = {MakeIntRow(1), MakeIntRow(2)};
48 1 : MaterializedRowSource src(MakeSingleInt64Schema(), std::move(rows));
49 1 : EXPECT_EQ(src.schema().columns.size(), 1u);
50 :
51 1 : storage::Row row;
52 1 : auto has = src.Next(&row);
53 2 : ASSERT_TRUE(has.ok()) << has.status();
54 1 : EXPECT_TRUE(*has);
55 1 : EXPECT_EQ(row.cells[0].int64_value(), 1);
56 :
57 1 : has = src.Next(&row);
58 2 : ASSERT_TRUE(has.ok()) << has.status();
59 1 : EXPECT_TRUE(*has);
60 1 : EXPECT_EQ(row.cells[0].int64_value(), 2);
61 :
62 1 : has = src.Next(&row);
63 2 : ASSERT_TRUE(has.ok()) << has.status();
64 1 : EXPECT_FALSE(*has);
65 :
66 : // EOF is sticky: a second Next call after EOF must still return
67 : // false; the row buffer is left unchanged.
68 1 : has = src.Next(&row);
69 2 : ASSERT_TRUE(has.ok()) << has.status();
70 1 : EXPECT_FALSE(*has);
71 1 : }
72 :
73 1 : TEST(MaterializedRowSourceTest, NullRowArgumentRejected) {
74 1 : MaterializedRowSource src(MakeSingleInt64Schema(), {});
75 1 : auto has = src.Next(nullptr);
76 1 : ASSERT_FALSE(has.ok());
77 1 : EXPECT_EQ(has.status().code(), absl::StatusCode::kInvalidArgument);
78 1 : }
79 :
80 1 : TEST(DrainRowSourceTest, ConsumesEverythingFromUpstream) {
81 : // Build an upstream MaterializedRowSource (acting as the
82 : // analog of a DuckDB-backed RowSource for test purposes) and
83 : // drain it through the adapter; the adapter must preserve
84 : // schema and produce the same rows.
85 1 : std::vector<storage::Row> rows = {
86 1 : MakeIntRow(10), MakeIntRow(20), MakeIntRow(30)};
87 1 : MaterializedRowSource upstream(MakeSingleInt64Schema(), std::move(rows));
88 :
89 1 : auto drained_or = DrainRowSource(upstream);
90 2 : ASSERT_TRUE(drained_or.ok()) << drained_or.status();
91 1 : auto drained = *std::move(drained_or);
92 1 : ASSERT_EQ(drained->schema().columns.size(), 1u);
93 :
94 1 : std::vector<int64_t> seen;
95 1 : storage::Row row;
96 4 : while (true) {
97 4 : auto has = drained->Next(&row);
98 8 : ASSERT_TRUE(has.ok()) << has.status();
99 4 : if (!*has) break;
100 3 : seen.push_back(row.cells[0].int64_value());
101 3 : }
102 1 : EXPECT_EQ(seen, (std::vector<int64_t>{10, 20, 30}));
103 1 : }
104 :
105 : } // namespace
106 : } // namespace semantic
107 : } // namespace engine
108 : } // namespace backend
109 : } // namespace bigquery_emulator
|