|
|
|
@ -21,6 +21,7 @@ import (
|
|
|
|
|
"context"
|
|
|
|
|
"database/sql"
|
|
|
|
|
"time"
|
|
|
|
|
"unsafe"
|
|
|
|
|
|
|
|
|
|
"github.com/superseriousbusiness/gotosocial/internal/db"
|
|
|
|
|
"github.com/superseriousbusiness/gotosocial/internal/gtserror"
|
|
|
|
@ -66,7 +67,7 @@ func WrapDB(db *bun.DB) *DB {
|
|
|
|
|
return &DB{
|
|
|
|
|
raw: rawdb{
|
|
|
|
|
errHook: errProc,
|
|
|
|
|
DB: db.DB,
|
|
|
|
|
db: db.DB,
|
|
|
|
|
},
|
|
|
|
|
bun: db,
|
|
|
|
|
}
|
|
|
|
@ -87,18 +88,7 @@ func (db *DB) PingContext(ctx context.Context) error { return db.bun.PingContext
|
|
|
|
|
// Close is a direct call-through to bun.DB.Close().
|
|
|
|
|
func (db *DB) Close() error { return db.bun.Close() }
|
|
|
|
|
|
|
|
|
|
// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout.
|
|
|
|
|
func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx bun.Tx, err error) {
|
|
|
|
|
bundb := db.bun // use *bun.DB interface to return bun.Tx type
|
|
|
|
|
err = retryOnBusy(ctx, func() error {
|
|
|
|
|
tx, err = bundb.BeginTx(ctx, opts)
|
|
|
|
|
err = db.raw.errHook(err)
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout.
|
|
|
|
|
// ExecContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) {
|
|
|
|
|
bundb := db.bun // use underlying *bun.DB interface for their query formatting
|
|
|
|
|
err = retryOnBusy(ctx, func() error {
|
|
|
|
@ -109,7 +99,7 @@ func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (resul
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout.
|
|
|
|
|
// QueryContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) {
|
|
|
|
|
bundb := db.bun // use underlying *bun.DB interface for their query formatting
|
|
|
|
|
err = retryOnBusy(ctx, func() error {
|
|
|
|
@ -120,19 +110,40 @@ func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (rows
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout.
|
|
|
|
|
// QueryRowContext wraps bun.DB.ExecContext() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) {
|
|
|
|
|
bundb := db.bun // use underlying *bun.DB interface for their query formatting
|
|
|
|
|
_ = retryOnBusy(ctx, func() error {
|
|
|
|
|
row = bundb.QueryRowContext(ctx, query, args...)
|
|
|
|
|
err := db.raw.errHook(row.Err())
|
|
|
|
|
return err
|
|
|
|
|
if err := db.raw.errHook(row.Err()); err != nil {
|
|
|
|
|
updateRowError(row, err) // set new error
|
|
|
|
|
}
|
|
|
|
|
return row.Err()
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// BeginTx wraps bun.DB.BeginTx() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (db *DB) BeginTx(ctx context.Context, opts *sql.TxOptions) (tx Tx, err error) {
|
|
|
|
|
var buntx bun.Tx // captured bun.Tx
|
|
|
|
|
bundb := db.bun // use *bun.DB interface to return bun.Tx type
|
|
|
|
|
|
|
|
|
|
err = retryOnBusy(ctx, func() error {
|
|
|
|
|
buntx, err = bundb.BeginTx(ctx, opts)
|
|
|
|
|
err = db.raw.errHook(err)
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if err == nil {
|
|
|
|
|
// Wrap bun.Tx in our type.
|
|
|
|
|
tx = wrapTx(db, &buntx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// RunInTx is functionally the same as bun.DB.RunInTx() but with retry-busy timeouts.
|
|
|
|
|
func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error {
|
|
|
|
|
func (db *DB) RunInTx(ctx context.Context, fn func(Tx) error) error {
|
|
|
|
|
// Attempt to start new transaction.
|
|
|
|
|
tx, err := db.BeginTx(ctx, nil)
|
|
|
|
|
if err != nil {
|
|
|
|
@ -143,24 +154,18 @@ func (db *DB) RunInTx(ctx context.Context, fn func(bun.Tx) error) error {
|
|
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
|
if !done {
|
|
|
|
|
// Rollback (with retry-backoff).
|
|
|
|
|
_ = retryOnBusy(ctx, func() error {
|
|
|
|
|
err := tx.Rollback()
|
|
|
|
|
return db.raw.errHook(err)
|
|
|
|
|
})
|
|
|
|
|
// Rollback tx.
|
|
|
|
|
_ = tx.Rollback()
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Perform supplied transaction
|
|
|
|
|
if err := fn(tx); err != nil {
|
|
|
|
|
return db.raw.errHook(err)
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Commit (with retry-backoff).
|
|
|
|
|
err = retryOnBusy(ctx, func() error {
|
|
|
|
|
err := tx.Commit()
|
|
|
|
|
return db.raw.errHook(err)
|
|
|
|
|
})
|
|
|
|
|
// Commit tx.
|
|
|
|
|
err = tx.Commit()
|
|
|
|
|
done = true
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
@ -275,39 +280,258 @@ type rawdb struct {
|
|
|
|
|
|
|
|
|
|
// embedded raw
|
|
|
|
|
// db interface
|
|
|
|
|
*sql.DB
|
|
|
|
|
db *sql.DB
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout.
|
|
|
|
|
// ExecContext wraps sql.DB.ExecContext() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (db *rawdb) ExecContext(ctx context.Context, query string, args ...any) (result sql.Result, err error) {
|
|
|
|
|
err = retryOnBusy(ctx, func() error {
|
|
|
|
|
result, err = db.DB.ExecContext(ctx, query, args...)
|
|
|
|
|
result, err = db.db.ExecContext(ctx, query, args...)
|
|
|
|
|
err = db.errHook(err)
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout.
|
|
|
|
|
// QueryContext wraps sql.DB.QueryContext() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (db *rawdb) QueryContext(ctx context.Context, query string, args ...any) (rows *sql.Rows, err error) {
|
|
|
|
|
err = retryOnBusy(ctx, func() error {
|
|
|
|
|
rows, err = db.DB.QueryContext(ctx, query, args...)
|
|
|
|
|
rows, err = db.db.QueryContext(ctx, query, args...)
|
|
|
|
|
err = db.errHook(err)
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout.
|
|
|
|
|
// QueryRowContext wraps sql.DB.QueryRowContext() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (db *rawdb) QueryRowContext(ctx context.Context, query string, args ...any) (row *sql.Row) {
|
|
|
|
|
_ = retryOnBusy(ctx, func() error {
|
|
|
|
|
row = db.DB.QueryRowContext(ctx, query, args...)
|
|
|
|
|
row = db.db.QueryRowContext(ctx, query, args...)
|
|
|
|
|
err := db.errHook(row.Err())
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Tx wraps a bun transaction instance
|
|
|
|
|
// to provide common per-dialect SQL error
|
|
|
|
|
// conversions to common types, and retries
|
|
|
|
|
// on busy commit/rollback (SQLite only).
|
|
|
|
|
type Tx struct {
|
|
|
|
|
// our own wrapped Tx type
|
|
|
|
|
// kept separate to the *bun.Tx
|
|
|
|
|
// type to be passed into query
|
|
|
|
|
// builders as bun.IConn iface
|
|
|
|
|
// (this prevents double firing
|
|
|
|
|
// bun query hooks).
|
|
|
|
|
//
|
|
|
|
|
// also holds per-dialect
|
|
|
|
|
// error hook function.
|
|
|
|
|
raw rawtx
|
|
|
|
|
|
|
|
|
|
// bun Tx interface we use
|
|
|
|
|
// for dialects, and improved
|
|
|
|
|
// struct marshal/unmarshaling.
|
|
|
|
|
bun *bun.Tx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// wrapTx wraps a given bun.Tx in our own wrapping Tx type.
|
|
|
|
|
func wrapTx(db *DB, tx *bun.Tx) Tx {
|
|
|
|
|
return Tx{
|
|
|
|
|
raw: rawtx{
|
|
|
|
|
errHook: db.raw.errHook,
|
|
|
|
|
tx: tx.Tx,
|
|
|
|
|
},
|
|
|
|
|
bun: tx,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ExecContext wraps bun.Tx.ExecContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
|
|
|
|
|
func (tx Tx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
|
|
|
|
buntx := tx.bun // use underlying *bun.Tx interface for their query formatting
|
|
|
|
|
res, err := buntx.ExecContext(ctx, query, args...)
|
|
|
|
|
err = tx.raw.errHook(err)
|
|
|
|
|
return res, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryContext wraps bun.Tx.QueryContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
|
|
|
|
|
func (tx Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
|
|
|
|
buntx := tx.bun // use underlying *bun.Tx interface for their query formatting
|
|
|
|
|
rows, err := buntx.QueryContext(ctx, query, args...)
|
|
|
|
|
err = tx.raw.errHook(err)
|
|
|
|
|
return rows, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryRowContext wraps bun.Tx.QueryRowContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
|
|
|
|
|
func (tx Tx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
|
|
|
|
buntx := tx.bun // use underlying *bun.Tx interface for their query formatting
|
|
|
|
|
row := buntx.QueryRowContext(ctx, query, args...)
|
|
|
|
|
if err := tx.raw.errHook(row.Err()); err != nil {
|
|
|
|
|
updateRowError(row, err) // set new error
|
|
|
|
|
}
|
|
|
|
|
return row
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Commit wraps bun.Tx.Commit() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (tx Tx) Commit() (err error) {
|
|
|
|
|
buntx := tx.bun // use *bun.Tx interface
|
|
|
|
|
err = retryOnBusy(context.TODO(), func() error {
|
|
|
|
|
err = buntx.Commit()
|
|
|
|
|
err = tx.raw.errHook(err)
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Rollback wraps bun.Tx.Rollback() with retry-busy timeout and our own error processing.
|
|
|
|
|
func (tx Tx) Rollback() (err error) {
|
|
|
|
|
buntx := tx.bun // use *bun.Tx interface
|
|
|
|
|
err = retryOnBusy(context.TODO(), func() error {
|
|
|
|
|
err = buntx.Rollback()
|
|
|
|
|
err = tx.raw.errHook(err)
|
|
|
|
|
return err
|
|
|
|
|
})
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Dialect is a direct call-through to bun.DB.Dialect().
|
|
|
|
|
func (tx Tx) Dialect() schema.Dialect {
|
|
|
|
|
return tx.bun.Dialect()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewValues(model interface{}) *bun.ValuesQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewValues(model).Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewMerge() *bun.MergeQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewMerge().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewSelect() *bun.SelectQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewSelect().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewInsert() *bun.InsertQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewInsert().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewUpdate() *bun.UpdateQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewUpdate().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewDelete() *bun.DeleteQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewDelete().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewRaw(query string, args ...interface{}) *bun.RawQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewRaw(query, args...).Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewCreateTable() *bun.CreateTableQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewCreateTable().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewDropTable() *bun.DropTableQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewDropTable().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewCreateIndex() *bun.CreateIndexQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewCreateIndex().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewDropIndex() *bun.DropIndexQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewDropIndex().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewTruncateTable() *bun.TruncateTableQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewTruncateTable().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewAddColumn() *bun.AddColumnQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewAddColumn().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (tx Tx) NewDropColumn() *bun.DropColumnQuery {
|
|
|
|
|
// note: passing in rawtx as conn iface so no double query-hook
|
|
|
|
|
// firing when passed through the bun.Tx.Query___() functions.
|
|
|
|
|
return tx.bun.NewDropColumn().Conn(&tx.raw)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type rawtx struct {
|
|
|
|
|
// dialect specific error
|
|
|
|
|
// processing function hook.
|
|
|
|
|
errHook func(error) error
|
|
|
|
|
|
|
|
|
|
// embedded raw
|
|
|
|
|
// tx interface
|
|
|
|
|
tx *sql.Tx
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ExecContext wraps sql.Tx.ExecContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
|
|
|
|
|
func (tx *rawtx) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
|
|
|
|
res, err := tx.tx.ExecContext(ctx, query, args...)
|
|
|
|
|
err = tx.errHook(err)
|
|
|
|
|
return res, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryContext wraps sql.Tx.QueryContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
|
|
|
|
|
func (tx *rawtx) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
|
|
|
|
rows, err := tx.tx.QueryContext(ctx, query, args...)
|
|
|
|
|
err = tx.errHook(err)
|
|
|
|
|
return rows, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// QueryRowContext wraps sql.Tx.QueryRowContext() with our own error processing, WITHOUT retry-busy timeouts (as will be mid-transaction).
|
|
|
|
|
func (tx *rawtx) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
|
|
|
|
row := tx.tx.QueryRowContext(ctx, query, args...)
|
|
|
|
|
if err := tx.errHook(row.Err()); err != nil {
|
|
|
|
|
updateRowError(row, err) // set new error
|
|
|
|
|
}
|
|
|
|
|
return row
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// updateRowError updates an sql.Row's internal error field using the unsafe package.
|
|
|
|
|
func updateRowError(sqlrow *sql.Row, err error) {
|
|
|
|
|
type row struct {
|
|
|
|
|
err error
|
|
|
|
|
rows *sql.Rows
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// compile-time check to ensure sql.Row not changed.
|
|
|
|
|
if unsafe.Sizeof(row{}) != unsafe.Sizeof(sql.Row{}) {
|
|
|
|
|
panic("sql.Row has changed definition")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// this code is awful and i must be shamed for this.
|
|
|
|
|
(*row)(unsafe.Pointer(sqlrow)).err = err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// retryOnBusy will retry given function on returned 'errBusy'.
|
|
|
|
|
func retryOnBusy(ctx context.Context, fn func() error) error {
|
|
|
|
|
var backoff time.Duration
|
|
|
|
|