about summary refs log tree commit diff
path: root/src/state/messages
diff options
context:
space:
mode:
authorEric Bailey <git@esb.lol>2024-05-08 18:01:07 -0500
committerGitHub <noreply@github.com>2024-05-08 18:01:07 -0500
commit3bac0182b5e93c9109bf00185c6f6d1dbf375f21 (patch)
treec71387c041944ece554aafbd181b33d46d5fe365 /src/state/messages
parentce2eddca8e95113146e18281415365806088fd5d (diff)
downloadvoidsky-3bac0182b5e93c9109bf00185c6f6d1dbf375f21.tar.zst
[🐴] Integrate event bus (#3915)
* Integrate event bus

* Fixes

* Move events mgmt into Convo class

* Clean up poll interval updates

* Remove unused

* Remove annoying log
Diffstat (limited to 'src/state/messages')
-rw-r--r--src/state/messages/convo/agent.ts228
-rw-r--r--src/state/messages/convo/const.ts2
-rw-r--r--src/state/messages/convo/index.tsx3
-rw-r--r--src/state/messages/convo/types.ts3
-rw-r--r--src/state/messages/events/agent.ts10
5 files changed, 120 insertions, 126 deletions
diff --git a/src/state/messages/convo/agent.ts b/src/state/messages/convo/agent.ts
index 38a3f5e62..a85293494 100644
--- a/src/state/messages/convo/agent.ts
+++ b/src/state/messages/convo/agent.ts
@@ -10,6 +10,10 @@ import {nanoid} from 'nanoid/non-secure'
 import {logger} from '#/logger'
 import {isNative} from '#/platform/detection'
 import {
+  ACTIVE_POLL_INTERVAL,
+  BACKGROUND_POLL_INTERVAL,
+} from '#/state/messages/convo/const'
+import {
   ConvoDispatch,
   ConvoDispatchEvent,
   ConvoErrorCode,
@@ -19,9 +23,8 @@ import {
   ConvoState,
   ConvoStatus,
 } from '#/state/messages/convo/types'
-
-const ACTIVE_POLL_INTERVAL = 1e3
-const BACKGROUND_POLL_INTERVAL = 10e3
+import {MessagesEventBus} from '#/state/messages/events/agent'
+import {MessagesEventBusError} from '#/state/messages/events/types'
 
 // TODO temporary
 let DEBUG_ACTIVE_CHAT: string | undefined
@@ -41,10 +44,10 @@ export class Convo {
   private id: string
 
   private agent: BskyAgent
+  private events: MessagesEventBus
   private __tempFromUserDid: string
 
   private status: ConvoStatus = ConvoStatus.Uninitialized
-  private pollInterval = ACTIVE_POLL_INTERVAL
   private error:
     | {
         code: ConvoErrorCode
@@ -52,9 +55,9 @@ export class Convo {
         retry: () => void
       }
     | undefined
-  private historyCursor: string | undefined | null = undefined
+  private oldestRev: string | undefined | null = undefined
   private isFetchingHistory = false
-  private eventsCursor: string | undefined = undefined
+  private latestRev: string | undefined = undefined
 
   private pastMessages: Map<
     string,
@@ -73,7 +76,6 @@ export class Convo {
   private headerItems: Map<string, ConvoItem> = new Map()
 
   private isProcessingPendingMessages = false
-  private nextPoll: NodeJS.Timeout | undefined
 
   convoId: string
   convo: ChatBskyConvoDefs.ConvoView | undefined
@@ -85,6 +87,7 @@ export class Convo {
     this.id = nanoid(3)
     this.convoId = params.convoId
     this.agent = params.agent
+    this.events = params.events
     this.__tempFromUserDid = params.__tempFromUserDid
 
     this.subscribe = this.subscribe.bind(this)
@@ -92,6 +95,9 @@ export class Convo {
     this.sendMessage = this.sendMessage.bind(this)
     this.deleteMessage = this.deleteMessage.bind(this)
     this.fetchMessageHistory = this.fetchMessageHistory.bind(this)
+    this.ingestFirehose = this.ingestFirehose.bind(this)
+    this.onFirehoseConnect = this.onFirehoseConnect.bind(this)
+    this.onFirehoseError = this.onFirehoseError.bind(this)
 
     if (DEBUG_ACTIVE_CHAT) {
       logger.error(`Convo: another chat was already active`, {
@@ -100,6 +106,12 @@ export class Convo {
     } else {
       DEBUG_ACTIVE_CHAT = this.convoId
     }
+
+    this.events.trailConvo(this.convoId, events => {
+      this.ingestFirehose(events)
+    })
+    this.events.onConnect(this.onFirehoseConnect)
+    this.events.onError(this.onFirehoseError)
   }
 
   private commit() {
@@ -198,6 +210,7 @@ export class Convo {
           case ConvoDispatchEvent.Init: {
             this.status = ConvoStatus.Initializing
             this.setup()
+            this.requestPollInterval(ACTIVE_POLL_INTERVAL)
             break
           }
         }
@@ -207,27 +220,24 @@ export class Convo {
         switch (action.event) {
           case ConvoDispatchEvent.Ready: {
             this.status = ConvoStatus.Ready
-            this.pollInterval = ACTIVE_POLL_INTERVAL
-            this.fetchMessageHistory().then(() => {
-              this.restartPoll()
-            })
+            this.fetchMessageHistory()
             break
           }
           case ConvoDispatchEvent.Background: {
             this.status = ConvoStatus.Backgrounded
-            this.pollInterval = BACKGROUND_POLL_INTERVAL
-            this.fetchMessageHistory().then(() => {
-              this.restartPoll()
-            })
+            this.fetchMessageHistory()
+            this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
             break
           }
           case ConvoDispatchEvent.Suspend: {
             this.status = ConvoStatus.Suspended
+            this.withdrawRequestedPollInterval()
             break
           }
           case ConvoDispatchEvent.Error: {
             this.status = ConvoStatus.Error
             this.error = action.payload
+            this.withdrawRequestedPollInterval()
             break
           }
         }
@@ -237,24 +247,23 @@ export class Convo {
         switch (action.event) {
           case ConvoDispatchEvent.Resume: {
             this.refreshConvo()
-            this.restartPoll()
+            this.requestPollInterval(ACTIVE_POLL_INTERVAL)
             break
           }
           case ConvoDispatchEvent.Background: {
             this.status = ConvoStatus.Backgrounded
-            this.pollInterval = BACKGROUND_POLL_INTERVAL
-            this.restartPoll()
+            this.requestPollInterval(BACKGROUND_POLL_INTERVAL)
             break
           }
           case ConvoDispatchEvent.Suspend: {
             this.status = ConvoStatus.Suspended
-            this.cancelNextPoll()
+            this.withdrawRequestedPollInterval()
             break
           }
           case ConvoDispatchEvent.Error: {
             this.status = ConvoStatus.Error
             this.error = action.payload
-            this.cancelNextPoll()
+            this.withdrawRequestedPollInterval()
             break
           }
         }
@@ -262,23 +271,27 @@ export class Convo {
       }
       case ConvoStatus.Backgrounded: {
         switch (action.event) {
+          // TODO truncate history if needed
           case ConvoDispatchEvent.Resume: {
-            this.status = ConvoStatus.Ready
-            this.pollInterval = ACTIVE_POLL_INTERVAL
-            this.refreshConvo()
-            // TODO truncate history if needed
-            this.restartPoll()
+            if (this.convo) {
+              this.status = ConvoStatus.Ready
+              this.refreshConvo()
+            } else {
+              this.status = ConvoStatus.Initializing
+              this.setup()
+            }
+            this.requestPollInterval(ACTIVE_POLL_INTERVAL)
             break
           }
           case ConvoDispatchEvent.Suspend: {
             this.status = ConvoStatus.Suspended
-            this.cancelNextPoll()
+            this.withdrawRequestedPollInterval()
             break
           }
           case ConvoDispatchEvent.Error: {
             this.status = ConvoStatus.Error
             this.error = action.payload
-            this.cancelNextPoll()
+            this.withdrawRequestedPollInterval()
             break
           }
         }
@@ -287,18 +300,11 @@ export class Convo {
       case ConvoStatus.Suspended: {
         switch (action.event) {
           case ConvoDispatchEvent.Init: {
-            this.status = ConvoStatus.Ready
-            this.pollInterval = ACTIVE_POLL_INTERVAL
-            this.refreshConvo()
-            // TODO truncate history if needed
-            this.restartPoll()
+            this.reset()
             break
           }
           case ConvoDispatchEvent.Resume: {
-            this.status = ConvoStatus.Ready
-            this.pollInterval = ACTIVE_POLL_INTERVAL
-            this.refreshConvo()
-            this.restartPoll()
+            this.reset()
             break
           }
           case ConvoDispatchEvent.Error: {
@@ -356,8 +362,8 @@ export class Convo {
 
     this.status = ConvoStatus.Uninitialized
     this.error = undefined
-    this.historyCursor = undefined
-    this.eventsCursor = undefined
+    this.oldestRev = undefined
+    this.latestRev = undefined
 
     this.pastMessages = new Map()
     this.newMessages = new Map()
@@ -426,6 +432,17 @@ export class Convo {
     DEBUG_ACTIVE_CHAT = undefined
   }
 
+  private requestedPollInterval: (() => void) | undefined
+  private requestPollInterval(interval: number) {
+    this.withdrawRequestedPollInterval()
+    this.requestedPollInterval = this.events.requestPollInterval(interval)
+  }
+  private withdrawRequestedPollInterval() {
+    if (this.requestedPollInterval) {
+      this.requestedPollInterval()
+    }
+  }
+
   private pendingFetchConvo:
     | Promise<{
         convo: ChatBskyConvoDefs.ConvoView
@@ -499,9 +516,9 @@ export class Convo {
     logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo)
 
     /*
-     * If historyCursor is null, we've fetched all history.
+     * If oldestRev is null, we've fetched all history.
      */
-    if (this.historyCursor === null) return
+    if (this.oldestRev === null) return
 
     /*
      * Don't fetch again if a fetch is already in progress
@@ -529,7 +546,7 @@ export class Convo {
 
       const response = await this.agent.api.chat.bsky.convo.getMessages(
         {
-          cursor: this.historyCursor,
+          cursor: this.oldestRev,
           convoId: this.convoId,
           limit: isNative ? 25 : 50,
         },
@@ -541,21 +558,22 @@ export class Convo {
       )
       const {cursor, messages} = response.data
 
-      this.historyCursor = cursor ?? null
+      this.oldestRev = cursor ?? null
 
       for (const message of messages) {
         if (
           ChatBskyConvoDefs.isMessageView(message) ||
           ChatBskyConvoDefs.isDeletedMessageView(message)
         ) {
-          this.pastMessages.set(message.id, message)
-
-          // set to latest rev
-          if (
-            message.rev > (this.eventsCursor = this.eventsCursor || message.rev)
-          ) {
-            this.eventsCursor = message.rev
+          /*
+           * If this message is already in new messages, it was added by the
+           * firehose ingestion, and we can safely overwrite it. This trusts
+           * the server on ordering, and keeps it in sync.
+           */
+          if (this.newMessages.has(message.id)) {
+            this.newMessages.delete(message.id)
           }
+          this.pastMessages.set(message.id, message)
         }
       }
     } catch (e: any) {
@@ -576,84 +594,26 @@ export class Convo {
     }
   }
 
-  private restartPoll() {
-    this.cancelNextPoll()
-    this.pollLatestEvents()
-  }
-
-  private cancelNextPoll() {
-    if (this.nextPoll) clearTimeout(this.nextPoll)
-  }
-
-  private pollLatestEvents() {
-    /*
-     * Uncomment to view poll events
-     */
-    logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo)
-
-    try {
-      this.fetchLatestEvents().then(({events}) => {
-        this.applyLatestEvents(events)
-      })
-      this.nextPoll = setTimeout(() => {
-        this.pollLatestEvents()
-      }, this.pollInterval)
-    } catch (e: any) {
-      logger.error('Convo: poll events failed')
-
-      this.cancelNextPoll()
-
-      this.footerItems.set(ConvoItemError.PollFailed, {
-        type: 'error-recoverable',
-        key: ConvoItemError.PollFailed,
-        code: ConvoItemError.PollFailed,
-        retry: () => {
-          this.footerItems.delete(ConvoItemError.PollFailed)
-          this.commit()
-          this.pollLatestEvents()
-        },
-      })
-
-      this.commit()
-    }
+  onFirehoseConnect() {
+    this.footerItems.delete(ConvoItemError.PollFailed)
+    this.commit()
   }
 
-  private pendingFetchLatestEvents:
-    | Promise<{
-        events: ChatBskyConvoGetLog.OutputSchema['logs']
-      }>
-    | undefined
-  async fetchLatestEvents() {
-    if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents
-
-    this.pendingFetchLatestEvents = new Promise<{
-      events: ChatBskyConvoGetLog.OutputSchema['logs']
-    }>(async (resolve, reject) => {
-      try {
-        // throw new Error('UNCOMMENT TO TEST POLL FAILURE')
-        const response = await this.agent.api.chat.bsky.convo.getLog(
-          {
-            cursor: this.eventsCursor,
-          },
-          {
-            headers: {
-              Authorization: this.__tempFromUserDid,
-            },
-          },
-        )
-        const {logs} = response.data
-        resolve({events: logs})
-      } catch (e) {
-        reject(e)
-      } finally {
-        this.pendingFetchLatestEvents = undefined
-      }
+  onFirehoseError(error?: MessagesEventBusError) {
+    this.footerItems.set(ConvoItemError.PollFailed, {
+      type: 'error-recoverable',
+      key: ConvoItemError.PollFailed,
+      code: ConvoItemError.PollFailed,
+      retry: () => {
+        this.footerItems.delete(ConvoItemError.PollFailed)
+        this.commit()
+        error?.retry()
+      },
     })
-
-    return this.pendingFetchLatestEvents
+    this.commit()
   }
 
-  private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
+  ingestFirehose(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
     let needsCommit = false
 
     for (const ev of events) {
@@ -662,14 +622,25 @@ export class Convo {
        * know what it is.
        */
       if (typeof ev.rev === 'string') {
+        const isUninitialized = !this.latestRev
+        const isNewEvent = this.latestRev && ev.rev > this.latestRev
+
+        /*
+         * We received an event prior to fetching any history, so we can safely
+         * use this as the initial history cursor
+         */
+        if (this.oldestRev === undefined && isUninitialized) {
+          this.oldestRev = ev.rev
+        }
+
         /*
          * We only care about new events
          */
-        if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) {
+        if (isNewEvent || isUninitialized) {
           /*
            * Update rev regardless of if it's a ev type we care about or not
            */
-          this.eventsCursor = ev.rev
+          this.latestRev = ev.rev
 
           /*
            * This is VERY important. We don't want to insert any messages from
@@ -681,8 +652,14 @@ export class Convo {
             ChatBskyConvoDefs.isLogCreateMessage(ev) &&
             ChatBskyConvoDefs.isMessageView(ev.message)
           ) {
+            /**
+             * If this message is already in new messages, it was added by our
+             * sending logic, and is based on client-ordering. When we receive
+             * the "commited" event from the log, we should replace this
+             * reference and re-insert in order to respect the order we receied
+             * from the log.
+             */
             if (this.newMessages.has(ev.message.id)) {
-              // Trust the ev as the source of truth on ordering
               this.newMessages.delete(ev.message.id)
             }
             this.newMessages.set(ev.message.id, ev.message)
@@ -694,6 +671,7 @@ export class Convo {
             /*
              * Update if we have this in state. If we don't, don't worry about it.
              */
+            // TODO check for other storage spots
             if (this.pastMessages.has(ev.message.id)) {
               /*
                * For now, we remove deleted messages from the thread, if we receive one.
diff --git a/src/state/messages/convo/const.ts b/src/state/messages/convo/const.ts
new file mode 100644
index 000000000..0b8873341
--- /dev/null
+++ b/src/state/messages/convo/const.ts
@@ -0,0 +1,2 @@
+export const ACTIVE_POLL_INTERVAL = 1e3
+export const BACKGROUND_POLL_INTERVAL = 5e3
diff --git a/src/state/messages/convo/index.tsx b/src/state/messages/convo/index.tsx
index c4fe71d30..311e8ce05 100644
--- a/src/state/messages/convo/index.tsx
+++ b/src/state/messages/convo/index.tsx
@@ -5,6 +5,7 @@ import {useFocusEffect, useIsFocused} from '@react-navigation/native'
 
 import {Convo} from '#/state/messages/convo/agent'
 import {ConvoParams, ConvoState} from '#/state/messages/convo/types'
+import {useMessagesEventBus} from '#/state/messages/events'
 import {useMarkAsReadMutation} from '#/state/queries/messages/conversation'
 import {useAgent} from '#/state/session'
 import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage'
@@ -26,6 +27,7 @@ export function ConvoProvider({
   const isScreenFocused = useIsFocused()
   const {serviceUrl} = useDmServiceUrlStorage()
   const {getAgent} = useAgent()
+  const events = useMessagesEventBus()
   const [convo] = useState(
     () =>
       new Convo({
@@ -33,6 +35,7 @@ export function ConvoProvider({
         agent: new BskyAgent({
           service: serviceUrl,
         }),
+        events,
         __tempFromUserDid: getAgent().session?.did!,
       }),
   )
diff --git a/src/state/messages/convo/types.ts b/src/state/messages/convo/types.ts
index cfbde6d7e..2ed2eeaff 100644
--- a/src/state/messages/convo/types.ts
+++ b/src/state/messages/convo/types.ts
@@ -5,9 +5,12 @@ import {
   ChatBskyConvoSendMessage,
 } from '@atproto-labs/api'
 
+import {MessagesEventBus} from '#/state/messages/events/agent'
+
 export type ConvoParams = {
   convoId: string
   agent: BskyAgent
+  events: MessagesEventBus
   __tempFromUserDid: string
 }
 
diff --git a/src/state/messages/events/agent.ts b/src/state/messages/events/agent.ts
index f22cff9d9..eea61a61b 100644
--- a/src/state/messages/events/agent.ts
+++ b/src/state/messages/events/agent.ts
@@ -347,7 +347,15 @@ export class MessagesEventBus {
 
     this.isPolling = true
 
-    logger.debug(`${LOGGER_CONTEXT}: poll`, {}, logger.DebugContext.convo)
+    // logger.debug(
+    //   `${LOGGER_CONTEXT}: poll`,
+    //   {
+    //     requestedPollIntervals: Array.from(
+    //       this.requestedPollIntervals.values(),
+    //     ),
+    //   },
+    //   logger.DebugContext.convo,
+    // )
 
     try {
       const response = await this.agent.api.chat.bsky.convo.getLog(