diff options
Diffstat (limited to 'src/state/messages/events/agent.ts')
-rw-r--r-- | src/state/messages/events/agent.ts | 103 |
1 files changed, 37 insertions, 66 deletions
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts index 061337d3b..68225e595 100644 --- a/src/state/messages/events/agent.ts +++ b/src/state/messages/events/agent.ts @@ -8,9 +8,8 @@ import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const' import { MessagesEventBusDispatch, MessagesEventBusDispatchEvent, - MessagesEventBusError, MessagesEventBusErrorCode, - MessagesEventBusEvents, + MessagesEventBusEvent, MessagesEventBusParams, MessagesEventBusStatus, } from '#/state/messages/events/types' @@ -22,10 +21,9 @@ export class MessagesEventBus { private agent: BskyAgent private __tempFromUserDid: string - private emitter = new EventEmitter<MessagesEventBusEvents>() + private emitter = new EventEmitter<{event: [MessagesEventBusEvent]}>() private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing - private error: MessagesEventBusError | undefined private latestRev: string | undefined = undefined private pollInterval = DEFAULT_POLL_INTERVAL private requestedPollIntervals: Map<string, number> = new Map() @@ -52,65 +50,43 @@ export class MessagesEventBus { } } - trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) { - this.emitter.on('events', handler) - return () => { - this.emitter.off('events', handler) - } - } - - trailConvo( - convoId: string, - handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void, - ) { - const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => { - const convoEvents = events.filter(ev => { - if (typeof ev.convoId === 'string' && ev.convoId === convoId) { - return ev.convoId === convoId - } - return false - }) - - if (convoEvents.length > 0) { - handler(convoEvents) - } - } - - this.emitter.on('events', handle) - return () => { - this.emitter.off('events', handle) - } - } - getLatestRev() { return this.latestRev } - onConnect(handler: () => void) { - this.emitter.on('connect', handler) - - if ( - this.status === MessagesEventBusStatus.Ready || - this.status === MessagesEventBusStatus.Backgrounded || - this.status === MessagesEventBusStatus.Suspended - ) { - handler() - } + on( + handler: (event: MessagesEventBusEvent) => void, + options: { + convoId?: string + }, + ) { + const handle = (event: MessagesEventBusEvent) => { + if (event.type === 'logs' && options.convoId) { + const filteredLogs = event.logs.filter(log => { + if ( + typeof log.convoId === 'string' && + log.convoId === options.convoId + ) { + return log.convoId === options.convoId + } + return false + }) - return () => { - this.emitter.off('connect', handler) + if (filteredLogs.length > 0) { + handler({ + ...event, + logs: filteredLogs, + }) + } + } else { + handler(event) + } } - } - - onError(handler: (payload?: MessagesEventBusError) => void) { - this.emitter.on('error', handler) - if (this.status === MessagesEventBusStatus.Error) { - handler(this.error) - } + this.emitter.on('event', handle) return () => { - this.emitter.off('error', handler) + this.emitter.off('event', handle) } } @@ -138,13 +114,13 @@ export class MessagesEventBus { case MessagesEventBusDispatchEvent.Ready: { this.status = MessagesEventBusStatus.Ready this.resetPoll() - this.emitter.emit('connect') + this.emitter.emit('event', {type: 'connect'}) break } case MessagesEventBusDispatchEvent.Background: { this.status = MessagesEventBusStatus.Backgrounded this.resetPoll() - this.emitter.emit('connect') + this.emitter.emit('event', {type: 'connect'}) break } case MessagesEventBusDispatchEvent.Suspend: { @@ -153,8 +129,7 @@ export class MessagesEventBus { } case MessagesEventBusDispatchEvent.Error: { this.status = MessagesEventBusStatus.Error - this.error = action.payload - this.emitter.emit('error', action.payload) + this.emitter.emit('event', {type: 'error', error: action.payload}) break } } @@ -174,9 +149,8 @@ export class MessagesEventBus { } case MessagesEventBusDispatchEvent.Error: { this.status = MessagesEventBusStatus.Error - this.error = action.payload this.stopPoll() - this.emitter.emit('error', action.payload) + this.emitter.emit('event', {type: 'error', error: action.payload}) break } case MessagesEventBusDispatchEvent.UpdatePoll: { @@ -200,9 +174,8 @@ export class MessagesEventBus { } case MessagesEventBusDispatchEvent.Error: { this.status = MessagesEventBusStatus.Error - this.error = action.payload this.stopPoll() - this.emitter.emit('error', action.payload) + this.emitter.emit('event', {type: 'error', error: action.payload}) break } case MessagesEventBusDispatchEvent.UpdatePoll: { @@ -226,9 +199,8 @@ export class MessagesEventBus { } case MessagesEventBusDispatchEvent.Error: { this.status = MessagesEventBusStatus.Error - this.error = action.payload this.stopPoll() - this.emitter.emit('error', action.payload) + this.emitter.emit('event', {type: 'error', error: action.payload}) break } } @@ -239,7 +211,6 @@ export class MessagesEventBus { case MessagesEventBusDispatchEvent.Resume: { // basically reset this.status = MessagesEventBusStatus.Initializing - this.error = undefined this.latestRev = undefined this.init() break @@ -403,7 +374,7 @@ export class MessagesEventBus { if (needsEmit) { try { - this.emitter.emit('events', batch) + this.emitter.emit('event', {type: 'logs', logs: batch}) } catch (e: any) { logger.error(e, { context: `${LOGGER_CONTEXT}: process latest events`, |