about summary refs log tree commit diff
path: root/bskylink/src/cache/safelinkClient.ts
blob: 547a9c84383fbaefa641ec7d56ecf3799f225af9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
import {
  AtpAgent,
  CredentialSession,
  type ToolsOzoneSafelinkDefs,
  type ToolsOzoneSafelinkQueryEvents,
} from '@atproto/api'
import {ExpiredTokenError} from '@atproto/api/dist/client/types/com/atproto/server/confirmEmail.js'
import {MINUTE} from '@atproto/common'
import {LRUCache} from 'lru-cache'

import {type ServiceConfig} from '../config.js'
import type Database from '../db/index.js'
import {type SafelinkRule} from '../db/schema.js'
import {redirectLogger} from '../logger.js'

const SAFELINK_MIN_FETCH_INTERVAL = 1_000
const SAFELINK_MAX_FETCH_INTERVAL = 10_000
const SCHEME_REGEX = /^[a-zA-Z][a-zA-Z0-9+.-]*:/

export class SafelinkClient {
  private domainCache: LRUCache<string, SafelinkRule | 'ok'>
  private urlCache: LRUCache<string, SafelinkRule | 'ok'>

  private db: Database

  private ozoneAgent: OzoneAgent

  private cursor?: string

  constructor({cfg, db}: {cfg: ServiceConfig; db: Database}) {
    this.domainCache = new LRUCache<string, SafelinkRule | 'ok'>({
      max: 10000,
    })

    this.urlCache = new LRUCache<string, SafelinkRule | 'ok'>({
      max: 25000,
    })

    this.db = db

    this.ozoneAgent = new OzoneAgent(
      cfg.safelinkPdsUrl!,
      cfg.safelinkAgentIdentifier!,
      cfg.safelinkAgentPass!,
    )
  }

  public async tryFindRule(link: string): Promise<SafelinkRule | 'ok'> {
    let url: string
    let domain: string
    try {
      url = SafelinkClient.normalizeUrl(link)
      domain = SafelinkClient.normalizeDomain(link)
    } catch (e) {
      redirectLogger.error(
        {error: e, inputUrl: link},
        'failed to normalize looked up link',
      )
      // fail open
      return 'ok'
    }

    // First, check if there is an existing URL rule. Note that even if the rule is 'ok', we still
    // want to check for a blocking domain rule, so we will only return here if the url rule exists
    // _and_ it is not 'ok'.
    const urlRule = this.urlCache.get(url)
    if (urlRule && urlRule !== 'ok') {
      return urlRule
    }

    // If we find a domain rule of _any_ kind, including 'ok', we can now return that rule.
    const domainRule = this.domainCache.get(domain)
    if (domainRule) {
      return domainRule
    }

    try {
      const maybeUrlRule = await this.getRule(this.db, url, 'url')
      this.urlCache.set(url, maybeUrlRule)
      return maybeUrlRule
    } catch (e) {
      this.urlCache.set(url, 'ok')
    }

    try {
      const maybeDomainRule = await this.getRule(this.db, domain, 'domain')
      this.domainCache.set(domain, maybeDomainRule)
      return maybeDomainRule
    } catch (e) {
      this.domainCache.set(domain, 'ok')
    }

    return 'ok'
  }

  private async getRule(
    db: Database,
    url: string,
    pattern: ToolsOzoneSafelinkDefs.PatternType,
  ): Promise<SafelinkRule> {
    return db.db
      .selectFrom('safelink_rule')
      .selectAll()
      .where('url', '=', url)
      .where('pattern', '=', pattern)
      .orderBy('createdAt', 'desc')
      .executeTakeFirstOrThrow()
  }

  private async addRule(db: Database, rule: SafelinkRule) {
    try {
      if (rule.pattern === 'url') {
        rule.url = SafelinkClient.normalizeUrl(rule.url)
      } else if (rule.pattern === 'domain') {
        rule.url = SafelinkClient.normalizeDomain(rule.url)
      }
    } catch (e) {
      redirectLogger.error(
        {error: e, inputUrl: rule.url},
        'failed to normalize rule input URL',
      )
      return
    }

    db.db
      .insertInto('safelink_rule')
      .values({
        id: rule.id,
        eventType: rule.eventType,
        url: rule.url,
        pattern: rule.pattern,
        action: rule.action,
        createdAt: rule.createdAt,
      })
      .execute()
      .catch(err => {
        redirectLogger.error(
          {error: err, rule},
          'failed to add rule to database',
        )
      })

    if (rule.pattern === 'domain') {
      this.domainCache.delete(rule.url)
    } else {
      this.urlCache.delete(rule.url)
    }
  }

  private async removeRule(db: Database, rule: SafelinkRule) {
    try {
      if (rule.pattern === 'url') {
        rule.url = SafelinkClient.normalizeUrl(rule.url)
      } else if (rule.pattern === 'domain') {
        rule.url = SafelinkClient.normalizeDomain(rule.url)
      }
    } catch (e) {
      redirectLogger.error(
        {error: e, inputUrl: rule.url},
        'failed to normalize rule input URL',
      )
      return
    }

    await db.db
      .deleteFrom('safelink_rule')
      .where('pattern', '=', 'domain')
      .where('url', '=', rule.url)
      .execute()
      .catch(err => {
        redirectLogger.error(
          {error: err, rule},
          'failed to remove rule from database',
        )
      })

    if (rule.pattern === 'domain') {
      this.domainCache.delete(rule.url)
    } else {
      this.urlCache.delete(rule.url)
    }
  }

  public async runFetchEvents() {
    let agent: AtpAgent
    try {
      agent = await this.ozoneAgent.getAgent()
    } catch (err) {
      redirectLogger.error({error: err}, 'error getting Ozone agent')
      setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL)
      return
    }

    let res: ToolsOzoneSafelinkQueryEvents.Response
    try {
      const cursor = await this.getCursor()
      res = await agent.tools.ozone.safelink.queryEvents({
        cursor,
        limit: 100,
        sortDirection: 'asc',
      })
    } catch (err) {
      if (err instanceof ExpiredTokenError) {
        redirectLogger.info('ozone agent had expired session, refreshing...')
        await this.ozoneAgent.refreshSession()
        setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL)
        return
      }

      redirectLogger.error(
        {error: err},
        'error fetching safelink events from Ozone',
      )
      setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL)
      return
    }

    if (res.data.events.length === 0) {
      redirectLogger.info('received no new safelink events from ozone')
      setTimeout(() => this.runFetchEvents(), SAFELINK_MAX_FETCH_INTERVAL)
    } else {
      await this.db.transaction(async db => {
        for (const rule of res.data.events) {
          switch (rule.eventType) {
            case 'removeRule':
              await this.removeRule(db, rule)
              break
            case 'addRule':
            case 'updateRule':
              await this.addRule(db, rule)
              break
            default:
              redirectLogger.warn({rule}, 'received unknown rule event type')
          }
        }
      })
      if (res.data.cursor) {
        redirectLogger.info(
          {cursor: res.data.cursor},
          'received new safelink events from Ozone',
        )
        await this.setCursor(res.data.cursor)
      }
      setTimeout(() => this.runFetchEvents(), SAFELINK_MIN_FETCH_INTERVAL)
    }
  }

  private async getCursor() {
    if (this.cursor === '') {
      const res = await this.db.db
        .selectFrom('safelink_cursor')
        .selectAll()
        .where('id', '=', 1)
        .executeTakeFirst()
      if (!res) {
        return ''
      }
      this.cursor = res.cursor
    }
    return this.cursor
  }

  private async setCursor(cursor: string) {
    const updatedAt = new Date()
    try {
      await this.db.db
        .insertInto('safelink_cursor')
        .values({
          id: 1,
          cursor,
          updatedAt,
        })
        .onConflict(oc => oc.column('id').doUpdateSet({cursor, updatedAt}))
        .execute()
      this.cursor = cursor
    } catch (err) {
      redirectLogger.error({error: err}, 'failed to update safelink cursor')
    }
  }

  private static normalizeUrl(input: string) {
    if (!SCHEME_REGEX.test(input)) {
      input = `https://${input}`
    }
    const u = new URL(input)
    u.hash = ''
    let normalized = u.href.replace(SCHEME_REGEX, '').toLowerCase()
    if (normalized.endsWith('/')) {
      normalized = normalized.substring(0, normalized.length - 1)
    }
    return normalized
  }

  private static normalizeDomain(input: string) {
    if (!SCHEME_REGEX.test(input)) {
      input = `https://${input}`
    }
    const u = new URL(input)
    return u.host.toLowerCase()
  }
}

export class OzoneAgent {
  private identifier: string
  private password: string

  private session: CredentialSession
  private agent: AtpAgent

  private refreshAt = 0

  constructor(pdsHost: string, identifier: string, password: string) {
    this.identifier = identifier
    this.password = password

    this.session = new CredentialSession(new URL(pdsHost))
    this.agent = new AtpAgent(this.session)
  }

  public async getAgent() {
    if (!this.identifier && !this.password) {
      throw new Error(
        'OZONE_AGENT_HANDLE and OZONE_AGENT_PASS environment variables must be set',
      )
    }

    if (!this.session.hasSession) {
      redirectLogger.info('creating Ozone session')
      await this.session.login({
        identifier: this.identifier,
        password: this.password,
      })
      redirectLogger.info('ozone session created successfully')
      this.refreshAt = Date.now() + 50 * MINUTE
    }

    if (Date.now() <= this.refreshAt) {
      await this.refreshSession()
    }

    return this.agent
  }

  public async refreshSession() {
    try {
      await this.session.refreshSession()
      this.refreshAt = Date.now() + 50 * MINUTE
    } catch (e) {
      redirectLogger.error({error: e}, 'error refreshing session')
    }
  }
}