diff options
Diffstat (limited to 'bskylink/src')
-rw-r--r-- | bskylink/src/bin.ts | 24 | ||||
-rw-r--r-- | bskylink/src/config.ts | 82 | ||||
-rw-r--r-- | bskylink/src/context.ts | 33 | ||||
-rw-r--r-- | bskylink/src/db/index.ts | 174 | ||||
-rw-r--r-- | bskylink/src/db/migrations/001-init.ts | 15 | ||||
-rw-r--r-- | bskylink/src/db/migrations/index.ts | 5 | ||||
-rw-r--r-- | bskylink/src/db/migrations/provider.ts | 8 | ||||
-rw-r--r-- | bskylink/src/db/schema.ts | 17 | ||||
-rw-r--r-- | bskylink/src/index.ts | 45 | ||||
-rw-r--r-- | bskylink/src/logger.ts | 4 | ||||
-rw-r--r-- | bskylink/src/routes/create.ts | 111 | ||||
-rw-r--r-- | bskylink/src/routes/health.ts | 20 | ||||
-rw-r--r-- | bskylink/src/routes/index.ts | 17 | ||||
-rw-r--r-- | bskylink/src/routes/redirect.ts | 40 | ||||
-rw-r--r-- | bskylink/src/routes/siteAssociation.ts | 13 | ||||
-rw-r--r-- | bskylink/src/routes/util.ts | 23 | ||||
-rw-r--r-- | bskylink/src/util.ts | 8 |
17 files changed, 639 insertions, 0 deletions
diff --git a/bskylink/src/bin.ts b/bskylink/src/bin.ts new file mode 100644 index 000000000..17f068841 --- /dev/null +++ b/bskylink/src/bin.ts @@ -0,0 +1,24 @@ +import {Database, envToCfg, httpLogger, LinkService, readEnv} from './index.js' + +async function main() { + const env = readEnv() + const cfg = envToCfg(env) + if (cfg.db.migrationUrl) { + const migrateDb = Database.postgres({ + url: cfg.db.migrationUrl, + schema: cfg.db.schema, + }) + await migrateDb.migrateToLatestOrThrow() + await migrateDb.close() + } + const link = await LinkService.create(cfg) + await link.start() + httpLogger.info('link service is running') + process.on('SIGTERM', async () => { + httpLogger.info('link service is stopping') + await link.destroy() + httpLogger.info('link service is stopped') + }) +} + +main() diff --git a/bskylink/src/config.ts b/bskylink/src/config.ts new file mode 100644 index 000000000..ce409cccc --- /dev/null +++ b/bskylink/src/config.ts @@ -0,0 +1,82 @@ +import {envInt, envList, envStr} from '@atproto/common' + +export type Config = { + service: ServiceConfig + db: DbConfig +} + +export type ServiceConfig = { + port: number + version?: string + hostnames: string[] + appHostname: string +} + +export type DbConfig = { + url: string + migrationUrl?: string + pool: DbPoolConfig + schema?: string +} + +export type DbPoolConfig = { + size: number + maxUses: number + idleTimeoutMs: number +} + +export type Environment = { + port?: number + version?: string + hostnames: string[] + appHostname?: string + dbPostgresUrl?: string + dbPostgresMigrationUrl?: string + dbPostgresSchema?: string + dbPostgresPoolSize?: number + dbPostgresPoolMaxUses?: number + dbPostgresPoolIdleTimeoutMs?: number +} + +export const readEnv = (): Environment => { + return { + port: envInt('LINK_PORT'), + version: envStr('LINK_VERSION'), + hostnames: envList('LINK_HOSTNAMES'), + appHostname: envStr('LINK_APP_HOSTNAME'), + dbPostgresUrl: envStr('LINK_DB_POSTGRES_URL'), + dbPostgresMigrationUrl: envStr('LINK_DB_POSTGRES_MIGRATION_URL'), + dbPostgresSchema: envStr('LINK_DB_POSTGRES_SCHEMA'), + dbPostgresPoolSize: envInt('LINK_DB_POSTGRES_POOL_SIZE'), + dbPostgresPoolMaxUses: envInt('LINK_DB_POSTGRES_POOL_MAX_USES'), + dbPostgresPoolIdleTimeoutMs: envInt( + 'LINK_DB_POSTGRES_POOL_IDLE_TIMEOUT_MS', + ), + } +} + +export const envToCfg = (env: Environment): Config => { + const serviceCfg: ServiceConfig = { + port: env.port ?? 3000, + version: env.version, + hostnames: env.hostnames, + appHostname: env.appHostname || 'bsky.app', + } + if (!env.dbPostgresUrl) { + throw new Error('Must configure postgres url (LINK_DB_POSTGRES_URL)') + } + const dbCfg: DbConfig = { + url: env.dbPostgresUrl, + migrationUrl: env.dbPostgresMigrationUrl, + schema: env.dbPostgresSchema, + pool: { + idleTimeoutMs: env.dbPostgresPoolIdleTimeoutMs ?? 10000, + maxUses: env.dbPostgresPoolMaxUses ?? Infinity, + size: env.dbPostgresPoolSize ?? 10, + }, + } + return { + service: serviceCfg, + db: dbCfg, + } +} diff --git a/bskylink/src/context.ts b/bskylink/src/context.ts new file mode 100644 index 000000000..7e6f2f34e --- /dev/null +++ b/bskylink/src/context.ts @@ -0,0 +1,33 @@ +import {Config} from './config.js' +import Database from './db/index.js' + +export type AppContextOptions = { + cfg: Config + db: Database +} + +export class AppContext { + cfg: Config + db: Database + abortController = new AbortController() + + constructor(private opts: AppContextOptions) { + this.cfg = this.opts.cfg + this.db = this.opts.db + } + + static async fromConfig(cfg: Config, overrides?: Partial<AppContextOptions>) { + const db = Database.postgres({ + url: cfg.db.url, + schema: cfg.db.schema, + poolSize: cfg.db.pool.size, + poolMaxUses: cfg.db.pool.maxUses, + poolIdleTimeoutMs: cfg.db.pool.idleTimeoutMs, + }) + return new AppContext({ + cfg, + db, + ...overrides, + }) + } +} diff --git a/bskylink/src/db/index.ts b/bskylink/src/db/index.ts new file mode 100644 index 000000000..5f201cc07 --- /dev/null +++ b/bskylink/src/db/index.ts @@ -0,0 +1,174 @@ +import assert from 'assert' +import { + Kysely, + KyselyPlugin, + Migrator, + PluginTransformQueryArgs, + PluginTransformResultArgs, + PostgresDialect, + QueryResult, + RootOperationNode, + UnknownRow, +} from 'kysely' +import {default as Pg} from 'pg' + +import {dbLogger as log} from '../logger.js' +import {default as migrations} from './migrations/index.js' +import {DbMigrationProvider} from './migrations/provider.js' +import {DbSchema} from './schema.js' + +export class Database { + migrator: Migrator + destroyed = false + + constructor(public db: Kysely<DbSchema>, public cfg: PgConfig) { + this.migrator = new Migrator({ + db, + migrationTableSchema: cfg.schema, + provider: new DbMigrationProvider(migrations), + }) + } + + static postgres(opts: PgOptions): Database { + const {schema, url, txLockNonce} = opts + const pool = + opts.pool ?? + new Pg.Pool({ + connectionString: url, + max: opts.poolSize, + maxUses: opts.poolMaxUses, + idleTimeoutMillis: opts.poolIdleTimeoutMs, + }) + + // Select count(*) and other pg bigints as js integer + Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10)) + + // Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema) + if (schema && !/^[a-z_]+$/i.test(schema)) { + throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`) + } + + pool.on('error', onPoolError) + + const db = new Kysely<DbSchema>({ + dialect: new PostgresDialect({pool}), + }) + + return new Database(db, { + pool, + schema, + url, + txLockNonce, + }) + } + + async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> { + const leakyTxPlugin = new LeakyTxPlugin() + return this.db + .withPlugin(leakyTxPlugin) + .transaction() + .execute(txn => { + const dbTxn = new Database(txn, this.cfg) + return fn(dbTxn) + .catch(async err => { + leakyTxPlugin.endTx() + // ensure that all in-flight queries are flushed & the connection is open + await dbTxn.db.getExecutor().provideConnection(async () => {}) + throw err + }) + .finally(() => leakyTxPlugin.endTx()) + }) + } + + get schema(): string | undefined { + return this.cfg.schema + } + + get isTransaction() { + return this.db.isTransaction + } + + assertTransaction() { + assert(this.isTransaction, 'Transaction required') + } + + assertNotTransaction() { + assert(!this.isTransaction, 'Cannot be in a transaction') + } + + async close(): Promise<void> { + if (this.destroyed) return + await this.db.destroy() + this.destroyed = true + } + + async migrateToOrThrow(migration: string) { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const {error, results} = await this.migrator.migrateTo(migration) + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } + + async migrateToLatestOrThrow() { + if (this.schema) { + await this.db.schema.createSchema(this.schema).ifNotExists().execute() + } + const {error, results} = await this.migrator.migrateToLatest() + if (error) { + throw error + } + if (!results) { + throw new Error('An unknown failure occurred while migrating') + } + return results + } +} + +export default Database + +export type PgConfig = { + pool: Pg.Pool + url: string + schema?: string + txLockNonce?: string +} + +type PgOptions = { + url: string + pool?: Pg.Pool + schema?: string + poolSize?: number + poolMaxUses?: number + poolIdleTimeoutMs?: number + txLockNonce?: string +} + +class LeakyTxPlugin implements KyselyPlugin { + private txOver = false + + endTx() { + this.txOver = true + } + + transformQuery(args: PluginTransformQueryArgs): RootOperationNode { + if (this.txOver) { + throw new Error('tx already failed') + } + return args.node + } + + async transformResult( + args: PluginTransformResultArgs, + ): Promise<QueryResult<UnknownRow>> { + return args.result + } +} + +const onPoolError = (err: Error) => log.error({err}, 'db pool error') diff --git a/bskylink/src/db/migrations/001-init.ts b/bskylink/src/db/migrations/001-init.ts new file mode 100644 index 000000000..fe3bcf186 --- /dev/null +++ b/bskylink/src/db/migrations/001-init.ts @@ -0,0 +1,15 @@ +import {Kysely} from 'kysely' + +export async function up(db: Kysely<unknown>): Promise<void> { + await db.schema + .createTable('link') + .addColumn('id', 'varchar', col => col.primaryKey()) + .addColumn('type', 'smallint', col => col.notNull()) // integer enum: 1->starterpack + .addColumn('path', 'varchar', col => col.notNull()) + .addUniqueConstraint('link_path_unique', ['path']) + .execute() +} + +export async function down(db: Kysely<unknown>): Promise<void> { + await db.schema.dropTable('link').execute() +} diff --git a/bskylink/src/db/migrations/index.ts b/bskylink/src/db/migrations/index.ts new file mode 100644 index 000000000..05e4de937 --- /dev/null +++ b/bskylink/src/db/migrations/index.ts @@ -0,0 +1,5 @@ +import * as init from './001-init.js' + +export default { + '001': init, +} diff --git a/bskylink/src/db/migrations/provider.ts b/bskylink/src/db/migrations/provider.ts new file mode 100644 index 000000000..bef93a48f --- /dev/null +++ b/bskylink/src/db/migrations/provider.ts @@ -0,0 +1,8 @@ +import {Migration, MigrationProvider} from 'kysely' + +export class DbMigrationProvider implements MigrationProvider { + constructor(private migrations: Record<string, Migration>) {} + async getMigrations(): Promise<Record<string, Migration>> { + return this.migrations + } +} diff --git a/bskylink/src/db/schema.ts b/bskylink/src/db/schema.ts new file mode 100644 index 000000000..8d97f5800 --- /dev/null +++ b/bskylink/src/db/schema.ts @@ -0,0 +1,17 @@ +import {Selectable} from 'kysely' + +export type DbSchema = { + link: Link +} + +export interface Link { + id: string + type: LinkType + path: string +} + +export enum LinkType { + StarterPack = 1, +} + +export type LinkEntry = Selectable<Link> diff --git a/bskylink/src/index.ts b/bskylink/src/index.ts new file mode 100644 index 000000000..ca425eee8 --- /dev/null +++ b/bskylink/src/index.ts @@ -0,0 +1,45 @@ +import events from 'node:events' +import http from 'node:http' + +import cors from 'cors' +import express from 'express' +import {createHttpTerminator, HttpTerminator} from 'http-terminator' + +import {Config} from './config.js' +import {AppContext} from './context.js' +import {default as routes, errorHandler} from './routes/index.js' + +export * from './config.js' +export * from './db/index.js' +export * from './logger.js' + +export class LinkService { + public server?: http.Server + private terminator?: HttpTerminator + + constructor(public app: express.Application, public ctx: AppContext) {} + + static async create(cfg: Config): Promise<LinkService> { + let app = express() + app.use(cors()) + + const ctx = await AppContext.fromConfig(cfg) + app = routes(ctx, app) + app.use(errorHandler) + + return new LinkService(app, ctx) + } + + async start() { + this.server = this.app.listen(this.ctx.cfg.service.port) + this.server.keepAliveTimeout = 90000 + this.terminator = createHttpTerminator({server: this.server}) + await events.once(this.server, 'listening') + } + + async destroy() { + this.ctx.abortController.abort() + await this.terminator?.terminate() + await this.ctx.db.close() + } +} diff --git a/bskylink/src/logger.ts b/bskylink/src/logger.ts new file mode 100644 index 000000000..25bb590a1 --- /dev/null +++ b/bskylink/src/logger.ts @@ -0,0 +1,4 @@ +import {subsystemLogger} from '@atproto/common' + +export const httpLogger = subsystemLogger('bskylink') +export const dbLogger = subsystemLogger('bskylink:db') diff --git a/bskylink/src/routes/create.ts b/bskylink/src/routes/create.ts new file mode 100644 index 000000000..db7c3f809 --- /dev/null +++ b/bskylink/src/routes/create.ts @@ -0,0 +1,111 @@ +import assert from 'node:assert' + +import bodyParser from 'body-parser' +import {Express, Request} from 'express' + +import {AppContext} from '../context.js' +import {LinkType} from '../db/schema.js' +import {randomId} from '../util.js' +import {handler} from './util.js' + +export default function (ctx: AppContext, app: Express) { + return app.post( + '/link', + bodyParser.json(), + handler(async (req, res) => { + let path: string + if (typeof req.body?.path === 'string') { + path = req.body.path + } else { + return res.status(400).json({ + error: 'InvalidPath', + message: '"path" parameter is missing or not a string', + }) + } + if (!path.startsWith('/')) { + return res.status(400).json({ + error: 'InvalidPath', + message: + '"path" parameter must be formatted as a path, starting with a "/"', + }) + } + const parts = getPathParts(path) + if (parts.length === 3 && parts[0] === 'start') { + // link pattern: /start/{did}/{rkey} + if (!parts[1].startsWith('did:')) { + // enforce strong links + return res.status(400).json({ + error: 'InvalidPath', + message: + '"path" parameter for starter pack must contain the actor\'s DID', + }) + } + const id = await ensureLink(ctx, LinkType.StarterPack, parts) + return res.json({url: getUrl(ctx, req, id)}) + } + return res.status(400).json({ + error: 'InvalidPath', + message: '"path" parameter does not have a known format', + }) + }), + ) +} + +const ensureLink = async (ctx: AppContext, type: LinkType, parts: string[]) => { + const normalizedPath = normalizedPathFromParts(parts) + const created = await ctx.db.db + .insertInto('link') + .values({ + id: randomId(), + type, + path: normalizedPath, + }) + .onConflict(oc => oc.column('path').doNothing()) + .returningAll() + .executeTakeFirst() + if (created) { + return created.id + } + const found = await ctx.db.db + .selectFrom('link') + .selectAll() + .where('path', '=', normalizedPath) + .executeTakeFirstOrThrow() + return found.id +} + +const getUrl = (ctx: AppContext, req: Request, id: string) => { + if (!ctx.cfg.service.hostnames.length) { + assert(req.headers.host, 'request must be made with host header') + const baseUrl = + req.protocol === 'http' && req.headers.host.startsWith('localhost:') + ? `http://${req.headers.host}` + : `https://${req.headers.host}` + return `${baseUrl}/${id}` + } + const baseUrl = ctx.cfg.service.hostnames.includes(req.headers.host) + ? `https://${req.headers.host}` + : `https://${ctx.cfg.service.hostnames[0]}` + return `${baseUrl}/${id}` +} + +const normalizedPathFromParts = (parts: string[]): string => { + return ( + '/' + + parts + .map(encodeURIComponent) + .map(part => part.replaceAll('%3A', ':')) // preserve colons + .join('/') + ) +} + +const getPathParts = (path: string): string[] => { + if (path === '/') return [] + if (path.endsWith('/')) { + path = path.slice(0, -1) // ignore trailing slash + } + return path + .slice(1) // remove leading slash + .split('/') + .map(decodeURIComponent) +} diff --git a/bskylink/src/routes/health.ts b/bskylink/src/routes/health.ts new file mode 100644 index 000000000..c8a30c59e --- /dev/null +++ b/bskylink/src/routes/health.ts @@ -0,0 +1,20 @@ +import {Express} from 'express' +import {sql} from 'kysely' + +import {AppContext} from '../context.js' +import {handler} from './util.js' + +export default function (ctx: AppContext, app: Express) { + return app.get( + '/_health', + handler(async (_req, res) => { + const {version} = ctx.cfg.service + try { + await sql`select 1`.execute(ctx.db.db) + return res.send({version}) + } catch (err) { + return res.status(503).send({version, error: 'Service Unavailable'}) + } + }), + ) +} diff --git a/bskylink/src/routes/index.ts b/bskylink/src/routes/index.ts new file mode 100644 index 000000000..f60b99bcb --- /dev/null +++ b/bskylink/src/routes/index.ts @@ -0,0 +1,17 @@ +import {Express} from 'express' + +import {AppContext} from '../context.js' +import {default as create} from './create.js' +import {default as health} from './health.js' +import {default as redirect} from './redirect.js' +import {default as siteAssociation} from './siteAssociation.js' + +export * from './util.js' + +export default function (ctx: AppContext, app: Express) { + app = health(ctx, app) // GET /_health + app = siteAssociation(ctx, app) // GET /.well-known/apple-app-site-association + app = create(ctx, app) // POST /link + app = redirect(ctx, app) // GET /:linkId (should go last due to permissive matching) + return app +} diff --git a/bskylink/src/routes/redirect.ts b/bskylink/src/routes/redirect.ts new file mode 100644 index 000000000..7791ea815 --- /dev/null +++ b/bskylink/src/routes/redirect.ts @@ -0,0 +1,40 @@ +import assert from 'node:assert' + +import {DAY, SECOND} from '@atproto/common' +import {Express} from 'express' + +import {AppContext} from '../context.js' +import {handler} from './util.js' + +export default function (ctx: AppContext, app: Express) { + return app.get( + '/:linkId', + handler(async (req, res) => { + const linkId = req.params.linkId + assert( + typeof linkId === 'string', + 'express guarantees id parameter is a string', + ) + const found = await ctx.db.db + .selectFrom('link') + .selectAll() + .where('id', '=', linkId) + .executeTakeFirst() + if (!found) { + // potentially broken or mistyped link— send user to the app + res.setHeader('Location', `https://${ctx.cfg.service.appHostname}`) + res.setHeader('Cache-Control', 'no-store') + return res.status(302).end() + } + // build url from original url in order to preserve query params + const url = new URL( + req.originalUrl, + `https://${ctx.cfg.service.appHostname}`, + ) + url.pathname = found.path + res.setHeader('Location', url.href) + res.setHeader('Cache-Control', `max-age=${(7 * DAY) / SECOND}`) + return res.status(301).end() + }), + ) +} diff --git a/bskylink/src/routes/siteAssociation.ts b/bskylink/src/routes/siteAssociation.ts new file mode 100644 index 000000000..ae3b42e30 --- /dev/null +++ b/bskylink/src/routes/siteAssociation.ts @@ -0,0 +1,13 @@ +import {Express} from 'express' + +import {AppContext} from '../context.js' + +export default function (ctx: AppContext, app: Express) { + return app.get('/.well-known/apple-app-site-association', (req, res) => { + res.json({ + appclips: { + apps: ['B3LX46C5HS.xyz.blueskyweb.app.AppClip'], + }, + }) + }) +} diff --git a/bskylink/src/routes/util.ts b/bskylink/src/routes/util.ts new file mode 100644 index 000000000..bcac64b01 --- /dev/null +++ b/bskylink/src/routes/util.ts @@ -0,0 +1,23 @@ +import {ErrorRequestHandler, Request, RequestHandler, Response} from 'express' + +import {httpLogger} from '../logger.js' + +export type Handler = (req: Request, res: Response) => Awaited<void> + +export const handler = (runHandler: Handler): RequestHandler => { + return async (req, res, next) => { + try { + await runHandler(req, res) + } catch (err) { + next(err) + } + } +} + +export const errorHandler: ErrorRequestHandler = (err, _req, res, next) => { + httpLogger.error({err}, 'request error') + if (res.headersSent) { + return next(err) + } + return res.status(500).end('server error') +} diff --git a/bskylink/src/util.ts b/bskylink/src/util.ts new file mode 100644 index 000000000..0b57dd5c5 --- /dev/null +++ b/bskylink/src/util.ts @@ -0,0 +1,8 @@ +import {randomBytes} from 'node:crypto' + +import {toString} from 'uint8arrays' + +// 40bit random id of 5-7 characters +export const randomId = () => { + return toString(randomBytes(5), 'base58btc') +} |