about summary refs log tree commit diff
path: root/src/state/messages/convo/agent.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/state/messages/convo/agent.ts')
-rw-r--r--src/state/messages/convo/agent.ts180
1 files changed, 128 insertions, 52 deletions
diff --git a/src/state/messages/convo/agent.ts b/src/state/messages/convo/agent.ts
index 94bb8dda4..8673c70ad 100644
--- a/src/state/messages/convo/agent.ts
+++ b/src/state/messages/convo/agent.ts
@@ -5,6 +5,8 @@ import {
   ChatBskyConvoGetLog,
   ChatBskyConvoSendMessage,
 } from '@atproto/api'
+import {XRPCError} from '@atproto/xrpc'
+import EventEmitter from 'eventemitter3'
 import {nanoid} from 'nanoid/non-secure'
 
 import {networkRetry} from '#/lib/async/retry'
@@ -14,11 +16,14 @@ import {
   ACTIVE_POLL_INTERVAL,
   BACKGROUND_POLL_INTERVAL,
   INACTIVE_TIMEOUT,
+  NETWORK_FAILURE_STATUSES,
 } from '#/state/messages/convo/const'
 import {
   ConvoDispatch,
   ConvoDispatchEvent,
+  ConvoError,
   ConvoErrorCode,
+  ConvoEvent,
   ConvoItem,
   ConvoItemError,
   ConvoParams,
@@ -51,13 +56,7 @@ export class Convo {
   private senderUserDid: string
 
   private status: ConvoStatus = ConvoStatus.Uninitialized
-  private error:
-    | {
-        code: ConvoErrorCode
-        exception?: Error
-        retry: () => void
-      }
-    | undefined
+  private error: ConvoError | undefined
   private oldestRev: string | undefined | null = undefined
   private isFetchingHistory = false
   private latestRev: string | undefined = undefined
@@ -75,13 +74,13 @@ export class Convo {
     {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']}
   > = new Map()
   private deletedMessages: Set<string> = new Set()
-  private footerItems: Map<string, ConvoItem> = new Map()
-  private headerItems: Map<string, ConvoItem> = new Map()
 
   private isProcessingPendingMessages = false
 
   private lastActiveTimestamp: number | undefined
 
+  private emitter = new EventEmitter<{event: [ConvoEvent]}>()
+
   convoId: string
   convo: ChatBskyConvoDefs.ConvoView | undefined
   sender: AppBskyActorDefs.ProfileViewBasic | undefined
@@ -174,7 +173,7 @@ export class Convo {
           status: ConvoStatus.Error,
           items: [],
           convo: undefined,
-          error: this.error,
+          error: this.error!,
           sender: undefined,
           recipients: undefined,
           isFetchingHistory: false,
@@ -282,6 +281,7 @@ export class Convo {
               if (this.convo) {
                 this.status = ConvoStatus.Ready
                 this.refreshConvo()
+                this.maybeRecoverFromNetworkError()
               } else {
                 this.status = ConvoStatus.Initializing
                 this.setup()
@@ -379,12 +379,30 @@ export class Convo {
     this.newMessages = new Map()
     this.pendingMessages = new Map()
     this.deletedMessages = new Set()
-    this.footerItems = new Map()
-    this.headerItems = new Map()
+
+    this.pendingMessageFailure = null
+    this.fetchMessageHistoryError = undefined
+    this.firehoseError = undefined
 
     this.dispatch({event: ConvoDispatchEvent.Init})
   }
 
+  maybeRecoverFromNetworkError() {
+    if (this.firehoseError) {
+      this.firehoseError.retry()
+      this.firehoseError = undefined
+      this.commit()
+    } else {
+      this.batchRetryPendingMessages()
+    }
+
+    if (this.fetchMessageHistoryError) {
+      this.fetchMessageHistoryError.retry()
+      this.fetchMessageHistoryError = undefined
+      this.commit()
+    }
+  }
+
   private async setup() {
     try {
       const {convo, sender, recipients} = await this.fetchConvo()
@@ -520,6 +538,11 @@ export class Convo {
     }
   }
 
+  private fetchMessageHistoryError:
+    | {
+        retry: () => void
+      }
+    | undefined
   async fetchMessageHistory() {
     logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo)
 
@@ -537,7 +560,7 @@ export class Convo {
      * If we've rendered a retry state for history fetching, exit. Upon retry,
      * this will be removed and we'll try again.
      */
-    if (this.headerItems.has(ConvoItemError.HistoryFailed)) return
+    if (this.fetchMessageHistoryError) return
 
     try {
       this.isFetchingHistory = true
@@ -586,15 +609,11 @@ export class Convo {
     } catch (e: any) {
       logger.error('Convo: failed to fetch message history')
 
-      this.headerItems.set(ConvoItemError.HistoryFailed, {
-        type: 'error-recoverable',
-        key: ConvoItemError.HistoryFailed,
-        code: ConvoItemError.HistoryFailed,
+      this.fetchMessageHistoryError = {
         retry: () => {
-          this.headerItems.delete(ConvoItemError.HistoryFailed)
           this.fetchMessageHistory()
         },
-      })
+      }
     } finally {
       this.isFetchingHistory = false
       this.commit()
@@ -628,22 +647,16 @@ export class Convo {
     )
   }
 
+  private firehoseError: MessagesEventBusError | undefined
+
   onFirehoseConnect() {
-    this.footerItems.delete(ConvoItemError.FirehoseFailed)
+    this.firehoseError = undefined
+    this.batchRetryPendingMessages()
     this.commit()
   }
 
   onFirehoseError(error?: MessagesEventBusError) {
-    this.footerItems.set(ConvoItemError.FirehoseFailed, {
-      type: 'error-recoverable',
-      key: ConvoItemError.FirehoseFailed,
-      code: ConvoItemError.FirehoseFailed,
-      retry: () => {
-        this.footerItems.delete(ConvoItemError.FirehoseFailed)
-        this.commit()
-        error?.retry()
-      },
-    })
+    this.firehoseError = error
     this.commit()
   }
 
@@ -724,7 +737,7 @@ export class Convo {
     }
   }
 
-  private pendingFailed = false
+  private pendingMessageFailure: 'recoverable' | 'unrecoverable' | null = null
 
   async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
     // Ignore empty messages for now since they have no other purpose atm
@@ -734,13 +747,14 @@ export class Convo {
 
     const tempId = nanoid()
 
+    this.pendingMessageFailure = null
     this.pendingMessages.set(tempId, {
       id: tempId,
       message,
     })
     this.commit()
 
-    if (!this.isProcessingPendingMessages && !this.pendingFailed) {
+    if (!this.isProcessingPendingMessages && !this.pendingMessageFailure) {
       this.processPendingMessages()
     }
   }
@@ -765,7 +779,6 @@ export class Convo {
     try {
       this.isProcessingPendingMessages = true
 
-      // throw new Error('UNCOMMENT TO TEST RETRY')
       const {id, message} = pendingMessage
 
       const response = await networkRetry(2, () => {
@@ -794,23 +807,65 @@ export class Convo {
       this.commit()
     } catch (e: any) {
       logger.error(e, {context: `Convo: failed to send message`})
-      this.pendingFailed = true
-      this.commit()
+      this.handleSendMessageFailure(e)
     } finally {
       this.isProcessingPendingMessages = false
     }
   }
 
+  private handleSendMessageFailure(e: any) {
+    if (e instanceof XRPCError) {
+      if (NETWORK_FAILURE_STATUSES.includes(e.status)) {
+        this.pendingMessageFailure = 'recoverable'
+      } else {
+        switch (e.message) {
+          case 'block between recipient and sender':
+            this.pendingMessageFailure = 'unrecoverable'
+            this.emitter.emit('event', {
+              type: 'invalidate-block-state',
+              accountDids: [
+                this.sender!.did,
+                ...this.recipients!.map(r => r.did),
+              ],
+            })
+            break
+          default:
+            logger.warn(
+              `Convo handleSendMessageFailure could not handle error`,
+              {
+                status: e.status,
+                message: e.message,
+              },
+            )
+            break
+        }
+      }
+    } else {
+      logger.error(e, {
+        context: `Convo handleSendMessageFailure received unknown error`,
+      })
+    }
+
+    this.commit()
+  }
+
   async batchRetryPendingMessages() {
+    if (this.pendingMessageFailure === null) return
+
+    const messageArray = Array.from(this.pendingMessages.values())
+    if (messageArray.length === 0) return
+
+    this.pendingMessageFailure = null
+    this.commit()
+
     logger.debug(
-      `Convo: retrying ${this.pendingMessages.size} pending messages`,
+      `Convo: batch retrying ${this.pendingMessages.size} pending messages`,
       {},
       logger.DebugContext.convo,
     )
 
     try {
       // throw new Error('UNCOMMENT TO TEST RETRY')
-      const messageArray = Array.from(this.pendingMessages.values())
       const {data} = await networkRetry(2, () => {
         return this.agent.api.chat.bsky.convo.sendMessageBatch(
           {
@@ -848,8 +903,7 @@ export class Convo {
       )
     } catch (e: any) {
       logger.error(e, {context: `Convo: failed to batch retry messages`})
-      this.pendingFailed = true
-      this.commit()
+      this.handleSendMessageFailure(e)
     }
   }
 
@@ -877,6 +931,14 @@ export class Convo {
     }
   }
 
+  on(handler: (event: ConvoEvent) => void) {
+    this.emitter.on('event', handler)
+
+    return () => {
+      this.emitter.off('event', handler)
+    }
+  }
+
   /*
    * Items in reverse order, since FlatList inverts
    */
@@ -901,9 +963,16 @@ export class Convo {
       }
     })
 
-    this.headerItems.forEach(item => {
-      items.unshift(item)
-    })
+    if (this.fetchMessageHistoryError) {
+      items.unshift({
+        type: 'error',
+        code: ConvoItemError.HistoryFailed,
+        key: ConvoItemError.HistoryFailed,
+        retry: () => {
+          this.maybeRecoverFromNetworkError()
+        },
+      })
+    }
 
     this.newMessages.forEach(m => {
       if (ChatBskyConvoDefs.isMessageView(m)) {
@@ -940,19 +1009,26 @@ export class Convo {
           sender: this.sender!,
         },
         nextMessage: null,
-        retry: this.pendingFailed
-          ? () => {
-              this.pendingFailed = false
-              this.commit()
-              this.batchRetryPendingMessages()
-            }
-          : undefined,
+        failed: this.pendingMessageFailure !== null,
+        retry:
+          this.pendingMessageFailure === 'recoverable'
+            ? () => {
+                this.maybeRecoverFromNetworkError()
+              }
+            : undefined,
       })
     })
 
-    this.footerItems.forEach(item => {
-      items.push(item)
-    })
+    if (this.firehoseError) {
+      items.push({
+        type: 'error',
+        code: ConvoItemError.FirehoseFailed,
+        key: ConvoItemError.FirehoseFailed,
+        retry: () => {
+          this.firehoseError?.retry()
+        },
+      })
+    }
 
     return items
       .filter(item => {