about summary refs log tree commit diff
path: root/src/state/messages/events/agent.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/state/messages/events/agent.ts')
-rw-r--r--src/state/messages/events/agent.ts103
1 files changed, 37 insertions, 66 deletions
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts
index 061337d3b..68225e595 100644
--- a/src/state/messages/events/agent.ts
+++ b/src/state/messages/events/agent.ts
@@ -8,9 +8,8 @@ import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const'
 import {
   MessagesEventBusDispatch,
   MessagesEventBusDispatchEvent,
-  MessagesEventBusError,
   MessagesEventBusErrorCode,
-  MessagesEventBusEvents,
+  MessagesEventBusEvent,
   MessagesEventBusParams,
   MessagesEventBusStatus,
 } from '#/state/messages/events/types'
@@ -22,10 +21,9 @@ export class MessagesEventBus {
 
   private agent: BskyAgent
   private __tempFromUserDid: string
-  private emitter = new EventEmitter<MessagesEventBusEvents>()
+  private emitter = new EventEmitter<{event: [MessagesEventBusEvent]}>()
 
   private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing
-  private error: MessagesEventBusError | undefined
   private latestRev: string | undefined = undefined
   private pollInterval = DEFAULT_POLL_INTERVAL
   private requestedPollIntervals: Map<string, number> = new Map()
@@ -52,65 +50,43 @@ export class MessagesEventBus {
     }
   }
 
-  trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
-    this.emitter.on('events', handler)
-    return () => {
-      this.emitter.off('events', handler)
-    }
-  }
-
-  trailConvo(
-    convoId: string,
-    handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
-  ) {
-    const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
-      const convoEvents = events.filter(ev => {
-        if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
-          return ev.convoId === convoId
-        }
-        return false
-      })
-
-      if (convoEvents.length > 0) {
-        handler(convoEvents)
-      }
-    }
-
-    this.emitter.on('events', handle)
-    return () => {
-      this.emitter.off('events', handle)
-    }
-  }
-
   getLatestRev() {
     return this.latestRev
   }
 
-  onConnect(handler: () => void) {
-    this.emitter.on('connect', handler)
-
-    if (
-      this.status === MessagesEventBusStatus.Ready ||
-      this.status === MessagesEventBusStatus.Backgrounded ||
-      this.status === MessagesEventBusStatus.Suspended
-    ) {
-      handler()
-    }
+  on(
+    handler: (event: MessagesEventBusEvent) => void,
+    options: {
+      convoId?: string
+    },
+  ) {
+    const handle = (event: MessagesEventBusEvent) => {
+      if (event.type === 'logs' && options.convoId) {
+        const filteredLogs = event.logs.filter(log => {
+          if (
+            typeof log.convoId === 'string' &&
+            log.convoId === options.convoId
+          ) {
+            return log.convoId === options.convoId
+          }
+          return false
+        })
 
-    return () => {
-      this.emitter.off('connect', handler)
+        if (filteredLogs.length > 0) {
+          handler({
+            ...event,
+            logs: filteredLogs,
+          })
+        }
+      } else {
+        handler(event)
+      }
     }
-  }
-
-  onError(handler: (payload?: MessagesEventBusError) => void) {
-    this.emitter.on('error', handler)
 
-    if (this.status === MessagesEventBusStatus.Error) {
-      handler(this.error)
-    }
+    this.emitter.on('event', handle)
 
     return () => {
-      this.emitter.off('error', handler)
+      this.emitter.off('event', handle)
     }
   }
 
@@ -138,13 +114,13 @@ export class MessagesEventBus {
           case MessagesEventBusDispatchEvent.Ready: {
             this.status = MessagesEventBusStatus.Ready
             this.resetPoll()
-            this.emitter.emit('connect')
+            this.emitter.emit('event', {type: 'connect'})
             break
           }
           case MessagesEventBusDispatchEvent.Background: {
             this.status = MessagesEventBusStatus.Backgrounded
             this.resetPoll()
-            this.emitter.emit('connect')
+            this.emitter.emit('event', {type: 'connect'})
             break
           }
           case MessagesEventBusDispatchEvent.Suspend: {
@@ -153,8 +129,7 @@ export class MessagesEventBus {
           }
           case MessagesEventBusDispatchEvent.Error: {
             this.status = MessagesEventBusStatus.Error
-            this.error = action.payload
-            this.emitter.emit('error', action.payload)
+            this.emitter.emit('event', {type: 'error', error: action.payload})
             break
           }
         }
@@ -174,9 +149,8 @@ export class MessagesEventBus {
           }
           case MessagesEventBusDispatchEvent.Error: {
             this.status = MessagesEventBusStatus.Error
-            this.error = action.payload
             this.stopPoll()
-            this.emitter.emit('error', action.payload)
+            this.emitter.emit('event', {type: 'error', error: action.payload})
             break
           }
           case MessagesEventBusDispatchEvent.UpdatePoll: {
@@ -200,9 +174,8 @@ export class MessagesEventBus {
           }
           case MessagesEventBusDispatchEvent.Error: {
             this.status = MessagesEventBusStatus.Error
-            this.error = action.payload
             this.stopPoll()
-            this.emitter.emit('error', action.payload)
+            this.emitter.emit('event', {type: 'error', error: action.payload})
             break
           }
           case MessagesEventBusDispatchEvent.UpdatePoll: {
@@ -226,9 +199,8 @@ export class MessagesEventBus {
           }
           case MessagesEventBusDispatchEvent.Error: {
             this.status = MessagesEventBusStatus.Error
-            this.error = action.payload
             this.stopPoll()
-            this.emitter.emit('error', action.payload)
+            this.emitter.emit('event', {type: 'error', error: action.payload})
             break
           }
         }
@@ -239,7 +211,6 @@ export class MessagesEventBus {
           case MessagesEventBusDispatchEvent.Resume: {
             // basically reset
             this.status = MessagesEventBusStatus.Initializing
-            this.error = undefined
             this.latestRev = undefined
             this.init()
             break
@@ -403,7 +374,7 @@ export class MessagesEventBus {
 
       if (needsEmit) {
         try {
-          this.emitter.emit('events', batch)
+          this.emitter.emit('event', {type: 'logs', logs: batch})
         } catch (e: any) {
           logger.error(e, {
             context: `${LOGGER_CONTEXT}: process latest events`,