diff options
Diffstat (limited to 'src/state/messages/events/agent.ts')
-rw-r--r-- | src/state/messages/events/agent.ts | 321 |
1 files changed, 127 insertions, 194 deletions
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts index 422672853..f22cff9d9 100644 --- a/src/state/messages/events/agent.ts +++ b/src/state/messages/events/agent.ts @@ -3,158 +3,147 @@ import EventEmitter from 'eventemitter3' import {nanoid} from 'nanoid/non-secure' import {logger} from '#/logger' +import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const' import { MessagesEventBusDispatch, MessagesEventBusDispatchEvent, MessagesEventBusError, MessagesEventBusErrorCode, + MessagesEventBusEvents, MessagesEventBusParams, - MessagesEventBusState, MessagesEventBusStatus, } from '#/state/messages/events/types' const LOGGER_CONTEXT = 'MessagesEventBus' -const DEFAULT_POLL_INTERVAL = 60e3 - export class MessagesEventBus { private id: string private agent: BskyAgent private __tempFromUserDid: string - private emitter = new EventEmitter() + private emitter = new EventEmitter<MessagesEventBusEvents>() - private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized + private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing private error: MessagesEventBusError | undefined private latestRev: string | undefined = undefined private pollInterval = DEFAULT_POLL_INTERVAL private requestedPollIntervals: Map<string, number> = new Map() - snapshot: MessagesEventBusState | undefined - constructor(params: MessagesEventBusParams) { this.id = nanoid(3) this.agent = params.agent this.__tempFromUserDid = params.__tempFromUserDid - this.subscribe = this.subscribe.bind(this) - this.getSnapshot = this.getSnapshot.bind(this) - this.init = this.init.bind(this) - this.suspend = this.suspend.bind(this) - this.resume = this.resume.bind(this) - this.requestPollInterval = this.requestPollInterval.bind(this) - this.trail = this.trail.bind(this) - this.trailConvo = this.trailConvo.bind(this) + this.init() } - private commit() { - this.snapshot = undefined - this.subscribers.forEach(subscriber => subscriber()) + requestPollInterval(interval: number) { + const id = nanoid() + this.requestedPollIntervals.set(id, interval) + this.dispatch({ + event: MessagesEventBusDispatchEvent.UpdatePoll, + }) + return () => { + this.requestedPollIntervals.delete(id) + this.dispatch({ + event: MessagesEventBusDispatchEvent.UpdatePoll, + }) + } } - private subscribers: (() => void)[] = [] + trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) { + this.emitter.on('events', handler) + return () => { + this.emitter.off('events', handler) + } + } - subscribe(subscriber: () => void) { - if (this.subscribers.length === 0) this.init() + trailConvo( + convoId: string, + handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void, + ) { + const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => { + const convoEvents = events.filter(ev => { + if (typeof ev.convoId === 'string' && ev.convoId === convoId) { + return ev.convoId === convoId + } + return false + }) - this.subscribers.push(subscriber) + if (convoEvents.length > 0) { + handler(convoEvents) + } + } + this.emitter.on('events', handle) return () => { - this.subscribers = this.subscribers.filter(s => s !== subscriber) - if (this.subscribers.length === 0) this.suspend() + this.emitter.off('events', handle) } } - getSnapshot(): MessagesEventBusState { - if (!this.snapshot) this.snapshot = this.generateSnapshot() - // logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo) - return this.snapshot + getLatestRev() { + return this.latestRev } - private generateSnapshot(): MessagesEventBusState { - switch (this.status) { - case MessagesEventBusStatus.Initializing: { - return { - status: MessagesEventBusStatus.Initializing, - rev: undefined, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Ready: { - return { - status: this.status, - rev: this.latestRev!, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Suspended: { - return { - status: this.status, - rev: this.latestRev, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Error: { - return { - status: MessagesEventBusStatus.Error, - rev: this.latestRev, - error: this.error || { - code: MessagesEventBusErrorCode.Unknown, - retry: () => { - this.init() - }, - }, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - default: { - return { - status: MessagesEventBusStatus.Uninitialized, - rev: undefined, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } + onConnect(handler: () => void) { + this.emitter.on('connect', handler) + + if ( + this.status === MessagesEventBusStatus.Ready || + this.status === MessagesEventBusStatus.Backgrounded || + this.status === MessagesEventBusStatus.Suspended + ) { + handler() + } + + return () => { + this.emitter.off('connect', handler) } } - dispatch(action: MessagesEventBusDispatch) { + onError(handler: (payload?: MessagesEventBusError) => void) { + this.emitter.on('error', handler) + + if (this.status === MessagesEventBusStatus.Error) { + handler(this.error) + } + + return () => { + this.emitter.off('error', handler) + } + } + + background() { + logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Background}) + } + + suspend() { + logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) + } + + resume() { + logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) + } + + private dispatch(action: MessagesEventBusDispatch) { const prevStatus = this.status switch (this.status) { - case MessagesEventBusStatus.Uninitialized: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Init: { - this.status = MessagesEventBusStatus.Initializing - this.setup() - break - } - } - break - } case MessagesEventBusStatus.Initializing: { switch (action.event) { case MessagesEventBusDispatchEvent.Ready: { this.status = MessagesEventBusStatus.Ready this.resetPoll() + this.emitter.emit('connect') break } case MessagesEventBusDispatchEvent.Background: { this.status = MessagesEventBusStatus.Backgrounded this.resetPoll() + this.emitter.emit('connect') break } case MessagesEventBusDispatchEvent.Suspend: { @@ -164,6 +153,7 @@ export class MessagesEventBus { case MessagesEventBusDispatchEvent.Error: { this.status = MessagesEventBusStatus.Error this.error = action.payload + this.emitter.emit('error', action.payload) break } } @@ -185,6 +175,11 @@ export class MessagesEventBus { this.status = MessagesEventBusStatus.Error this.error = action.payload this.stopPoll() + this.emitter.emit('error', action.payload) + break + } + case MessagesEventBusDispatchEvent.UpdatePoll: { + this.resetPoll() break } } @@ -206,6 +201,11 @@ export class MessagesEventBus { this.status = MessagesEventBusStatus.Error this.error = action.payload this.stopPoll() + this.emitter.emit('error', action.payload) + break + } + case MessagesEventBusDispatchEvent.UpdatePoll: { + this.resetPoll() break } } @@ -227,6 +227,7 @@ export class MessagesEventBus { this.status = MessagesEventBusStatus.Error this.error = action.payload this.stopPoll() + this.emitter.emit('error', action.payload) break } } @@ -234,12 +235,12 @@ export class MessagesEventBus { } case MessagesEventBusStatus.Error: { switch (action.event) { - case MessagesEventBusDispatchEvent.Resume: - case MessagesEventBusDispatchEvent.Init: { + case MessagesEventBusDispatchEvent.Resume: { + // basically reset this.status = MessagesEventBusStatus.Initializing this.error = undefined this.latestRev = undefined - this.setup() + this.init() break } } @@ -258,19 +259,36 @@ export class MessagesEventBus { }, logger.DebugContext.convo, ) - - this.commit() } - private async setup() { - logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo) + private async init() { + logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) try { - await this.initializeLatestRev() + const response = await this.agent.api.chat.bsky.convo.listConvos( + { + limit: 1, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + // throw new Error('UNCOMMENT TO TEST INIT FAILURE') + + const {convos} = response.data + + for (const convo of convos) { + if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { + this.latestRev = convo.rev + } + } + this.dispatch({event: MessagesEventBusDispatchEvent.Ready}) } catch (e: any) { logger.error(e, { - context: `${LOGGER_CONTEXT}: setup failed`, + context: `${LOGGER_CONTEXT}: init failed`, }) this.dispatch({ @@ -279,100 +297,13 @@ export class MessagesEventBus { exception: e, code: MessagesEventBusErrorCode.InitFailed, retry: () => { - this.init() + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) }, }, }) } } - init() { - logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Init}) - } - - background() { - logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Background}) - } - - suspend() { - logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) - } - - resume() { - logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) - } - - requestPollInterval(interval: number) { - const id = nanoid() - this.requestedPollIntervals.set(id, interval) - this.resetPoll() - return () => { - this.requestedPollIntervals.delete(id) - this.resetPoll() - } - } - - trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) { - this.emitter.on('events', handler) - return () => { - this.emitter.off('events', handler) - } - } - - trailConvo( - convoId: string, - handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void, - ) { - const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => { - const convoEvents = events.filter(ev => { - if (typeof ev.convoId === 'string' && ev.convoId === convoId) { - return ev.convoId === convoId - } - return false - }) - - if (convoEvents.length > 0) { - handler(convoEvents) - } - } - - this.emitter.on('events', handle) - return () => { - this.emitter.off('events', handle) - } - } - - private async initializeLatestRev() { - logger.debug( - `${LOGGER_CONTEXT}: initialize latest rev`, - {}, - logger.DebugContext.convo, - ) - - const response = await this.agent.api.chat.bsky.convo.listConvos( - { - limit: 1, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - - const {convos} = response.data - - for (const convo of convos) { - if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { - this.latestRev = convo.rev - } - } - } - /* * Polling */ @@ -430,6 +361,8 @@ export class MessagesEventBus { }, ) + // throw new Error('UNCOMMENT TO TEST POLL FAILURE') + const {logs: events} = response.data let needsEmit = false @@ -473,7 +406,7 @@ export class MessagesEventBus { exception: e, code: MessagesEventBusErrorCode.PollFailed, retry: () => { - this.init() + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) }, }, }) |