about summary refs log tree commit diff
path: root/src/state/messages/events/agent.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/state/messages/events/agent.ts')
-rw-r--r--src/state/messages/events/agent.ts321
1 files changed, 127 insertions, 194 deletions
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts
index 422672853..f22cff9d9 100644
--- a/src/state/messages/events/agent.ts
+++ b/src/state/messages/events/agent.ts
@@ -3,158 +3,147 @@ import EventEmitter from 'eventemitter3'
 import {nanoid} from 'nanoid/non-secure'
 
 import {logger} from '#/logger'
+import {DEFAULT_POLL_INTERVAL} from '#/state/messages/events/const'
 import {
   MessagesEventBusDispatch,
   MessagesEventBusDispatchEvent,
   MessagesEventBusError,
   MessagesEventBusErrorCode,
+  MessagesEventBusEvents,
   MessagesEventBusParams,
-  MessagesEventBusState,
   MessagesEventBusStatus,
 } from '#/state/messages/events/types'
 
 const LOGGER_CONTEXT = 'MessagesEventBus'
 
-const DEFAULT_POLL_INTERVAL = 60e3
-
 export class MessagesEventBus {
   private id: string
 
   private agent: BskyAgent
   private __tempFromUserDid: string
-  private emitter = new EventEmitter()
+  private emitter = new EventEmitter<MessagesEventBusEvents>()
 
-  private status: MessagesEventBusStatus = MessagesEventBusStatus.Uninitialized
+  private status: MessagesEventBusStatus = MessagesEventBusStatus.Initializing
   private error: MessagesEventBusError | undefined
   private latestRev: string | undefined = undefined
   private pollInterval = DEFAULT_POLL_INTERVAL
   private requestedPollIntervals: Map<string, number> = new Map()
 
-  snapshot: MessagesEventBusState | undefined
-
   constructor(params: MessagesEventBusParams) {
     this.id = nanoid(3)
     this.agent = params.agent
     this.__tempFromUserDid = params.__tempFromUserDid
 
-    this.subscribe = this.subscribe.bind(this)
-    this.getSnapshot = this.getSnapshot.bind(this)
-    this.init = this.init.bind(this)
-    this.suspend = this.suspend.bind(this)
-    this.resume = this.resume.bind(this)
-    this.requestPollInterval = this.requestPollInterval.bind(this)
-    this.trail = this.trail.bind(this)
-    this.trailConvo = this.trailConvo.bind(this)
+    this.init()
   }
 
-  private commit() {
-    this.snapshot = undefined
-    this.subscribers.forEach(subscriber => subscriber())
+  requestPollInterval(interval: number) {
+    const id = nanoid()
+    this.requestedPollIntervals.set(id, interval)
+    this.dispatch({
+      event: MessagesEventBusDispatchEvent.UpdatePoll,
+    })
+    return () => {
+      this.requestedPollIntervals.delete(id)
+      this.dispatch({
+        event: MessagesEventBusDispatchEvent.UpdatePoll,
+      })
+    }
   }
 
-  private subscribers: (() => void)[] = []
+  trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
+    this.emitter.on('events', handler)
+    return () => {
+      this.emitter.off('events', handler)
+    }
+  }
 
-  subscribe(subscriber: () => void) {
-    if (this.subscribers.length === 0) this.init()
+  trailConvo(
+    convoId: string,
+    handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
+  ) {
+    const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
+      const convoEvents = events.filter(ev => {
+        if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
+          return ev.convoId === convoId
+        }
+        return false
+      })
 
-    this.subscribers.push(subscriber)
+      if (convoEvents.length > 0) {
+        handler(convoEvents)
+      }
+    }
 
+    this.emitter.on('events', handle)
     return () => {
-      this.subscribers = this.subscribers.filter(s => s !== subscriber)
-      if (this.subscribers.length === 0) this.suspend()
+      this.emitter.off('events', handle)
     }
   }
 
-  getSnapshot(): MessagesEventBusState {
-    if (!this.snapshot) this.snapshot = this.generateSnapshot()
-    // logger.debug(`${LOGGER_CONTEXT}: snapshotted`, {}, logger.DebugContext.convo)
-    return this.snapshot
+  getLatestRev() {
+    return this.latestRev
   }
 
-  private generateSnapshot(): MessagesEventBusState {
-    switch (this.status) {
-      case MessagesEventBusStatus.Initializing: {
-        return {
-          status: MessagesEventBusStatus.Initializing,
-          rev: undefined,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      case MessagesEventBusStatus.Ready: {
-        return {
-          status: this.status,
-          rev: this.latestRev!,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      case MessagesEventBusStatus.Suspended: {
-        return {
-          status: this.status,
-          rev: this.latestRev,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      case MessagesEventBusStatus.Error: {
-        return {
-          status: MessagesEventBusStatus.Error,
-          rev: this.latestRev,
-          error: this.error || {
-            code: MessagesEventBusErrorCode.Unknown,
-            retry: () => {
-              this.init()
-            },
-          },
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
-      default: {
-        return {
-          status: MessagesEventBusStatus.Uninitialized,
-          rev: undefined,
-          error: undefined,
-          requestPollInterval: this.requestPollInterval,
-          trail: this.trail,
-          trailConvo: this.trailConvo,
-        }
-      }
+  onConnect(handler: () => void) {
+    this.emitter.on('connect', handler)
+
+    if (
+      this.status === MessagesEventBusStatus.Ready ||
+      this.status === MessagesEventBusStatus.Backgrounded ||
+      this.status === MessagesEventBusStatus.Suspended
+    ) {
+      handler()
+    }
+
+    return () => {
+      this.emitter.off('connect', handler)
     }
   }
 
-  dispatch(action: MessagesEventBusDispatch) {
+  onError(handler: (payload?: MessagesEventBusError) => void) {
+    this.emitter.on('error', handler)
+
+    if (this.status === MessagesEventBusStatus.Error) {
+      handler(this.error)
+    }
+
+    return () => {
+      this.emitter.off('error', handler)
+    }
+  }
+
+  background() {
+    logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
+    this.dispatch({event: MessagesEventBusDispatchEvent.Background})
+  }
+
+  suspend() {
+    logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
+    this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
+  }
+
+  resume() {
+    logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
+    this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
+  }
+
+  private dispatch(action: MessagesEventBusDispatch) {
     const prevStatus = this.status
 
     switch (this.status) {
-      case MessagesEventBusStatus.Uninitialized: {
-        switch (action.event) {
-          case MessagesEventBusDispatchEvent.Init: {
-            this.status = MessagesEventBusStatus.Initializing
-            this.setup()
-            break
-          }
-        }
-        break
-      }
       case MessagesEventBusStatus.Initializing: {
         switch (action.event) {
           case MessagesEventBusDispatchEvent.Ready: {
             this.status = MessagesEventBusStatus.Ready
             this.resetPoll()
+            this.emitter.emit('connect')
             break
           }
           case MessagesEventBusDispatchEvent.Background: {
             this.status = MessagesEventBusStatus.Backgrounded
             this.resetPoll()
+            this.emitter.emit('connect')
             break
           }
           case MessagesEventBusDispatchEvent.Suspend: {
@@ -164,6 +153,7 @@ export class MessagesEventBus {
           case MessagesEventBusDispatchEvent.Error: {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
+            this.emitter.emit('error', action.payload)
             break
           }
         }
@@ -185,6 +175,11 @@ export class MessagesEventBus {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
             this.stopPoll()
+            this.emitter.emit('error', action.payload)
+            break
+          }
+          case MessagesEventBusDispatchEvent.UpdatePoll: {
+            this.resetPoll()
             break
           }
         }
@@ -206,6 +201,11 @@ export class MessagesEventBus {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
             this.stopPoll()
+            this.emitter.emit('error', action.payload)
+            break
+          }
+          case MessagesEventBusDispatchEvent.UpdatePoll: {
+            this.resetPoll()
             break
           }
         }
@@ -227,6 +227,7 @@ export class MessagesEventBus {
             this.status = MessagesEventBusStatus.Error
             this.error = action.payload
             this.stopPoll()
+            this.emitter.emit('error', action.payload)
             break
           }
         }
@@ -234,12 +235,12 @@ export class MessagesEventBus {
       }
       case MessagesEventBusStatus.Error: {
         switch (action.event) {
-          case MessagesEventBusDispatchEvent.Resume:
-          case MessagesEventBusDispatchEvent.Init: {
+          case MessagesEventBusDispatchEvent.Resume: {
+            // basically reset
             this.status = MessagesEventBusStatus.Initializing
             this.error = undefined
             this.latestRev = undefined
-            this.setup()
+            this.init()
             break
           }
         }
@@ -258,19 +259,36 @@ export class MessagesEventBus {
       },
       logger.DebugContext.convo,
     )
-
-    this.commit()
   }
 
-  private async setup() {
-    logger.debug(`${LOGGER_CONTEXT}: setup`, {}, logger.DebugContext.convo)
+  private async init() {
+    logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
 
     try {
-      await this.initializeLatestRev()
+      const response = await this.agent.api.chat.bsky.convo.listConvos(
+        {
+          limit: 1,
+        },
+        {
+          headers: {
+            Authorization: this.__tempFromUserDid,
+          },
+        },
+      )
+      // throw new Error('UNCOMMENT TO TEST INIT FAILURE')
+
+      const {convos} = response.data
+
+      for (const convo of convos) {
+        if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
+          this.latestRev = convo.rev
+        }
+      }
+
       this.dispatch({event: MessagesEventBusDispatchEvent.Ready})
     } catch (e: any) {
       logger.error(e, {
-        context: `${LOGGER_CONTEXT}: setup failed`,
+        context: `${LOGGER_CONTEXT}: init failed`,
       })
 
       this.dispatch({
@@ -279,100 +297,13 @@ export class MessagesEventBus {
           exception: e,
           code: MessagesEventBusErrorCode.InitFailed,
           retry: () => {
-            this.init()
+            this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
           },
         },
       })
     }
   }
 
-  init() {
-    logger.debug(`${LOGGER_CONTEXT}: init`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Init})
-  }
-
-  background() {
-    logger.debug(`${LOGGER_CONTEXT}: background`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Background})
-  }
-
-  suspend() {
-    logger.debug(`${LOGGER_CONTEXT}: suspend`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Suspend})
-  }
-
-  resume() {
-    logger.debug(`${LOGGER_CONTEXT}: resume`, {}, logger.DebugContext.convo)
-    this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
-  }
-
-  requestPollInterval(interval: number) {
-    const id = nanoid()
-    this.requestedPollIntervals.set(id, interval)
-    this.resetPoll()
-    return () => {
-      this.requestedPollIntervals.delete(id)
-      this.resetPoll()
-    }
-  }
-
-  trail(handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void) {
-    this.emitter.on('events', handler)
-    return () => {
-      this.emitter.off('events', handler)
-    }
-  }
-
-  trailConvo(
-    convoId: string,
-    handler: (events: ChatBskyConvoGetLog.OutputSchema['logs']) => void,
-  ) {
-    const handle = (events: ChatBskyConvoGetLog.OutputSchema['logs']) => {
-      const convoEvents = events.filter(ev => {
-        if (typeof ev.convoId === 'string' && ev.convoId === convoId) {
-          return ev.convoId === convoId
-        }
-        return false
-      })
-
-      if (convoEvents.length > 0) {
-        handler(convoEvents)
-      }
-    }
-
-    this.emitter.on('events', handle)
-    return () => {
-      this.emitter.off('events', handle)
-    }
-  }
-
-  private async initializeLatestRev() {
-    logger.debug(
-      `${LOGGER_CONTEXT}: initialize latest rev`,
-      {},
-      logger.DebugContext.convo,
-    )
-
-    const response = await this.agent.api.chat.bsky.convo.listConvos(
-      {
-        limit: 1,
-      },
-      {
-        headers: {
-          Authorization: this.__tempFromUserDid,
-        },
-      },
-    )
-
-    const {convos} = response.data
-
-    for (const convo of convos) {
-      if (convo.rev > (this.latestRev = this.latestRev || convo.rev)) {
-        this.latestRev = convo.rev
-      }
-    }
-  }
-
   /*
    * Polling
    */
@@ -430,6 +361,8 @@ export class MessagesEventBus {
         },
       )
 
+      // throw new Error('UNCOMMENT TO TEST POLL FAILURE')
+
       const {logs: events} = response.data
 
       let needsEmit = false
@@ -473,7 +406,7 @@ export class MessagesEventBus {
           exception: e,
           code: MessagesEventBusErrorCode.PollFailed,
           retry: () => {
-            this.init()
+            this.dispatch({event: MessagesEventBusDispatchEvent.Resume})
           },
         },
       })