diff options
author | devin ivy <devinivy@gmail.com> | 2024-06-21 12:41:06 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-06-21 12:41:06 -0400 |
commit | 55812b03940852f1f91cd0a46b5c093601c854a9 (patch) | |
tree | 54956cb522786b1260b0a556f6f7c3ea1b0aed11 /bskylink/src/db/index.ts | |
parent | ba21fddd7897513fef663b826094878ad0ff1556 (diff) | |
download | voidsky-55812b03940852f1f91cd0a46b5c093601c854a9.tar.zst |
Bsky short link service (#4542)
* bskylink: scaffold service w/ initial config and schema * bskylink: implement link creation and redirects * bskylink: tidy * bskylink: tests * bskylink: tidy, add error handler * bskylink: add dockerfile * bskylink: add build * bskylink: fix some express plumbing * bskyweb: proxy fallthrough routes to link service redirects * bskyweb: build w/ link proxy * Add AASA to bskylink (#4588) --------- Co-authored-by: Hailey <me@haileyok.com>
Diffstat (limited to 'bskylink/src/db/index.ts')
-rw-r--r-- | bskylink/src/db/index.ts | 174 |
1 files changed, 174 insertions, 0 deletions
diff --git a/bskylink/src/db/index.ts b/bskylink/src/db/index.ts new file mode 100644 index 000000000..5f201cc07 --- /dev/null +++ b/bskylink/src/db/index.ts @@ -0,0 +1,174 @@ +import assert from 'assert' +import { + Kysely, + KyselyPlugin, + Migrator, + PluginTransformQueryArgs, + PluginTransformResultArgs, + PostgresDialect, + QueryResult, + RootOperationNode, + UnknownRow, +} from 'kysely' +import {default as Pg} from 'pg' + +import {dbLogger as log} from '../logger.js' +import {default as migrations} from './migrations/index.js' +import {DbMigrationProvider} from './migrations/provider.js' +import {DbSchema} from './schema.js' + +export class Database { + migrator: Migrator + destroyed = false + + constructor(public db: Kysely<DbSchema>, public cfg: PgConfig) { + this.migrator = new Migrator({ + db, + migrationTableSchema: cfg.schema, + provider: new DbMigrationProvider(migrations), + }) + } + + static postgres(opts: PgOptions): Database { + const {schema, url, txLockNonce} = opts + const pool = + opts.pool ?? + new Pg.Pool({ + connectionString: url, + max: opts.poolSize, + maxUses: opts.poolMaxUses, + idleTimeoutMillis: opts.poolIdleTimeoutMs, + }) + + // Select count(*) and other pg bigints as js integer + Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10)) + + // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema) + if (schema && !/^[a-z_]+$/i.test(schema)) { + throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`) + } + + pool.on('error', onPoolError) + + const db = new Kysely<DbSchema>({ + dialect: new PostgresDialect({pool}), + }) + + return new Database(db, { + pool, + schema, + url, + txLockNonce, + }) + } + + async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> { + const leakyTxPlugin = new LeakyTxPlugin() + return this.db + .withPlugin(leakyTxPlugin) + .transaction() + .execute(txn => { + const dbTxn = new Database(txn, this.cfg) + return fn(dbTxn) + .catch(async err => { + leakyTxPlugin.endTx() + // ensure that all in-flight queries are flushed & the connection is open + await dbTxn.db.getExecutor().provideConnection(async () => {}) + throw err + }) + .finally(() => leakyTxPlugin.endTx()) + }) + } + + get schema(): string | undefined { + return this.cfg.schema + } + + get isTransaction() { + return this.db.isTransaction + } + + assertTransaction() { + assert(this.isTransaction, 'Transaction required') + } + + assertNotTransaction() { + assert(!this.isTransaction, 'Cannot be in a transaction') + } + + async close(): Promise<void> { + if (this.destroyed) return + await this.db.destroy() + this.destroyed = true + } + + async migrateToOrThrow(migration: string) { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const {error, results} = await this.migrator.migrateTo(migration) + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } + + async migrateToLatestOrThrow() { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const {error, results} = await this.migrator.migrateToLatest() + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } +} + +export default Database + +export type PgConfig = { + pool: Pg.Pool + url: string + schema?: string + txLockNonce?: string +} + +type PgOptions = { + url: string + pool?: Pg.Pool + schema?: string + poolSize?: number + poolMaxUses?: number + poolIdleTimeoutMs?: number + txLockNonce?: string +} + +class LeakyTxPlugin implements KyselyPlugin { + private txOver = false + + endTx() { + this.txOver = true + } + + transformQuery(args: PluginTransformQueryArgs): RootOperationNode { + if (this.txOver) { + throw new Error('tx already failed') + } + return args.node + } + + async transformResult( + args: PluginTransformResultArgs, + ): Promise<QueryResult<UnknownRow>> { + return args.result + } +} + +const onPoolError = (err: Error) => log.error({err}, 'db pool error') |