diff options
-rw-r--r-- | src/state/messages/events/agent.ts | 321 | ||||
-rw-r--r-- | src/state/messages/events/const.ts | 1 | ||||
-rw-r--r-- | src/state/messages/events/index.tsx | 19 | ||||
-rw-r--r-- | src/state/messages/events/types.ts | 67 |
4 files changed, 151 insertions, 257 deletions
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts index 422672853..f22cff9d9 100644 --- a/src/state/messages/events/agent.ts +++ b/src/state/messages/events/agent.ts @@ -3,158 +3,147 @@ import EventEmitter from 'eventemitter3' import {nanoid} from 'nanoid/non-secure' import {logger} from '#/logger' +import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const' import { MessagesEventBusDispatch, MessagesEventBusDispatchEvent, MessagesEventBusError, MessagesEventBusErrorCode, + MessagesEventBusEvents, MessagesEventBusParams, - MessagesEventBusState, MessagesEventBusStatus, } from '#/state/messages/events/types' const LOGGER_CONTEXT = 'MessagesEventBus' -const DEFAULT_POLL_INTERVAL = 60e3 - export class MessagesEventBus { private id: string private agent: BskyAgent private __tempFromUserDid: string - private emitter = new EventEmitter() + private emitter = new EventEmitter<MessagesEventBusEvents>() - private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized + private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing private error: MessagesEventBusError | undefined private latestRev: string | undefined = undefined private pollInterval = DEFAULT_POLL_INTERVAL private requestedPollIntervals: Map<string, number> = new Map() - snapshot: MessagesEventBusState | undefined - constructor(params: MessagesEventBusParams) { this.id = nanoid(3) this.agent = params.agent this.__tempFromUserDid = params.__tempFromUserDid - this.subscribe = this.subscribe.bind(this) - this.getSnapshot = this.getSnapshot.bind(this) - this.init = this.init.bind(this) - this.suspend = this.suspend.bind(this) - this.resume = this.resume.bind(this) - this.requestPollInterval = this.requestPollInterval.bind(this) - this.trail = this.trail.bind(this) - this.trailConvo = this.trailConvo.bind(this) + this.init() } - private commit() { - this.snapshot = undefined - this.subscribers.forEach(subscriber => subscriber()) + requestPollInterval(interval: number) { + const id = nanoid() + this.requestedPollIntervals.set(id, interval) + this.dispatch({ + event: MessagesEventBusDispatchEvent.UpdatePoll, + }) + return () => { + this.requestedPollIntervals.delete(id) + this.dispatch({ + event: MessagesEventBusDispatchEvent.UpdatePoll, + }) + } } - private subscribers: (() => void)[] = [] + trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) { + this.emitter.on('events', handler) + return () => { + this.emitter.off('events', handler) + } + } - subscribe(subscriber: () => void) { - if (this.subscribers.length === 0) this.init() + trailConvo( + convoId: string, + handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void, + ) { + const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => { + const convoEvents = events.filter(ev => { + if (typeof ev.convoId === 'string' && ev.convoId === convoId) { + return ev.convoId === convoId + } + return false + }) - this.subscribers.push(subscriber) + if (convoEvents.length > 0) { + handler(convoEvents) + } + } + this.emitter.on('events', handle) return () => { - this.subscribers = this.subscribers.filter(s => s !== subscriber) - if (this.subscribers.length === 0) this.suspend() + this.emitter.off('events', handle) } } - getSnapshot(): MessagesEventBusState { - if (!this.snapshot) this.snapshot = this.generateSnapshot() - // logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo) - return this.snapshot + getLatestRev() { + return this.latestRev } - private generateSnapshot(): MessagesEventBusState { - switch (this.status) { - case MessagesEventBusStatus.Initializing: { - return { - status: MessagesEventBusStatus.Initializing, - rev: undefined, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Ready: { - return { - status: this.status, - rev: this.latestRev!, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Suspended: { - return { - status: this.status, - rev: this.latestRev, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - case MessagesEventBusStatus.Error: { - return { - status: MessagesEventBusStatus.Error, - rev: this.latestRev, - error: this.error || { - code: MessagesEventBusErrorCode.Unknown, - retry: () => { - this.init() - }, - }, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } - default: { - return { - status: MessagesEventBusStatus.Uninitialized, - rev: undefined, - error: undefined, - requestPollInterval: this.requestPollInterval, - trail: this.trail, - trailConvo: this.trailConvo, - } - } + onConnect(handler: () => void) { + this.emitter.on('connect', handler) + + if ( + this.status === MessagesEventBusStatus.Ready || + this.status === MessagesEventBusStatus.Backgrounded || + this.status === MessagesEventBusStatus.Suspended + ) { + handler() + } + + return () => { + this.emitter.off('connect', handler) } } - dispatch(action: MessagesEventBusDispatch) { + onError(handler: (payload?: MessagesEventBusError) => void) { + this.emitter.on('error', handler) + + if (this.status === MessagesEventBusStatus.Error) { + handler(this.error) + } + + return () => { + this.emitter.off('error', handler) + } + } + + background() { + logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Background}) + } + + suspend() { + logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) + } + + resume() { + logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) + } + + private dispatch(action: MessagesEventBusDispatch) { const prevStatus = this.status switch (this.status) { - case MessagesEventBusStatus.Uninitialized: { - switch (action.event) { - case MessagesEventBusDispatchEvent.Init: { - this.status = MessagesEventBusStatus.Initializing - this.setup() - break - } - } - break - } case MessagesEventBusStatus.Initializing: { switch (action.event) { case MessagesEventBusDispatchEvent.Ready: { this.status = MessagesEventBusStatus.Ready this.resetPoll() + this.emitter.emit('connect') break } case MessagesEventBusDispatchEvent.Background: { this.status = MessagesEventBusStatus.Backgrounded this.resetPoll() + this.emitter.emit('connect') break } case MessagesEventBusDispatchEvent.Suspend: { @@ -164,6 +153,7 @@ export class MessagesEventBus { case MessagesEventBusDispatchEvent.Error: { this.status = MessagesEventBusStatus.Error this.error = action.payload + this.emitter.emit('error', action.payload) break } } @@ -185,6 +175,11 @@ export class MessagesEventBus { this.status = MessagesEventBusStatus.Error this.error = action.payload this.stopPoll() + this.emitter.emit('error', action.payload) + break + } + case MessagesEventBusDispatchEvent.UpdatePoll: { + this.resetPoll() break } } @@ -206,6 +201,11 @@ export class MessagesEventBus { this.status = MessagesEventBusStatus.Error this.error = action.payload this.stopPoll() + this.emitter.emit('error', action.payload) + break + } + case MessagesEventBusDispatchEvent.UpdatePoll: { + this.resetPoll() break } } @@ -227,6 +227,7 @@ export class MessagesEventBus { this.status = MessagesEventBusStatus.Error this.error = action.payload this.stopPoll() + this.emitter.emit('error', action.payload) break } } @@ -234,12 +235,12 @@ export class MessagesEventBus { } case MessagesEventBusStatus.Error: { switch (action.event) { - case MessagesEventBusDispatchEvent.Resume: - case MessagesEventBusDispatchEvent.Init: { + case MessagesEventBusDispatchEvent.Resume: { + // basically reset this.status = MessagesEventBusStatus.Initializing this.error = undefined this.latestRev = undefined - this.setup() + this.init() break } } @@ -258,19 +259,36 @@ export class MessagesEventBus { }, logger.DebugContext.convo, ) - - this.commit() } - private async setup() { - logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo) + private async init() { + logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) try { - await this.initializeLatestRev() + const response = await this.agent.api.chat.bsky.convo.listConvos( + { + limit: 1, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + // throw new Error('UNCOMMENT TO TEST INIT FAILURE') + + const {convos} = response.data + + for (const convo of convos) { + if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { + this.latestRev = convo.rev + } + } + this.dispatch({event: MessagesEventBusDispatchEvent.Ready}) } catch (e: any) { logger.error(e, { - context: `${LOGGER_CONTEXT}: setup failed`, + context: `${LOGGER_CONTEXT}: init failed`, }) this.dispatch({ @@ -279,100 +297,13 @@ export class MessagesEventBus { exception: e, code: MessagesEventBusErrorCode.InitFailed, retry: () => { - this.init() + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) }, }, }) } } - init() { - logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Init}) - } - - background() { - logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Background}) - } - - suspend() { - logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Suspend}) - } - - resume() { - logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo) - this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) - } - - requestPollInterval(interval: number) { - const id = nanoid() - this.requestedPollIntervals.set(id, interval) - this.resetPoll() - return () => { - this.requestedPollIntervals.delete(id) - this.resetPoll() - } - } - - trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) { - this.emitter.on('events', handler) - return () => { - this.emitter.off('events', handler) - } - } - - trailConvo( - convoId: string, - handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void, - ) { - const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => { - const convoEvents = events.filter(ev => { - if (typeof ev.convoId === 'string' && ev.convoId === convoId) { - return ev.convoId === convoId - } - return false - }) - - if (convoEvents.length > 0) { - handler(convoEvents) - } - } - - this.emitter.on('events', handle) - return () => { - this.emitter.off('events', handle) - } - } - - private async initializeLatestRev() { - logger.debug( - `${LOGGER_CONTEXT}: initialize latest rev`, - {}, - logger.DebugContext.convo, - ) - - const response = await this.agent.api.chat.bsky.convo.listConvos( - { - limit: 1, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - - const {convos} = response.data - - for (const convo of convos) { - if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) { - this.latestRev = convo.rev - } - } - } - /* * Polling */ @@ -430,6 +361,8 @@ export class MessagesEventBus { }, ) + // throw new Error('UNCOMMENT TO TEST POLL FAILURE') + const {logs: events} = response.data let needsEmit = false @@ -473,7 +406,7 @@ export class MessagesEventBus { exception: e, code: MessagesEventBusErrorCode.PollFailed, retry: () => { - this.init() + this.dispatch({event: MessagesEventBusDispatchEvent.Resume}) }, }, }) diff --git a/src/state/messages/events/const.ts b/src/state/messages/events/const.ts new file mode 100644 index 000000000..921557ce5 --- /dev/null +++ b/src/state/messages/events/const.ts @@ -0,0 +1 @@ +export const DEFAULT_POLL_INTERVAL = 20e3 diff --git a/src/state/messages/events/index.tsx b/src/state/messages/events/index.tsx index 2de6286e7..08ec77503 100644 --- a/src/state/messages/events/index.tsx +++ b/src/state/messages/events/index.tsx @@ -5,13 +5,13 @@ import {BskyAgent} from '@atproto-labs/api' import {useGate} from '#/lib/statsig/statsig' import {isWeb} from '#/platform/detection' import {MessagesEventBus} from '#/state/messages/events/agent' -import {MessagesEventBusState} from '#/state/messages/events/types' import {useAgent} from '#/state/session' import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' import {IS_DEV} from '#/env' -const MessagesEventBusContext = - React.createContext<MessagesEventBusState | null>(null) +const MessagesEventBusContext = React.createContext<MessagesEventBus | null>( + null, +) export function useMessagesEventBus() { const ctx = React.useContext(MessagesEventBusContext) @@ -37,12 +37,13 @@ export function Temp_MessagesEventBusProvider({ __tempFromUserDid: getAgent().session?.did!, }), ) - const service = React.useSyncExternalStore(bus.subscribe, bus.getSnapshot) - if (isWeb && IS_DEV) { - // @ts-ignore - window.messagesEventBus = service - } + React.useEffect(() => { + if (isWeb && IS_DEV) { + // @ts-ignore + window.bus = bus + } + }, [bus]) React.useEffect(() => { const handleAppStateChange = (nextAppState: string) => { @@ -61,7 +62,7 @@ export function Temp_MessagesEventBusProvider({ }, [bus]) return ( - <MessagesEventBusContext.Provider value={service}> + <MessagesEventBusContext.Provider value={bus}> {children} </MessagesEventBusContext.Provider> ) diff --git a/src/state/messages/events/types.ts b/src/state/messages/events/types.ts index 6959b4f06..c6be522ae 100644 --- a/src/state/messages/events/types.ts +++ b/src/state/messages/events/types.ts @@ -6,7 +6,6 @@ export type MessagesEventBusParams = { } export enum MessagesEventBusStatus { - Uninitialized = 'uninitialized', Initializing = 'initializing', Ready = 'ready', Error = 'error', @@ -15,12 +14,12 @@ export enum MessagesEventBusStatus { } export enum MessagesEventBusDispatchEvent { - Init = 'init', Ready = 'ready', Error = 'error', Background = 'background', Suspend = 'suspend', Resume = 'resume', + UpdatePoll = 'updatePoll', } export enum MessagesEventBusErrorCode { @@ -37,9 +36,6 @@ export type MessagesEventBusError = { export type MessagesEventBusDispatch = | { - event: MessagesEventBusDispatchEvent.Init - } - | { event: MessagesEventBusDispatchEvent.Ready } | { @@ -55,59 +51,22 @@ export type MessagesEventBusDispatch = event: MessagesEventBusDispatchEvent.Error payload: MessagesEventBusError } + | { + event: MessagesEventBusDispatchEvent.UpdatePoll + } export type TrailHandler = ( events: ChatBskyConvoGetLog.OutputSchema['logs'], ) => void export type RequestPollIntervalHandler = (interval: number) => () => void +export type OnConnectHandler = (handler: () => void) => () => void +export type OnDisconnectHandler = ( + handler: (error?: MessagesEventBusError) => void, +) => () => void -export type MessagesEventBusState = - | { - status: MessagesEventBusStatus.Uninitialized - rev: undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Initializing - rev: undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Ready - rev: string - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Backgrounded - rev: string | undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Suspended - rev: string | undefined - error: undefined - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } - | { - status: MessagesEventBusStatus.Error - rev: string | undefined - error: MessagesEventBusError - requestPollInterval: RequestPollIntervalHandler - trail: (handler: TrailHandler) => () => void - trailConvo: (convoId: string, handler: TrailHandler) => () => void - } +export type MessagesEventBusEvents = { + events: [ChatBskyConvoGetLog.OutputSchema['logs']] + connect: undefined + error: [MessagesEventBusError] | undefined +} |