diff options
author | Eric Bailey <git@esb.lol> | 2024-05-16 14:01:39 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-16 14:01:39 -0500 |
commit | 4bceabc21cacd865f5b10684142485faca2c9bb4 (patch) | |
tree | 98201e8fc87f5f2a6e880c2bf8f2da42af5546e0 /src/state/messages/convo/agent.ts | |
parent | dff6bd7c6542b62f1ba8325d2c0520b1665d412b (diff) | |
download | voidsky-4bceabc21cacd865f5b10684142485faca2c9bb4.tar.zst |
[🐴] Error recovery (#4036)
* Handle block state when sending messages * Handle different pending failures * Use existing profile data to handle blocks * Better cleanup, leave room for more * Attempt recover upon next send * Reset pending failure * Capture unexpected error * Gracefully handle network errors and recovery * Re-align error components and types * Include history fetching in recoverable states
Diffstat (limited to 'src/state/messages/convo/agent.ts')
-rw-r--r-- | src/state/messages/convo/agent.ts | 180 |
1 files changed, 128 insertions, 52 deletions
diff --git a/src/state/messages/convo/agent.ts b/src/state/messages/convo/agent.ts index 94bb8dda4..8673c70ad 100644 --- a/src/state/messages/convo/agent.ts +++ b/src/state/messages/convo/agent.ts @@ -5,6 +5,8 @@ import { ChatBskyConvoGetLog, ChatBskyConvoSendMessage, } from '@atproto/api' +import {XRPCError} from '@atproto/xrpc' +import EventEmitter from 'eventemitter3' import {nanoid} from 'nanoid/non-secure' import {networkRetry} from '#/lib/async/retry' @@ -14,11 +16,14 @@ import { ACTIVE_POLL_INTERVAL, BACKGROUND_POLL_INTERVAL, INACTIVE_TIMEOUT, + NETWORK_FAILURE_STATUSES, } from '#/state/messages/convo/const' import { ConvoDispatch, ConvoDispatchEvent, + ConvoError, ConvoErrorCode, + ConvoEvent, ConvoItem, ConvoItemError, ConvoParams, @@ -51,13 +56,7 @@ export class Convo { private senderUserDid: string private status: ConvoStatus = ConvoStatus.Uninitialized - private error: - | { - code: ConvoErrorCode - exception?: Error - retry: () => void - } - | undefined + private error: ConvoError | undefined private oldestRev: string | undefined | null = undefined private isFetchingHistory = false private latestRev: string | undefined = undefined @@ -75,13 +74,13 @@ export class Convo { {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} > = new Map() private deletedMessages: Set<string> = new Set() - private footerItems: Map<string, ConvoItem> = new Map() - private headerItems: Map<string, ConvoItem> = new Map() private isProcessingPendingMessages = false private lastActiveTimestamp: number | undefined + private emitter = new EventEmitter<{event: [ConvoEvent]}>() + convoId: string convo: ChatBskyConvoDefs.ConvoView | undefined sender: AppBskyActorDefs.ProfileViewBasic | undefined @@ -174,7 +173,7 @@ export class Convo { status: ConvoStatus.Error, items: [], convo: undefined, - error: this.error, + error: this.error!, sender: undefined, recipients: undefined, isFetchingHistory: false, @@ -282,6 +281,7 @@ export class Convo { if (this.convo) { this.status = ConvoStatus.Ready this.refreshConvo() + this.maybeRecoverFromNetworkError() } else { this.status = ConvoStatus.Initializing this.setup() @@ -379,12 +379,30 @@ export class Convo { this.newMessages = new Map() this.pendingMessages = new Map() this.deletedMessages = new Set() - this.footerItems = new Map() - this.headerItems = new Map() + + this.pendingMessageFailure = null + this.fetchMessageHistoryError = undefined + this.firehoseError = undefined this.dispatch({event: ConvoDispatchEvent.Init}) } + maybeRecoverFromNetworkError() { + if (this.firehoseError) { + this.firehoseError.retry() + this.firehoseError = undefined + this.commit() + } else { + this.batchRetryPendingMessages() + } + + if (this.fetchMessageHistoryError) { + this.fetchMessageHistoryError.retry() + this.fetchMessageHistoryError = undefined + this.commit() + } + } + private async setup() { try { const {convo, sender, recipients} = await this.fetchConvo() @@ -520,6 +538,11 @@ export class Convo { } } + private fetchMessageHistoryError: + | { + retry: () => void + } + | undefined async fetchMessageHistory() { logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo) @@ -537,7 +560,7 @@ export class Convo { * If we've rendered a retry state for history fetching, exit. Upon retry, * this will be removed and we'll try again. */ - if (this.headerItems.has(ConvoItemError.HistoryFailed)) return + if (this.fetchMessageHistoryError) return try { this.isFetchingHistory = true @@ -586,15 +609,11 @@ export class Convo { } catch (e: any) { logger.error('Convo: failed to fetch message history') - this.headerItems.set(ConvoItemError.HistoryFailed, { - type: 'error-recoverable', - key: ConvoItemError.HistoryFailed, - code: ConvoItemError.HistoryFailed, + this.fetchMessageHistoryError = { retry: () => { - this.headerItems.delete(ConvoItemError.HistoryFailed) this.fetchMessageHistory() }, - }) + } } finally { this.isFetchingHistory = false this.commit() @@ -628,22 +647,16 @@ export class Convo { ) } + private firehoseError: MessagesEventBusError | undefined + onFirehoseConnect() { - this.footerItems.delete(ConvoItemError.FirehoseFailed) + this.firehoseError = undefined + this.batchRetryPendingMessages() this.commit() } onFirehoseError(error?: MessagesEventBusError) { - this.footerItems.set(ConvoItemError.FirehoseFailed, { - type: 'error-recoverable', - key: ConvoItemError.FirehoseFailed, - code: ConvoItemError.FirehoseFailed, - retry: () => { - this.footerItems.delete(ConvoItemError.FirehoseFailed) - this.commit() - error?.retry() - }, - }) + this.firehoseError = error this.commit() } @@ -724,7 +737,7 @@ export class Convo { } } - private pendingFailed = false + private pendingMessageFailure: 'recoverable' | 'unrecoverable' | null = null async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { // Ignore empty messages for now since they have no other purpose atm @@ -734,13 +747,14 @@ export class Convo { const tempId = nanoid() + this.pendingMessageFailure = null this.pendingMessages.set(tempId, { id: tempId, message, }) this.commit() - if (!this.isProcessingPendingMessages && !this.pendingFailed) { + if (!this.isProcessingPendingMessages && !this.pendingMessageFailure) { this.processPendingMessages() } } @@ -765,7 +779,6 @@ export class Convo { try { this.isProcessingPendingMessages = true - // throw new Error('UNCOMMENT TO TEST RETRY') const {id, message} = pendingMessage const response = await networkRetry(2, () => { @@ -794,23 +807,65 @@ export class Convo { this.commit() } catch (e: any) { logger.error(e, {context: `Convo: failed to send message`}) - this.pendingFailed = true - this.commit() + this.handleSendMessageFailure(e) } finally { this.isProcessingPendingMessages = false } } + private handleSendMessageFailure(e: any) { + if (e instanceof XRPCError) { + if (NETWORK_FAILURE_STATUSES.includes(e.status)) { + this.pendingMessageFailure = 'recoverable' + } else { + switch (e.message) { + case 'block between recipient and sender': + this.pendingMessageFailure = 'unrecoverable' + this.emitter.emit('event', { + type: 'invalidate-block-state', + accountDids: [ + this.sender!.did, + ...this.recipients!.map(r => r.did), + ], + }) + break + default: + logger.warn( + `Convo handleSendMessageFailure could not handle error`, + { + status: e.status, + message: e.message, + }, + ) + break + } + } + } else { + logger.error(e, { + context: `Convo handleSendMessageFailure received unknown error`, + }) + } + + this.commit() + } + async batchRetryPendingMessages() { + if (this.pendingMessageFailure === null) return + + const messageArray = Array.from(this.pendingMessages.values()) + if (messageArray.length === 0) return + + this.pendingMessageFailure = null + this.commit() + logger.debug( - `Convo: retrying ${this.pendingMessages.size} pending messages`, + `Convo: batch retrying ${this.pendingMessages.size} pending messages`, {}, logger.DebugContext.convo, ) try { // throw new Error('UNCOMMENT TO TEST RETRY') - const messageArray = Array.from(this.pendingMessages.values()) const {data} = await networkRetry(2, () => { return this.agent.api.chat.bsky.convo.sendMessageBatch( { @@ -848,8 +903,7 @@ export class Convo { ) } catch (e: any) { logger.error(e, {context: `Convo: failed to batch retry messages`}) - this.pendingFailed = true - this.commit() + this.handleSendMessageFailure(e) } } @@ -877,6 +931,14 @@ export class Convo { } } + on(handler: (event: ConvoEvent) => void) { + this.emitter.on('event', handler) + + return () => { + this.emitter.off('event', handler) + } + } + /* * Items in reverse order, since FlatList inverts */ @@ -901,9 +963,16 @@ export class Convo { } }) - this.headerItems.forEach(item => { - items.unshift(item) - }) + if (this.fetchMessageHistoryError) { + items.unshift({ + type: 'error', + code: ConvoItemError.HistoryFailed, + key: ConvoItemError.HistoryFailed, + retry: () => { + this.maybeRecoverFromNetworkError() + }, + }) + } this.newMessages.forEach(m => { if (ChatBskyConvoDefs.isMessageView(m)) { @@ -940,19 +1009,26 @@ export class Convo { sender: this.sender!, }, nextMessage: null, - retry: this.pendingFailed - ? () => { - this.pendingFailed = false - this.commit() - this.batchRetryPendingMessages() - } - : undefined, + failed: this.pendingMessageFailure !== null, + retry: + this.pendingMessageFailure === 'recoverable' + ? () => { + this.maybeRecoverFromNetworkError() + } + : undefined, }) }) - this.footerItems.forEach(item => { - items.push(item) - }) + if (this.firehoseError) { + items.push({ + type: 'error', + code: ConvoItemError.FirehoseFailed, + key: ConvoItemError.FirehoseFailed, + retry: () => { + this.firehoseError?.retry() + }, + }) + } return items .filter(item => { |