diff options
Diffstat (limited to 'bskylink/src/db')
-rw-r--r-- | bskylink/src/db/index.ts | 174 | ||||
-rw-r--r-- | bskylink/src/db/migrations/001-init.ts | 15 | ||||
-rw-r--r-- | bskylink/src/db/migrations/index.ts | 5 | ||||
-rw-r--r-- | bskylink/src/db/migrations/provider.ts | 8 | ||||
-rw-r--r-- | bskylink/src/db/schema.ts | 17 |
5 files changed, 219 insertions, 0 deletions
diff --git a/bskylink/src/db/index.ts b/bskylink/src/db/index.ts new file mode 100644 index 000000000..5f201cc07 --- /dev/null +++ b/bskylink/src/db/index.ts @@ -0,0 +1,174 @@ +import assert from 'assert' +import { + Kysely, + KyselyPlugin, + Migrator, + PluginTransformQueryArgs, + PluginTransformResultArgs, + PostgresDialect, + QueryResult, + RootOperationNode, + UnknownRow, +} from 'kysely' +import {default as Pg} from 'pg' + +import {dbLogger as log} from '../logger.js' +import {default as migrations} from './migrations/index.js' +import {DbMigrationProvider} from './migrations/provider.js' +import {DbSchema} from './schema.js' + +export class Database { + migrator: Migrator + destroyed = false + + constructor(public db: Kysely<DbSchema>, public cfg: PgConfig) { + this.migrator = new Migrator({ + db, + migrationTableSchema: cfg.schema, + provider: new DbMigrationProvider(migrations), + }) + } + + static postgres(opts: PgOptions): Database { + const {schema, url, txLockNonce} = opts + const pool = + opts.pool ?? + new Pg.Pool({ + connectionString: url, + max: opts.poolSize, + maxUses: opts.poolMaxUses, + idleTimeoutMillis: opts.poolIdleTimeoutMs, + }) + + // Select count(*) and other pg bigints as js integer + Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10)) + + // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema) + if (schema && !/^[a-z_]+$/i.test(schema)) { + throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`) + } + + pool.on('error', onPoolError) + + const db = new Kysely<DbSchema>({ + dialect: new PostgresDialect({pool}), + }) + + return new Database(db, { + pool, + schema, + url, + txLockNonce, + }) + } + + async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> { + const leakyTxPlugin = new LeakyTxPlugin() + return this.db + .withPlugin(leakyTxPlugin) + .transaction() + .execute(txn => { + const dbTxn = new Database(txn, this.cfg) + return fn(dbTxn) + .catch(async err => { + leakyTxPlugin.endTx() + // ensure that all in-flight queries are flushed & the connection is open + await dbTxn.db.getExecutor().provideConnection(async () => {}) + throw err + }) + .finally(() => leakyTxPlugin.endTx()) + }) + } + + get schema(): string | undefined { + return this.cfg.schema + } + + get isTransaction() { + return this.db.isTransaction + } + + assertTransaction() { + assert(this.isTransaction, 'Transaction required') + } + + assertNotTransaction() { + assert(!this.isTransaction, 'Cannot be in a transaction') + } + + async close(): Promise<void> { + if (this.destroyed) return + await this.db.destroy() + this.destroyed = true + } + + async migrateToOrThrow(migration: string) { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const {error, results} = await this.migrator.migrateTo(migration) + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } + + async migrateToLatestOrThrow() { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const {error, results} = await this.migrator.migrateToLatest() + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } +} + +export default Database + +export type PgConfig = { + pool: Pg.Pool + url: string + schema?: string + txLockNonce?: string +} + +type PgOptions = { + url: string + pool?: Pg.Pool + schema?: string + poolSize?: number + poolMaxUses?: number + poolIdleTimeoutMs?: number + txLockNonce?: string +} + +class LeakyTxPlugin implements KyselyPlugin { + private txOver = false + + endTx() { + this.txOver = true + } + + transformQuery(args: PluginTransformQueryArgs): RootOperationNode { + if (this.txOver) { + throw new Error('tx already failed') + } + return args.node + } + + async transformResult( + args: PluginTransformResultArgs, + ): Promise<QueryResult<UnknownRow>> { + return args.result + } +} + +const onPoolError = (err: Error) => log.error({err}, 'db pool error') diff --git a/bskylink/src/db/migrations/001-init.ts b/bskylink/src/db/migrations/001-init.ts new file mode 100644 index 000000000..fe3bcf186 --- /dev/null +++ b/bskylink/src/db/migrations/001-init.ts @@ -0,0 +1,15 @@ +import {Kysely} from 'kysely' + +export async function up(db: Kysely<unknown>): Promise<void> { + await db.schema + .createTable('link') + .addColumn('id', 'varchar', col => col.primaryKey()) + .addColumn('type', 'smallint', col => col.notNull()) // integer enum: 1->starterpack + .addColumn('path', 'varchar', col => col.notNull()) + .addUniqueConstraint('link_path_unique', ['path']) + .execute() +} + +export async function down(db: Kysely<unknown>): Promise<void> { + await db.schema.dropTable('link').execute() +} diff --git a/bskylink/src/db/migrations/index.ts b/bskylink/src/db/migrations/index.ts new file mode 100644 index 000000000..05e4de937 --- /dev/null +++ b/bskylink/src/db/migrations/index.ts @@ -0,0 +1,5 @@ +import * as init from './001-init.js' + +export default { + '001': init, +} diff --git a/bskylink/src/db/migrations/provider.ts b/bskylink/src/db/migrations/provider.ts new file mode 100644 index 000000000..bef93a48f --- /dev/null +++ b/bskylink/src/db/migrations/provider.ts @@ -0,0 +1,8 @@ +import {Migration, MigrationProvider} from 'kysely' + +export class DbMigrationProvider implements MigrationProvider { + constructor(private migrations: Record<string, Migration>) {} + async getMigrations(): Promise<Record<string, Migration>> { + return this.migrations + } +} diff --git a/bskylink/src/db/schema.ts b/bskylink/src/db/schema.ts new file mode 100644 index 000000000..8d97f5800 --- /dev/null +++ b/bskylink/src/db/schema.ts @@ -0,0 +1,17 @@ +import {Selectable} from 'kysely' + +export type DbSchema = { + link: Link +} + +export interface Link { + id: string + type: LinkType + path: string +} + +export enum LinkType { + StarterPack = 1, +} + +export type LinkEntry = Selectable<Link> |