about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorEric Bailey <git@esb.lol>2024-05-08 17:41:10 -0500
committerGitHub <noreply@github.com>2024-05-08 17:41:10 -0500
commitce2eddca8e95113146e18281415365806088fd5d (patch)
treeea2b6a5c6f764944f18448678a3d4047557ba714 /src
parent0c6bf276dd43762cb36541d278f95dceb1c35896 (diff)
downloadvoidsky-ce2eddca8e95113146e18281415365806088fd5d.tar.zst
[🐴] Refactor event bus (#3919)
* Refactor to singleton class outside react

* Fix retry, remove debug logs
Diffstat (limited to 'src')
-rw-r--r--src/state/messages/events/agent.ts321
-rw-r--r--src/state/messages/events/const.ts1
-rw-r--r--src/state/messages/events/index.tsx19
-rw-r--r--src/state/messages/events/types.ts67
4 files changed, 151 insertions, 257 deletions
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts
index 422672853..f22cff9d9 100644
--- a/src/state/messages/events/agent.ts
+++ b/src/state/messages/events/agent.ts
@@ -3,158 +3,147 @@ import EventEmitter from 'eventemitter3'
 import {nanoid} from 'nanoid/non-secure'
 
 import {logger} from '#/logger'
+import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const'
 import {
   MessagesEventBusDispatch,
   MessagesEventBusDispatchEvent,
   MessagesEventBusError,
   MessagesEventBusErrorCode,
+  MessagesEventBusEvents,
   MessagesEventBusParams,
-  MessagesEventBusState,
   MessagesEventBusStatus,
 } from '#/state/messages/events/types'
 
 const LOGGER_CONTEXT = 'MessagesEventBus'
 
-const DEFAULT_POLL_INTERVAL = 60e3
-
 export class MessagesEventBus {
   private id: string
 
   private agent: BskyAgent
   private __tempFromUserDid: string
-  private emitter = new EventEmitter()
+  private emitter = new EventEmitter<MessagesEventBusEvents>()
 
-  private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized
+  private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing
   private error: MessagesEventBusError | undefined
   private latestRev: string | undefined = undefined
   private pollInterval = DEFAULT_POLL_INTERVAL
   private requestedPollIntervals: Map<string, number> = new Map()
 
-  snapshot: MessagesEventBusState | undefined
-
   constructor(params: MessagesEventBusParams) {
     this.id = nanoid(3)
     this.agent = params.agent
     this.__tempFromUserDid = params.__tempFromUserDid
 
-    this.subscribe = this.subscribe.bind(this)
-    this.getSnapshot = this.getSnapshot.bind(this)
-    this.init = this.init.bind(this)
-    this.suspend = this.suspend.bind(this)
-    this.resume = this.resume.bind(this)
-    this.requestPollInterval = this.requestPollInterval.bind(this)
-    this.trail = this.trail.bind(this)
-    this.trailConvo = this.trailConvo.bind(this)
+    this.init()
   }
 
-  private commit() {
-    this.snapshot = undefined
-    this.subscribers.forEach(subscriber => subscriber())
+  requestPollInterval(interval: number) {
+    const id = nanoid()
+    this.requestedPollIntervals.set(id, interval)
+    this.dispatch({
+      event: MessagesEventBusDispatchEvent.UpdatePoll,
+    })
+    return () => {
+      this.requestedPollIntervals.delete(id)
+      this.dispatch({
+        event: MessagesEventBusDispatchEvent.UpdatePoll,
+      })
+    }
   }
 
-  private subscribers: (() => void)[] = []
+  trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
+    this.emitter.on('events', handler)
+    return () => {
+      this.emitter.off('events', handler)
+    }
+  }
 
-  subscribe(subscriber: () => void) {
-    if (this.subscribers.length === 0) this.init()
+  trailConvo(
+    convoId: string,
+    handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
+  ) {
+    const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
+      const convoEvents = events.filter(ev => {
+        if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
+          return ev.convoId === convoId
+        }
+        return false
+      })
 
-    this.subscribers.push(subscriber)
+      if (convoEvents.length > 0) {
+        handler(convoEvents)
+      }
+    }
 
+    this.emitter.on('events', handle)
     return () => {
-      this.subscribers = this.subscribers.filter(s => s !== subscriber)
-      if (this.subscribers.length === 0) this.suspend()
+      this.emitter.off('events', handle)
     }
   }
 
-  getSnapshot(): MessagesEventBusState {
-    if (!this.snapshot) this.snapshot = this.generateSnapshot()
-    // logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo)
-    return this.snapshot
+  getLatestRev() {
+    return this.latestRev
   }
 
-  private generateSnapshot(): MessagesEventBusState {
-    switch (this.status) {
-      case MessagesEventBusStatus.Initializing: {
-        return {
-          status: MessagesEventBusStatus.Initializing,
-          rev: undefined,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      case MessagesEventBusStatus.Ready: {
-        return {
-          status: this.status,
-          rev: this.latestRev!,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      case MessagesEventBusStatus.Suspended: {
-        return {
-          status: this.status,
-          rev: this.latestRev,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      case MessagesEventBusStatus.Error: {
-        return {
-          status: MessagesEventBusStatus.Error,
-          rev: this.latestRev,
-          error: this.error || {
-            code: MessagesEventBusErrorCode.Unknown,
-            retry: () => {
-              this.init()
-            },
-          },
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      default: {
-        return {
-          status: MessagesEventBusStatus.Uninitialized,
-          rev: undefined,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
+  onConnect(handler: () => void) {
+    this.emitter.on('connect', handler)
+
+    if (
+      this.status === MessagesEventBusStatus.Ready ||
+      this.status === MessagesEventBusStatus.Backgrounded ||
+      this.status === MessagesEventBusStatus.Suspended
+    ) {
+      handler()
+    }
+
+    return () => {
+      this.emitter.off('connect', handler)
     }
   }
 
-  dispatch(action: MessagesEventBusDispatch) {
+  onError(handler: (payload?: MessagesEventBusError) => void) {
+    this.emitter.on('error', handler)
+
+    if (this.status === MessagesEventBusStatus.Error) {
+      handler(this.error)
+    }
+
+    return () => {
+      this.emitter.off('error', handler)
+    }
+  }
+
+  background() {
+    logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
+    this.dispatch({event: MessagesEventBusDispatchEvent.Background})
+  }
+
+  suspend() {
+    logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
+    this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
+  }
+
+  resume() {
+    logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
+    this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
+  }
+
+  private dispatch(action: MessagesEventBusDispatch) {
     const prevStatus = this.status
 
     switch (this.status) {
-      case MessagesEventBusStatus.Uninitialized: {
-        switch (action.event) {
-          case MessagesEventBusDispatchEvent.Init: {
-            this.status = MessagesEventBusStatus.Initializing
-            this.setup()
-            break
-          }
-        }
-        break
-      }
       case MessagesEventBusStatus.Initializing: {
         switch (action.event) {
           case MessagesEventBusDispatchEvent.Ready: {
             this.status = MessagesEventBusStatus.Ready
             this.resetPoll()
+            this.emitter.emit('connect')
             break
           }
           case MessagesEventBusDispatchEvent.Background: {
             this.status = MessagesEventBusStatus.Backgrounded
             this.resetPoll()
+            this.emitter.emit('connect')
             break
           }
           case MessagesEventBusDispatchEvent.Suspend: {
@@ -164,6 +153,7 @@ export class MessagesEventBus {
           case MessagesEventBusDispatchEvent.Error: {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
+            this.emitter.emit('error', action.payload)
             break
           }
         }
@@ -185,6 +175,11 @@ export class MessagesEventBus {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
             this.stopPoll()
+            this.emitter.emit('error', action.payload)
+            break
+          }
+          case MessagesEventBusDispatchEvent.UpdatePoll: {
+            this.resetPoll()
             break
           }
         }
@@ -206,6 +201,11 @@ export class MessagesEventBus {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
             this.stopPoll()
+            this.emitter.emit('error', action.payload)
+            break
+          }
+          case MessagesEventBusDispatchEvent.UpdatePoll: {
+            this.resetPoll()
             break
           }
         }
@@ -227,6 +227,7 @@ export class MessagesEventBus {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
             this.stopPoll()
+            this.emitter.emit('error', action.payload)
             break
           }
         }
@@ -234,12 +235,12 @@ export class MessagesEventBus {
       }
       case MessagesEventBusStatus.Error: {
         switch (action.event) {
-          case MessagesEventBusDispatchEvent.Resume:
-          case MessagesEventBusDispatchEvent.Init: {
+          case MessagesEventBusDispatchEvent.Resume: {
+            // basically reset
             this.status = MessagesEventBusStatus.Initializing
             this.error = undefined
             this.latestRev = undefined
-            this.setup()
+            this.init()
             break
           }
         }
@@ -258,19 +259,36 @@ export class MessagesEventBus {
       },
       logger.DebugContext.convo,
     )
-
-    this.commit()
   }
 
-  private async setup() {
-    logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo)
+  private async init() {
+    logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
 
     try {
-      await this.initializeLatestRev()
+      const response = await this.agent.api.chat.bsky.convo.listConvos(
+        {
+          limit: 1,
+        },
+        {
+          headers: {
+            Authorization: this.__tempFromUserDid,
+          },
+        },
+      )
+      // throw new Error('UNCOMMENT TO TEST INIT FAILURE')
+
+      const {convos} = response.data
+
+      for (const convo of convos) {
+        if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
+          this.latestRev = convo.rev
+        }
+      }
+
       this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
     } catch (e: any) {
       logger.error(e, {
-        context: `${LOGGER_CONTEXT}: setup failed`,
+        context: `${LOGGER_CONTEXT}: init failed`,
       })
 
       this.dispatch({
@@ -279,100 +297,13 @@ export class MessagesEventBus {
           exception: e,
           code: MessagesEventBusErrorCode.InitFailed,
           retry: () => {
-            this.init()
+            this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
           },
         },
       })
     }
   }
 
-  init() {
-    logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Init})
-  }
-
-  background() {
-    logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Background})
-  }
-
-  suspend() {
-    logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
-  }
-
-  resume() {
-    logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
-  }
-
-  requestPollInterval(interval: number) {
-    const id = nanoid()
-    this.requestedPollIntervals.set(id, interval)
-    this.resetPoll()
-    return () => {
-      this.requestedPollIntervals.delete(id)
-      this.resetPoll()
-    }
-  }
-
-  trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
-    this.emitter.on('events', handler)
-    return () => {
-      this.emitter.off('events', handler)
-    }
-  }
-
-  trailConvo(
-    convoId: string,
-    handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
-  ) {
-    const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
-      const convoEvents = events.filter(ev => {
-        if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
-          return ev.convoId === convoId
-        }
-        return false
-      })
-
-      if (convoEvents.length > 0) {
-        handler(convoEvents)
-      }
-    }
-
-    this.emitter.on('events', handle)
-    return () => {
-      this.emitter.off('events', handle)
-    }
-  }
-
-  private async initializeLatestRev() {
-    logger.debug(
-      `${LOGGER_CONTEXT}: initialize latest rev`,
-      {},
-      logger.DebugContext.convo,
-    )
-
-    const response = await this.agent.api.chat.bsky.convo.listConvos(
-      {
-        limit: 1,
-      },
-      {
-        headers: {
-          Authorization: this.__tempFromUserDid,
-        },
-      },
-    )
-
-    const {convos} = response.data
-
-    for (const convo of convos) {
-      if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
-        this.latestRev = convo.rev
-      }
-    }
-  }
-
   /*
    * Polling
    */
@@ -430,6 +361,8 @@ export class MessagesEventBus {
         },
       )
 
+      // throw new Error('UNCOMMENT TO TEST POLL FAILURE')
+
       const {logs: events} = response.data
 
       let needsEmit = false
@@ -473,7 +406,7 @@ export class MessagesEventBus {
           exception: e,
           code: MessagesEventBusErrorCode.PollFailed,
           retry: () => {
-            this.init()
+            this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
           },
         },
       })
diff --git a/src/state/messages/events/const.ts b/src/state/messages/events/const.ts
new file mode 100644
index 000000000..921557ce5
--- /dev/null
+++ b/src/state/messages/events/const.ts
@@ -0,0 +1 @@
+export const DEFAULT_POLL_INTERVAL = 20e3
diff --git a/src/state/messages/events/index.tsx b/src/state/messages/events/index.tsx
index 2de6286e7..08ec77503 100644
--- a/src/state/messages/events/index.tsx
+++ b/src/state/messages/events/index.tsx
@@ -5,13 +5,13 @@ import {BskyAgent} from '@atproto-labs/api'
 import {useGate} from '#/lib/statsig/statsig'
 import {isWeb} from '#/platform/detection'
 import {MessagesEventBus} from '#/state/messages/events/agent'
-import {MessagesEventBusState} from '#/state/messages/events/types'
 import {useAgent} from '#/state/session'
 import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
 import {IS_DEV} from '#/env'
 
-const MessagesEventBusContext =
-  React.createContext<MessagesEventBusState | null>(null)
+const MessagesEventBusContext = React.createContext<MessagesEventBus | null>(
+  null,
+)
 
 export function useMessagesEventBus() {
   const ctx = React.useContext(MessagesEventBusContext)
@@ -37,12 +37,13 @@ export function Temp_MessagesEventBusProvider({
         __tempFromUserDid: getAgent().session?.did!,
       }),
   )
-  const service = React.useSyncExternalStore(bus.subscribe, bus.getSnapshot)
 
-  if (isWeb && IS_DEV) {
-    // @ts-ignore
-    window.messagesEventBus = service
-  }
+  React.useEffect(() => {
+    if (isWeb && IS_DEV) {
+      // @ts-ignore
+      window.bus = bus
+    }
+  }, [bus])
 
   React.useEffect(() => {
     const handleAppStateChange = (nextAppState: string) => {
@@ -61,7 +62,7 @@ export function Temp_MessagesEventBusProvider({
   }, [bus])
 
   return (
-    <MessagesEventBusContext.Provider value={service}>
+    <MessagesEventBusContext.Provider value={bus}>
       {children}
     </MessagesEventBusContext.Provider>
   )
diff --git a/src/state/messages/events/types.ts b/src/state/messages/events/types.ts
index 6959b4f06..c6be522ae 100644
--- a/src/state/messages/events/types.ts
+++ b/src/state/messages/events/types.ts
@@ -6,7 +6,6 @@ export type MessagesEventBusParams = {
 }
 
 export enum MessagesEventBusStatus {
-  Uninitialized = 'uninitialized',
   Initializing = 'initializing',
   Ready = 'ready',
   Error = 'error',
@@ -15,12 +14,12 @@ export enum MessagesEventBusStatus {
 }
 
 export enum MessagesEventBusDispatchEvent {
-  Init = 'init',
   Ready = 'ready',
   Error = 'error',
   Background = 'background',
   Suspend = 'suspend',
   Resume = 'resume',
+  UpdatePoll = 'updatePoll',
 }
 
 export enum MessagesEventBusErrorCode {
@@ -37,9 +36,6 @@ export type MessagesEventBusError = {
 
 export type MessagesEventBusDispatch =
   | {
-      event: MessagesEventBusDispatchEvent.Init
-    }
-  | {
       event: MessagesEventBusDispatchEvent.Ready
     }
   | {
@@ -55,59 +51,22 @@ export type MessagesEventBusDispatch =
       event: MessagesEventBusDispatchEvent.Error
       payload: MessagesEventBusError
     }
+  | {
+      event: MessagesEventBusDispatchEvent.UpdatePoll
+    }
 
 export type TrailHandler = (
   events: ChatBskyConvoGetLog.OutputSchema['logs'],
 ) => void
 
 export type RequestPollIntervalHandler = (interval: number) => () => void
+export type OnConnectHandler = (handler: () => void) => () => void
+export type OnDisconnectHandler = (
+  handler: (error?: MessagesEventBusError) => void,
+) => () => void
 
-export type MessagesEventBusState =
-  | {
-      status: MessagesEventBusStatus.Uninitialized
-      rev: undefined
-      error: undefined
-      requestPollInterval: RequestPollIntervalHandler
-      trail: (handler: TrailHandler) => () => void
-      trailConvo: (convoId: string, handler: TrailHandler) => () => void
-    }
-  | {
-      status: MessagesEventBusStatus.Initializing
-      rev: undefined
-      error: undefined
-      requestPollInterval: RequestPollIntervalHandler
-      trail: (handler: TrailHandler) => () => void
-      trailConvo: (convoId: string, handler: TrailHandler) => () => void
-    }
-  | {
-      status: MessagesEventBusStatus.Ready
-      rev: string
-      error: undefined
-      requestPollInterval: RequestPollIntervalHandler
-      trail: (handler: TrailHandler) => () => void
-      trailConvo: (convoId: string, handler: TrailHandler) => () => void
-    }
-  | {
-      status: MessagesEventBusStatus.Backgrounded
-      rev: string | undefined
-      error: undefined
-      requestPollInterval: RequestPollIntervalHandler
-      trail: (handler: TrailHandler) => () => void
-      trailConvo: (convoId: string, handler: TrailHandler) => () => void
-    }
-  | {
-      status: MessagesEventBusStatus.Suspended
-      rev: string | undefined
-      error: undefined
-      requestPollInterval: RequestPollIntervalHandler
-      trail: (handler: TrailHandler) => () => void
-      trailConvo: (convoId: string, handler: TrailHandler) => () => void
-    }
-  | {
-      status: MessagesEventBusStatus.Error
-      rev: string | undefined
-      error: MessagesEventBusError
-      requestPollInterval: RequestPollIntervalHandler
-      trail: (handler: TrailHandler) => () => void
-      trailConvo: (convoId: string, handler: TrailHandler) => () => void
-    }
+export type MessagesEventBusEvents = {
+  events: [ChatBskyConvoGetLog.OutputSchema['logs']]
+  connect: undefined
+  error: [MessagesEventBusError] | undefined
+}