about summary refs log tree commit diff
path: root/bskylink/src/db
diff options
context:
space:
mode:
Diffstat (limited to 'bskylink/src/db')
-rw-r--r--bskylink/src/db/index.ts174
-rw-r--r--bskylink/src/db/migrations/001-init.ts15
-rw-r--r--bskylink/src/db/migrations/index.ts5
-rw-r--r--bskylink/src/db/migrations/provider.ts8
-rw-r--r--bskylink/src/db/schema.ts17
5 files changed, 219 insertions, 0 deletions
diff --git a/bskylink/src/db/index.ts b/bskylink/src/db/index.ts
new file mode 100644
index 000000000..5f201cc07
--- /dev/null
+++ b/bskylink/src/db/index.ts
@@ -0,0 +1,174 @@
+import assert from 'assert'
+import {
+  Kysely,
+  KyselyPlugin,
+  Migrator,
+  PluginTransformQueryArgs,
+  PluginTransformResultArgs,
+  PostgresDialect,
+  QueryResult,
+  RootOperationNode,
+  UnknownRow,
+} from 'kysely'
+import {default as Pg} from 'pg'
+
+import {dbLogger as log} from '../logger.js'
+import {default as migrations} from './migrations/index.js'
+import {DbMigrationProvider} from './migrations/provider.js'
+import {DbSchema} from './schema.js'
+
+export class Database {
+  migrator: Migrator
+  destroyed = false
+
+  constructor(public db: Kysely<DbSchema>, public cfg: PgConfig) {
+    this.migrator = new Migrator({
+      db,
+      migrationTableSchema: cfg.schema,
+      provider: new DbMigrationProvider(migrations),
+    })
+  }
+
+  static postgres(opts: PgOptions): Database {
+    const {schema, url, txLockNonce} = opts
+    const pool =
+      opts.pool ??
+      new Pg.Pool({
+        connectionString: url,
+        max: opts.poolSize,
+        maxUses: opts.poolMaxUses,
+        idleTimeoutMillis: opts.poolIdleTimeoutMs,
+      })
+
+    // Select count(*) and other pg bigints as js integer
+    Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10))
+
+    // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema)
+    if (schema && !/^[a-z_]+$/i.test(schema)) {
+      throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`)
+    }
+
+    pool.on('error', onPoolError)
+
+    const db = new Kysely<DbSchema>({
+      dialect: new PostgresDialect({pool}),
+    })
+
+    return new Database(db, {
+      pool,
+      schema,
+      url,
+      txLockNonce,
+    })
+  }
+
+  async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
+    const leakyTxPlugin = new LeakyTxPlugin()
+    return this.db
+      .withPlugin(leakyTxPlugin)
+      .transaction()
+      .execute(txn => {
+        const dbTxn = new Database(txn, this.cfg)
+        return fn(dbTxn)
+          .catch(async err => {
+            leakyTxPlugin.endTx()
+            // ensure that all in-flight queries are flushed & the connection is open
+            await dbTxn.db.getExecutor().provideConnection(async () => {})
+            throw err
+          })
+          .finally(() => leakyTxPlugin.endTx())
+      })
+  }
+
+  get schema(): string | undefined {
+    return this.cfg.schema
+  }
+
+  get isTransaction() {
+    return this.db.isTransaction
+  }
+
+  assertTransaction() {
+    assert(this.isTransaction, 'Transaction required')
+  }
+
+  assertNotTransaction() {
+    assert(!this.isTransaction, 'Cannot be in a transaction')
+  }
+
+  async close(): Promise<void> {
+    if (this.destroyed) return
+    await this.db.destroy()
+    this.destroyed = true
+  }
+
+  async migrateToOrThrow(migration: string) {
+    if (this.schema) {
+      await this.db.schema.createSchema(this.schema).ifNotExists().execute()
+    }
+    const {error, results} = await this.migrator.migrateTo(migration)
+    if (error) {
+      throw error
+    }
+    if (!results) {
+      throw new Error('An unknown failure occurred while migrating')
+    }
+    return results
+  }
+
+  async migrateToLatestOrThrow() {
+    if (this.schema) {
+      await this.db.schema.createSchema(this.schema).ifNotExists().execute()
+    }
+    const {error, results} = await this.migrator.migrateToLatest()
+    if (error) {
+      throw error
+    }
+    if (!results) {
+      throw new Error('An unknown failure occurred while migrating')
+    }
+    return results
+  }
+}
+
+export default Database
+
+export type PgConfig = {
+  pool: Pg.Pool
+  url: string
+  schema?: string
+  txLockNonce?: string
+}
+
+type PgOptions = {
+  url: string
+  pool?: Pg.Pool
+  schema?: string
+  poolSize?: number
+  poolMaxUses?: number
+  poolIdleTimeoutMs?: number
+  txLockNonce?: string
+}
+
+class LeakyTxPlugin implements KyselyPlugin {
+  private txOver = false
+
+  endTx() {
+    this.txOver = true
+  }
+
+  transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
+    if (this.txOver) {
+      throw new Error('tx already failed')
+    }
+    return args.node
+  }
+
+  async transformResult(
+    args: PluginTransformResultArgs,
+  ): Promise<QueryResult<UnknownRow>> {
+    return args.result
+  }
+}
+
+const onPoolError = (err: Error) => log.error({err}, 'db pool error')
diff --git a/bskylink/src/db/migrations/001-init.ts b/bskylink/src/db/migrations/001-init.ts
new file mode 100644
index 000000000..fe3bcf186
--- /dev/null
+++ b/bskylink/src/db/migrations/001-init.ts
@@ -0,0 +1,15 @@
+import {Kysely} from 'kysely'
+
+export async function up(db: Kysely<unknown>): Promise<void> {
+  await db.schema
+    .createTable('link')
+    .addColumn('id', 'varchar', col => col.primaryKey())
+    .addColumn('type', 'smallint', col => col.notNull()) // integer enum: 1->starterpack
+    .addColumn('path', 'varchar', col => col.notNull())
+    .addUniqueConstraint('link_path_unique', ['path'])
+    .execute()
+}
+
+export async function down(db: Kysely<unknown>): Promise<void> {
+  await db.schema.dropTable('link').execute()
+}
diff --git a/bskylink/src/db/migrations/index.ts b/bskylink/src/db/migrations/index.ts
new file mode 100644
index 000000000..05e4de937
--- /dev/null
+++ b/bskylink/src/db/migrations/index.ts
@@ -0,0 +1,5 @@
+import * as init from './001-init.js'
+
+export default {
+  '001': init,
+}
diff --git a/bskylink/src/db/migrations/provider.ts b/bskylink/src/db/migrations/provider.ts
new file mode 100644
index 000000000..bef93a48f
--- /dev/null
+++ b/bskylink/src/db/migrations/provider.ts
@@ -0,0 +1,8 @@
+import {Migration, MigrationProvider} from 'kysely'
+
+export class DbMigrationProvider implements MigrationProvider {
+  constructor(private migrations: Record<string, Migration>) {}
+  async getMigrations(): Promise<Record<string, Migration>> {
+    return this.migrations
+  }
+}
diff --git a/bskylink/src/db/schema.ts b/bskylink/src/db/schema.ts
new file mode 100644
index 000000000..8d97f5800
--- /dev/null
+++ b/bskylink/src/db/schema.ts
@@ -0,0 +1,17 @@
+import {Selectable} from 'kysely'
+
+export type DbSchema = {
+  link: Link
+}
+
+export interface Link {
+  id: string
+  type: LinkType
+  path: string
+}
+
+export enum LinkType {
+  StarterPack = 1,
+}
+
+export type LinkEntry = Selectable<Link>