From 3fd63e5e9d6a05e1dcd84a0ca6dd0930f3ef9cf6 Mon Sep 17 00:00:00 2001 From: Vika Date: Fri, 14 Oct 2022 17:54:22 +0300 Subject: media: implement file range requests for backends For now it is not yet exposed on the frontend, but that is merely a matter of time. TODO possibly remove the legacy methods, since they're obsoleted --- kittybox-rs/src/media/storage/file.rs | 217 +++++++++++++++++++++++++++++++--- kittybox-rs/src/media/storage/mod.rs | 73 ++++++++++++ 2 files changed, 274 insertions(+), 16 deletions(-) diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs index 42149aa..824f326 100644 --- a/kittybox-rs/src/media/storage/file.rs +++ b/kittybox-rs/src/media/storage/file.rs @@ -2,8 +2,9 @@ use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result}; use async_trait::async_trait; use std::{path::PathBuf, fmt::Debug}; use tokio::fs::OpenOptions; -use tokio::io::{BufReader, BufWriter, AsyncWriteExt}; +use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt}; use futures::{StreamExt, TryStreamExt}; +use std::ops::{Bound, RangeBounds, Neg}; use std::pin::Pin; use sha2::Digest; use futures::FutureExt; @@ -45,7 +46,7 @@ impl FileStore { #[async_trait] impl MediaStore for FileStore { - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip(self, content))] async fn write_streaming( &self, domain: &str, @@ -137,19 +138,13 @@ impl MediaStore for FileStore { filename: &str, ) -> Result<(Metadata, Pin> + Send>>)> { let path = self.base.join(format!("{}{}", domain, filename)); - let metapath = self.base.join(format!("{}{}.json", domain, filename)); - debug!("Path: {}, metadata: {}", path.display(), metapath.display()); + debug!("Path: {}", path.display()); let file = OpenOptions::new() .read(true) .open(path) .await?; - let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) - .map_err(|err| MediaStoreError { - kind: ErrorKind::Json, - msg: format!("{}", err), - source: Some(Box::new(err)) - })?; + let meta = self.metadata(domain, filename).await?; Ok((meta, Box::pin( tokio_util::io::ReaderStream::new( @@ -164,6 +159,86 @@ impl MediaStore for FileStore { ))) } + #[tracing::instrument(skip(self))] + async fn metadata(&self, domain: &str, filename: &str) -> Result { + let metapath = self.base.join(format!("{}{}.json", domain, filename)); + debug!("Metadata path: {}", metapath.display()); + + let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?) + .map_err(|err| MediaStoreError { + kind: ErrorKind::Json, + msg: format!("{}", err), + source: Some(Box::new(err)) + })?; + + Ok(meta) + } + + #[tracing::instrument(skip(self))] + async fn stream_range( + &self, + domain: &str, + filename: &str, + range: (Bound, Bound) + ) -> Result> + Send>>> { + let path = self.base.join(format!("{}{}", domain, filename)); + let metapath = self.base.join(format!("{}{}.json", domain, filename)); + debug!("Path: {}, metadata: {}", path.display(), metapath.display()); + + let mut file = OpenOptions::new() + .read(true) + .open(path) + .await?; + + let start = match range { + (Bound::Included(bound), _) => { + debug!("Seeking {} bytes forward...", bound); + file.seek(std::io::SeekFrom::Start(bound)).await? + } + (Bound::Excluded(_), _) => unreachable!(), + (Bound::Unbounded, Bound::Included(bound)) => { + // Seek to the end minus the bounded bytes + debug!("Seeking {} bytes back from the end...", bound); + file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await? + }, + (Bound::Unbounded, Bound::Unbounded) => 0, + (_, Bound::Excluded(_)) => unreachable!() + }; + + let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file))) + .map_ok({ + let mut bytes_read = 0usize; + let len = match range { + (_, Bound::Unbounded) => None, + (Bound::Unbounded, Bound::Included(bound)) => Some(bound), + (_, Bound::Included(bound)) => Some(bound + 1 - start), + (_, Bound::Excluded(_)) => unreachable!() + }; + move |chunk| { + debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); + bytes_read += chunk.len(); + if let Some(len) = len.map(|len| len.try_into().unwrap()) { + if bytes_read > len { + if bytes_read - len > chunk.len() { + return None + } + debug!("Truncating last {} bytes", bytes_read - len); + return Some(chunk.slice(..chunk.len() - (bytes_read - len))) + } + } + + Some(chunk) + } + }) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + // Will never panic, because the moment the stream yields + // a None, it is considered exhausted. + .map_ok(|x| x.unwrap()); + + return Ok(Box::pin(stream)) + } + + async fn delete(&self, domain: &str, filename: &str) -> Result<()> { let path = self.base.join(format!("{}/{}", domain, filename)); @@ -174,17 +249,127 @@ impl MediaStore for FileStore { #[cfg(test)] mod tests { use super::{Metadata, FileStore, MediaStore}; + use std::ops::Bound; use tokio::io::AsyncReadExt; #[tokio::test] #[tracing_test::traced_test] - async fn test_streaming_read_write() { + async fn test_ranges() { let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); let store = FileStore::new(tempdir.path()); - let tempdir_path = tempdir.into_path(); + let file: &[u8] = include_bytes!("./file.rs"); + let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); + let metadata = Metadata { + filename: Some("file.rs".to_string()), + content_type: Some("text/plain".to_string()), + length: None, + etag: None, + }; + + // write through the interface + let filename = store.write_streaming( + "fireburn.ru", + metadata, stream + ).await.unwrap(); + + tracing::debug!("Writing complete."); + + // Ensure the file is there + let content = tokio::fs::read( + tempdir.path() + .join("fireburn.ru") + .join(&filename) + ).await.unwrap(); + assert_eq!(content, file); + + tracing::debug!("Reading range from the start..."); + // try to read range + let range = { + let stream = store.stream_range( + "fireburn.ru", &("/".to_string() + &filename), + (Bound::Included(0), Bound::Included(299)) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), 300); + assert_eq!(range.as_slice(), &file[..=299]); + + tracing::debug!("Reading range from the middle..."); + + let range = { + let stream = store.stream_range( + "fireburn.ru", &("/".to_string() + &filename), + (Bound::Included(150), Bound::Included(449)) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), 300); + assert_eq!(range.as_slice(), &file[150..=449]); + + tracing::debug!("Reading range from the end..."); + let range = { + let stream = store.stream_range( + "fireburn.ru", &("/".to_string() + &filename), + // Note: the `headers` crate parses bounds in a + // non-standard way, where unbounded start actually + // means getting things from the end... + (Bound::Unbounded, Bound::Included(300)) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), 300); + assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]); + + tracing::debug!("Reading the whole file..."); + // try to read range + let range = { + let stream = store.stream_range( + "fireburn.ru", &("/".to_string() + &filename), + (Bound::Unbounded, Bound::Unbounded) + ).await.unwrap(); + + let mut reader = tokio_util::io::StreamReader::new(stream); + + let mut buf = Vec::default(); + reader.read_to_end(&mut buf).await.unwrap(); + + buf + }; + + assert_eq!(range.len(), file.len()); + assert_eq!(range.as_slice(), file); + } + + + #[tokio::test] + #[tracing_test::traced_test] + async fn test_streaming_read_write() { + let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir"); + let store = FileStore::new(tempdir.path()); - let file: &[u8] = include_bytes!("../../../companion-lite/style.css"); + let file: &[u8] = include_bytes!("./file.rs"); let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i)))); let metadata = Metadata { filename: Some("style.css".to_string()), @@ -198,12 +383,12 @@ mod tests { "fireburn.ru", metadata, stream ).await.unwrap(); - println!("{}, {}", filename, tempdir_path + println!("{}, {}", filename, tempdir.path() .join("fireburn.ru") .join(&filename) .display()); let content = tokio::fs::read( - tempdir_path + tempdir.path() .join("fireburn.ru") .join(&filename) ).await.unwrap(); @@ -211,7 +396,7 @@ mod tests { // check internal metadata format let meta: Metadata = serde_json::from_slice(&tokio::fs::read( - tempdir_path + tempdir.path() .join("fireburn.ru") .join(filename.clone() + ".json") ).await.unwrap()).unwrap(); diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs index 4ef7c7a..020999c 100644 --- a/kittybox-rs/src/media/storage/mod.rs +++ b/kittybox-rs/src/media/storage/mod.rs @@ -3,6 +3,7 @@ use axum::extract::multipart::Field; use tokio_stream::Stream; use bytes::Bytes; use serde::{Deserialize, Serialize}; +use std::ops::Bound; use std::pin::Pin; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -100,5 +101,77 @@ pub trait MediaStore: 'static + Send + Sync + Clone { filename: &str, ) -> Result<(Metadata, Pin> + Send>>)>; + async fn stream_range( + &self, + domain: &str, + filename: &str, + range: (Bound, Bound) + ) -> Result> + Send>>> { + use futures::stream::TryStreamExt; + use tracing::debug; + let (metadata, mut stream) = self.read_streaming(domain, filename).await?; + let length = metadata.length.unwrap().get(); + + use Bound::*; + let (start, end): (usize, usize) = match range { + (Unbounded, Unbounded) => return Ok(stream), + (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1), + (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1), + (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()), + (_, _) => unreachable!() + }; + + stream = Box::pin( + stream.map_ok({ + let mut bytes_skipped = 0usize; + let mut bytes_read = 0usize; + + move |chunk| { + debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len()); + let chunk = if bytes_skipped < start { + let need_to_skip = start - bytes_skipped; + if chunk.len() < need_to_skip { + return None + } + debug!("Skipping {} bytes", need_to_skip); + bytes_skipped += need_to_skip; + + chunk.slice(need_to_skip..) + } else { + chunk + }; + + debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len()); + bytes_read += chunk.len(); + + if bytes_read > length { + if bytes_read - length > chunk.len() { + return None + } + debug!("Truncating last {} bytes", bytes_read - length); + return Some(chunk.slice(..chunk.len() - (bytes_read - length))) + } + + Some(chunk) + } + }) + .try_skip_while(|x| std::future::ready(Ok(x.is_none()))) + .try_take_while(|x| std::future::ready(Ok(x.is_some()))) + .map_ok(|x| x.unwrap()) + ); + + return Ok(stream); + } + + /// Read metadata for a file. + /// + /// The default implementation uses the `read_streaming` method + /// and drops the stream containing file content. + async fn metadata(&self, domain: &str, filename: &str) -> Result { + self.read_streaming(domain, filename) + .await + .map(|(meta, stream)| meta) + } + async fn delete(&self, domain: &str, filename: &str) -> Result<()>; } -- cgit 1.4.1