From 56f713077f6e663ac77df20f1d67b55fd1da153a Mon Sep 17 00:00:00 2001 From: Eric Bailey Date: Tue, 7 May 2024 21:46:59 -0500 Subject: [🐴] Reorg convo files (#3909) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Remove unused prop * Reorganize --- src/components/dms/MessageMenu.tsx | 12 +- .../Messages/Conversation/MessageListError.tsx | 2 +- src/screens/Messages/Conversation/MessagesList.tsx | 22 +- src/screens/Messages/Conversation/index.tsx | 28 +- src/state/messages/convo.ts | 1148 -------------------- src/state/messages/convo/agent.ts | 985 +++++++++++++++++ src/state/messages/convo/index.tsx | 75 ++ src/state/messages/convo/types.ts | 178 +++ src/state/messages/index.tsx | 74 +- 9 files changed, 1271 insertions(+), 1253 deletions(-) delete mode 100644 src/state/messages/convo.ts create mode 100644 src/state/messages/convo/agent.ts create mode 100644 src/state/messages/convo/index.tsx create mode 100644 src/state/messages/convo/types.ts (limited to 'src') diff --git a/src/components/dms/MessageMenu.tsx b/src/components/dms/MessageMenu.tsx index d2a7d147d..3a5fa54d8 100644 --- a/src/components/dms/MessageMenu.tsx +++ b/src/components/dms/MessageMenu.tsx @@ -5,8 +5,8 @@ import {ChatBskyConvoDefs} from '@atproto-labs/api' import {msg} from '@lingui/macro' import {useLingui} from '@lingui/react' -import {useChat} from 'state/messages' -import {ConvoStatus} from 'state/messages/convo' +import {useConvo} from 'state/messages/convo' +import {ConvoStatus} from 'state/messages/convo/types' import {useSession} from 'state/session' import * as Toast from '#/view/com/util/Toast' import {atoms as a, useTheme} from '#/alf' @@ -33,7 +33,7 @@ export let MessageMenu = ({ const {_} = useLingui() const t = useTheme() const {currentAccount} = useSession() - const chat = useChat() + const convo = useConvo() const deleteControl = usePromptControl() const retryDeleteControl = usePromptControl() @@ -48,14 +48,14 @@ export let MessageMenu = ({ }, [_, message.text]) const onDelete = React.useCallback(() => { - if (chat.status !== ConvoStatus.Ready) return + if (convo.status !== ConvoStatus.Ready) return LayoutAnimation.configureNext(LayoutAnimation.Presets.easeInEaseOut) - chat + convo .deleteMessage(message.id) .then(() => Toast.show(_(msg`Message deleted`))) .catch(() => retryDeleteControl.open()) - }, [_, chat, message.id, retryDeleteControl]) + }, [_, convo, message.id, retryDeleteControl]) const onReport = React.useCallback(() => { // TODO report the message diff --git a/src/screens/Messages/Conversation/MessageListError.tsx b/src/screens/Messages/Conversation/MessageListError.tsx index 7c6fd02c1..5f5df4fc9 100644 --- a/src/screens/Messages/Conversation/MessageListError.tsx +++ b/src/screens/Messages/Conversation/MessageListError.tsx @@ -3,7 +3,7 @@ import {View} from 'react-native' import {msg} from '@lingui/macro' import {useLingui} from '@lingui/react' -import {ConvoItem, ConvoItemError} from '#/state/messages/convo' +import {ConvoItem, ConvoItemError} from '#/state/messages/convo/types' import {atoms as a, useTheme} from '#/alf' import {CircleInfo_Stroke2_Corner0_Rounded as CircleInfo} from '#/components/icons/CircleInfo' import {InlineLinkText} from '#/components/Link' diff --git a/src/screens/Messages/Conversation/MessagesList.tsx b/src/screens/Messages/Conversation/MessagesList.tsx index 1dc26d6c3..49e780aa2 100644 --- a/src/screens/Messages/Conversation/MessagesList.tsx +++ b/src/screens/Messages/Conversation/MessagesList.tsx @@ -11,8 +11,8 @@ import {msg, Trans} from '@lingui/macro' import {useLingui} from '@lingui/react' import {isIOS} from '#/platform/detection' -import {useChat} from '#/state/messages' -import {ConvoItem, ConvoStatus} from '#/state/messages/convo' +import {useConvo} from '#/state/messages/convo' +import {ConvoItem, ConvoStatus} from '#/state/messages/convo/types' import {ScrollProvider} from 'lib/ScrollContext' import {isWeb} from 'platform/detection' import {List} from 'view/com/util/List' @@ -86,7 +86,7 @@ function onScrollToIndexFailed() { } export function MessagesList() { - const chat = useChat() + const convo = useConvo() const flatListRef = useRef(null) // We need to keep track of when the scroll offset is at the bottom of the list to know when to scroll as new items @@ -153,20 +153,20 @@ export function MessagesList() { // The check for `hasInitiallyScrolled` prevents an initial fetch on mount. FlatList triggers `onStartReached` // immediately on mount, since we are in fact at an offset of zero, so we have to ignore those initial calls. const onStartReached = useCallback(() => { - if (chat.status === ConvoStatus.Ready && hasInitiallyScrolled) { - chat.fetchMessageHistory() + if (convo.status === ConvoStatus.Ready && hasInitiallyScrolled) { + convo.fetchMessageHistory() } - }, [chat, hasInitiallyScrolled]) + }, [convo, hasInitiallyScrolled]) const onSendMessage = useCallback( (text: string) => { - if (chat.status === ConvoStatus.Ready) { - chat.sendMessage({ + if (convo.status === ConvoStatus.Ready) { + convo.sendMessage({ text, }) } }, - [chat], + [convo], ) const onScroll = React.useCallback( @@ -229,7 +229,7 @@ export function MessagesList() { + } /> diff --git a/src/screens/Messages/Conversation/index.tsx b/src/screens/Messages/Conversation/index.tsx index 7b78e7e61..db22f9e31 100644 --- a/src/screens/Messages/Conversation/index.tsx +++ b/src/screens/Messages/Conversation/index.tsx @@ -13,8 +13,8 @@ import {useGate} from '#/lib/statsig/statsig' import {useCurrentConvoId} from '#/state/messages/current-convo-id' import {BACK_HITSLOP} from 'lib/constants' import {isWeb} from 'platform/detection' -import {ChatProvider, useChat} from 'state/messages' -import {ConvoStatus} from 'state/messages/convo' +import {ConvoProvider, useConvo} from 'state/messages/convo' +import {ConvoStatus} from 'state/messages/convo/types' import {PreviewableUserAvatar} from 'view/com/util/UserAvatar' import {CenteredView} from 'view/com/util/Views' import {MessagesList} from '#/screens/Messages/Conversation/MessagesList' @@ -46,23 +46,23 @@ export function MessagesConversationScreen({route}: Props) { if (!gate('dms')) return return ( - + - + ) } function Inner() { - const chat = useChat() + const convo = useConvo() if ( - chat.status === ConvoStatus.Uninitialized || - chat.status === ConvoStatus.Initializing + convo.status === ConvoStatus.Uninitialized || + convo.status === ConvoStatus.Initializing ) { return } - if (chat.status === ConvoStatus.Error) { + if (convo.status === ConvoStatus.Error) { // TODO return ( @@ -71,7 +71,7 @@ function Inner() { @@ -81,13 +81,13 @@ function Inner() { } /* - * Any other chat states (atm) are "ready" states + * Any other convo states (atm) are "ready" states */ return ( -
+
@@ -103,7 +103,7 @@ let Header = ({ const {_} = useLingui() const {gtTablet} = useBreakpoints() const navigation = useNavigation() - const chat = useChat() + const convo = useConvo() const onPressBack = useCallback(() => { if (isWeb) { @@ -157,9 +157,9 @@ let Header = ({ {profile.displayName} - {chat.status === ConvoStatus.Ready ? ( + {convo.status === ConvoStatus.Ready ? ( void -} - -export enum ConvoDispatchEvent { - Init = 'init', - Ready = 'ready', - Resume = 'resume', - Background = 'background', - Suspend = 'suspend', - Error = 'error', -} - -export type ConvoDispatch = - | { - event: ConvoDispatchEvent.Init - } - | { - event: ConvoDispatchEvent.Ready - } - | { - event: ConvoDispatchEvent.Resume - } - | { - event: ConvoDispatchEvent.Background - } - | { - event: ConvoDispatchEvent.Suspend - } - | { - event: ConvoDispatchEvent.Error - payload: ConvoError - } - -export type ConvoItem = - | { - type: 'message' | 'pending-message' - key: string - message: ChatBskyConvoDefs.MessageView - nextMessage: - | ChatBskyConvoDefs.MessageView - | ChatBskyConvoDefs.DeletedMessageView - | null - } - | { - type: 'deleted-message' - key: string - message: ChatBskyConvoDefs.DeletedMessageView - nextMessage: - | ChatBskyConvoDefs.MessageView - | ChatBskyConvoDefs.DeletedMessageView - | null - } - | { - type: 'pending-retry' - key: string - retry: () => void - } - | { - type: 'error-recoverable' - key: string - code: ConvoItemError - retry: () => void - } - -export type ConvoState = - | { - status: ConvoStatus.Uninitialized - items: [] - convo: undefined - error: undefined - sender: undefined - recipients: undefined - isFetchingHistory: false - deleteMessage: undefined - sendMessage: undefined - fetchMessageHistory: undefined - } - | { - status: ConvoStatus.Initializing - items: [] - convo: undefined - error: undefined - sender: undefined - recipients: undefined - isFetchingHistory: boolean - deleteMessage: undefined - sendMessage: undefined - fetchMessageHistory: undefined - } - | { - status: ConvoStatus.Ready - items: ConvoItem[] - convo: ChatBskyConvoDefs.ConvoView - error: undefined - sender: AppBskyActorDefs.ProfileViewBasic - recipients: AppBskyActorDefs.ProfileViewBasic[] - isFetchingHistory: boolean - deleteMessage: (messageId: string) => Promise - sendMessage: ( - message: ChatBskyConvoSendMessage.InputSchema['message'], - ) => void - fetchMessageHistory: () => void - } - | { - status: ConvoStatus.Suspended - items: ConvoItem[] - convo: ChatBskyConvoDefs.ConvoView - error: undefined - sender: AppBskyActorDefs.ProfileViewBasic - recipients: AppBskyActorDefs.ProfileViewBasic[] - isFetchingHistory: boolean - deleteMessage: (messageId: string) => Promise - sendMessage: ( - message: ChatBskyConvoSendMessage.InputSchema['message'], - ) => Promise - fetchMessageHistory: () => Promise - } - | { - status: ConvoStatus.Backgrounded - items: ConvoItem[] - convo: ChatBskyConvoDefs.ConvoView - error: undefined - sender: AppBskyActorDefs.ProfileViewBasic - recipients: AppBskyActorDefs.ProfileViewBasic[] - isFetchingHistory: boolean - deleteMessage: (messageId: string) => Promise - sendMessage: ( - message: ChatBskyConvoSendMessage.InputSchema['message'], - ) => Promise - fetchMessageHistory: () => Promise - } - | { - status: ConvoStatus.Error - items: [] - convo: undefined - error: any - sender: undefined - recipients: undefined - isFetchingHistory: false - deleteMessage: undefined - sendMessage: undefined - fetchMessageHistory: undefined - } - -const ACTIVE_POLL_INTERVAL = 1e3 -const BACKGROUND_POLL_INTERVAL = 10e3 - -// TODO temporary -let DEBUG_ACTIVE_CHAT: string | undefined - -export function isConvoItemMessage( - item: ConvoItem, -): item is ConvoItem & {type: 'message'} { - if (!item) return false - return ( - item.type === 'message' || - item.type === 'deleted-message' || - item.type === 'pending-message' - ) -} - -export class Convo { - private id: string - - private agent: BskyAgent - private __tempFromUserDid: string - - private status: ConvoStatus = ConvoStatus.Uninitialized - private pollInterval = ACTIVE_POLL_INTERVAL - private error: - | { - code: ConvoErrorCode - exception?: Error - retry: () => void - } - | undefined - private historyCursor: string | undefined | null = undefined - private isFetchingHistory = false - private eventsCursor: string | undefined = undefined - - private pastMessages: Map< - string, - ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView - > = new Map() - private newMessages: Map< - string, - ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView - > = new Map() - private pendingMessages: Map< - string, - {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} - > = new Map() - private deletedMessages: Set = new Set() - private footerItems: Map = new Map() - private headerItems: Map = new Map() - - private isProcessingPendingMessages = false - private pendingPoll: Promise | undefined - private nextPoll: NodeJS.Timeout | undefined - - convoId: string - convo: ChatBskyConvoDefs.ConvoView | undefined - sender: AppBskyActorDefs.ProfileViewBasic | undefined - recipients: AppBskyActorDefs.ProfileViewBasic[] | undefined = undefined - snapshot: ConvoState | undefined - - constructor(params: ConvoParams) { - this.id = nanoid(3) - this.convoId = params.convoId - this.agent = params.agent - this.__tempFromUserDid = params.__tempFromUserDid - - this.subscribe = this.subscribe.bind(this) - this.getSnapshot = this.getSnapshot.bind(this) - this.sendMessage = this.sendMessage.bind(this) - this.deleteMessage = this.deleteMessage.bind(this) - this.fetchMessageHistory = this.fetchMessageHistory.bind(this) - - if (DEBUG_ACTIVE_CHAT) { - logger.error(`Convo: another chat was already active`, { - convoId: this.convoId, - }) - } else { - DEBUG_ACTIVE_CHAT = this.convoId - } - } - - private commit() { - this.snapshot = undefined - this.subscribers.forEach(subscriber => subscriber()) - } - - private subscribers: (() => void)[] = [] - - subscribe(subscriber: () => void) { - if (this.subscribers.length === 0) this.init() - - this.subscribers.push(subscriber) - - return () => { - this.subscribers = this.subscribers.filter(s => s !== subscriber) - if (this.subscribers.length === 0) this.suspend() - } - } - - getSnapshot(): ConvoState { - if (!this.snapshot) this.snapshot = this.generateSnapshot() - // logger.debug('Convo: snapshotted', {}, logger.DebugContext.convo) - return this.snapshot - } - - private generateSnapshot(): ConvoState { - switch (this.status) { - case ConvoStatus.Initializing: { - return { - status: ConvoStatus.Initializing, - items: [], - convo: undefined, - error: undefined, - sender: undefined, - recipients: undefined, - isFetchingHistory: this.isFetchingHistory, - deleteMessage: undefined, - sendMessage: undefined, - fetchMessageHistory: undefined, - } - } - case ConvoStatus.Suspended: - case ConvoStatus.Backgrounded: - case ConvoStatus.Ready: { - return { - status: this.status, - items: this.getItems(), - convo: this.convo!, - error: undefined, - sender: this.sender!, - recipients: this.recipients!, - isFetchingHistory: this.isFetchingHistory, - deleteMessage: this.deleteMessage, - sendMessage: this.sendMessage, - fetchMessageHistory: this.fetchMessageHistory, - } - } - case ConvoStatus.Error: { - return { - status: ConvoStatus.Error, - items: [], - convo: undefined, - error: this.error, - sender: undefined, - recipients: undefined, - isFetchingHistory: false, - deleteMessage: undefined, - sendMessage: undefined, - fetchMessageHistory: undefined, - } - } - default: { - return { - status: ConvoStatus.Uninitialized, - items: [], - convo: undefined, - error: undefined, - sender: undefined, - recipients: undefined, - isFetchingHistory: false, - deleteMessage: undefined, - sendMessage: undefined, - fetchMessageHistory: undefined, - } - } - } - } - - dispatch(action: ConvoDispatch) { - const prevStatus = this.status - - switch (this.status) { - case ConvoStatus.Uninitialized: { - switch (action.event) { - case ConvoDispatchEvent.Init: { - this.status = ConvoStatus.Initializing - this.setup() - break - } - } - break - } - case ConvoStatus.Initializing: { - switch (action.event) { - case ConvoDispatchEvent.Ready: { - this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.fetchMessageHistory().then(() => { - this.restartPoll() - }) - break - } - case ConvoDispatchEvent.Background: { - this.status = ConvoStatus.Backgrounded - this.pollInterval = BACKGROUND_POLL_INTERVAL - this.fetchMessageHistory().then(() => { - this.restartPoll() - }) - break - } - case ConvoDispatchEvent.Suspend: { - this.status = ConvoStatus.Suspended - break - } - case ConvoDispatchEvent.Error: { - this.status = ConvoStatus.Error - this.error = action.payload - break - } - } - break - } - case ConvoStatus.Ready: { - switch (action.event) { - case ConvoDispatchEvent.Resume: { - this.refreshConvo() - this.restartPoll() - break - } - case ConvoDispatchEvent.Background: { - this.status = ConvoStatus.Backgrounded - this.pollInterval = BACKGROUND_POLL_INTERVAL - this.restartPoll() - break - } - case ConvoDispatchEvent.Suspend: { - this.status = ConvoStatus.Suspended - this.cancelNextPoll() - break - } - case ConvoDispatchEvent.Error: { - this.status = ConvoStatus.Error - this.error = action.payload - this.cancelNextPoll() - break - } - } - break - } - case ConvoStatus.Backgrounded: { - switch (action.event) { - case ConvoDispatchEvent.Resume: { - this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.refreshConvo() - // TODO truncate history if needed - this.restartPoll() - break - } - case ConvoDispatchEvent.Suspend: { - this.status = ConvoStatus.Suspended - this.cancelNextPoll() - break - } - case ConvoDispatchEvent.Error: { - this.status = ConvoStatus.Error - this.error = action.payload - this.cancelNextPoll() - break - } - } - break - } - case ConvoStatus.Suspended: { - switch (action.event) { - case ConvoDispatchEvent.Init: { - this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.refreshConvo() - // TODO truncate history if needed - this.restartPoll() - break - } - case ConvoDispatchEvent.Resume: { - this.status = ConvoStatus.Ready - this.pollInterval = ACTIVE_POLL_INTERVAL - this.refreshConvo() - this.restartPoll() - break - } - case ConvoDispatchEvent.Error: { - this.status = ConvoStatus.Error - this.error = action.payload - break - } - } - break - } - case ConvoStatus.Error: { - switch (action.event) { - case ConvoDispatchEvent.Init: { - this.reset() - break - } - case ConvoDispatchEvent.Resume: { - this.reset() - break - } - case ConvoDispatchEvent.Suspend: { - this.status = ConvoStatus.Suspended - break - } - case ConvoDispatchEvent.Error: { - this.status = ConvoStatus.Error - this.error = action.payload - break - } - } - break - } - default: - break - } - - logger.debug( - `Convo: dispatch '${action.event}'`, - { - id: this.id, - prev: prevStatus, - next: this.status, - }, - logger.DebugContext.convo, - ) - - this.commit() - } - - private reset() { - this.convo = undefined - this.sender = undefined - this.recipients = undefined - this.snapshot = undefined - - this.status = ConvoStatus.Uninitialized - this.error = undefined - this.historyCursor = undefined - this.eventsCursor = undefined - - this.pastMessages = new Map() - this.newMessages = new Map() - this.pendingMessages = new Map() - this.deletedMessages = new Set() - this.footerItems = new Map() - this.headerItems = new Map() - - this.dispatch({event: ConvoDispatchEvent.Init}) - } - - private async setup() { - try { - const {convo, sender, recipients} = await this.fetchConvo() - - this.convo = convo - this.sender = sender - this.recipients = recipients - - /* - * Some validation prior to `Ready` status - */ - if (!this.convo) { - throw new Error('Convo: could not find convo') - } - if (!this.sender) { - throw new Error('Convo: could not find sender in convo') - } - if (!this.recipients) { - throw new Error('Convo: could not find recipients in convo') - } - - // await new Promise(y => setTimeout(y, 2000)) - // throw new Error('UNCOMMENT TO TEST INIT FAILURE') - this.dispatch({event: ConvoDispatchEvent.Ready}) - } catch (e: any) { - logger.error('Convo: setup() failed') - - this.dispatch({ - event: ConvoDispatchEvent.Error, - payload: { - exception: e, - code: ConvoErrorCode.InitFailed, - retry: () => { - this.reset() - }, - }, - }) - } - } - - init() { - this.dispatch({event: ConvoDispatchEvent.Init}) - } - - resume() { - this.dispatch({event: ConvoDispatchEvent.Resume}) - } - - background() { - this.dispatch({event: ConvoDispatchEvent.Background}) - } - - suspend() { - this.dispatch({event: ConvoDispatchEvent.Suspend}) - DEBUG_ACTIVE_CHAT = undefined - } - - private pendingFetchConvo: - | Promise<{ - convo: ChatBskyConvoDefs.ConvoView - sender: AppBskyActorDefs.ProfileViewBasic | undefined - recipients: AppBskyActorDefs.ProfileViewBasic[] - }> - | undefined - async fetchConvo() { - if (this.pendingFetchConvo) return this.pendingFetchConvo - - this.pendingFetchConvo = new Promise<{ - convo: ChatBskyConvoDefs.ConvoView - sender: AppBskyActorDefs.ProfileViewBasic | undefined - recipients: AppBskyActorDefs.ProfileViewBasic[] - }>(async (resolve, reject) => { - try { - const response = await this.agent.api.chat.bsky.convo.getConvo( - { - convoId: this.convoId, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - - const convo = response.data.convo - - resolve({ - convo, - sender: convo.members.find(m => m.did === this.__tempFromUserDid), - recipients: convo.members.filter( - m => m.did !== this.__tempFromUserDid, - ), - }) - } catch (e) { - reject(e) - } finally { - this.pendingFetchConvo = undefined - } - }) - - return this.pendingFetchConvo - } - - async refreshConvo() { - try { - const {convo, sender, recipients} = await this.fetchConvo() - // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE') - this.convo = convo || this.convo - this.sender = sender || this.sender - this.recipients = recipients || this.recipients - } catch (e: any) { - logger.error(`Convo: failed to refresh convo`) - - this.footerItems.set(ConvoItemError.Network, { - type: 'error-recoverable', - key: ConvoItemError.Network, - code: ConvoItemError.Network, - retry: () => { - this.footerItems.delete(ConvoItemError.Network) - this.resume() - }, - }) - this.commit() - } - } - - async fetchMessageHistory() { - logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo) - - /* - * If historyCursor is null, we've fetched all history. - */ - if (this.historyCursor === null) return - - /* - * Don't fetch again if a fetch is already in progress - */ - if (this.isFetchingHistory) return - - /* - * If we've rendered a retry state for history fetching, exit. Upon retry, - * this will be removed and we'll try again. - */ - if (this.headerItems.has(ConvoItemError.HistoryFailed)) return - - try { - this.isFetchingHistory = true - this.commit() - - /* - * Delay if paginating while scrolled to prevent momentum scrolling from - * jerking the list around, plus makes it feel a little more human. - */ - if (this.pastMessages.size > 0) { - await new Promise(y => setTimeout(y, 500)) - // throw new Error('UNCOMMENT TO TEST RETRY') - } - - const response = await this.agent.api.chat.bsky.convo.getMessages( - { - cursor: this.historyCursor, - convoId: this.convoId, - limit: isNative ? 25 : 50, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const {cursor, messages} = response.data - - this.historyCursor = cursor ?? null - - for (const message of messages) { - if ( - ChatBskyConvoDefs.isMessageView(message) || - ChatBskyConvoDefs.isDeletedMessageView(message) - ) { - this.pastMessages.set(message.id, message) - - // set to latest rev - if ( - message.rev > (this.eventsCursor = this.eventsCursor || message.rev) - ) { - this.eventsCursor = message.rev - } - } - } - } catch (e: any) { - logger.error('Convo: failed to fetch message history') - - this.headerItems.set(ConvoItemError.HistoryFailed, { - type: 'error-recoverable', - key: ConvoItemError.HistoryFailed, - code: ConvoItemError.HistoryFailed, - retry: () => { - this.headerItems.delete(ConvoItemError.HistoryFailed) - this.fetchMessageHistory() - }, - }) - } finally { - this.isFetchingHistory = false - this.commit() - } - } - - private restartPoll() { - this.cancelNextPoll() - this.pollLatestEvents() - } - - private cancelNextPoll() { - if (this.nextPoll) clearTimeout(this.nextPoll) - } - - private pollLatestEvents() { - /* - * Uncomment to view poll events - */ - logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo) - - try { - this.fetchLatestEvents().then(({events}) => { - this.applyLatestEvents(events) - }) - this.nextPoll = setTimeout(() => { - this.pollLatestEvents() - }, this.pollInterval) - } catch (e: any) { - logger.error('Convo: poll events failed') - - this.cancelNextPoll() - - this.footerItems.set(ConvoItemError.PollFailed, { - type: 'error-recoverable', - key: ConvoItemError.PollFailed, - code: ConvoItemError.PollFailed, - retry: () => { - this.footerItems.delete(ConvoItemError.PollFailed) - this.commit() - this.pollLatestEvents() - }, - }) - - this.commit() - } - } - - private pendingFetchLatestEvents: - | Promise<{ - events: ChatBskyConvoGetLog.OutputSchema['logs'] - }> - | undefined - async fetchLatestEvents() { - if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents - - this.pendingFetchLatestEvents = new Promise<{ - events: ChatBskyConvoGetLog.OutputSchema['logs'] - }>(async (resolve, reject) => { - try { - // throw new Error('UNCOMMENT TO TEST POLL FAILURE') - const response = await this.agent.api.chat.bsky.convo.getLog( - { - cursor: this.eventsCursor, - }, - { - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const {logs} = response.data - resolve({events: logs}) - } catch (e) { - reject(e) - } finally { - this.pendingFetchLatestEvents = undefined - } - }) - - return this.pendingFetchLatestEvents - } - - private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) { - let needsCommit = false - - for (const ev of events) { - /* - * If there's a rev, we should handle it. If there's not a rev, we don't - * know what it is. - */ - if (typeof ev.rev === 'string') { - /* - * We only care about new events - */ - if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) { - /* - * Update rev regardless of if it's a ev type we care about or not - */ - this.eventsCursor = ev.rev - - /* - * This is VERY important. We don't want to insert any messages from - * your other chats. - */ - if (ev.convoId !== this.convoId) continue - - if ( - ChatBskyConvoDefs.isLogCreateMessage(ev) && - ChatBskyConvoDefs.isMessageView(ev.message) - ) { - if (this.newMessages.has(ev.message.id)) { - // Trust the ev as the source of truth on ordering - this.newMessages.delete(ev.message.id) - } - this.newMessages.set(ev.message.id, ev.message) - needsCommit = true - } else if ( - ChatBskyConvoDefs.isLogDeleteMessage(ev) && - ChatBskyConvoDefs.isDeletedMessageView(ev.message) - ) { - /* - * Update if we have this in state. If we don't, don't worry about it. - */ - if (this.pastMessages.has(ev.message.id)) { - /* - * For now, we remove deleted messages from the thread, if we receive one. - * - * To support them, it'd look something like this: - * this.pastMessages.set(ev.message.id, ev.message) - */ - this.pastMessages.delete(ev.message.id) - this.newMessages.delete(ev.message.id) - this.deletedMessages.delete(ev.message.id) - needsCommit = true - } - } - } - } - } - - if (needsCommit) { - this.commit() - } - } - - async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { - // Ignore empty messages for now since they have no other purpose atm - if (!message.text.trim()) return - - logger.debug('Convo: send message', {}, logger.DebugContext.convo) - - const tempId = nanoid() - - this.pendingMessages.set(tempId, { - id: tempId, - message, - }) - this.commit() - - if (!this.isProcessingPendingMessages) { - this.processPendingMessages() - } - } - - async processPendingMessages() { - logger.debug( - `Convo: processing messages (${this.pendingMessages.size} remaining)`, - {}, - logger.DebugContext.convo, - ) - - const pendingMessage = Array.from(this.pendingMessages.values()).shift() - - /* - * If there are no pending messages, we're done. - */ - if (!pendingMessage) { - this.isProcessingPendingMessages = false - return - } - - try { - this.isProcessingPendingMessages = true - - // throw new Error('UNCOMMENT TO TEST RETRY') - const {id, message} = pendingMessage - - const response = await this.agent.api.chat.bsky.convo.sendMessage( - { - convoId: this.convoId, - message, - }, - { - encoding: 'application/json', - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const res = response.data - - /* - * Insert into `newMessages` as soon as we have a real ID. That way, when - * we get an event log back, we can replace in situ. - */ - this.newMessages.set(res.id, { - ...res, - $type: 'chat.bsky.convo.defs#messageView', - sender: this.sender, - }) - this.pendingMessages.delete(id) - - await this.processPendingMessages() - - this.commit() - } catch (e) { - this.footerItems.set('pending-retry', { - type: 'pending-retry', - key: 'pending-retry', - retry: this.batchRetryPendingMessages.bind(this), - }) - this.commit() - } - } - - async batchRetryPendingMessages() { - logger.debug( - `Convo: retrying ${this.pendingMessages.size} pending messages`, - {}, - logger.DebugContext.convo, - ) - - this.footerItems.delete('pending-retry') - this.commit() - - try { - const messageArray = Array.from(this.pendingMessages.values()) - const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch( - { - items: messageArray.map(({message}) => ({ - convoId: this.convoId, - message, - })), - }, - { - encoding: 'application/json', - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - const {items} = data - - /* - * Insert into `newMessages` as soon as we have a real ID. That way, when - * we get an event log back, we can replace in situ. - */ - for (const item of items) { - this.newMessages.set(item.id, { - ...item, - $type: 'chat.bsky.convo.defs#messageView', - sender: this.convo?.members.find( - m => m.did === this.__tempFromUserDid, - ), - }) - } - - for (const pendingMessage of messageArray) { - this.pendingMessages.delete(pendingMessage.id) - } - - this.commit() - } catch (e) { - this.footerItems.set('pending-retry', { - type: 'pending-retry', - key: 'pending-retry', - retry: this.batchRetryPendingMessages.bind(this), - }) - this.commit() - } - } - - async deleteMessage(messageId: string) { - logger.debug('Convo: delete message', {}, logger.DebugContext.convo) - - this.deletedMessages.add(messageId) - this.commit() - - try { - await this.agent.api.chat.bsky.convo.deleteMessageForSelf( - { - convoId: this.convoId, - messageId, - }, - { - encoding: 'application/json', - headers: { - Authorization: this.__tempFromUserDid, - }, - }, - ) - } catch (e) { - this.deletedMessages.delete(messageId) - this.commit() - throw e - } - } - - /* - * Items in reverse order, since FlatList inverts - */ - getItems(): ConvoItem[] { - const items: ConvoItem[] = [] - - this.headerItems.forEach(item => { - items.push(item) - }) - - this.pastMessages.forEach(m => { - if (ChatBskyConvoDefs.isMessageView(m)) { - items.unshift({ - type: 'message', - key: m.id, - message: m, - nextMessage: null, - }) - } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { - items.unshift({ - type: 'deleted-message', - key: m.id, - message: m, - nextMessage: null, - }) - } - }) - - this.newMessages.forEach(m => { - if (ChatBskyConvoDefs.isMessageView(m)) { - items.push({ - type: 'message', - key: m.id, - message: m, - nextMessage: null, - }) - } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { - items.push({ - type: 'deleted-message', - key: m.id, - message: m, - nextMessage: null, - }) - } - }) - - this.pendingMessages.forEach(m => { - items.push({ - type: 'pending-message', - key: m.id, - message: { - ...m.message, - id: nanoid(), - rev: '__fake__', - sentAt: new Date().toISOString(), - sender: this.sender, - }, - nextMessage: null, - }) - }) - - this.footerItems.forEach(item => { - items.push(item) - }) - - return items - .filter(item => { - if (isConvoItemMessage(item)) { - return !this.deletedMessages.has(item.message.id) - } - return true - }) - .map((item, i, arr) => { - let nextMessage = null - const isMessage = isConvoItemMessage(item) - - if (isMessage) { - if ( - isMessage && - (ChatBskyConvoDefs.isMessageView(item.message) || - ChatBskyConvoDefs.isDeletedMessageView(item.message)) - ) { - const next = arr[i + 1] - - if ( - isConvoItemMessage(next) && - next && - (ChatBskyConvoDefs.isMessageView(next.message) || - ChatBskyConvoDefs.isDeletedMessageView(next.message)) - ) { - nextMessage = next.message - } - } - - return { - ...item, - nextMessage, - } - } - - return item - }) - } -} diff --git a/src/state/messages/convo/agent.ts b/src/state/messages/convo/agent.ts new file mode 100644 index 000000000..38a3f5e62 --- /dev/null +++ b/src/state/messages/convo/agent.ts @@ -0,0 +1,985 @@ +import {AppBskyActorDefs} from '@atproto/api' +import { + BskyAgent, + ChatBskyConvoDefs, + ChatBskyConvoGetLog, + ChatBskyConvoSendMessage, +} from '@atproto-labs/api' +import {nanoid} from 'nanoid/non-secure' + +import {logger} from '#/logger' +import {isNative} from '#/platform/detection' +import { + ConvoDispatch, + ConvoDispatchEvent, + ConvoErrorCode, + ConvoItem, + ConvoItemError, + ConvoParams, + ConvoState, + ConvoStatus, +} from '#/state/messages/convo/types' + +const ACTIVE_POLL_INTERVAL = 1e3 +const BACKGROUND_POLL_INTERVAL = 10e3 + +// TODO temporary +let DEBUG_ACTIVE_CHAT: string | undefined + +export function isConvoItemMessage( + item: ConvoItem, +): item is ConvoItem & {type: 'message'} { + if (!item) return false + return ( + item.type === 'message' || + item.type === 'deleted-message' || + item.type === 'pending-message' + ) +} + +export class Convo { + private id: string + + private agent: BskyAgent + private __tempFromUserDid: string + + private status: ConvoStatus = ConvoStatus.Uninitialized + private pollInterval = ACTIVE_POLL_INTERVAL + private error: + | { + code: ConvoErrorCode + exception?: Error + retry: () => void + } + | undefined + private historyCursor: string | undefined | null = undefined + private isFetchingHistory = false + private eventsCursor: string | undefined = undefined + + private pastMessages: Map< + string, + ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView + > = new Map() + private newMessages: Map< + string, + ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView + > = new Map() + private pendingMessages: Map< + string, + {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} + > = new Map() + private deletedMessages: Set = new Set() + private footerItems: Map = new Map() + private headerItems: Map = new Map() + + private isProcessingPendingMessages = false + private nextPoll: NodeJS.Timeout | undefined + + convoId: string + convo: ChatBskyConvoDefs.ConvoView | undefined + sender: AppBskyActorDefs.ProfileViewBasic | undefined + recipients: AppBskyActorDefs.ProfileViewBasic[] | undefined = undefined + snapshot: ConvoState | undefined + + constructor(params: ConvoParams) { + this.id = nanoid(3) + this.convoId = params.convoId + this.agent = params.agent + this.__tempFromUserDid = params.__tempFromUserDid + + this.subscribe = this.subscribe.bind(this) + this.getSnapshot = this.getSnapshot.bind(this) + this.sendMessage = this.sendMessage.bind(this) + this.deleteMessage = this.deleteMessage.bind(this) + this.fetchMessageHistory = this.fetchMessageHistory.bind(this) + + if (DEBUG_ACTIVE_CHAT) { + logger.error(`Convo: another chat was already active`, { + convoId: this.convoId, + }) + } else { + DEBUG_ACTIVE_CHAT = this.convoId + } + } + + private commit() { + this.snapshot = undefined + this.subscribers.forEach(subscriber => subscriber()) + } + + private subscribers: (() => void)[] = [] + + subscribe(subscriber: () => void) { + if (this.subscribers.length === 0) this.init() + + this.subscribers.push(subscriber) + + return () => { + this.subscribers = this.subscribers.filter(s => s !== subscriber) + if (this.subscribers.length === 0) this.suspend() + } + } + + getSnapshot(): ConvoState { + if (!this.snapshot) this.snapshot = this.generateSnapshot() + // logger.debug('Convo: snapshotted', {}, logger.DebugContext.convo) + return this.snapshot + } + + private generateSnapshot(): ConvoState { + switch (this.status) { + case ConvoStatus.Initializing: { + return { + status: ConvoStatus.Initializing, + items: [], + convo: undefined, + error: undefined, + sender: undefined, + recipients: undefined, + isFetchingHistory: this.isFetchingHistory, + deleteMessage: undefined, + sendMessage: undefined, + fetchMessageHistory: undefined, + } + } + case ConvoStatus.Suspended: + case ConvoStatus.Backgrounded: + case ConvoStatus.Ready: { + return { + status: this.status, + items: this.getItems(), + convo: this.convo!, + error: undefined, + sender: this.sender!, + recipients: this.recipients!, + isFetchingHistory: this.isFetchingHistory, + deleteMessage: this.deleteMessage, + sendMessage: this.sendMessage, + fetchMessageHistory: this.fetchMessageHistory, + } + } + case ConvoStatus.Error: { + return { + status: ConvoStatus.Error, + items: [], + convo: undefined, + error: this.error, + sender: undefined, + recipients: undefined, + isFetchingHistory: false, + deleteMessage: undefined, + sendMessage: undefined, + fetchMessageHistory: undefined, + } + } + default: { + return { + status: ConvoStatus.Uninitialized, + items: [], + convo: undefined, + error: undefined, + sender: undefined, + recipients: undefined, + isFetchingHistory: false, + deleteMessage: undefined, + sendMessage: undefined, + fetchMessageHistory: undefined, + } + } + } + } + + dispatch(action: ConvoDispatch) { + const prevStatus = this.status + + switch (this.status) { + case ConvoStatus.Uninitialized: { + switch (action.event) { + case ConvoDispatchEvent.Init: { + this.status = ConvoStatus.Initializing + this.setup() + break + } + } + break + } + case ConvoStatus.Initializing: { + switch (action.event) { + case ConvoDispatchEvent.Ready: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.fetchMessageHistory().then(() => { + this.restartPoll() + }) + break + } + case ConvoDispatchEvent.Background: { + this.status = ConvoStatus.Backgrounded + this.pollInterval = BACKGROUND_POLL_INTERVAL + this.fetchMessageHistory().then(() => { + this.restartPoll() + }) + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + break + } + } + break + } + case ConvoStatus.Ready: { + switch (action.event) { + case ConvoDispatchEvent.Resume: { + this.refreshConvo() + this.restartPoll() + break + } + case ConvoDispatchEvent.Background: { + this.status = ConvoStatus.Backgrounded + this.pollInterval = BACKGROUND_POLL_INTERVAL + this.restartPoll() + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + this.cancelNextPoll() + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + this.cancelNextPoll() + break + } + } + break + } + case ConvoStatus.Backgrounded: { + switch (action.event) { + case ConvoDispatchEvent.Resume: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.refreshConvo() + // TODO truncate history if needed + this.restartPoll() + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + this.cancelNextPoll() + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + this.cancelNextPoll() + break + } + } + break + } + case ConvoStatus.Suspended: { + switch (action.event) { + case ConvoDispatchEvent.Init: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.refreshConvo() + // TODO truncate history if needed + this.restartPoll() + break + } + case ConvoDispatchEvent.Resume: { + this.status = ConvoStatus.Ready + this.pollInterval = ACTIVE_POLL_INTERVAL + this.refreshConvo() + this.restartPoll() + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + break + } + } + break + } + case ConvoStatus.Error: { + switch (action.event) { + case ConvoDispatchEvent.Init: { + this.reset() + break + } + case ConvoDispatchEvent.Resume: { + this.reset() + break + } + case ConvoDispatchEvent.Suspend: { + this.status = ConvoStatus.Suspended + break + } + case ConvoDispatchEvent.Error: { + this.status = ConvoStatus.Error + this.error = action.payload + break + } + } + break + } + default: + break + } + + logger.debug( + `Convo: dispatch '${action.event}'`, + { + id: this.id, + prev: prevStatus, + next: this.status, + }, + logger.DebugContext.convo, + ) + + this.commit() + } + + private reset() { + this.convo = undefined + this.sender = undefined + this.recipients = undefined + this.snapshot = undefined + + this.status = ConvoStatus.Uninitialized + this.error = undefined + this.historyCursor = undefined + this.eventsCursor = undefined + + this.pastMessages = new Map() + this.newMessages = new Map() + this.pendingMessages = new Map() + this.deletedMessages = new Set() + this.footerItems = new Map() + this.headerItems = new Map() + + this.dispatch({event: ConvoDispatchEvent.Init}) + } + + private async setup() { + try { + const {convo, sender, recipients} = await this.fetchConvo() + + this.convo = convo + this.sender = sender + this.recipients = recipients + + /* + * Some validation prior to `Ready` status + */ + if (!this.convo) { + throw new Error('Convo: could not find convo') + } + if (!this.sender) { + throw new Error('Convo: could not find sender in convo') + } + if (!this.recipients) { + throw new Error('Convo: could not find recipients in convo') + } + + // await new Promise(y => setTimeout(y, 2000)) + // throw new Error('UNCOMMENT TO TEST INIT FAILURE') + this.dispatch({event: ConvoDispatchEvent.Ready}) + } catch (e: any) { + logger.error('Convo: setup() failed') + + this.dispatch({ + event: ConvoDispatchEvent.Error, + payload: { + exception: e, + code: ConvoErrorCode.InitFailed, + retry: () => { + this.reset() + }, + }, + }) + } + } + + init() { + this.dispatch({event: ConvoDispatchEvent.Init}) + } + + resume() { + this.dispatch({event: ConvoDispatchEvent.Resume}) + } + + background() { + this.dispatch({event: ConvoDispatchEvent.Background}) + } + + suspend() { + this.dispatch({event: ConvoDispatchEvent.Suspend}) + DEBUG_ACTIVE_CHAT = undefined + } + + private pendingFetchConvo: + | Promise<{ + convo: ChatBskyConvoDefs.ConvoView + sender: AppBskyActorDefs.ProfileViewBasic | undefined + recipients: AppBskyActorDefs.ProfileViewBasic[] + }> + | undefined + async fetchConvo() { + if (this.pendingFetchConvo) return this.pendingFetchConvo + + this.pendingFetchConvo = new Promise<{ + convo: ChatBskyConvoDefs.ConvoView + sender: AppBskyActorDefs.ProfileViewBasic | undefined + recipients: AppBskyActorDefs.ProfileViewBasic[] + }>(async (resolve, reject) => { + try { + const response = await this.agent.api.chat.bsky.convo.getConvo( + { + convoId: this.convoId, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + + const convo = response.data.convo + + resolve({ + convo, + sender: convo.members.find(m => m.did === this.__tempFromUserDid), + recipients: convo.members.filter( + m => m.did !== this.__tempFromUserDid, + ), + }) + } catch (e) { + reject(e) + } finally { + this.pendingFetchConvo = undefined + } + }) + + return this.pendingFetchConvo + } + + async refreshConvo() { + try { + const {convo, sender, recipients} = await this.fetchConvo() + // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE') + this.convo = convo || this.convo + this.sender = sender || this.sender + this.recipients = recipients || this.recipients + } catch (e: any) { + logger.error(`Convo: failed to refresh convo`) + + this.footerItems.set(ConvoItemError.Network, { + type: 'error-recoverable', + key: ConvoItemError.Network, + code: ConvoItemError.Network, + retry: () => { + this.footerItems.delete(ConvoItemError.Network) + this.resume() + }, + }) + this.commit() + } + } + + async fetchMessageHistory() { + logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo) + + /* + * If historyCursor is null, we've fetched all history. + */ + if (this.historyCursor === null) return + + /* + * Don't fetch again if a fetch is already in progress + */ + if (this.isFetchingHistory) return + + /* + * If we've rendered a retry state for history fetching, exit. Upon retry, + * this will be removed and we'll try again. + */ + if (this.headerItems.has(ConvoItemError.HistoryFailed)) return + + try { + this.isFetchingHistory = true + this.commit() + + /* + * Delay if paginating while scrolled to prevent momentum scrolling from + * jerking the list around, plus makes it feel a little more human. + */ + if (this.pastMessages.size > 0) { + await new Promise(y => setTimeout(y, 500)) + // throw new Error('UNCOMMENT TO TEST RETRY') + } + + const response = await this.agent.api.chat.bsky.convo.getMessages( + { + cursor: this.historyCursor, + convoId: this.convoId, + limit: isNative ? 25 : 50, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {cursor, messages} = response.data + + this.historyCursor = cursor ?? null + + for (const message of messages) { + if ( + ChatBskyConvoDefs.isMessageView(message) || + ChatBskyConvoDefs.isDeletedMessageView(message) + ) { + this.pastMessages.set(message.id, message) + + // set to latest rev + if ( + message.rev > (this.eventsCursor = this.eventsCursor || message.rev) + ) { + this.eventsCursor = message.rev + } + } + } + } catch (e: any) { + logger.error('Convo: failed to fetch message history') + + this.headerItems.set(ConvoItemError.HistoryFailed, { + type: 'error-recoverable', + key: ConvoItemError.HistoryFailed, + code: ConvoItemError.HistoryFailed, + retry: () => { + this.headerItems.delete(ConvoItemError.HistoryFailed) + this.fetchMessageHistory() + }, + }) + } finally { + this.isFetchingHistory = false + this.commit() + } + } + + private restartPoll() { + this.cancelNextPoll() + this.pollLatestEvents() + } + + private cancelNextPoll() { + if (this.nextPoll) clearTimeout(this.nextPoll) + } + + private pollLatestEvents() { + /* + * Uncomment to view poll events + */ + logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo) + + try { + this.fetchLatestEvents().then(({events}) => { + this.applyLatestEvents(events) + }) + this.nextPoll = setTimeout(() => { + this.pollLatestEvents() + }, this.pollInterval) + } catch (e: any) { + logger.error('Convo: poll events failed') + + this.cancelNextPoll() + + this.footerItems.set(ConvoItemError.PollFailed, { + type: 'error-recoverable', + key: ConvoItemError.PollFailed, + code: ConvoItemError.PollFailed, + retry: () => { + this.footerItems.delete(ConvoItemError.PollFailed) + this.commit() + this.pollLatestEvents() + }, + }) + + this.commit() + } + } + + private pendingFetchLatestEvents: + | Promise<{ + events: ChatBskyConvoGetLog.OutputSchema['logs'] + }> + | undefined + async fetchLatestEvents() { + if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents + + this.pendingFetchLatestEvents = new Promise<{ + events: ChatBskyConvoGetLog.OutputSchema['logs'] + }>(async (resolve, reject) => { + try { + // throw new Error('UNCOMMENT TO TEST POLL FAILURE') + const response = await this.agent.api.chat.bsky.convo.getLog( + { + cursor: this.eventsCursor, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {logs} = response.data + resolve({events: logs}) + } catch (e) { + reject(e) + } finally { + this.pendingFetchLatestEvents = undefined + } + }) + + return this.pendingFetchLatestEvents + } + + private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) { + let needsCommit = false + + for (const ev of events) { + /* + * If there's a rev, we should handle it. If there's not a rev, we don't + * know what it is. + */ + if (typeof ev.rev === 'string') { + /* + * We only care about new events + */ + if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) { + /* + * Update rev regardless of if it's a ev type we care about or not + */ + this.eventsCursor = ev.rev + + /* + * This is VERY important. We don't want to insert any messages from + * your other chats. + */ + if (ev.convoId !== this.convoId) continue + + if ( + ChatBskyConvoDefs.isLogCreateMessage(ev) && + ChatBskyConvoDefs.isMessageView(ev.message) + ) { + if (this.newMessages.has(ev.message.id)) { + // Trust the ev as the source of truth on ordering + this.newMessages.delete(ev.message.id) + } + this.newMessages.set(ev.message.id, ev.message) + needsCommit = true + } else if ( + ChatBskyConvoDefs.isLogDeleteMessage(ev) && + ChatBskyConvoDefs.isDeletedMessageView(ev.message) + ) { + /* + * Update if we have this in state. If we don't, don't worry about it. + */ + if (this.pastMessages.has(ev.message.id)) { + /* + * For now, we remove deleted messages from the thread, if we receive one. + * + * To support them, it'd look something like this: + * this.pastMessages.set(ev.message.id, ev.message) + */ + this.pastMessages.delete(ev.message.id) + this.newMessages.delete(ev.message.id) + this.deletedMessages.delete(ev.message.id) + needsCommit = true + } + } + } + } + } + + if (needsCommit) { + this.commit() + } + } + + async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { + // Ignore empty messages for now since they have no other purpose atm + if (!message.text.trim()) return + + logger.debug('Convo: send message', {}, logger.DebugContext.convo) + + const tempId = nanoid() + + this.pendingMessages.set(tempId, { + id: tempId, + message, + }) + this.commit() + + if (!this.isProcessingPendingMessages) { + this.processPendingMessages() + } + } + + async processPendingMessages() { + logger.debug( + `Convo: processing messages (${this.pendingMessages.size} remaining)`, + {}, + logger.DebugContext.convo, + ) + + const pendingMessage = Array.from(this.pendingMessages.values()).shift() + + /* + * If there are no pending messages, we're done. + */ + if (!pendingMessage) { + this.isProcessingPendingMessages = false + return + } + + try { + this.isProcessingPendingMessages = true + + // throw new Error('UNCOMMENT TO TEST RETRY') + const {id, message} = pendingMessage + + const response = await this.agent.api.chat.bsky.convo.sendMessage( + { + convoId: this.convoId, + message, + }, + { + encoding: 'application/json', + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const res = response.data + + /* + * Insert into `newMessages` as soon as we have a real ID. That way, when + * we get an event log back, we can replace in situ. + */ + this.newMessages.set(res.id, { + ...res, + $type: 'chat.bsky.convo.defs#messageView', + sender: this.sender, + }) + this.pendingMessages.delete(id) + + await this.processPendingMessages() + + this.commit() + } catch (e) { + this.footerItems.set('pending-retry', { + type: 'pending-retry', + key: 'pending-retry', + retry: this.batchRetryPendingMessages.bind(this), + }) + this.commit() + } + } + + async batchRetryPendingMessages() { + logger.debug( + `Convo: retrying ${this.pendingMessages.size} pending messages`, + {}, + logger.DebugContext.convo, + ) + + this.footerItems.delete('pending-retry') + this.commit() + + try { + const messageArray = Array.from(this.pendingMessages.values()) + const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch( + { + items: messageArray.map(({message}) => ({ + convoId: this.convoId, + message, + })), + }, + { + encoding: 'application/json', + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {items} = data + + /* + * Insert into `newMessages` as soon as we have a real ID. That way, when + * we get an event log back, we can replace in situ. + */ + for (const item of items) { + this.newMessages.set(item.id, { + ...item, + $type: 'chat.bsky.convo.defs#messageView', + sender: this.convo?.members.find( + m => m.did === this.__tempFromUserDid, + ), + }) + } + + for (const pendingMessage of messageArray) { + this.pendingMessages.delete(pendingMessage.id) + } + + this.commit() + } catch (e) { + this.footerItems.set('pending-retry', { + type: 'pending-retry', + key: 'pending-retry', + retry: this.batchRetryPendingMessages.bind(this), + }) + this.commit() + } + } + + async deleteMessage(messageId: string) { + logger.debug('Convo: delete message', {}, logger.DebugContext.convo) + + this.deletedMessages.add(messageId) + this.commit() + + try { + await this.agent.api.chat.bsky.convo.deleteMessageForSelf( + { + convoId: this.convoId, + messageId, + }, + { + encoding: 'application/json', + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + } catch (e) { + this.deletedMessages.delete(messageId) + this.commit() + throw e + } + } + + /* + * Items in reverse order, since FlatList inverts + */ + getItems(): ConvoItem[] { + const items: ConvoItem[] = [] + + this.headerItems.forEach(item => { + items.push(item) + }) + + this.pastMessages.forEach(m => { + if (ChatBskyConvoDefs.isMessageView(m)) { + items.unshift({ + type: 'message', + key: m.id, + message: m, + nextMessage: null, + }) + } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { + items.unshift({ + type: 'deleted-message', + key: m.id, + message: m, + nextMessage: null, + }) + } + }) + + this.newMessages.forEach(m => { + if (ChatBskyConvoDefs.isMessageView(m)) { + items.push({ + type: 'message', + key: m.id, + message: m, + nextMessage: null, + }) + } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { + items.push({ + type: 'deleted-message', + key: m.id, + message: m, + nextMessage: null, + }) + } + }) + + this.pendingMessages.forEach(m => { + items.push({ + type: 'pending-message', + key: m.id, + message: { + ...m.message, + id: nanoid(), + rev: '__fake__', + sentAt: new Date().toISOString(), + sender: this.sender, + }, + nextMessage: null, + }) + }) + + this.footerItems.forEach(item => { + items.push(item) + }) + + return items + .filter(item => { + if (isConvoItemMessage(item)) { + return !this.deletedMessages.has(item.message.id) + } + return true + }) + .map((item, i, arr) => { + let nextMessage = null + const isMessage = isConvoItemMessage(item) + + if (isMessage) { + if ( + isMessage && + (ChatBskyConvoDefs.isMessageView(item.message) || + ChatBskyConvoDefs.isDeletedMessageView(item.message)) + ) { + const next = arr[i + 1] + + if ( + isConvoItemMessage(next) && + next && + (ChatBskyConvoDefs.isMessageView(next.message) || + ChatBskyConvoDefs.isDeletedMessageView(next.message)) + ) { + nextMessage = next.message + } + } + + return { + ...item, + nextMessage, + } + } + + return item + }) + } +} diff --git a/src/state/messages/convo/index.tsx b/src/state/messages/convo/index.tsx new file mode 100644 index 000000000..c4fe71d30 --- /dev/null +++ b/src/state/messages/convo/index.tsx @@ -0,0 +1,75 @@ +import React, {useContext, useState, useSyncExternalStore} from 'react' +import {AppState} from 'react-native' +import {BskyAgent} from '@atproto-labs/api' +import {useFocusEffect, useIsFocused} from '@react-navigation/native' + +import {Convo} from '#/state/messages/convo/agent' +import {ConvoParams, ConvoState} from '#/state/messages/convo/types' +import {useMarkAsReadMutation} from '#/state/queries/messages/conversation' +import {useAgent} from '#/state/session' +import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' + +const ChatContext = React.createContext(null) + +export function useConvo() { + const ctx = useContext(ChatContext) + if (!ctx) { + throw new Error('useConvo must be used within a ConvoProvider') + } + return ctx +} + +export function ConvoProvider({ + children, + convoId, +}: Pick & {children: React.ReactNode}) { + const isScreenFocused = useIsFocused() + const {serviceUrl} = useDmServiceUrlStorage() + const {getAgent} = useAgent() + const [convo] = useState( + () => + new Convo({ + convoId, + agent: new BskyAgent({ + service: serviceUrl, + }), + __tempFromUserDid: getAgent().session?.did!, + }), + ) + const service = useSyncExternalStore(convo.subscribe, convo.getSnapshot) + const {mutate: markAsRead} = useMarkAsReadMutation() + + useFocusEffect( + React.useCallback(() => { + convo.resume() + markAsRead({convoId}) + + return () => { + convo.background() + markAsRead({convoId}) + } + }, [convo, convoId, markAsRead]), + ) + + React.useEffect(() => { + const handleAppStateChange = (nextAppState: string) => { + if (isScreenFocused) { + if (nextAppState === 'active') { + convo.resume() + } else { + convo.background() + } + + markAsRead({convoId}) + } + } + + const sub = AppState.addEventListener('change', handleAppStateChange) + + return () => { + sub.remove() + } + }, [convoId, convo, isScreenFocused, markAsRead]) + + return {children} +} diff --git a/src/state/messages/convo/types.ts b/src/state/messages/convo/types.ts new file mode 100644 index 000000000..cfbde6d7e --- /dev/null +++ b/src/state/messages/convo/types.ts @@ -0,0 +1,178 @@ +import {AppBskyActorDefs} from '@atproto/api' +import { + BskyAgent, + ChatBskyConvoDefs, + ChatBskyConvoSendMessage, +} from '@atproto-labs/api' + +export type ConvoParams = { + convoId: string + agent: BskyAgent + __tempFromUserDid: string +} + +export enum ConvoStatus { + Uninitialized = 'uninitialized', + Initializing = 'initializing', + Ready = 'ready', + Error = 'error', + Backgrounded = 'backgrounded', + Suspended = 'suspended', +} + +export enum ConvoItemError { + HistoryFailed = 'historyFailed', + PollFailed = 'pollFailed', + Network = 'network', +} + +export enum ConvoErrorCode { + InitFailed = 'initFailed', +} + +export type ConvoError = { + code: ConvoErrorCode + exception?: Error + retry: () => void +} + +export enum ConvoDispatchEvent { + Init = 'init', + Ready = 'ready', + Resume = 'resume', + Background = 'background', + Suspend = 'suspend', + Error = 'error', +} + +export type ConvoDispatch = + | { + event: ConvoDispatchEvent.Init + } + | { + event: ConvoDispatchEvent.Ready + } + | { + event: ConvoDispatchEvent.Resume + } + | { + event: ConvoDispatchEvent.Background + } + | { + event: ConvoDispatchEvent.Suspend + } + | { + event: ConvoDispatchEvent.Error + payload: ConvoError + } + +export type ConvoItem = + | { + type: 'message' | 'pending-message' + key: string + message: ChatBskyConvoDefs.MessageView + nextMessage: + | ChatBskyConvoDefs.MessageView + | ChatBskyConvoDefs.DeletedMessageView + | null + } + | { + type: 'deleted-message' + key: string + message: ChatBskyConvoDefs.DeletedMessageView + nextMessage: + | ChatBskyConvoDefs.MessageView + | ChatBskyConvoDefs.DeletedMessageView + | null + } + | { + type: 'pending-retry' + key: string + retry: () => void + } + | { + type: 'error-recoverable' + key: string + code: ConvoItemError + retry: () => void + } + +export type ConvoState = + | { + status: ConvoStatus.Uninitialized + items: [] + convo: undefined + error: undefined + sender: undefined + recipients: undefined + isFetchingHistory: false + deleteMessage: undefined + sendMessage: undefined + fetchMessageHistory: undefined + } + | { + status: ConvoStatus.Initializing + items: [] + convo: undefined + error: undefined + sender: undefined + recipients: undefined + isFetchingHistory: boolean + deleteMessage: undefined + sendMessage: undefined + fetchMessageHistory: undefined + } + | { + status: ConvoStatus.Ready + items: ConvoItem[] + convo: ChatBskyConvoDefs.ConvoView + error: undefined + sender: AppBskyActorDefs.ProfileViewBasic + recipients: AppBskyActorDefs.ProfileViewBasic[] + isFetchingHistory: boolean + deleteMessage: (messageId: string) => Promise + sendMessage: ( + message: ChatBskyConvoSendMessage.InputSchema['message'], + ) => void + fetchMessageHistory: () => void + } + | { + status: ConvoStatus.Suspended + items: ConvoItem[] + convo: ChatBskyConvoDefs.ConvoView + error: undefined + sender: AppBskyActorDefs.ProfileViewBasic + recipients: AppBskyActorDefs.ProfileViewBasic[] + isFetchingHistory: boolean + deleteMessage: (messageId: string) => Promise + sendMessage: ( + message: ChatBskyConvoSendMessage.InputSchema['message'], + ) => Promise + fetchMessageHistory: () => Promise + } + | { + status: ConvoStatus.Backgrounded + items: ConvoItem[] + convo: ChatBskyConvoDefs.ConvoView + error: undefined + sender: AppBskyActorDefs.ProfileViewBasic + recipients: AppBskyActorDefs.ProfileViewBasic[] + isFetchingHistory: boolean + deleteMessage: (messageId: string) => Promise + sendMessage: ( + message: ChatBskyConvoSendMessage.InputSchema['message'], + ) => Promise + fetchMessageHistory: () => Promise + } + | { + status: ConvoStatus.Error + items: [] + convo: undefined + error: any + sender: undefined + recipients: undefined + isFetchingHistory: false + deleteMessage: undefined + sendMessage: undefined + fetchMessageHistory: undefined + } diff --git a/src/state/messages/index.tsx b/src/state/messages/index.tsx index 60538615a..205d17e8c 100644 --- a/src/state/messages/index.tsx +++ b/src/state/messages/index.tsx @@ -1,79 +1,7 @@ -import React, {useContext, useState, useSyncExternalStore} from 'react' -import {AppState} from 'react-native' -import {BskyAgent} from '@atproto-labs/api' -import {useFocusEffect, useIsFocused} from '@react-navigation/native' +import React from 'react' -import {Convo, ConvoParams, ConvoState} from '#/state/messages/convo' import {CurrentConvoIdProvider} from '#/state/messages/current-convo-id' import {MessagesEventBusProvider} from '#/state/messages/events' -import {useMarkAsReadMutation} from '#/state/queries/messages/conversation' -import {useAgent} from '#/state/session' -import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' - -const ChatContext = React.createContext(null) - -export function useChat() { - const ctx = useContext(ChatContext) - if (!ctx) { - throw new Error('useChat must be used within a ChatProvider') - } - return ctx -} - -export function ChatProvider({ - children, - convoId, -}: Pick & {children: React.ReactNode}) { - const isScreenFocused = useIsFocused() - const {serviceUrl} = useDmServiceUrlStorage() - const {getAgent} = useAgent() - const [convo] = useState( - () => - new Convo({ - convoId, - agent: new BskyAgent({ - service: serviceUrl, - }), - __tempFromUserDid: getAgent().session?.did!, - }), - ) - const service = useSyncExternalStore(convo.subscribe, convo.getSnapshot) - const {mutate: markAsRead} = useMarkAsReadMutation() - - useFocusEffect( - React.useCallback(() => { - convo.resume() - markAsRead({convoId}) - - return () => { - convo.background() - markAsRead({convoId}) - } - }, [convo, convoId, markAsRead]), - ) - - React.useEffect(() => { - const handleAppStateChange = (nextAppState: string) => { - if (isScreenFocused) { - if (nextAppState === 'active') { - convo.resume() - } else { - convo.background() - } - - markAsRead({convoId}) - } - } - - const sub = AppState.addEventListener('change', handleAppStateChange) - - return () => { - sub.remove() - } - }, [convoId, convo, isScreenFocused, markAsRead]) - - return {children} -} export function MessagesProvider({children}: {children: React.ReactNode}) { return ( -- cgit 1.4.1