diff options
Diffstat (limited to 'src/state/messages')
-rw-r--r-- | src/state/messages/__tests__/client.test.ts | 38 | ||||
-rw-r--r-- | src/state/messages/convo.ts | 442 | ||||
-rw-r--r-- | src/state/messages/index.tsx | 57 |
3 files changed, 537 insertions, 0 deletions
diff --git a/src/state/messages/__tests__/client.test.ts b/src/state/messages/__tests__/client.test.ts new file mode 100644 index 000000000..cab1d9021 --- /dev/null +++ b/src/state/messages/__tests__/client.test.ts @@ -0,0 +1,38 @@ +import {describe, it} from '@jest/globals' + +describe(`#/state/dms/client`, () => { + describe(`ChatsService`, () => { + describe(`unread count`, () => { + it.todo(`marks a chat as read, decrements total unread count`) + }) + + describe(`log processing`, () => { + /* + * We receive a new chat log AND messages for it in the same batch. We + * need to first initialize the chat, then process the received logs. + */ + describe(`handles new chats and subsequent messages received in same log batch`, () => { + it.todo(`receives new chat and messages`) + it.todo( + `receives new chat, new messages come in while still initializing new chat`, + ) + }) + }) + + describe(`reset state`, () => { + it.todo(`after period of inactivity, rehydrates entirely fresh state`) + }) + }) + + describe(`ChatService`, () => { + describe(`history fetching`, () => { + it.todo(`fetches initial chat history`) + it.todo(`fetches additional chat history`) + it.todo(`handles history fetch failure`) + }) + + describe(`optimistic updates`, () => { + it.todo(`adds sending messages`) + }) + }) +}) diff --git a/src/state/messages/convo.ts b/src/state/messages/convo.ts new file mode 100644 index 000000000..a1de1dbed --- /dev/null +++ b/src/state/messages/convo.ts @@ -0,0 +1,442 @@ +import { + BskyAgent, + ChatBskyConvoDefs, + ChatBskyConvoSendMessage, +} from '@atproto-labs/api' +import {EventEmitter} from 'eventemitter3' +import {nanoid} from 'nanoid/non-secure' + +export type ConvoParams = { + convoId: string + agent: BskyAgent + __tempFromUserDid: string +} + +export enum ConvoStatus { + Uninitialized = 'uninitialized', + Initializing = 'initializing', + Ready = 'ready', + Error = 'error', + Destroyed = 'destroyed', +} + +export type ConvoItem = + | { + type: 'message' + key: string + message: ChatBskyConvoDefs.MessageView + nextMessage: + | ChatBskyConvoDefs.MessageView + | ChatBskyConvoDefs.DeletedMessageView + | null + } + | { + type: 'deleted-message' + key: string + message: ChatBskyConvoDefs.DeletedMessageView + nextMessage: + | ChatBskyConvoDefs.MessageView + | ChatBskyConvoDefs.DeletedMessageView + | null + } + | { + type: 'pending-message' + key: string + message: ChatBskyConvoSendMessage.InputSchema['message'] + } + +export type ConvoState = + | { + status: ConvoStatus.Uninitialized + } + | { + status: ConvoStatus.Initializing + } + | { + status: ConvoStatus.Ready + items: ConvoItem[] + convo: ChatBskyConvoDefs.ConvoView + isFetchingHistory: boolean + } + | { + status: ConvoStatus.Error + error: any + } + | { + status: ConvoStatus.Destroyed + } + +export class Convo { + private convoId: string + private agent: BskyAgent + private __tempFromUserDid: string + + private status: ConvoStatus = ConvoStatus.Uninitialized + private error: any + private convo: ChatBskyConvoDefs.ConvoView | undefined + private historyCursor: string | undefined | null = undefined + private isFetchingHistory = false + private eventsCursor: string | undefined = undefined + + private pastMessages: Map< + string, + ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView + > = new Map() + private newMessages: Map< + string, + ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView + > = new Map() + private pendingMessages: Map< + string, + {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']} + > = new Map() + + private pendingEventIngestion: Promise<void> | undefined + + constructor(params: ConvoParams) { + this.convoId = params.convoId + this.agent = params.agent + this.__tempFromUserDid = params.__tempFromUserDid + } + + async initialize() { + if (this.status !== 'uninitialized') return + this.status = ConvoStatus.Initializing + + try { + const response = await this.agent.api.chat.bsky.convo.getConvo( + { + convoId: this.convoId, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {convo} = response.data + + this.convo = convo + this.status = ConvoStatus.Ready + + this.commit() + + await this.fetchMessageHistory() + + this.pollEvents() + } catch (e) { + this.status = ConvoStatus.Error + this.error = e + } + } + + private async pollEvents() { + if (this.status === ConvoStatus.Destroyed) return + if (this.pendingEventIngestion) return + setTimeout(async () => { + this.pendingEventIngestion = this.ingestLatestEvents() + await this.pendingEventIngestion + this.pendingEventIngestion = undefined + this.pollEvents() + }, 5e3) + } + + async fetchMessageHistory() { + if (this.status === ConvoStatus.Destroyed) return + // reached end + if (this.historyCursor === null) return + if (this.isFetchingHistory) return + + this.isFetchingHistory = true + this.commit() + + /* + * Delay if paginating while scrolled. + * + * TODO why does the FlatList jump without this delay? + * + * Tbh it feels a little more natural with a slight delay. + */ + if (this.pastMessages.size > 0) { + await new Promise(y => setTimeout(y, 500)) + } + + const response = await this.agent.api.chat.bsky.convo.getMessages( + { + cursor: this.historyCursor, + convoId: this.convoId, + limit: 20, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {cursor, messages} = response.data + + this.historyCursor = cursor || null + + for (const message of messages) { + if ( + ChatBskyConvoDefs.isMessageView(message) || + ChatBskyConvoDefs.isDeletedMessageView(message) + ) { + this.pastMessages.set(message.id, message) + + // set to latest rev + if ( + message.rev > (this.eventsCursor = this.eventsCursor || message.rev) + ) { + this.eventsCursor = message.rev + } + } + } + + this.isFetchingHistory = false + this.commit() + } + + async ingestLatestEvents() { + if (this.status === ConvoStatus.Destroyed) return + + const response = await this.agent.api.chat.bsky.convo.getLog( + { + cursor: this.eventsCursor, + }, + { + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const {logs} = response.data + + for (const log of logs) { + /* + * If there's a rev, we should handle it. If there's not a rev, we don't + * know what it is. + */ + if (typeof log.rev === 'string') { + /* + * We only care about new events + */ + if (log.rev > (this.eventsCursor = this.eventsCursor || log.rev)) { + /* + * Update rev regardless of if it's a log type we care about or not + */ + this.eventsCursor = log.rev + + /* + * This is VERY important. We don't want to insert any messages from + * your other chats. + * + * TODO there may be a better way to handle this + */ + if (log.convoId !== this.convoId) continue + + if ( + ChatBskyConvoDefs.isLogCreateMessage(log) && + ChatBskyConvoDefs.isMessageView(log.message) + ) { + if (this.newMessages.has(log.message.id)) { + // Trust the log as the source of truth on ordering + // TODO test this + this.newMessages.delete(log.message.id) + } + this.newMessages.set(log.message.id, log.message) + } else if ( + ChatBskyConvoDefs.isLogDeleteMessage(log) && + ChatBskyConvoDefs.isDeletedMessageView(log.message) + ) { + /* + * Update if we have this in state. If we don't, don't worry about it. + */ + if (this.pastMessages.has(log.message.id)) { + /* + * For now, we remove deleted messages from the thread, if we receive one. + * + * To support them, it'd look something like this: + * this.pastMessages.set(log.message.id, log.message) + */ + this.pastMessages.delete(log.message.id) + } + } + } + } + } + + this.commit() + } + + async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) { + if (this.status === ConvoStatus.Destroyed) return + // Ignore empty messages for now since they have no other purpose atm + if (!message.text) return + + const tempId = nanoid() + + this.pendingMessages.set(tempId, { + id: tempId, + message, + }) + this.commit() + + await new Promise(y => setTimeout(y, 500)) + const response = await this.agent.api.chat.bsky.convo.sendMessage( + { + convoId: this.convoId, + message, + }, + { + encoding: 'application/json', + headers: { + Authorization: this.__tempFromUserDid, + }, + }, + ) + const res = response.data + + /* + * Insert into `newMessages` as soon as we have a real ID. That way, when + * we get an event log back, we can replace in situ. + */ + this.newMessages.set(res.id, { + ...res, + $type: 'chat.bsky.convo.defs#messageView', + sender: this.convo?.members.find(m => m.did === this.__tempFromUserDid), + }) + this.pendingMessages.delete(tempId) + + this.commit() + } + + /* + * Items in reverse order, since FlatList inverts + */ + get items(): ConvoItem[] { + const items: ConvoItem[] = [] + + // `newMessages` is in insertion order, unshift to reverse + this.newMessages.forEach(m => { + if (ChatBskyConvoDefs.isMessageView(m)) { + items.unshift({ + type: 'message', + key: m.id, + message: m, + nextMessage: null, + }) + } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { + items.unshift({ + type: 'deleted-message', + key: m.id, + message: m, + nextMessage: null, + }) + } + }) + + // `newMessages` is in insertion order, unshift to reverse + this.pendingMessages.forEach(m => { + items.unshift({ + type: 'pending-message', + key: m.id, + message: m.message, + }) + }) + + this.pastMessages.forEach(m => { + if (ChatBskyConvoDefs.isMessageView(m)) { + items.push({ + type: 'message', + key: m.id, + message: m, + nextMessage: null, + }) + } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) { + items.push({ + type: 'deleted-message', + key: m.id, + message: m, + nextMessage: null, + }) + } + }) + + return items.map((item, i) => { + let nextMessage = null + + if ( + ChatBskyConvoDefs.isMessageView(item.message) || + ChatBskyConvoDefs.isDeletedMessageView(item.message) + ) { + const next = items[i - 1] + if ( + next && + (ChatBskyConvoDefs.isMessageView(next.message) || + ChatBskyConvoDefs.isDeletedMessageView(next.message)) + ) { + nextMessage = next.message + } + } + + return { + ...item, + nextMessage, + } + }) + } + + destroy() { + this.status = ConvoStatus.Destroyed + this.commit() + } + + get state(): ConvoState { + switch (this.status) { + case ConvoStatus.Initializing: { + return { + status: ConvoStatus.Initializing, + } + } + case ConvoStatus.Ready: { + return { + status: ConvoStatus.Ready, + items: this.items, + convo: this.convo!, + isFetchingHistory: this.isFetchingHistory, + } + } + case ConvoStatus.Error: { + return { + status: ConvoStatus.Error, + error: this.error, + } + } + case ConvoStatus.Destroyed: { + return { + status: ConvoStatus.Destroyed, + } + } + default: { + return { + status: ConvoStatus.Uninitialized, + } + } + } + } + + private _emitter = new EventEmitter() + + private commit() { + this._emitter.emit('update') + } + + on(event: 'update', cb: () => void) { + this._emitter.on(event, cb) + } + + off(event: 'update', cb: () => void) { + this._emitter.off(event, cb) + } +} diff --git a/src/state/messages/index.tsx b/src/state/messages/index.tsx new file mode 100644 index 000000000..c59915253 --- /dev/null +++ b/src/state/messages/index.tsx @@ -0,0 +1,57 @@ +import React from 'react' +import {BskyAgent} from '@atproto-labs/api' + +import {Convo, ConvoParams} from '#/state/messages/convo' +import {useAgent} from '#/state/session' +import {useDmServiceUrlStorage} from '#/screens/Messages/Temp/useDmServiceUrlStorage' + +const ChatContext = React.createContext<{ + service: Convo + state: Convo['state'] +}>({ + // @ts-ignore + service: null, + // @ts-ignore + state: null, +}) + +export function useChat() { + return React.useContext(ChatContext) +} + +export function ChatProvider({ + children, + convoId, +}: Pick<ConvoParams, 'convoId'> & {children: React.ReactNode}) { + const {serviceUrl} = useDmServiceUrlStorage() + const {getAgent} = useAgent() + const [service] = React.useState( + () => + new Convo({ + convoId, + agent: new BskyAgent({ + service: serviceUrl, + }), + __tempFromUserDid: getAgent().session?.did!, + }), + ) + const [state, setState] = React.useState(service.state) + + React.useEffect(() => { + service.initialize() + }, [service]) + + React.useEffect(() => { + const update = () => setState(service.state) + service.on('update', update) + return () => { + service.destroy() + } + }, [service]) + + return ( + <ChatContext.Provider value={{state, service}}> + {children} + </ChatContext.Provider> + ) +} |