about summary refs log tree commit diff
path: root/src/state/messages/convo/agent.ts
diff options
context:
space:
mode:
Diffstat (limited to 'src/state/messages/convo/agent.ts')
-rw-r--r--src/state/messages/convo/agent.ts985
1 files changed, 985 insertions, 0 deletions
diff --git a/src/state/messages/convo/agent.ts b/src/state/messages/convo/agent.ts
new file mode 100644
index 000000000..38a3f5e62
--- /dev/null
+++ b/src/state/messages/convo/agent.ts
@@ -0,0 +1,985 @@
+import {AppBskyActorDefs} from '@atproto/api'
+import {
+  BskyAgent,
+  ChatBskyConvoDefs,
+  ChatBskyConvoGetLog,
+  ChatBskyConvoSendMessage,
+} from '@atproto-labs/api'
+import {nanoid} from 'nanoid/non-secure'
+
+import {logger} from '#/logger'
+import {isNative} from '#/platform/detection'
+import {
+  ConvoDispatch,
+  ConvoDispatchEvent,
+  ConvoErrorCode,
+  ConvoItem,
+  ConvoItemError,
+  ConvoParams,
+  ConvoState,
+  ConvoStatus,
+} from '#/state/messages/convo/types'
+
+const ACTIVE_POLL_INTERVAL = 1e3
+const BACKGROUND_POLL_INTERVAL = 10e3
+
+// TODO temporary
+let DEBUG_ACTIVE_CHAT: string | undefined
+
+export function isConvoItemMessage(
+  item: ConvoItem,
+): item is ConvoItem & {type: 'message'} {
+  if (!item) return false
+  return (
+    item.type === 'message' ||
+    item.type === 'deleted-message' ||
+    item.type === 'pending-message'
+  )
+}
+
+export class Convo {
+  private id: string
+
+  private agent: BskyAgent
+  private __tempFromUserDid: string
+
+  private status: ConvoStatus = ConvoStatus.Uninitialized
+  private pollInterval = ACTIVE_POLL_INTERVAL
+  private error:
+    | {
+        code: ConvoErrorCode
+        exception?: Error
+        retry: () => void
+      }
+    | undefined
+  private historyCursor: string | undefined | null = undefined
+  private isFetchingHistory = false
+  private eventsCursor: string | undefined = undefined
+
+  private pastMessages: Map<
+    string,
+    ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
+  > = new Map()
+  private newMessages: Map<
+    string,
+    ChatBskyConvoDefs.MessageView | ChatBskyConvoDefs.DeletedMessageView
+  > = new Map()
+  private pendingMessages: Map<
+    string,
+    {id: string; message: ChatBskyConvoSendMessage.InputSchema['message']}
+  > = new Map()
+  private deletedMessages: Set<string> = new Set()
+  private footerItems: Map<string, ConvoItem> = new Map()
+  private headerItems: Map<string, ConvoItem> = new Map()
+
+  private isProcessingPendingMessages = false
+  private nextPoll: NodeJS.Timeout | undefined
+
+  convoId: string
+  convo: ChatBskyConvoDefs.ConvoView | undefined
+  sender: AppBskyActorDefs.ProfileViewBasic | undefined
+  recipients: AppBskyActorDefs.ProfileViewBasic[] | undefined = undefined
+  snapshot: ConvoState | undefined
+
+  constructor(params: ConvoParams) {
+    this.id = nanoid(3)
+    this.convoId = params.convoId
+    this.agent = params.agent
+    this.__tempFromUserDid = params.__tempFromUserDid
+
+    this.subscribe = this.subscribe.bind(this)
+    this.getSnapshot = this.getSnapshot.bind(this)
+    this.sendMessage = this.sendMessage.bind(this)
+    this.deleteMessage = this.deleteMessage.bind(this)
+    this.fetchMessageHistory = this.fetchMessageHistory.bind(this)
+
+    if (DEBUG_ACTIVE_CHAT) {
+      logger.error(`Convo: another chat was already active`, {
+        convoId: this.convoId,
+      })
+    } else {
+      DEBUG_ACTIVE_CHAT = this.convoId
+    }
+  }
+
+  private commit() {
+    this.snapshot = undefined
+    this.subscribers.forEach(subscriber => subscriber())
+  }
+
+  private subscribers: (() => void)[] = []
+
+  subscribe(subscriber: () => void) {
+    if (this.subscribers.length === 0) this.init()
+
+    this.subscribers.push(subscriber)
+
+    return () => {
+      this.subscribers = this.subscribers.filter(s => s !== subscriber)
+      if (this.subscribers.length === 0) this.suspend()
+    }
+  }
+
+  getSnapshot(): ConvoState {
+    if (!this.snapshot) this.snapshot = this.generateSnapshot()
+    // logger.debug('Convo: snapshotted', {}, logger.DebugContext.convo)
+    return this.snapshot
+  }
+
+  private generateSnapshot(): ConvoState {
+    switch (this.status) {
+      case ConvoStatus.Initializing: {
+        return {
+          status: ConvoStatus.Initializing,
+          items: [],
+          convo: undefined,
+          error: undefined,
+          sender: undefined,
+          recipients: undefined,
+          isFetchingHistory: this.isFetchingHistory,
+          deleteMessage: undefined,
+          sendMessage: undefined,
+          fetchMessageHistory: undefined,
+        }
+      }
+      case ConvoStatus.Suspended:
+      case ConvoStatus.Backgrounded:
+      case ConvoStatus.Ready: {
+        return {
+          status: this.status,
+          items: this.getItems(),
+          convo: this.convo!,
+          error: undefined,
+          sender: this.sender!,
+          recipients: this.recipients!,
+          isFetchingHistory: this.isFetchingHistory,
+          deleteMessage: this.deleteMessage,
+          sendMessage: this.sendMessage,
+          fetchMessageHistory: this.fetchMessageHistory,
+        }
+      }
+      case ConvoStatus.Error: {
+        return {
+          status: ConvoStatus.Error,
+          items: [],
+          convo: undefined,
+          error: this.error,
+          sender: undefined,
+          recipients: undefined,
+          isFetchingHistory: false,
+          deleteMessage: undefined,
+          sendMessage: undefined,
+          fetchMessageHistory: undefined,
+        }
+      }
+      default: {
+        return {
+          status: ConvoStatus.Uninitialized,
+          items: [],
+          convo: undefined,
+          error: undefined,
+          sender: undefined,
+          recipients: undefined,
+          isFetchingHistory: false,
+          deleteMessage: undefined,
+          sendMessage: undefined,
+          fetchMessageHistory: undefined,
+        }
+      }
+    }
+  }
+
+  dispatch(action: ConvoDispatch) {
+    const prevStatus = this.status
+
+    switch (this.status) {
+      case ConvoStatus.Uninitialized: {
+        switch (action.event) {
+          case ConvoDispatchEvent.Init: {
+            this.status = ConvoStatus.Initializing
+            this.setup()
+            break
+          }
+        }
+        break
+      }
+      case ConvoStatus.Initializing: {
+        switch (action.event) {
+          case ConvoDispatchEvent.Ready: {
+            this.status = ConvoStatus.Ready
+            this.pollInterval = ACTIVE_POLL_INTERVAL
+            this.fetchMessageHistory().then(() => {
+              this.restartPoll()
+            })
+            break
+          }
+          case ConvoDispatchEvent.Background: {
+            this.status = ConvoStatus.Backgrounded
+            this.pollInterval = BACKGROUND_POLL_INTERVAL
+            this.fetchMessageHistory().then(() => {
+              this.restartPoll()
+            })
+            break
+          }
+          case ConvoDispatchEvent.Suspend: {
+            this.status = ConvoStatus.Suspended
+            break
+          }
+          case ConvoDispatchEvent.Error: {
+            this.status = ConvoStatus.Error
+            this.error = action.payload
+            break
+          }
+        }
+        break
+      }
+      case ConvoStatus.Ready: {
+        switch (action.event) {
+          case ConvoDispatchEvent.Resume: {
+            this.refreshConvo()
+            this.restartPoll()
+            break
+          }
+          case ConvoDispatchEvent.Background: {
+            this.status = ConvoStatus.Backgrounded
+            this.pollInterval = BACKGROUND_POLL_INTERVAL
+            this.restartPoll()
+            break
+          }
+          case ConvoDispatchEvent.Suspend: {
+            this.status = ConvoStatus.Suspended
+            this.cancelNextPoll()
+            break
+          }
+          case ConvoDispatchEvent.Error: {
+            this.status = ConvoStatus.Error
+            this.error = action.payload
+            this.cancelNextPoll()
+            break
+          }
+        }
+        break
+      }
+      case ConvoStatus.Backgrounded: {
+        switch (action.event) {
+          case ConvoDispatchEvent.Resume: {
+            this.status = ConvoStatus.Ready
+            this.pollInterval = ACTIVE_POLL_INTERVAL
+            this.refreshConvo()
+            // TODO truncate history if needed
+            this.restartPoll()
+            break
+          }
+          case ConvoDispatchEvent.Suspend: {
+            this.status = ConvoStatus.Suspended
+            this.cancelNextPoll()
+            break
+          }
+          case ConvoDispatchEvent.Error: {
+            this.status = ConvoStatus.Error
+            this.error = action.payload
+            this.cancelNextPoll()
+            break
+          }
+        }
+        break
+      }
+      case ConvoStatus.Suspended: {
+        switch (action.event) {
+          case ConvoDispatchEvent.Init: {
+            this.status = ConvoStatus.Ready
+            this.pollInterval = ACTIVE_POLL_INTERVAL
+            this.refreshConvo()
+            // TODO truncate history if needed
+            this.restartPoll()
+            break
+          }
+          case ConvoDispatchEvent.Resume: {
+            this.status = ConvoStatus.Ready
+            this.pollInterval = ACTIVE_POLL_INTERVAL
+            this.refreshConvo()
+            this.restartPoll()
+            break
+          }
+          case ConvoDispatchEvent.Error: {
+            this.status = ConvoStatus.Error
+            this.error = action.payload
+            break
+          }
+        }
+        break
+      }
+      case ConvoStatus.Error: {
+        switch (action.event) {
+          case ConvoDispatchEvent.Init: {
+            this.reset()
+            break
+          }
+          case ConvoDispatchEvent.Resume: {
+            this.reset()
+            break
+          }
+          case ConvoDispatchEvent.Suspend: {
+            this.status = ConvoStatus.Suspended
+            break
+          }
+          case ConvoDispatchEvent.Error: {
+            this.status = ConvoStatus.Error
+            this.error = action.payload
+            break
+          }
+        }
+        break
+      }
+      default:
+        break
+    }
+
+    logger.debug(
+      `Convo: dispatch '${action.event}'`,
+      {
+        id: this.id,
+        prev: prevStatus,
+        next: this.status,
+      },
+      logger.DebugContext.convo,
+    )
+
+    this.commit()
+  }
+
+  private reset() {
+    this.convo = undefined
+    this.sender = undefined
+    this.recipients = undefined
+    this.snapshot = undefined
+
+    this.status = ConvoStatus.Uninitialized
+    this.error = undefined
+    this.historyCursor = undefined
+    this.eventsCursor = undefined
+
+    this.pastMessages = new Map()
+    this.newMessages = new Map()
+    this.pendingMessages = new Map()
+    this.deletedMessages = new Set()
+    this.footerItems = new Map()
+    this.headerItems = new Map()
+
+    this.dispatch({event: ConvoDispatchEvent.Init})
+  }
+
+  private async setup() {
+    try {
+      const {convo, sender, recipients} = await this.fetchConvo()
+
+      this.convo = convo
+      this.sender = sender
+      this.recipients = recipients
+
+      /*
+       * Some validation prior to `Ready` status
+       */
+      if (!this.convo) {
+        throw new Error('Convo: could not find convo')
+      }
+      if (!this.sender) {
+        throw new Error('Convo: could not find sender in convo')
+      }
+      if (!this.recipients) {
+        throw new Error('Convo: could not find recipients in convo')
+      }
+
+      // await new Promise(y => setTimeout(y, 2000))
+      // throw new Error('UNCOMMENT TO TEST INIT FAILURE')
+      this.dispatch({event: ConvoDispatchEvent.Ready})
+    } catch (e: any) {
+      logger.error('Convo: setup() failed')
+
+      this.dispatch({
+        event: ConvoDispatchEvent.Error,
+        payload: {
+          exception: e,
+          code: ConvoErrorCode.InitFailed,
+          retry: () => {
+            this.reset()
+          },
+        },
+      })
+    }
+  }
+
+  init() {
+    this.dispatch({event: ConvoDispatchEvent.Init})
+  }
+
+  resume() {
+    this.dispatch({event: ConvoDispatchEvent.Resume})
+  }
+
+  background() {
+    this.dispatch({event: ConvoDispatchEvent.Background})
+  }
+
+  suspend() {
+    this.dispatch({event: ConvoDispatchEvent.Suspend})
+    DEBUG_ACTIVE_CHAT = undefined
+  }
+
+  private pendingFetchConvo:
+    | Promise<{
+        convo: ChatBskyConvoDefs.ConvoView
+        sender: AppBskyActorDefs.ProfileViewBasic | undefined
+        recipients: AppBskyActorDefs.ProfileViewBasic[]
+      }>
+    | undefined
+  async fetchConvo() {
+    if (this.pendingFetchConvo) return this.pendingFetchConvo
+
+    this.pendingFetchConvo = new Promise<{
+      convo: ChatBskyConvoDefs.ConvoView
+      sender: AppBskyActorDefs.ProfileViewBasic | undefined
+      recipients: AppBskyActorDefs.ProfileViewBasic[]
+    }>(async (resolve, reject) => {
+      try {
+        const response = await this.agent.api.chat.bsky.convo.getConvo(
+          {
+            convoId: this.convoId,
+          },
+          {
+            headers: {
+              Authorization: this.__tempFromUserDid,
+            },
+          },
+        )
+
+        const convo = response.data.convo
+
+        resolve({
+          convo,
+          sender: convo.members.find(m => m.did === this.__tempFromUserDid),
+          recipients: convo.members.filter(
+            m => m.did !== this.__tempFromUserDid,
+          ),
+        })
+      } catch (e) {
+        reject(e)
+      } finally {
+        this.pendingFetchConvo = undefined
+      }
+    })
+
+    return this.pendingFetchConvo
+  }
+
+  async refreshConvo() {
+    try {
+      const {convo, sender, recipients} = await this.fetchConvo()
+      // throw new Error('UNCOMMENT TO TEST REFRESH FAILURE')
+      this.convo = convo || this.convo
+      this.sender = sender || this.sender
+      this.recipients = recipients || this.recipients
+    } catch (e: any) {
+      logger.error(`Convo: failed to refresh convo`)
+
+      this.footerItems.set(ConvoItemError.Network, {
+        type: 'error-recoverable',
+        key: ConvoItemError.Network,
+        code: ConvoItemError.Network,
+        retry: () => {
+          this.footerItems.delete(ConvoItemError.Network)
+          this.resume()
+        },
+      })
+      this.commit()
+    }
+  }
+
+  async fetchMessageHistory() {
+    logger.debug('Convo: fetch message history', {}, logger.DebugContext.convo)
+
+    /*
+     * If historyCursor is null, we've fetched all history.
+     */
+    if (this.historyCursor === null) return
+
+    /*
+     * Don't fetch again if a fetch is already in progress
+     */
+    if (this.isFetchingHistory) return
+
+    /*
+     * If we've rendered a retry state for history fetching, exit. Upon retry,
+     * this will be removed and we'll try again.
+     */
+    if (this.headerItems.has(ConvoItemError.HistoryFailed)) return
+
+    try {
+      this.isFetchingHistory = true
+      this.commit()
+
+      /*
+       * Delay if paginating while scrolled to prevent momentum scrolling from
+       * jerking the list around, plus makes it feel a little more human.
+       */
+      if (this.pastMessages.size > 0) {
+        await new Promise(y => setTimeout(y, 500))
+        // throw new Error('UNCOMMENT TO TEST RETRY')
+      }
+
+      const response = await this.agent.api.chat.bsky.convo.getMessages(
+        {
+          cursor: this.historyCursor,
+          convoId: this.convoId,
+          limit: isNative ? 25 : 50,
+        },
+        {
+          headers: {
+            Authorization: this.__tempFromUserDid,
+          },
+        },
+      )
+      const {cursor, messages} = response.data
+
+      this.historyCursor = cursor ?? null
+
+      for (const message of messages) {
+        if (
+          ChatBskyConvoDefs.isMessageView(message) ||
+          ChatBskyConvoDefs.isDeletedMessageView(message)
+        ) {
+          this.pastMessages.set(message.id, message)
+
+          // set to latest rev
+          if (
+            message.rev > (this.eventsCursor = this.eventsCursor || message.rev)
+          ) {
+            this.eventsCursor = message.rev
+          }
+        }
+      }
+    } catch (e: any) {
+      logger.error('Convo: failed to fetch message history')
+
+      this.headerItems.set(ConvoItemError.HistoryFailed, {
+        type: 'error-recoverable',
+        key: ConvoItemError.HistoryFailed,
+        code: ConvoItemError.HistoryFailed,
+        retry: () => {
+          this.headerItems.delete(ConvoItemError.HistoryFailed)
+          this.fetchMessageHistory()
+        },
+      })
+    } finally {
+      this.isFetchingHistory = false
+      this.commit()
+    }
+  }
+
+  private restartPoll() {
+    this.cancelNextPoll()
+    this.pollLatestEvents()
+  }
+
+  private cancelNextPoll() {
+    if (this.nextPoll) clearTimeout(this.nextPoll)
+  }
+
+  private pollLatestEvents() {
+    /*
+     * Uncomment to view poll events
+     */
+    logger.debug('Convo: poll events', {id: this.id}, logger.DebugContext.convo)
+
+    try {
+      this.fetchLatestEvents().then(({events}) => {
+        this.applyLatestEvents(events)
+      })
+      this.nextPoll = setTimeout(() => {
+        this.pollLatestEvents()
+      }, this.pollInterval)
+    } catch (e: any) {
+      logger.error('Convo: poll events failed')
+
+      this.cancelNextPoll()
+
+      this.footerItems.set(ConvoItemError.PollFailed, {
+        type: 'error-recoverable',
+        key: ConvoItemError.PollFailed,
+        code: ConvoItemError.PollFailed,
+        retry: () => {
+          this.footerItems.delete(ConvoItemError.PollFailed)
+          this.commit()
+          this.pollLatestEvents()
+        },
+      })
+
+      this.commit()
+    }
+  }
+
+  private pendingFetchLatestEvents:
+    | Promise<{
+        events: ChatBskyConvoGetLog.OutputSchema['logs']
+      }>
+    | undefined
+  async fetchLatestEvents() {
+    if (this.pendingFetchLatestEvents) return this.pendingFetchLatestEvents
+
+    this.pendingFetchLatestEvents = new Promise<{
+      events: ChatBskyConvoGetLog.OutputSchema['logs']
+    }>(async (resolve, reject) => {
+      try {
+        // throw new Error('UNCOMMENT TO TEST POLL FAILURE')
+        const response = await this.agent.api.chat.bsky.convo.getLog(
+          {
+            cursor: this.eventsCursor,
+          },
+          {
+            headers: {
+              Authorization: this.__tempFromUserDid,
+            },
+          },
+        )
+        const {logs} = response.data
+        resolve({events: logs})
+      } catch (e) {
+        reject(e)
+      } finally {
+        this.pendingFetchLatestEvents = undefined
+      }
+    })
+
+    return this.pendingFetchLatestEvents
+  }
+
+  private applyLatestEvents(events: ChatBskyConvoGetLog.OutputSchema['logs']) {
+    let needsCommit = false
+
+    for (const ev of events) {
+      /*
+       * If there's a rev, we should handle it. If there's not a rev, we don't
+       * know what it is.
+       */
+      if (typeof ev.rev === 'string') {
+        /*
+         * We only care about new events
+         */
+        if (ev.rev > (this.eventsCursor = this.eventsCursor || ev.rev)) {
+          /*
+           * Update rev regardless of if it's a ev type we care about or not
+           */
+          this.eventsCursor = ev.rev
+
+          /*
+           * This is VERY important. We don't want to insert any messages from
+           * your other chats.
+           */
+          if (ev.convoId !== this.convoId) continue
+
+          if (
+            ChatBskyConvoDefs.isLogCreateMessage(ev) &&
+            ChatBskyConvoDefs.isMessageView(ev.message)
+          ) {
+            if (this.newMessages.has(ev.message.id)) {
+              // Trust the ev as the source of truth on ordering
+              this.newMessages.delete(ev.message.id)
+            }
+            this.newMessages.set(ev.message.id, ev.message)
+            needsCommit = true
+          } else if (
+            ChatBskyConvoDefs.isLogDeleteMessage(ev) &&
+            ChatBskyConvoDefs.isDeletedMessageView(ev.message)
+          ) {
+            /*
+             * Update if we have this in state. If we don't, don't worry about it.
+             */
+            if (this.pastMessages.has(ev.message.id)) {
+              /*
+               * For now, we remove deleted messages from the thread, if we receive one.
+               *
+               * To support them, it'd look something like this:
+               *   this.pastMessages.set(ev.message.id, ev.message)
+               */
+              this.pastMessages.delete(ev.message.id)
+              this.newMessages.delete(ev.message.id)
+              this.deletedMessages.delete(ev.message.id)
+              needsCommit = true
+            }
+          }
+        }
+      }
+    }
+
+    if (needsCommit) {
+      this.commit()
+    }
+  }
+
+  async sendMessage(message: ChatBskyConvoSendMessage.InputSchema['message']) {
+    // Ignore empty messages for now since they have no other purpose atm
+    if (!message.text.trim()) return
+
+    logger.debug('Convo: send message', {}, logger.DebugContext.convo)
+
+    const tempId = nanoid()
+
+    this.pendingMessages.set(tempId, {
+      id: tempId,
+      message,
+    })
+    this.commit()
+
+    if (!this.isProcessingPendingMessages) {
+      this.processPendingMessages()
+    }
+  }
+
+  async processPendingMessages() {
+    logger.debug(
+      `Convo: processing messages (${this.pendingMessages.size} remaining)`,
+      {},
+      logger.DebugContext.convo,
+    )
+
+    const pendingMessage = Array.from(this.pendingMessages.values()).shift()
+
+    /*
+     * If there are no pending messages, we're done.
+     */
+    if (!pendingMessage) {
+      this.isProcessingPendingMessages = false
+      return
+    }
+
+    try {
+      this.isProcessingPendingMessages = true
+
+      // throw new Error('UNCOMMENT TO TEST RETRY')
+      const {id, message} = pendingMessage
+
+      const response = await this.agent.api.chat.bsky.convo.sendMessage(
+        {
+          convoId: this.convoId,
+          message,
+        },
+        {
+          encoding: 'application/json',
+          headers: {
+            Authorization: this.__tempFromUserDid,
+          },
+        },
+      )
+      const res = response.data
+
+      /*
+       * Insert into `newMessages` as soon as we have a real ID. That way, when
+       * we get an event log back, we can replace in situ.
+       */
+      this.newMessages.set(res.id, {
+        ...res,
+        $type: 'chat.bsky.convo.defs#messageView',
+        sender: this.sender,
+      })
+      this.pendingMessages.delete(id)
+
+      await this.processPendingMessages()
+
+      this.commit()
+    } catch (e) {
+      this.footerItems.set('pending-retry', {
+        type: 'pending-retry',
+        key: 'pending-retry',
+        retry: this.batchRetryPendingMessages.bind(this),
+      })
+      this.commit()
+    }
+  }
+
+  async batchRetryPendingMessages() {
+    logger.debug(
+      `Convo: retrying ${this.pendingMessages.size} pending messages`,
+      {},
+      logger.DebugContext.convo,
+    )
+
+    this.footerItems.delete('pending-retry')
+    this.commit()
+
+    try {
+      const messageArray = Array.from(this.pendingMessages.values())
+      const {data} = await this.agent.api.chat.bsky.convo.sendMessageBatch(
+        {
+          items: messageArray.map(({message}) => ({
+            convoId: this.convoId,
+            message,
+          })),
+        },
+        {
+          encoding: 'application/json',
+          headers: {
+            Authorization: this.__tempFromUserDid,
+          },
+        },
+      )
+      const {items} = data
+
+      /*
+       * Insert into `newMessages` as soon as we have a real ID. That way, when
+       * we get an event log back, we can replace in situ.
+       */
+      for (const item of items) {
+        this.newMessages.set(item.id, {
+          ...item,
+          $type: 'chat.bsky.convo.defs#messageView',
+          sender: this.convo?.members.find(
+            m => m.did === this.__tempFromUserDid,
+          ),
+        })
+      }
+
+      for (const pendingMessage of messageArray) {
+        this.pendingMessages.delete(pendingMessage.id)
+      }
+
+      this.commit()
+    } catch (e) {
+      this.footerItems.set('pending-retry', {
+        type: 'pending-retry',
+        key: 'pending-retry',
+        retry: this.batchRetryPendingMessages.bind(this),
+      })
+      this.commit()
+    }
+  }
+
+  async deleteMessage(messageId: string) {
+    logger.debug('Convo: delete message', {}, logger.DebugContext.convo)
+
+    this.deletedMessages.add(messageId)
+    this.commit()
+
+    try {
+      await this.agent.api.chat.bsky.convo.deleteMessageForSelf(
+        {
+          convoId: this.convoId,
+          messageId,
+        },
+        {
+          encoding: 'application/json',
+          headers: {
+            Authorization: this.__tempFromUserDid,
+          },
+        },
+      )
+    } catch (e) {
+      this.deletedMessages.delete(messageId)
+      this.commit()
+      throw e
+    }
+  }
+
+  /*
+   * Items in reverse order, since FlatList inverts
+   */
+  getItems(): ConvoItem[] {
+    const items: ConvoItem[] = []
+
+    this.headerItems.forEach(item => {
+      items.push(item)
+    })
+
+    this.pastMessages.forEach(m => {
+      if (ChatBskyConvoDefs.isMessageView(m)) {
+        items.unshift({
+          type: 'message',
+          key: m.id,
+          message: m,
+          nextMessage: null,
+        })
+      } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
+        items.unshift({
+          type: 'deleted-message',
+          key: m.id,
+          message: m,
+          nextMessage: null,
+        })
+      }
+    })
+
+    this.newMessages.forEach(m => {
+      if (ChatBskyConvoDefs.isMessageView(m)) {
+        items.push({
+          type: 'message',
+          key: m.id,
+          message: m,
+          nextMessage: null,
+        })
+      } else if (ChatBskyConvoDefs.isDeletedMessageView(m)) {
+        items.push({
+          type: 'deleted-message',
+          key: m.id,
+          message: m,
+          nextMessage: null,
+        })
+      }
+    })
+
+    this.pendingMessages.forEach(m => {
+      items.push({
+        type: 'pending-message',
+        key: m.id,
+        message: {
+          ...m.message,
+          id: nanoid(),
+          rev: '__fake__',
+          sentAt: new Date().toISOString(),
+          sender: this.sender,
+        },
+        nextMessage: null,
+      })
+    })
+
+    this.footerItems.forEach(item => {
+      items.push(item)
+    })
+
+    return items
+      .filter(item => {
+        if (isConvoItemMessage(item)) {
+          return !this.deletedMessages.has(item.message.id)
+        }
+        return true
+      })
+      .map((item, i, arr) => {
+        let nextMessage = null
+        const isMessage = isConvoItemMessage(item)
+
+        if (isMessage) {
+          if (
+            isMessage &&
+            (ChatBskyConvoDefs.isMessageView(item.message) ||
+              ChatBskyConvoDefs.isDeletedMessageView(item.message))
+          ) {
+            const next = arr[i + 1]
+
+            if (
+              isConvoItemMessage(next) &&
+              next &&
+              (ChatBskyConvoDefs.isMessageView(next.message) ||
+                ChatBskyConvoDefs.isDeletedMessageView(next.message))
+            ) {
+              nextMessage = next.message
+            }
+          }
+
+          return {
+            ...item,
+            nextMessage,
+          }
+        }
+
+        return item
+      })
+  }
+}