Line data Source code
1 : #ifndef BIGQUERY_EMULATOR_BACKEND_STORAGE_STORAGE_H_
2 : #define BIGQUERY_EMULATOR_BACKEND_STORAGE_STORAGE_H_
3 :
4 : // Storage is the C++ engine's row store interface.
5 : //
6 : // `Storage` lives below the gRPC service boundary: the gateway never
7 : // touches it directly, the catalog handler does. The only concrete
8 : // implementation is `backend/storage/duckdb/duckdb_storage.{h,cc}`,
9 : // the persistent Parquet/Arrow-on-disk store backed by DuckDB.
10 : //
11 : // **Engine-agnostic types only**: this header MUST NOT mention
12 : // `googlesql::Value` or any other engine-specific type. The DuckDB
13 : // engine translates to its own representations at the call sites in
14 : // `backend/engine/`. See ROADMAP "Pluggable engine and storage" for
15 : // the rationale.
16 :
17 : #include <cstdint>
18 : #include <memory>
19 : #include <optional>
20 : #include <string>
21 : #include <utility>
22 : #include <variant>
23 : #include <vector>
24 :
25 : #include "absl/status/status.h"
26 : #include "absl/status/statusor.h"
27 : #include "absl/strings/string_view.h"
28 : #include "absl/types/span.h"
29 : #include "backend/schema/schema.h"
30 : #include "backend/storage/row_restriction.h"
31 : #include "backend/storage/table_governance.h"
32 :
33 : namespace bigquery_emulator {
34 : namespace backend {
35 : namespace storage {
36 :
37 : // Stable identifiers for catalog entries. These mirror the proto
38 : // `DatasetRef` / `TableRef` shape (proto/emulator.proto) but the
39 : // storage layer does not depend on the proto runtime.
40 : struct DatasetId {
41 : std::string project_id;
42 : std::string dataset_id;
43 : };
44 :
45 : struct TableId {
46 : std::string project_id;
47 : std::string dataset_id;
48 : std::string table_id;
49 : };
50 :
51 0 : inline bool operator==(const DatasetId& a, const DatasetId& b) {
52 0 : return a.project_id == b.project_id && a.dataset_id == b.dataset_id;
53 0 : }
54 0 : inline bool operator==(const TableId& a, const TableId& b) {
55 0 : return a.project_id == b.project_id && a.dataset_id == b.dataset_id &&
56 0 : a.table_id == b.table_id;
57 0 : }
58 :
59 : // Stable identifiers for persisted routines (UDF / UDAF / TVF /
60 : // procedure). Mirrors the BigQuery `RoutineReference` REST shape.
61 : struct RoutineId {
62 : std::string project_id;
63 : std::string dataset_id;
64 : std::string routine_id;
65 : };
66 :
67 0 : inline bool operator==(const RoutineId& a, const RoutineId& b) {
68 0 : return a.project_id == b.project_id && a.dataset_id == b.dataset_id &&
69 0 : a.routine_id == b.routine_id;
70 0 : }
71 :
72 : // Kind of routine stored in `catalog.duckdb`. Values are persisted as
73 : // the lowercase snake strings below (see duckdb_storage_routines.cc).
74 : enum class RoutineKind {
75 : kScalarFunction = 0,
76 : kAggregateFunction,
77 : kTableValuedFunction,
78 : kProcedure,
79 : };
80 :
81 : // Durable routine metadata. `ddl_sql` carries the original CREATE
82 : // statement (the source of truth for re-analysis at rehydrate time);
83 : // `signature_json` stores argument metadata (including ANY TYPE
84 : // markers) for REST round-trip without re-parsing the DDL body.
85 : struct RoutineRecord {
86 : RoutineId id;
87 : RoutineKind kind = RoutineKind::kScalarFunction;
88 : std::string language;
89 : std::string ddl_sql;
90 : bool is_temp = false;
91 : std::string signature_json;
92 : };
93 :
94 : // Durable logical-view metadata. `ddl_sql` is the original CREATE VIEW
95 : // statement (re-analyzed at rehydrate time into the view registry).
96 : struct ViewId {
97 : std::string project_id;
98 : std::string dataset_id;
99 : std::string view_id;
100 : };
101 :
102 : struct ViewRecord {
103 : ViewId id;
104 : std::string ddl_sql;
105 : std::string view_query;
106 : schema::TableSchema schema;
107 : };
108 :
109 : struct TableResourceInfo {
110 : std::string table_type;
111 : std::string view_query;
112 : std::string ddl_sql;
113 : };
114 :
115 : // Engine-agnostic cell value. The variant covers the BigQuery scalar
116 : // types plus ARRAY and STRUCT containers.
117 : //
118 : // We deliberately do not reuse `googlesql::Value` here; the DuckDB
119 : // engine marshals to and from `Value` at its own boundary. NULL is
120 : // an explicit kind so callers do not have to thread
121 : // `std::optional<Value>` through every API.
122 : class Value {
123 : public:
124 : enum class Kind {
125 : kNull = 0,
126 : kBool,
127 : kInt64,
128 : kFloat64,
129 : // Both BigQuery STRING and BYTES land in `string_value()`. The
130 : // Cell-level type is carried out-of-band on the matching
131 : // `ColumnSchema`; we avoid duplicating that info per row.
132 : kString,
133 : kBytes,
134 : kArray,
135 : kStruct,
136 : };
137 :
138 : Value() = default;
139 :
140 0 : static Value Null() {
141 0 : return Value();
142 0 : }
143 : static Value Bool(bool v);
144 : static Value Int64(int64_t v);
145 : static Value Float64(double v);
146 : static Value String(std::string v);
147 : static Value Bytes(std::string v);
148 : static Value Array(std::vector<Value> elements);
149 : static Value Struct(std::vector<Value> fields);
150 :
151 375 : Kind kind() const {
152 375 : return kind_;
153 375 : }
154 137 : bool is_null() const {
155 137 : return kind_ == Kind::kNull;
156 137 : }
157 :
158 : // Accessors. Each returns a default value (false / 0 / empty) when
159 : // the active kind does not match, so callers can use these to fold
160 : // values into wire encoders without an inspection cascade. Use
161 : // `kind()` to disambiguate when correctness matters.
162 : bool bool_value() const;
163 : int64_t int64_value() const;
164 : double float64_value() const;
165 : const std::string& string_value() const;
166 : const std::vector<Value>& array_value() const;
167 : const std::vector<Value>& struct_value() const;
168 :
169 : private:
170 : // The kString and kBytes kinds share the std::string slot in
171 : // `data_`; the active kind discriminates which BigQuery type the
172 : // bytes were parsed as.
173 : using Variant = std::variant<std::monostate,
174 : bool,
175 : int64_t,
176 : double,
177 : std::string,
178 : std::vector<Value>>;
179 :
180 : Kind kind_ = Kind::kNull;
181 : Variant data_{};
182 : };
183 :
184 : // One row in a table: cells are ordered by the column list of the
185 : // table's `schema::TableSchema`. A NULL cell is represented as
186 : // `Value::Null()`, never as a missing entry.
187 : struct Row {
188 : std::vector<Value> cells{};
189 : };
190 :
191 : // Forward iterator over a single scan of a table's rows. Storage
192 : // implementations may stream rows lazily; a single `RowIterator`
193 : // instance is owned by the caller and is **not** thread-safe.
194 : class RowIterator {
195 : public:
196 5 : virtual ~RowIterator() = default;
197 :
198 : // Pulls the next row into `*row`. Returns:
199 : // * `true` - a row was written.
200 : // * `false` - end of stream; `*row` is unchanged.
201 : // A non-OK status indicates a backend error mid-iteration; further
202 : // calls are undefined.
203 : //
204 : // `[[nodiscard]]`: the StatusOr carries the iteration error; a
205 : // caller that drops the result is silently swallowing a backend
206 : // failure that would otherwise surface to the gateway as a 5xx.
207 : [[nodiscard]] virtual absl::StatusOr<bool> Next(Row* row) = 0;
208 : };
209 :
210 : // ReadFilter constrains the rows a `CreateReadStream` iterator yields.
211 : //
212 : // The StorageRead gRPC surface (`ReadRows`) is wired on top of
213 : // `CreateReadStream`, so the filter shape mirrors what the public
214 : // `bigquery_emulator.v1.ReadOptions` proto can ask for. The DuckDB
215 : // backend pushes the knobs down natively (a SQL `LIMIT` / `WHERE`)
216 : // and the iterator surfaces them on the wire so the handler does
217 : // not have to re-apply them.
218 : //
219 : // `equality_predicate` carries the typed parse of
220 : // `<column> = <literal>` from `ReadOptions.row_restriction`. The raw
221 : // `row_restriction` string is left in for debugging / introspection
222 : // but the backends consume the parsed `equality_predicate` slot so
223 : // the parse happens exactly once per session. `selected_fields`
224 : // carries the per-column projection; see the field comments below.
225 : struct ReadFilter {
226 : // Maximum number of rows the iterator will yield before signaling
227 : // end-of-stream. <= 0 means "no limit" (return every row in the
228 : // table snapshot). Both backends enforce this knob so
229 : // ReadRows can honor caller-supplied caps without re-counting at
230 : // the handler layer.
231 : std::int64_t row_limit = 0;
232 :
233 : // Number of rows to skip from the head of the stream before the
234 : // first emitted row. ReadRows uses this to honor
235 : // `ReadRowsRequest.offset` so a caller resuming a stream after a
236 : // transient failure does not re-receive rows it already processed.
237 : // <= 0 means "start at the first row".
238 : std::int64_t offset = 0;
239 :
240 : // Subset of column names the caller wants returned. Empty means
241 : // "all columns". The DuckDB backend wires this
242 : // natively: the backend builds a projected schema from the
243 : // listed names and the `RowIterator` yields rows in that order.
244 : // The handler validates the names exist at session-create time, so
245 : // an unknown column surfaces as INVALID_ARGUMENT before the stream
246 : // opens; the backend does the same defensive check (re-validating
247 : // here would be redundant, but the helper is shared with the
248 : // CreateReadSession parse path so an unknown column never reaches
249 : // a real SELECT).
250 : std::vector<std::string> selected_fields;
251 :
252 : // Raw SQL-shaped predicate the caller supplied. Retained on the
253 : // filter for debugging / log lines.
254 : std::string row_restriction;
255 :
256 : // DuckDB `WHERE` body produced by `TranspileRowRestriction` (analyzer
257 : // + transpiler). Empty when no restriction was supplied.
258 : std::optional<std::string> where_sql;
259 :
260 : // Legacy typed parse of single-equality restrictions. Prefer
261 : // `where_sql`; kept so existing unit tests can exercise the narrow
262 : // parser without pulling in the analyzer.
263 : std::optional<EqualityPredicate> equality_predicate;
264 :
265 : // Inclusive lower bound on DuckDB `file_row_number` for this stream
266 : // partition. Defaults to 0 (table head).
267 : std::int64_t row_start = 0;
268 :
269 : // Exclusive upper bound on `file_row_number`. Negative means
270 : // unbounded (read through table tail).
271 : std::int64_t row_end = -1;
272 : };
273 :
274 : // Storage is the abstract interface every backend implements.
275 : //
276 : // All methods are **thread-safe**. The DuckDB-backed store relies on
277 : // DuckDB's own concurrency model.
278 : //
279 : // Lifetime: a `Storage` instance is created once at engine startup and
280 : // shared by every gRPC request handler.
281 : class Storage {
282 : public:
283 12 : virtual ~Storage() = default;
284 :
285 : // ------------------------------------------------------------------
286 : // Dataset CRUD. `location` is the BigQuery region the dataset is
287 : // pinned to (e.g. "US", "EU"); the DuckDB store stashes it for
288 : // round-tripping but otherwise ignores it. `delete_contents=true`
289 : // mirrors the BigQuery REST `deleteContents` query parameter on
290 : // `datasets.delete`.
291 : //
292 : // Status returns are `[[nodiscard]]` so callers cannot silently drop
293 : // dataset CRUD failures at compile time.
294 : // ------------------------------------------------------------------
295 : [[nodiscard]] virtual absl::Status CreateDataset(
296 : const DatasetId& id, absl::string_view location) = 0;
297 : [[nodiscard]] virtual absl::Status DropDataset(
298 : const DatasetId& id,
299 : bool delete_contents,
300 : absl::string_view rest_metadata_json = {}) = 0;
301 : [[nodiscard]] virtual absl::Status RestoreDataset(
302 0 : const DatasetId& id, std::int64_t deleted_ms = 0) {
303 0 : (void)id;
304 0 : (void)deleted_ms;
305 0 : return absl::UnimplementedError(
306 0 : "Storage::RestoreDataset is not implemented for this backend");
307 0 : }
308 :
309 : // Lists the datasets registered under `project_id`. Returns an empty
310 : // vector when the project has no datasets (i.e. never registered any
311 : // or all have been dropped). Implementations MUST return ids in
312 : // deterministic order so list pagination / callers that diff against
313 : // a prior listing are stable; the DuckDB backend orders
314 : // lexicographically by `dataset_id`. INVALID_ARGUMENT when
315 : // `project_id` is empty.
316 : //
317 : // Used by the gateway's `datasets.list` REST handler to enumerate
318 : // the catalog after `RegisterDataset` calls; the
319 : // `frontend/handlers/catalog.cc` gRPC wrapper exposes this through
320 : // the `Catalog.ListDatasets` RPC.
321 : [[nodiscard]] virtual absl::StatusOr<std::vector<DatasetId>> ListDatasets(
322 : absl::string_view project_id) const = 0;
323 :
324 : // ------------------------------------------------------------------
325 : // Table CRUD. `CreateTable` is idempotent at the dataset level only:
326 : // creating a table that already exists is an error
327 : // (ALREADY_EXISTS). `DropTable` is the inverse and is NOT_FOUND on
328 : // missing tables; callers that want "drop if exists" semantics
329 : // should swallow that status.
330 : // ------------------------------------------------------------------
331 : [[nodiscard]] virtual absl::Status CreateTable(
332 : const TableId& id, const schema::TableSchema& schema) = 0;
333 : [[nodiscard]] virtual absl::Status DropTable(const TableId& id) = 0;
334 : [[nodiscard]] virtual absl::Status RestoreTable(const TableId& id,
335 0 : std::int64_t deleted_ms = 0) {
336 0 : (void)id;
337 0 : (void)deleted_ms;
338 0 : return absl::UnimplementedError(
339 0 : "Storage::RestoreTable is not implemented for this backend");
340 0 : }
341 :
342 : // Lists the tables registered under `dataset_id`. Returns an empty
343 : // vector when the dataset is empty. NOT_FOUND when the dataset does
344 : // not exist; INVALID_ARGUMENT when `project_id` / `dataset_id` is
345 : // empty. Like ListDatasets, implementations MUST return ids in
346 : // deterministic (lexicographic) order. Used by the gateway's
347 : // `tables.list` REST handler.
348 : [[nodiscard]] virtual absl::StatusOr<std::vector<TableId>> ListTables(
349 : const DatasetId& dataset_id) const = 0;
350 :
351 : // Returns the schema the table was created with. NOT_FOUND if the
352 : // dataset or table does not exist.
353 : [[nodiscard]] virtual absl::StatusOr<schema::TableSchema> GetSchema(
354 : const TableId& id) const = 0;
355 :
356 : // Appends `rows` to `id` as a single batch. Implementations may
357 : // require all rows in the batch to share the table's schema shape
358 : // (cell count == column count); validation lives in the impl, not
359 : // here.
360 : [[nodiscard]] virtual absl::Status AppendRows(const TableId& id,
361 : absl::Span<const Row> rows) = 0;
362 :
363 : // Atomically replaces every row in `id` with `rows`. Used by the
364 : // DML engine's scan-and-rewrite path for UPDATE / DELETE / MERGE:
365 : // the engine pulls the existing rows, computes the post-mutation
366 : // shape, and hands the result back through this method so the
367 : // store can swap the row vector / parquet file in one shot.
368 : //
369 : // Same shape-check contract as `AppendRows`: row cell count must
370 : // equal the table's top-level column count, otherwise
371 : // INVALID_ARGUMENT. NOT_FOUND if the dataset / table does not
372 : // exist. The new row vector replaces the existing one in full,
373 : // including the empty-vector case (`rows.empty()` truncates the
374 : // table).
375 : [[nodiscard]] virtual absl::Status OverwriteRows(
376 : const TableId& id, absl::Span<const Row> rows) = 0;
377 :
378 : // Begins a fresh scan of `id`'s rows. The returned iterator captures
379 : // a snapshot at call time; rows appended afterward may or may not be
380 : // visible depending on the impl. NOT_FOUND if the table does not
381 : // exist.
382 : [[nodiscard]] virtual absl::StatusOr<std::unique_ptr<RowIterator>> ScanRows(
383 : const TableId& id) const = 0;
384 :
385 : // CreateReadStream is the StorageRead.ReadRows-shaped scan: same
386 : // snapshot semantics as `ScanRows`, but the returned iterator is
387 : // constrained by `filter` (see `ReadFilter` above). The DuckDB
388 : // backend pushes the limit / offset into the underlying SELECT so
389 : // we don't materialize rows we will never emit. The iterator is a
390 : // `RowIterator` so the `StorageReadService` handler does not
391 : // branch on backend type.
392 : //
393 : // `row_limit`, `offset`, `row_restriction` (via its typed
394 : // `equality_predicate` parse), and `selected_fields` are all
395 : // honored. See the field comments on `ReadFilter`
396 : // for the per-knob contract. NOT_FOUND if the table does not
397 : // exist.
398 : [[nodiscard]] virtual absl::StatusOr<std::unique_ptr<RowIterator>>
399 : CreateReadStream(const TableId& id, const ReadFilter& filter) const = 0;
400 :
401 : // Returns the row count of `id`'s on-disk snapshot at call time
402 : // (DuckDB: `COUNT(*)` over `read_parquet(..., file_row_number=true)`).
403 : // Used by Storage Read session minting to partition streams.
404 : [[nodiscard]] virtual absl::StatusOr<std::int64_t> CountRows(
405 : const TableId& id) const = 0;
406 :
407 : // When the backend persists table data as a Parquet snapshot, returns
408 : // the absolute path to that file so the DuckDB executor can attach it
409 : // via `read_parquet` without a row round-trip. Default: no snapshot.
410 : [[nodiscard]] virtual std::optional<std::string> ParquetSnapshotPath(
411 0 : const TableId& id) const {
412 0 : (void)id;
413 0 : return std::nullopt;
414 0 : }
415 :
416 : // When the backend supports historical Parquet snapshots, returns the
417 : // absolute path to the snapshot that was current at `as_of_ms` (Unix epoch
418 : // milliseconds). Default: same as `ParquetSnapshotPath` (ignores as-of).
419 : [[nodiscard]] virtual absl::StatusOr<std::optional<std::string>>
420 0 : ParquetSnapshotPathAt(const TableId& id, std::int64_t as_of_ms) const {
421 0 : (void)as_of_ms;
422 0 : return ParquetSnapshotPath(id);
423 0 : }
424 :
425 : // ------------------------------------------------------------------
426 : // Routine CRUD. Persists UDF / UDAF / TVF / procedure DDL so the
427 : // per-project registries can rehydrate across engine restarts.
428 : // `UpsertRoutine` replaces an existing row keyed by
429 : // (project_id, dataset_id, routine_id). Temp routines (`is_temp`)
430 : // are accepted for API symmetry but rehydration skips them.
431 : // ------------------------------------------------------------------
432 : [[nodiscard]] virtual absl::Status UpsertRoutine(
433 : const RoutineRecord& record) = 0;
434 : [[nodiscard]] virtual absl::Status DeleteRoutine(const RoutineId& id) = 0;
435 : [[nodiscard]] virtual absl::StatusOr<RoutineRecord> GetRoutine(
436 : const RoutineId& id) const = 0;
437 : [[nodiscard]] virtual absl::StatusOr<std::vector<RoutineRecord>> ListRoutines(
438 : const DatasetId& dataset_id) const = 0;
439 : [[nodiscard]] virtual absl::StatusOr<std::vector<RoutineRecord>>
440 : ListAllRoutines() const = 0;
441 :
442 : // ------------------------------------------------------------------
443 : // Logical view DDL persistence (CREATE VIEW only; materialized views
444 : // are storage-backed and do not use this table).
445 : // ------------------------------------------------------------------
446 : [[nodiscard]] virtual absl::Status UpsertView(const ViewRecord& record) = 0;
447 : [[nodiscard]] virtual absl::Status DeleteView(const ViewId& id) = 0;
448 : [[nodiscard]] virtual absl::StatusOr<std::vector<ViewRecord>> ListAllViews()
449 : const = 0;
450 :
451 : [[nodiscard]] virtual absl::StatusOr<TableResourceInfo> GetTableResourceInfo(
452 0 : const TableId& id) const {
453 0 : (void)id;
454 0 : return absl::UnimplementedError(
455 0 : "Storage backend does not expose table resource metadata");
456 0 : }
457 :
458 : // Row-access policies and column-level security metadata.
459 : [[nodiscard]] virtual absl::StatusOr<TableGovernance> GetTableGovernance(
460 0 : const TableId& id) const {
461 0 : (void)id;
462 0 : return TableGovernance{};
463 0 : }
464 : [[nodiscard]] virtual absl::Status UpsertRowAccessPolicy(
465 0 : const TableId& id, const RowAccessPolicyRecord& policy) {
466 0 : (void)id;
467 0 : (void)policy;
468 0 : return absl::UnimplementedError(
469 0 : "Storage backend does not persist row access policies");
470 0 : }
471 : [[nodiscard]] virtual absl::Status DeleteRowAccessPolicy(
472 0 : const TableId& id, absl::string_view policy_id) {
473 0 : (void)id;
474 0 : (void)policy_id;
475 0 : return absl::UnimplementedError(
476 0 : "Storage backend does not persist row access policies");
477 0 : }
478 : [[nodiscard]] virtual absl::Status SetColumnGovernance(
479 : const TableId& id,
480 : absl::string_view column_name,
481 0 : const ColumnGovernanceRecord& column) {
482 0 : (void)id;
483 0 : (void)column_name;
484 0 : (void)column;
485 0 : return absl::UnimplementedError(
486 0 : "Storage backend does not persist column governance");
487 0 : }
488 :
489 : // Persistent-store root (DuckDB catalog + external-source snapshots).
490 0 : [[nodiscard]] virtual absl::string_view data_dir() const {
491 0 : return "";
492 0 : }
493 : };
494 :
495 : } // namespace storage
496 : } // namespace backend
497 : } // namespace bigquery_emulator
498 :
499 : #endif // BIGQUERY_EMULATOR_BACKEND_STORAGE_STORAGE_H_
|