about summary refs log tree commit diff
path: root/bskylink/src/db/index.ts
diff options
context:
space:
mode:
authordevin ivy <devinivy@gmail.com>2024-06-21 12:41:06 -0400
committerGitHub <noreply@github.com>2024-06-21 12:41:06 -0400
commit55812b03940852f1f91cd0a46b5c093601c854a9 (patch)
tree54956cb522786b1260b0a556f6f7c3ea1b0aed11 /bskylink/src/db/index.ts
parentba21fddd7897513fef663b826094878ad0ff1556 (diff)
downloadvoidsky-55812b03940852f1f91cd0a46b5c093601c854a9.tar.zst
Bsky short link service (#4542)
* bskylink: scaffold service w/ initial config and schema

* bskylink: implement link creation and redirects

* bskylink: tidy

* bskylink: tests

* bskylink: tidy, add error handler

* bskylink: add dockerfile

* bskylink: add build

* bskylink: fix some express plumbing

* bskyweb: proxy fallthrough routes to link service redirects

* bskyweb: build w/ link proxy

* Add AASA to bskylink (#4588)

---------

Co-authored-by: Hailey <me@haileyok.com>
Diffstat (limited to 'bskylink/src/db/index.ts')
-rw-r--r--bskylink/src/db/index.ts174
1 files changed, 174 insertions, 0 deletions
diff --git a/bskylink/src/db/index.ts b/bskylink/src/db/index.ts
new file mode 100644
index 000000000..5f201cc07
--- /dev/null
+++ b/bskylink/src/db/index.ts
@@ -0,0 +1,174 @@
+import assert from 'assert'
+import {
+  Kysely,
+  KyselyPlugin,
+  Migrator,
+  PluginTransformQueryArgs,
+  PluginTransformResultArgs,
+  PostgresDialect,
+  QueryResult,
+  RootOperationNode,
+  UnknownRow,
+} from 'kysely'
+import {default as Pg} from 'pg'
+
+import {dbLogger as log} from '../logger.js'
+import {default as migrations} from './migrations/index.js'
+import {DbMigrationProvider} from './migrations/provider.js'
+import {DbSchema} from './schema.js'
+
+export class Database {
+  migrator: Migrator
+  destroyed = false
+
+  constructor(public db: Kysely<DbSchema>, public cfg: PgConfig) {
+    this.migrator = new Migrator({
+      db,
+      migrationTableSchema: cfg.schema,
+      provider: new DbMigrationProvider(migrations),
+    })
+  }
+
+  static postgres(opts: PgOptions): Database {
+    const {schema, url, txLockNonce} = opts
+    const pool =
+      opts.pool ??
+      new Pg.Pool({
+        connectionString: url,
+        max: opts.poolSize,
+        maxUses: opts.poolMaxUses,
+        idleTimeoutMillis: opts.poolIdleTimeoutMs,
+      })
+
+    // Select count(*) and other pg bigints as js integer
+    Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10))
+
+    // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema)
+    if (schema && !/^[a-z_]+$/i.test(schema)) {
+      throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`)
+    }
+
+    pool.on('error', onPoolError)
+
+    const db = new Kysely<DbSchema>({
+      dialect: new PostgresDialect({pool}),
+    })
+
+    return new Database(db, {
+      pool,
+      schema,
+      url,
+      txLockNonce,
+    })
+  }
+
+  async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
+    const leakyTxPlugin = new LeakyTxPlugin()
+    return this.db
+      .withPlugin(leakyTxPlugin)
+      .transaction()
+      .execute(txn => {
+        const dbTxn = new Database(txn, this.cfg)
+        return fn(dbTxn)
+          .catch(async err => {
+            leakyTxPlugin.endTx()
+            // ensure that all in-flight queries are flushed & the connection is open
+            await dbTxn.db.getExecutor().provideConnection(async () => {})
+            throw err
+          })
+          .finally(() => leakyTxPlugin.endTx())
+      })
+  }
+
+  get schema(): string | undefined {
+    return this.cfg.schema
+  }
+
+  get isTransaction() {
+    return this.db.isTransaction
+  }
+
+  assertTransaction() {
+    assert(this.isTransaction, 'Transaction required')
+  }
+
+  assertNotTransaction() {
+    assert(!this.isTransaction, 'Cannot be in a transaction')
+  }
+
+  async close(): Promise<void> {
+    if (this.destroyed) return
+    await this.db.destroy()
+    this.destroyed = true
+  }
+
+  async migrateToOrThrow(migration: string) {
+    if (this.schema) {
+      await this.db.schema.createSchema(this.schema).ifNotExists().execute()
+    }
+    const {error, results} = await this.migrator.migrateTo(migration)
+    if (error) {
+      throw error
+    }
+    if (!results) {
+      throw new Error('An unknown failure occurred while migrating')
+    }
+    return results
+  }
+
+  async migrateToLatestOrThrow() {
+    if (this.schema) {
+      await this.db.schema.createSchema(this.schema).ifNotExists().execute()
+    }
+    const {error, results} = await this.migrator.migrateToLatest()
+    if (error) {
+      throw error
+    }
+    if (!results) {
+      throw new Error('An unknown failure occurred while migrating')
+    }
+    return results
+  }
+}
+
+export default Database
+
+export type PgConfig = {
+  pool: Pg.Pool
+  url: string
+  schema?: string
+  txLockNonce?: string
+}
+
+type PgOptions = {
+  url: string
+  pool?: Pg.Pool
+  schema?: string
+  poolSize?: number
+  poolMaxUses?: number
+  poolIdleTimeoutMs?: number
+  txLockNonce?: string
+}
+
+class LeakyTxPlugin implements KyselyPlugin {
+  private txOver = false
+
+  endTx() {
+    this.txOver = true
+  }
+
+  transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
+    if (this.txOver) {
+      throw new Error('tx already failed')
+    }
+    return args.node
+  }
+
+  async transformResult(
+    args: PluginTransformResultArgs,
+  ): Promise<QueryResult<UnknownRow>> {
+    return args.result
+  }
+}
+
+const onPoolError = (err: Error) => log.error({err}, 'db pool error')