diff --git a/Makefile b/Makefile index 17cd9016..c7eb7270 100644 --- a/Makefile +++ b/Makefile @@ -3,8 +3,14 @@ # to run them locally when making specific optimisations. CI_RUN_BENCHMARKS ?= false +GENERATED_FILES += driver/sql/postgres/pgjournal/internal/xdb/queries.sql.go +GENERATED_FILES += driver/sql/postgres/pgkv/internal/xdb/queries.sql.go + -include .makefiles/Makefile -include .makefiles/pkg/go/v1/Makefile .makefiles/%: @curl -sfL https://makefiles.dev/v1 | bash /dev/stdin "$@" + +%.sql.go: %.sql + sqlc generate --file $(@D)/sqlc.yaml diff --git a/driver/sql/postgres/internal/bigint/doc.go b/driver/sql/postgres/internal/bigint/doc.go deleted file mode 100644 index a8d72a4f..00000000 --- a/driver/sql/postgres/internal/bigint/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package unsigned provides a type maps an unsigned 64-bit integer to a signed -// 64-bit integer for use in SQL statements and scan operations. -package bigint diff --git a/driver/sql/postgres/internal/bigint/unsigned.go b/driver/sql/postgres/internal/bigint/unsigned.go deleted file mode 100644 index a5462b46..00000000 --- a/driver/sql/postgres/internal/bigint/unsigned.go +++ /dev/null @@ -1,47 +0,0 @@ -package bigint - -import ( - "database/sql" - "database/sql/driver" - "fmt" - "math" -) - -// ConvertUnsigned returns a type that can be used in SQL statements and scan -// operations that encodes an unsigned 64-bit integer as a signed 64-bit integer -// (which is PostgreSQL's largest integer type). -// -// The encoding preserves order, such that sorting on the encoded values will -// produce the same order as sorting on the original unsigned values. This is -// the only guarantee provided by this type. -func ConvertUnsigned[T ~uint64](target *T) interface { - driver.Valuer - sql.Scanner -} { - return value[T]{target} -} - -type value[T ~uint64] struct { - Target *T -} - -func (v value[T]) Scan(src any) error { - if src, ok := src.(int64); ok { - unmarshal(src, v.Target) - return nil - } - - return fmt.Errorf("cannot scan %T into journal.Position", src) -} - -func (v value[T]) Value() (driver.Value, error) { - return marshal(*v.Target), nil -} - -func marshal[T ~uint64](target T) int64 { - return int64(target - (math.MaxInt64 + 1)) -} - -func unmarshal[T ~uint64](src int64, target *T) { - *target = T(src) + T(math.MaxInt64) + 1 -} diff --git a/driver/sql/postgres/internal/bigint/unsigned_test.go b/driver/sql/postgres/internal/bigint/unsigned_test.go deleted file mode 100644 index d9f75a4d..00000000 --- a/driver/sql/postgres/internal/bigint/unsigned_test.go +++ /dev/null @@ -1,100 +0,0 @@ -package bigint_test - -import ( - "math" - "math/rand" - "slices" - "testing" - - . "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/bigint" -) - -func TestConvertUnsigned(t *testing.T) { - cases := []struct { - Name string - Unsigned uint64 - Signed int64 - }{ - { - "zero", - 0, - math.MinInt64, - }, - { - "mid-point", - (math.MaxUint64 / 2) + 1, - 0, - }, - { - "max uint64", - math.MaxUint64, - math.MaxInt64, - }, - } - - for _, c := range cases { - t.Run(c.Name, func(t *testing.T) { - s, err := ConvertUnsigned(&c.Unsigned).Value() - if err != nil { - t.Fatal(err) - } - - if s != c.Signed { - t.Fatalf("unexpected encoded value: got %d, want %d", s, c.Signed) - } - - var u uint64 - if err := ConvertUnsigned(&u).Scan(c.Signed); err != nil { - t.Fatal(err) - } - - if u != c.Unsigned { - t.Fatalf("unexpected decoded value: got %d, want %d", u, c.Unsigned) - } - }) - } - - var ( - unsigned []uint64 - signed []int64 - ) - - // Test encoding/decoding of random values produces the original value. - for i := 0; i < 1000; i++ { - random := rand.Uint64() - - s, err := ConvertUnsigned(&random).Value() - if err != nil { - t.Fatal(err) - } - - var u uint64 - if err := ConvertUnsigned(&u).Scan(s); err != nil { - t.Fatal(err) - } - - if u != random { - t.Fatalf("unexpected decoded value: got %d, want %d", u, random) - } - - unsigned = append(unsigned, u) - signed = append(signed, s.(int64)) - } - - // Then sort those values and verify that the order is preserved. - slices.Sort(unsigned) - slices.Sort(signed) - - for i, u := range unsigned { - s := signed[i] - - x, err := ConvertUnsigned(&u).Value() - if err != nil { - t.Fatal(err) - } - - if x != s { - t.Fatalf("unexpected encoded value at index %d: got %d, want %d", i, x, s) - } - } -} diff --git a/driver/sql/postgres/internal/commonschema/doc.go b/driver/sql/postgres/internal/commonschema/doc.go new file mode 100644 index 00000000..73c4e737 --- /dev/null +++ b/driver/sql/postgres/internal/commonschema/doc.go @@ -0,0 +1,3 @@ +// Package commonschema contains common PostgreSQL schema elements used by +// multiple persistencekit drivers. +package commonschema diff --git a/driver/sql/postgres/pgkv/schema.go b/driver/sql/postgres/internal/commonschema/schema.go similarity index 55% rename from driver/sql/postgres/pgkv/schema.go rename to driver/sql/postgres/internal/commonschema/schema.go index 4b14affa..3c216b96 100644 --- a/driver/sql/postgres/pgkv/schema.go +++ b/driver/sql/postgres/internal/commonschema/schema.go @@ -1,4 +1,4 @@ -package pgkv +package commonschema import ( "context" @@ -11,17 +11,28 @@ import ( //go:embed schema.sql var schema string -// createSchema creates the PostgreSQL schema elements required by [BinaryStore]. -func createSchema( +// Create creates the PostgreSQL schema elements required by +// all PostgreSQL-based stores. +func Create( ctx context.Context, db *sql.DB, + additional ...string, ) error { return pgerror.Retry( ctx, db, func(tx *sql.Tx) error { - _, err := tx.ExecContext(ctx, schema) - return err + if _, err := tx.ExecContext(ctx, schema); err != nil { + return err + } + + for _, q := range additional { + if _, err := tx.ExecContext(ctx, q); err != nil { + return err + } + } + + return nil }, // Even though we use IF NOT EXISTS in the DDL, we still need to handle // conflicts due to a data race bug in PostgreSQL. diff --git a/driver/sql/postgres/internal/commonschema/schema.sql b/driver/sql/postgres/internal/commonschema/schema.sql new file mode 100644 index 00000000..560499a9 --- /dev/null +++ b/driver/sql/postgres/internal/commonschema/schema.sql @@ -0,0 +1,19 @@ +CREATE SCHEMA IF NOT EXISTS persistencekit; + +-- see `commonschema.Uint64` type +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM information_schema.domains + WHERE domain_schema = 'persistencekit' + AND domain_name = 'uint64' + ) THEN + + CREATE DOMAIN persistencekit.uint64 + AS BIGINT + DEFAULT -1::BIGINT << 63; + + END IF; +END +$$; diff --git a/driver/sql/postgres/internal/commonschema/uint64.go b/driver/sql/postgres/internal/commonschema/uint64.go new file mode 100644 index 00000000..2b140841 --- /dev/null +++ b/driver/sql/postgres/internal/commonschema/uint64.go @@ -0,0 +1,30 @@ +package commonschema + +import ( + "database/sql/driver" + "fmt" + "math" +) + +// Uint64 is a uint64 that is represented as a SIGNED BIGINT (64-bit) +// integer in PostgreSQL (which is PostgreSQL's largest integer type). +// +// The encoding preserves order, such that sorting on the encoded values will +// produce the same order as sorting on the original unsigned values. This is +// the only guarantee provided by this type. +type Uint64 uint64 + +// Scan implements [sql.Scanner]. +func (p *Uint64) Scan(src any) error { + if src, ok := src.(int64); ok { + *p = Uint64(uint64(src) + uint64(math.MaxInt64) + 1) + return nil + } + + return fmt.Errorf("cannot scan %T into bigint.Unsigned", src) +} + +// Value implements [driver.Valuer]. +func (p Uint64) Value() (driver.Value, error) { + return int64(p - (math.MaxInt64 + 1)), nil +} diff --git a/driver/sql/postgres/pgjournal/internal/xdb/db.go b/driver/sql/postgres/pgjournal/internal/xdb/db.go new file mode 100644 index 00000000..9d25a0c3 --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package xdb + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/driver/sql/postgres/pgjournal/internal/xdb/doc.go b/driver/sql/postgres/pgjournal/internal/xdb/doc.go new file mode 100644 index 00000000..95ee6189 --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/doc.go @@ -0,0 +1,2 @@ +// Package xdb contains the schema and queries for the journal store. +package xdb diff --git a/driver/sql/postgres/pgjournal/internal/xdb/models.go b/driver/sql/postgres/pgjournal/internal/xdb/models.go new file mode 100644 index 00000000..a43aae43 --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/models.go @@ -0,0 +1,5 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package xdb diff --git a/driver/sql/postgres/pgjournal/internal/xdb/queries.sql b/driver/sql/postgres/pgjournal/internal/xdb/queries.sql new file mode 100644 index 00000000..008fb8fc --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/queries.sql @@ -0,0 +1,60 @@ +-- name: UpsertJournal :one +INSERT INTO persistencekit.journal ( + name +) VALUES ( + sqlc.arg('name') +) ON CONFLICT (name) DO UPDATE SET + name = EXCLUDED.name +RETURNING id; + +-- name: UpdateBegin :execrows +UPDATE persistencekit.journal +SET "begin" = sqlc.arg('begin') +WHERE id = sqlc.arg('journal_id') +AND "begin" < sqlc.arg('begin'); + +-- name: IncrementEnd :execrows +UPDATE persistencekit.journal +SET "end" = "end" + 1 +WHERE id = sqlc.arg('journal_id') +AND "end" = sqlc.arg('end'); + +-- name: SelectBounds :one +SELECT + "begin", + "end" +FROM persistencekit.journal +WHERE id = sqlc.arg('journal_id'); + +-- name: SelectRecord :one +SELECT + record +FROM persistencekit.journal_record +WHERE journal_id = sqlc.arg('journal_id') +AND position = sqlc.arg('position'); + +-- name: SelectRecords :many +SELECT + position, + record +FROM persistencekit.journal_record +WHERE journal_id = sqlc.arg('journal_id') +AND position >= sqlc.arg('position') +ORDER BY position +LIMIT 500; + +-- name: InsertRecord :exec +INSERT INTO persistencekit.journal_record ( + journal_id, + position, + record +) VALUES ( + sqlc.arg('journal_id'), + sqlc.arg('position'), + sqlc.arg('record') +); + +-- name: DeleteRecords :exec +DELETE FROM persistencekit.journal_record +WHERE journal_id = sqlc.arg('journal_id') +AND position < sqlc.arg('end'); diff --git a/driver/sql/postgres/pgjournal/internal/xdb/queries.sql.go b/driver/sql/postgres/pgjournal/internal/xdb/queries.sql.go new file mode 100644 index 00000000..ecb51ee9 --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/queries.sql.go @@ -0,0 +1,192 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: queries.sql + +package xdb + +import ( + "context" + + "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/commonschema" +) + +const deleteRecords = `-- name: DeleteRecords :exec +DELETE FROM persistencekit.journal_record +WHERE journal_id = $1 +AND position < $2 +` + +type DeleteRecordsParams struct { + JournalID int64 + End commonschema.Uint64 +} + +func (q *Queries) DeleteRecords(ctx context.Context, arg DeleteRecordsParams) error { + _, err := q.db.ExecContext(ctx, deleteRecords, arg.JournalID, arg.End) + return err +} + +const incrementEnd = `-- name: IncrementEnd :execrows +UPDATE persistencekit.journal +SET "end" = "end" + 1 +WHERE id = $1 +AND "end" = $2 +` + +type IncrementEndParams struct { + JournalID int64 + End commonschema.Uint64 +} + +func (q *Queries) IncrementEnd(ctx context.Context, arg IncrementEndParams) (int64, error) { + result, err := q.db.ExecContext(ctx, incrementEnd, arg.JournalID, arg.End) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const insertRecord = `-- name: InsertRecord :exec +INSERT INTO persistencekit.journal_record ( + journal_id, + position, + record +) VALUES ( + $1, + $2, + $3 +) +` + +type InsertRecordParams struct { + JournalID int64 + Position commonschema.Uint64 + Record []byte +} + +func (q *Queries) InsertRecord(ctx context.Context, arg InsertRecordParams) error { + _, err := q.db.ExecContext(ctx, insertRecord, arg.JournalID, arg.Position, arg.Record) + return err +} + +const selectBounds = `-- name: SelectBounds :one +SELECT + "begin", + "end" +FROM persistencekit.journal +WHERE id = $1 +` + +type SelectBoundsRow struct { + Begin commonschema.Uint64 + End commonschema.Uint64 +} + +func (q *Queries) SelectBounds(ctx context.Context, journalID int64) (SelectBoundsRow, error) { + row := q.db.QueryRowContext(ctx, selectBounds, journalID) + var i SelectBoundsRow + err := row.Scan(&i.Begin, &i.End) + return i, err +} + +const selectRecord = `-- name: SelectRecord :one +SELECT + record +FROM persistencekit.journal_record +WHERE journal_id = $1 +AND position = $2 +` + +type SelectRecordParams struct { + JournalID int64 + Position commonschema.Uint64 +} + +func (q *Queries) SelectRecord(ctx context.Context, arg SelectRecordParams) ([]byte, error) { + row := q.db.QueryRowContext(ctx, selectRecord, arg.JournalID, arg.Position) + var record []byte + err := row.Scan(&record) + return record, err +} + +const selectRecords = `-- name: SelectRecords :many +SELECT + position, + record +FROM persistencekit.journal_record +WHERE journal_id = $1 +AND position >= $2 +ORDER BY position +LIMIT 500 +` + +type SelectRecordsParams struct { + JournalID int64 + Position commonschema.Uint64 +} + +type SelectRecordsRow struct { + Position commonschema.Uint64 + Record []byte +} + +func (q *Queries) SelectRecords(ctx context.Context, arg SelectRecordsParams) ([]SelectRecordsRow, error) { + rows, err := q.db.QueryContext(ctx, selectRecords, arg.JournalID, arg.Position) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SelectRecordsRow + for rows.Next() { + var i SelectRecordsRow + if err := rows.Scan(&i.Position, &i.Record); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const updateBegin = `-- name: UpdateBegin :execrows +UPDATE persistencekit.journal +SET "begin" = $1 +WHERE id = $2 +AND "begin" < $1 +` + +type UpdateBeginParams struct { + Begin commonschema.Uint64 + JournalID int64 +} + +func (q *Queries) UpdateBegin(ctx context.Context, arg UpdateBeginParams) (int64, error) { + result, err := q.db.ExecContext(ctx, updateBegin, arg.Begin, arg.JournalID) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const upsertJournal = `-- name: UpsertJournal :one +INSERT INTO persistencekit.journal ( + name +) VALUES ( + $1 +) ON CONFLICT (name) DO UPDATE SET + name = EXCLUDED.name +RETURNING id +` + +func (q *Queries) UpsertJournal(ctx context.Context, name string) (int64, error) { + row := q.db.QueryRowContext(ctx, upsertJournal, name) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/driver/sql/postgres/pgjournal/internal/xdb/schema.go b/driver/sql/postgres/pgjournal/internal/xdb/schema.go new file mode 100644 index 00000000..8e61d092 --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/schema.go @@ -0,0 +1,10 @@ +package xdb + +import ( + _ "embed" +) + +// Schema is the PostgreSQL schema elements required by the journal store. +// +//go:embed schema.sql +var Schema string diff --git a/driver/sql/postgres/pgjournal/internal/xdb/schema.sql b/driver/sql/postgres/pgjournal/internal/xdb/schema.sql new file mode 100644 index 00000000..fd729aea --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/schema.sql @@ -0,0 +1,17 @@ +CREATE TABLE IF NOT EXISTS persistencekit.journal ( + id BIGSERIAL NOT NULL, + name TEXT NOT NULL, + "begin" persistencekit.uint64 NOT NULL, + "end" persistencekit.uint64 NOT NULL, + + PRIMARY KEY (id), + UNIQUE (name) +); + +CREATE TABLE IF NOT EXISTS persistencekit.journal_record ( + journal_id BIGINT NOT NULL, + position persistencekit.uint64 NOT NULL, + record BYTEA NOT NULL, + + PRIMARY KEY (journal_id, position) +); diff --git a/driver/sql/postgres/pgjournal/internal/xdb/sqlc.yaml b/driver/sql/postgres/pgjournal/internal/xdb/sqlc.yaml new file mode 100644 index 00000000..4ad5cc8c --- /dev/null +++ b/driver/sql/postgres/pgjournal/internal/xdb/sqlc.yaml @@ -0,0 +1,19 @@ +version: "2" +sql: + - engine: postgresql + queries: queries.sql + schema: + - ../../../internal/commonschema/schema.sql + - schema.sql + gen: + go: + package: xdb + out: . + sql_package: database/sql + sql_driver: github.com/jackc/pgx/v5 + omit_unused_structs: true + overrides: + - db_type: "persistencekit.uint64" + go_type: + import: "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/commonschema" + type: Uint64 diff --git a/driver/sql/postgres/pgjournal/journal.go b/driver/sql/postgres/pgjournal/journal.go index 3136fea5..be42a129 100644 --- a/driver/sql/postgres/pgjournal/journal.go +++ b/driver/sql/postgres/pgjournal/journal.go @@ -5,16 +5,18 @@ import ( "database/sql" "fmt" - "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/bigint" + "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/commonschema" + "github.com/dogmatiq/persistencekit/driver/sql/postgres/pgjournal/internal/xdb" "github.com/dogmatiq/persistencekit/journal" ) // journ is an implementation of [journal.BinaryJournal] that persists to a PostgreSQL // database. type journ struct { - db *sql.DB - id uint64 - name string + db *sql.DB + queries *xdb.Queries + id int64 + name string } func (j *journ) Name() string { @@ -22,39 +24,23 @@ func (j *journ) Name() string { } func (j *journ) Bounds(ctx context.Context) (bounds journal.Interval, err error) { - row := j.db.QueryRowContext( - ctx, - `SELECT - j.encoded_begin, - j.encoded_end - FROM persistencekit.journal AS j - WHERE j.id = $1`, - j.id, - ) - - if err := row.Scan( - bigint.ConvertUnsigned(&bounds.Begin), - bigint.ConvertUnsigned(&bounds.End), - ); err != nil { + row, err := j.queries.SelectBounds(ctx, j.id) + if err != nil { return journal.Interval{}, fmt.Errorf("cannot query journal bounds: %w", err) } - return bounds, nil + return journal.Interval{ + Begin: journal.Position(row.Begin), + End: journal.Position(row.End), + }, nil } func (j *journ) Get(ctx context.Context, pos journal.Position) ([]byte, error) { - row := j.db.QueryRowContext( - ctx, - `SELECT record - FROM persistencekit.journal_record - WHERE journal_id = $1 - AND encoded_position = $2`, - j.id, - bigint.ConvertUnsigned(&pos), - ) - - var rec []byte - if err := row.Scan(&rec); err != nil { + rec, err := j.queries.SelectRecord(ctx, xdb.SelectRecordParams{ + JournalID: j.id, + Position: commonschema.Uint64(pos), + }) + if err != nil { if err == sql.ErrNoRows { return nil, journal.RecordNotFoundError{ Journal: j.Name(), @@ -72,64 +58,48 @@ func (j *journ) Range( pos journal.Position, fn journal.BinaryRangeFunc, ) error { - // TODO: "paginate" results across multiple queries to avoid loading - // everything into memory at once. - rows, err := j.db.QueryContext( - ctx, - `SELECT encoded_position, record - FROM persistencekit.journal_record - WHERE journal_id = $1 - AND encoded_position >= $2 - ORDER BY encoded_position`, - j.id, - bigint.ConvertUnsigned(&pos), - ) - if err != nil { - return fmt.Errorf("cannot query journal records: %w", err) + params := xdb.SelectRecordsParams{ + JournalID: j.id, + Position: commonschema.Uint64(pos), } - defer rows.Close() - - expectPos := pos - - for rows.Next() { - var ( - pos journal.Position - rec []byte - ) - if err := rows.Scan( - bigint.ConvertUnsigned(&pos), - &rec, - ); err != nil { - return fmt.Errorf("cannot scan journal record: %w", err) - } - if pos != expectPos { - return journal.RecordNotFoundError{ - Journal: j.Name(), - Position: expectPos, - } + for { + rows, err := j.queries.SelectRecords(ctx, params) + if err != nil { + return fmt.Errorf("cannot query journal records: %w", err) } - expectPos++ + if len(rows) == 0 { + if journal.Position(params.Position) == pos { + return journal.RecordNotFoundError{ + Journal: j.Name(), + Position: pos, + } + } - ok, err := fn(ctx, pos, rec) - if !ok || err != nil { - return err + return nil } - } - if err := rows.Err(); err != nil { - return fmt.Errorf("cannot range over journal records: %w", err) - } + for _, row := range rows { + if row.Position != params.Position { + return journal.RecordNotFoundError{ + Journal: j.Name(), + Position: journal.Position(params.Position), + } + } - if expectPos == pos { - return journal.RecordNotFoundError{ - Journal: j.Name(), - Position: pos, + ok, err := fn( + ctx, + journal.Position(params.Position), + row.Record, + ) + if !ok || err != nil { + return err + } + + params.Position++ } } - - return nil } func (j *journ) Append(ctx context.Context, pos journal.Position, rec []byte) error { @@ -139,22 +109,14 @@ func (j *journ) Append(ctx context.Context, pos journal.Position, rec []byte) er } defer tx.Rollback() - res, err := tx.ExecContext( - ctx, - `UPDATE persistencekit.journal - SET encoded_end = encoded_end + 1 - WHERE id = $1 - AND encoded_end = $2`, - j.id, - bigint.ConvertUnsigned(&pos), - ) - if err != nil { - return fmt.Errorf("cannot update journal bounds: %w", err) - } + queries := j.queries.WithTx(tx) - n, err := res.RowsAffected() + n, err := queries.IncrementEnd(ctx, xdb.IncrementEndParams{ + JournalID: j.id, + End: commonschema.Uint64(pos), + }) if err != nil { - return fmt.Errorf("cannot determine affected rows: %w", err) + return fmt.Errorf("cannot update journal bounds: %w", err) } if n == 0 { @@ -164,16 +126,13 @@ func (j *journ) Append(ctx context.Context, pos journal.Position, rec []byte) er } } - res, err = tx.ExecContext( - ctx, - `INSERT INTO persistencekit.journal_record - (journal_id, encoded_position, record) VALUES ($1, $2, $3)`, - j.id, - bigint.ConvertUnsigned(&pos), - rec, - ) - if err != nil { + if err := queries.InsertRecord(ctx, xdb.InsertRecordParams{ + JournalID: j.id, + Position: commonschema.Uint64(pos), + Record: rec, + }); err != nil { return fmt.Errorf("cannot insert journal record: %w", err) + } if err := tx.Commit(); err != nil { @@ -190,35 +149,24 @@ func (j *journ) Truncate(ctx context.Context, pos journal.Position) error { } defer tx.Rollback() - res, err := tx.ExecContext( - ctx, - `UPDATE persistencekit.journal - SET encoded_begin = $2 - WHERE id = $1 - AND encoded_begin < $2`, - j.id, - bigint.ConvertUnsigned(&pos), - ) + queries := j.queries.WithTx(tx) + + count, err := queries.UpdateBegin(ctx, xdb.UpdateBeginParams{ + JournalID: j.id, + Begin: commonschema.Uint64(pos), + }) if err != nil { return fmt.Errorf("cannot update journal bounds: %w", err) } - n, err := res.RowsAffected() - if err != nil { - return fmt.Errorf("cannot determine affected rows: %w", err) - } - if n == 0 { + if count == 0 { return nil } - if _, err := tx.ExecContext( - ctx, - `DELETE FROM persistencekit.journal_record - WHERE journal_id = $1 - AND encoded_position < $2`, - j.id, - bigint.ConvertUnsigned(&pos), - ); err != nil { + if err := queries.DeleteRecords(ctx, xdb.DeleteRecordsParams{ + JournalID: j.id, + End: commonschema.Uint64(pos), + }); err != nil { return fmt.Errorf("cannot truncate journal records: %w", err) } diff --git a/driver/sql/postgres/pgjournal/schema.go b/driver/sql/postgres/pgjournal/schema.go deleted file mode 100644 index 4b2d6f4d..00000000 --- a/driver/sql/postgres/pgjournal/schema.go +++ /dev/null @@ -1,30 +0,0 @@ -package pgjournal - -import ( - "context" - "database/sql" - _ "embed" - - "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/pgerror" -) - -//go:embed schema.sql -var schema string - -// createSchema creates the PostgreSQL schema elements required by [BinaryStore]. -func createSchema( - ctx context.Context, - db *sql.DB, -) error { - return pgerror.Retry( - ctx, - db, - func(tx *sql.Tx) error { - _, err := tx.ExecContext(ctx, schema) - return err - }, - // Even though we use IF NOT EXISTS in the DDL, we still need to handle - // conflicts due to a data race bug in PostgreSQL. - pgerror.CodeUniqueViolation, - ) -} diff --git a/driver/sql/postgres/pgjournal/schema.sql b/driver/sql/postgres/pgjournal/schema.sql deleted file mode 100644 index 0ae2736a..00000000 --- a/driver/sql/postgres/pgjournal/schema.sql +++ /dev/null @@ -1,19 +0,0 @@ -CREATE SCHEMA IF NOT EXISTS persistencekit; - -CREATE TABLE - IF NOT EXISTS persistencekit.journal ( - id BIGSERIAL NOT NULL, - name TEXT NOT NULL, - encoded_begin BIGINT NOT NULL DEFAULT -1::BIGINT << 63, -- see `bigint` package - encoded_end BIGINT NOT NULL DEFAULT -1::BIGINT << 63, -- see `bigint` package - PRIMARY KEY (id), - UNIQUE (name) - ); - -CREATE TABLE - IF NOT EXISTS persistencekit.journal_record ( - journal_id BIGINT NOT NULL, - encoded_position BIGINT NOT NULL, -- see `bigint` package - record BYTEA NOT NULL, - PRIMARY KEY (journal_id, encoded_position) - ); diff --git a/driver/sql/postgres/pgjournal/store.go b/driver/sql/postgres/pgjournal/store.go index aa8b28f1..96f1909a 100644 --- a/driver/sql/postgres/pgjournal/store.go +++ b/driver/sql/postgres/pgjournal/store.go @@ -5,7 +5,9 @@ import ( "database/sql" "fmt" + "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/commonschema" "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/pgerror" + "github.com/dogmatiq/persistencekit/driver/sql/postgres/pgjournal/internal/xdb" "github.com/dogmatiq/persistencekit/journal" ) @@ -18,40 +20,20 @@ type BinaryStore struct { // Open returns the journal with the given name. func (s *BinaryStore) Open(ctx context.Context, name string) (journal.BinaryJournal, error) { - id, err := s.getID(ctx, name) - if err != nil { - return nil, err - } - return &journ{s.DB, id, name}, nil -} + queries := xdb.New(s.DB) -func (s *BinaryStore) getID(ctx context.Context, name string) (uint64, error) { for { - row := s.DB.QueryRowContext( - ctx, - `INSERT INTO persistencekit.journal ( - name - ) VALUES ( - $1 - ) ON CONFLICT (name) DO UPDATE SET - name = EXCLUDED.name - RETURNING id`, - name, - ) - - var id uint64 - err := row.Scan(&id) - + id, err := queries.UpsertJournal(ctx, name) if err == nil { - return id, nil + return &journ{s.DB, queries, id, name}, nil } if !pgerror.Is(err, pgerror.CodeUndefinedTable) { - return 0, fmt.Errorf("cannot scan journal ID: %w", err) + return nil, err } - if err := createSchema(ctx, s.DB); err != nil { - return 0, fmt.Errorf("cannot create journal schema: %w", err) + if err := commonschema.Create(ctx, s.DB, xdb.Schema); err != nil { + return nil, fmt.Errorf("cannot create journal schema: %w", err) } } } diff --git a/driver/sql/postgres/pgkv/internal/xdb/db.go b/driver/sql/postgres/pgkv/internal/xdb/db.go new file mode 100644 index 00000000..9d25a0c3 --- /dev/null +++ b/driver/sql/postgres/pgkv/internal/xdb/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package xdb + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/driver/sql/postgres/pgkv/internal/xdb/doc.go b/driver/sql/postgres/pgkv/internal/xdb/doc.go new file mode 100644 index 00000000..c42fb1ed --- /dev/null +++ b/driver/sql/postgres/pgkv/internal/xdb/doc.go @@ -0,0 +1,2 @@ +// Package xdb contains the schema and queries for the key/value store. +package xdb diff --git a/driver/sql/postgres/pgkv/internal/xdb/models.go b/driver/sql/postgres/pgkv/internal/xdb/models.go new file mode 100644 index 00000000..a43aae43 --- /dev/null +++ b/driver/sql/postgres/pgkv/internal/xdb/models.go @@ -0,0 +1,5 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 + +package xdb diff --git a/driver/sql/postgres/pgkv/internal/xdb/queries.sql b/driver/sql/postgres/pgkv/internal/xdb/queries.sql new file mode 100644 index 00000000..d41c4be5 --- /dev/null +++ b/driver/sql/postgres/pgkv/internal/xdb/queries.sql @@ -0,0 +1,16 @@ +-- name: UpsertKeyspace :one +INSERT INTO persistencekit.keyspace ( + name +) VALUES ( + sqlc.arg('name') +) ON CONFLICT (name) DO UPDATE SET + name = EXCLUDED.name +RETURNING id; + +-- name: SelectPair :one +SELECT + value, + revision +FROM persistencekit.keyspace_pair +WHERE keyspace_id = sqlc.arg('id') +AND key = sqlc.arg('key'); diff --git a/driver/sql/postgres/pgkv/internal/xdb/queries.sql.go b/driver/sql/postgres/pgkv/internal/xdb/queries.sql.go new file mode 100644 index 00000000..65df9b83 --- /dev/null +++ b/driver/sql/postgres/pgkv/internal/xdb/queries.sql.go @@ -0,0 +1,53 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.30.0 +// source: queries.sql + +package xdb + +import ( + "context" +) + +const selectPair = `-- name: SelectPair :one +SELECT + value, + revision +FROM persistencekit.keyspace_pair +WHERE keyspace_id = $1 +AND key = $2 +` + +type SelectPairParams struct { + ID int64 + Key []byte +} + +type SelectPairRow struct { + Value []byte + Revision int64 +} + +func (q *Queries) SelectPair(ctx context.Context, arg SelectPairParams) (SelectPairRow, error) { + row := q.db.QueryRowContext(ctx, selectPair, arg.ID, arg.Key) + var i SelectPairRow + err := row.Scan(&i.Value, &i.Revision) + return i, err +} + +const upsertKeyspace = `-- name: UpsertKeyspace :one +INSERT INTO persistencekit.keyspace ( + name +) VALUES ( + $1 +) ON CONFLICT (name) DO UPDATE SET + name = EXCLUDED.name +RETURNING id +` + +func (q *Queries) UpsertKeyspace(ctx context.Context, name string) (int64, error) { + row := q.db.QueryRowContext(ctx, upsertKeyspace, name) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/driver/sql/postgres/pgkv/internal/xdb/schema.go b/driver/sql/postgres/pgkv/internal/xdb/schema.go new file mode 100644 index 00000000..194498ba --- /dev/null +++ b/driver/sql/postgres/pgkv/internal/xdb/schema.go @@ -0,0 +1,10 @@ +package xdb + +import ( + _ "embed" +) + +// Schema is the PostgreSQL schema elements required by the kv store. +// +//go:embed schema.sql +var Schema string diff --git a/driver/sql/postgres/pgkv/schema.sql b/driver/sql/postgres/pgkv/internal/xdb/schema.sql similarity index 91% rename from driver/sql/postgres/pgkv/schema.sql rename to driver/sql/postgres/pgkv/internal/xdb/schema.sql index bbf6eab7..3b3bcb24 100644 --- a/driver/sql/postgres/pgkv/schema.sql +++ b/driver/sql/postgres/pgkv/internal/xdb/schema.sql @@ -1,5 +1,3 @@ -CREATE SCHEMA IF NOT EXISTS persistencekit; - CREATE TABLE IF NOT EXISTS persistencekit.keyspace ( id BIGSERIAL NOT NULL, diff --git a/driver/sql/postgres/pgkv/internal/xdb/sqlc.yaml b/driver/sql/postgres/pgkv/internal/xdb/sqlc.yaml new file mode 100644 index 00000000..4ad5cc8c --- /dev/null +++ b/driver/sql/postgres/pgkv/internal/xdb/sqlc.yaml @@ -0,0 +1,19 @@ +version: "2" +sql: + - engine: postgresql + queries: queries.sql + schema: + - ../../../internal/commonschema/schema.sql + - schema.sql + gen: + go: + package: xdb + out: . + sql_package: database/sql + sql_driver: github.com/jackc/pgx/v5 + omit_unused_structs: true + overrides: + - db_type: "persistencekit.uint64" + go_type: + import: "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/commonschema" + type: Uint64 diff --git a/driver/sql/postgres/pgkv/keyspace.go b/driver/sql/postgres/pgkv/keyspace.go index 786c9184..02c91721 100644 --- a/driver/sql/postgres/pgkv/keyspace.go +++ b/driver/sql/postgres/pgkv/keyspace.go @@ -5,13 +5,15 @@ import ( "database/sql" "fmt" + "github.com/dogmatiq/persistencekit/driver/sql/postgres/pgkv/internal/xdb" "github.com/dogmatiq/persistencekit/kv" ) type keyspace struct { - db *sql.DB - id uint64 - name string + db *sql.DB + queries *xdb.Queries + id int64 + name string } func (ks *keyspace) Name() string { @@ -19,24 +21,18 @@ func (ks *keyspace) Name() string { } func (ks *keyspace) Get(ctx context.Context, k []byte) (v []byte, r uint64, err error) { - row := ks.db.QueryRowContext( - ctx, - `SELECT value, revision - FROM persistencekit.keyspace_pair - WHERE keyspace_id = $1 - AND key = $2`, - ks.id, - k, - ) - - if err := row.Scan(&v, &r); err != nil { + row, err := ks.queries.SelectPair(ctx, xdb.SelectPairParams{ + ID: ks.id, + Key: k, + }) + if err != nil { if err == sql.ErrNoRows { return nil, 0, nil } - return nil, 0, fmt.Errorf("cannot scan keyspace pair: %w", err) + return nil, 0, fmt.Errorf("cannot select keyspace pair: %w", err) } - return v, r, nil + return row.Value, uint64(row.Revision), nil } func (ks *keyspace) Has(ctx context.Context, k []byte) (bool, error) { diff --git a/driver/sql/postgres/pgkv/store.go b/driver/sql/postgres/pgkv/store.go index 8612ba05..2d422bf8 100644 --- a/driver/sql/postgres/pgkv/store.go +++ b/driver/sql/postgres/pgkv/store.go @@ -5,7 +5,9 @@ import ( "database/sql" "fmt" + "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/commonschema" "github.com/dogmatiq/persistencekit/driver/sql/postgres/internal/pgerror" + "github.com/dogmatiq/persistencekit/driver/sql/postgres/pgkv/internal/xdb" "github.com/dogmatiq/persistencekit/kv" ) @@ -17,40 +19,21 @@ type BinaryStore struct { // Open returns the keyspace with the given name. func (s *BinaryStore) Open(ctx context.Context, name string) (kv.BinaryKeyspace, error) { - id, err := s.getID(ctx, name) - if err != nil { - return nil, err - } - return &keyspace{s.DB, id, name}, nil -} + queries := xdb.New(s.DB) -func (s *BinaryStore) getID(ctx context.Context, name string) (uint64, error) { for { - row := s.DB.QueryRowContext( - ctx, - `INSERT INTO persistencekit.keyspace ( - name - ) VALUES ( - $1 - ) ON CONFLICT (name) DO UPDATE SET - name = EXCLUDED.name - RETURNING id`, - name, - ) - - var id uint64 - err := row.Scan(&id) + id, err := queries.UpsertKeyspace(ctx, name) if err == nil { - return id, nil + return &keyspace{s.DB, queries, id, name}, nil } if !pgerror.Is(err, pgerror.CodeUndefinedTable) { - return 0, fmt.Errorf("cannot scan keyspace ID: %w", err) + return nil, err } - if err := createSchema(ctx, s.DB); err != nil { - return 0, fmt.Errorf("cannot create keyspace schema: %w", err) + if err := commonschema.Create(ctx, s.DB, xdb.Schema); err != nil { + return nil, fmt.Errorf("cannot create keyspace schema: %w", err) } } }