diff options
author | Eric Bailey <git@esb.lol> | 2024-05-01 15:24:56 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-01 15:24:56 -0500 |
commit | fc0eab2d03bfbd2ddb2a39b77b83811ae9b4da2d (patch) | |
tree | d3878ce501a65bb25e89677a0c1940a411000368 /src/state/messages/convo.ts | |
parent | 333ccdad39fd2013615d9f53645763fe85c3e568 (diff) | |
download | voidsky-fc0eab2d03bfbd2ddb2a39b77b83811ae9b4da2d.tar.zst |
Retry clops (#3800)
* Add convo retries, sketch out tests * Only append nextMessage to messages * Remove debug code
Diffstat (limited to 'src/state/messages/convo.ts')
-rw-r--r-- | src/state/messages/convo.ts | 201 |
1 files changed, 156 insertions, 45 deletions
diff --git a/src/state/messages/convo.ts b/src/state/messages/convo.ts index a1de1dbed..73ef8d73e 100644 --- a/src/state/messages/convo.ts +++ b/src/state/messages/convo.ts @@ -6,6 +6,8 @@ import { import {EventEmitter} from 'eventemitter3' import {nanoid} from 'nanoid/non-secure' +import {isNative} from '#/platform/detection' + export type ConvoParams = { convoId: string agent: BskyAgent @@ -44,6 +46,11 @@ export type ConvoItem = key: string message: ChatBskyConvoSendMessage.InputSchema['message'] } + | { + type: 'pending-retry' + key: string + retry: () => void + } export type ConvoState = | { @@ -66,6 +73,17 @@ export type ConvoState = status: ConvoStatus.Destroyed } +export function isConvoItemMessage( + item: ConvoItem, +): item is ConvoItem & {type: 'message'} { + if (!item) return false + return ( + item.type === 'message' || + item.type === 'deleted-message' || + item.type === 'pending-message' + ) +} + export class Convo { private convoId: string private agent: BskyAgent @@ -90,8 +108,10 @@ export class Convo { string, {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} > = new Map() + private footerItems: Map<string, ConvoItem> = new Map() private pendingEventIngestion: Promise<void> | undefined + private isProcessingPendingMessages = false constructor(params: ConvoParams) { this.convoId = params.convoId @@ -165,7 +185,7 @@ export class Convo { { cursor: this.historyCursor, convoId: this.convoId, - limit: 20, + limit: isNative ? 25 : 50, }, { headers: { @@ -230,8 +250,6 @@ export class Convo { /* * This is VERY important. We don't want to insert any messages from * your other chats. - * - * TODO there may be a better way to handle this */ if (log.convoId !== this.convoId) continue @@ -241,7 +259,6 @@ export class Convo { ) { if (this.newMessages.has(log.message.id)) { // Trust the log as the source of truth on ordering - // TODO test this this.newMessages.delete(log.message.id) } this.newMessages.set(log.message.id, log.message) @@ -269,10 +286,116 @@ export class Convo { this.commit() } + async processPendingMessages() { + const pendingMessage = Array.from(this.pendingMessages.values()).shift() + + /* + * If there are no pending messages, we're done. + */ + if (!pendingMessage) { + this.isProcessingPendingMessages = false + return + } + + try { + this.isProcessingPendingMessages = true + + // throw new Error('UNCOMMENT TO TEST RETRY') + const {id, message} = pendingMessage + + const response = await this.agent.api.chat.bsky.convo.sendMessage( + { + convoId: this.convoId, + message, + }, + { + encoding: 'application/json', + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const res = response.data + + /* + * Insert into `newMessages` as soon as we have a real ID. That way, when + * we get an event log back, we can replace in situ. + */ + this.newMessages.set(res.id, { + ...res, + $type: 'chat.bsky.convo.defs#messageView', + sender: this.convo?.members.find(m => m.did === this.__tempFromUserDid), + }) + this.pendingMessages.delete(id) + + await this.processPendingMessages() + + this.commit() + } catch (e) { + this.footerItems.set('pending-retry', { + type: 'pending-retry', + key: 'pending-retry', + retry: this.batchRetryPendingMessages.bind(this), + }) + this.commit() + } + } + + async batchRetryPendingMessages() { + this.footerItems.delete('pending-retry') + this.commit() + + try { + const messageArray = Array.from(this.pendingMessages.values()) + const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch( + { + items: messageArray.map(({message}) => ({ + convoId: this.convoId, + message, + })), + }, + { + encoding: 'application/json', + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {items} = data + + /* + * Insert into `newMessages` as soon as we have a real ID. That way, when + * we get an event log back, we can replace in situ. + */ + for (const item of items) { + this.newMessages.set(item.id, { + ...item, + $type: 'chat.bsky.convo.defs#messageView', + sender: this.convo?.members.find( + m => m.did === this.__tempFromUserDid, + ), + }) + } + + for (const pendingMessage of messageArray) { + this.pendingMessages.delete(pendingMessage.id) + } + + this.commit() + } catch (e) { + this.footerItems.set('pending-retry', { + type: 'pending-retry', + key: 'pending-retry', + retry: this.batchRetryPendingMessages.bind(this), + }) + this.commit() + } + } + async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { if (this.status === ConvoStatus.Destroyed) return // Ignore empty messages for now since they have no other purpose atm - if (!message.text) return + if (!message.text.trim()) return const tempId = nanoid() @@ -282,33 +405,9 @@ export class Convo { }) this.commit() - await new Promise(y => setTimeout(y, 500)) - const response = await this.agent.api.chat.bsky.convo.sendMessage( - { - convoId: this.convoId, - message, - }, - { - encoding: 'application/json', - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const res = response.data - - /* - * Insert into `newMessages` as soon as we have a real ID. That way, when - * we get an event log back, we can replace in situ. - */ - this.newMessages.set(res.id, { - ...res, - $type: 'chat.bsky.convo.defs#messageView', - sender: this.convo?.members.find(m => m.did === this.__tempFromUserDid), - }) - this.pendingMessages.delete(tempId) - - this.commit() + if (!this.isProcessingPendingMessages) { + this.processPendingMessages() + } } /* @@ -345,6 +444,10 @@ export class Convo { }) }) + this.footerItems.forEach(item => { + items.unshift(item) + }) + this.pastMessages.forEach(m => { if (ChatBskyConvoDefs.isMessageView(m)) { items.push({ @@ -365,25 +468,33 @@ export class Convo { return items.map((item, i) => { let nextMessage = null + const isMessage = isConvoItemMessage(item) - if ( - ChatBskyConvoDefs.isMessageView(item.message) || - ChatBskyConvoDefs.isDeletedMessageView(item.message) - ) { - const next = items[i - 1] + if (isMessage) { if ( - next && - (ChatBskyConvoDefs.isMessageView(next.message) || - ChatBskyConvoDefs.isDeletedMessageView(next.message)) + isMessage && + (ChatBskyConvoDefs.isMessageView(item.message) || + ChatBskyConvoDefs.isDeletedMessageView(item.message)) ) { - nextMessage = next.message + const next = items[i - 1] + + if ( + isConvoItemMessage(next) && + next && + (ChatBskyConvoDefs.isMessageView(next.message) || + ChatBskyConvoDefs.isDeletedMessageView(next.message)) + ) { + nextMessage = next.message + } } - } - return { - ...item, - nextMessage, + return { + ...item, + nextMessage, + } } + + return item }) } |