diff options
author | Vika <vika@fireburn.ru> | 2022-10-14 17:54:22 +0300 |
---|---|---|
committer | Vika <vika@fireburn.ru> | 2022-10-14 17:54:22 +0300 |
commit | 3fd63e5e9d6a05e1dcd84a0ca6dd0930f3ef9cf6 (patch) | |
tree | f7c0e2b1c8f23e3bf5ecb385d086f72f73934c19 /kittybox-rs/src/media/storage/mod.rs | |
parent | 9f7b903901acb0cd6ec9cb2146406a92ebf79cab (diff) | |
download | kittybox-3fd63e5e9d6a05e1dcd84a0ca6dd0930f3ef9cf6.tar.zst |
media: implement file range requests for backends
For now it is not yet exposed on the frontend, but that is merely a matter of time. TODO possibly remove the legacy methods, since they're obsoleted
Diffstat (limited to 'kittybox-rs/src/media/storage/mod.rs')
-rw-r--r-- | kittybox-rs/src/media/storage/mod.rs | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs index 4ef7c7a..020999c 100644 --- a/kittybox-rs/src/media/storage/mod.rs +++ b/kittybox-rs/src/media/storage/mod.rs @@ -3,6 +3,7 @@ use axum::extract::multipart::Field; use tokio_stream::Stream; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use std::ops::Bound; use std::pin::Pin; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -100,5 +101,77 @@ pub trait MediaStore: 'static + Send + Sync + Clone { filename: &str, ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>; + async fn stream_range( + &self, + domain: &str, + filename: &str, + range: (Bound<u64>, Bound<u64>) + ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> { + use futures::stream::TryStreamExt; + use tracing::debug; + let (metadata, mut stream) = self.read_streaming(domain, filename).await?; + let length = metadata.length.unwrap().get(); + + use Bound::*; + let (start, end): (usize, usize) = match range { + (Unbounded, Unbounded) => return Ok(stream), + (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), + (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), + (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), + (_, _) => unreachable!() + }; + + stream = Box::pin( + stream.map_ok({ + let mut bytes_skipped = 0usize; + let mut bytes_read = 0usize; + + move |chunk| { + debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); + let chunk = if bytes_skipped < start { + let need_to_skip = start - bytes_skipped; + if chunk.len() < need_to_skip { + return None + } + debug!("Skipping {} bytes", need_to_skip); + bytes_skipped += need_to_skip; + + chunk.slice(need_to_skip..) + } else { + chunk + }; + + debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); + bytes_read += chunk.len(); + + if bytes_read > length { + if bytes_read - length > chunk.len() { + return None + } + debug!("Truncating last {} bytes", bytes_read - length); + return Some(chunk.slice(..chunk.len() - (bytes_read - length))) + } + + Some(chunk) + } + }) + .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + .map_ok(|x| x.unwrap()) + ); + + return Ok(stream); + } + + /// Read metadata for a file. + /// + /// The default implementation uses the `read_streaming` method + /// and drops the stream containing file content. + async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> { + self.read_streaming(domain, filename) + .await + .map(|(meta, stream)| meta) + } + async fn delete(&self, domain: &str, filename: &str) -> Result<()>; } |