about summary refs log tree commit diff
path: root/kittybox-rs
diff options
context:
space:
mode:
authorVika <vika@fireburn.ru>2022-10-14 17:54:22 +0300
committerVika <vika@fireburn.ru>2022-10-14 17:54:22 +0300
commit3fd63e5e9d6a05e1dcd84a0ca6dd0930f3ef9cf6 (patch)
treef7c0e2b1c8f23e3bf5ecb385d086f72f73934c19 /kittybox-rs
parent9f7b903901acb0cd6ec9cb2146406a92ebf79cab (diff)
media: implement file range requests for backends
For now it is not yet exposed on the frontend, but that is merely a
matter of time.

TODO possibly remove the legacy methods, since they're obsoleted
Diffstat (limited to 'kittybox-rs')
-rw-r--r--kittybox-rs/src/media/storage/file.rs217
-rw-r--r--kittybox-rs/src/media/storage/mod.rs73
2 files changed, 274 insertions, 16 deletions
diff --git a/kittybox-rs/src/media/storage/file.rs b/kittybox-rs/src/media/storage/file.rs
index 42149aa..824f326 100644
--- a/kittybox-rs/src/media/storage/file.rs
+++ b/kittybox-rs/src/media/storage/file.rs
@@ -2,8 +2,9 @@ use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
 use async_trait::async_trait;
 use std::{path::PathBuf, fmt::Debug};
 use tokio::fs::OpenOptions;
-use tokio::io::{BufReader, BufWriter, AsyncWriteExt};
+use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt};
 use futures::{StreamExt, TryStreamExt};
+use std::ops::{Bound, RangeBounds, Neg};
 use std::pin::Pin;
 use sha2::Digest;
 use futures::FutureExt;
@@ -45,7 +46,7 @@ impl FileStore {
 #[async_trait]
 impl MediaStore for FileStore {
 
-    #[tracing::instrument(skip(self))]
+    #[tracing::instrument(skip(self, content))]
     async fn write_streaming<T>(
         &self,
         domain: &str,
@@ -137,19 +138,13 @@ impl MediaStore for FileStore {
         filename: &str,
     ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
         let path = self.base.join(format!("{}{}", domain, filename));
-        let metapath = self.base.join(format!("{}{}.json", domain, filename));
-        debug!("Path: {}, metadata: {}", path.display(), metapath.display());
+        debug!("Path: {}", path.display());
 
         let file = OpenOptions::new()
             .read(true)
             .open(path)
             .await?;
-        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
-            .map_err(|err| MediaStoreError {
-                kind: ErrorKind::Json,
-                msg: format!("{}", err),
-                source: Some(Box::new(err))
-            })?;
+        let meta = self.metadata(domain, filename).await?;
 
         Ok((meta, Box::pin(
             tokio_util::io::ReaderStream::new(
@@ -164,6 +159,86 @@ impl MediaStore for FileStore {
         )))
     }
 
+    #[tracing::instrument(skip(self))]
+    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
+        let metapath = self.base.join(format!("{}{}.json", domain, filename));
+        debug!("Metadata path: {}", metapath.display());
+
+        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
+            .map_err(|err| MediaStoreError {
+                kind: ErrorKind::Json,
+                msg: format!("{}", err),
+                source: Some(Box::new(err))
+            })?;
+
+        Ok(meta)
+    }
+    
+    #[tracing::instrument(skip(self))]
+    async fn stream_range(
+        &self,
+        domain: &str,
+        filename: &str,
+        range: (Bound<u64>, Bound<u64>)
+    ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> {
+        let path = self.base.join(format!("{}{}", domain, filename));
+        let metapath = self.base.join(format!("{}{}.json", domain, filename));
+        debug!("Path: {}, metadata: {}", path.display(), metapath.display());
+
+        let mut file = OpenOptions::new()
+            .read(true)
+            .open(path)
+            .await?;
+
+        let start = match range {
+            (Bound::Included(bound), _) => {
+                debug!("Seeking {} bytes forward...", bound);
+                file.seek(std::io::SeekFrom::Start(bound)).await?
+            }
+            (Bound::Excluded(_), _) => unreachable!(),
+            (Bound::Unbounded, Bound::Included(bound)) => {
+                // Seek to the end minus the bounded bytes
+                debug!("Seeking {} bytes back from the end...", bound);
+                file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await?
+            },
+            (Bound::Unbounded, Bound::Unbounded) => 0,
+            (_, Bound::Excluded(_)) => unreachable!()
+        };
+
+        let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file)))
+            .map_ok({
+                let mut bytes_read = 0usize;
+                let len = match range {
+                    (_, Bound::Unbounded) => None,
+                    (Bound::Unbounded, Bound::Included(bound)) => Some(bound),
+                    (_, Bound::Included(bound)) => Some(bound + 1 - start),
+                    (_, Bound::Excluded(_)) => unreachable!()
+                };
+                move |chunk| {
+                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
+                    bytes_read += chunk.len();
+                    if let Some(len) = len.map(|len| len.try_into().unwrap()) {
+                        if bytes_read > len {
+                            if bytes_read - len > chunk.len() {
+                                return None
+                            }
+                            debug!("Truncating last {} bytes", bytes_read - len);
+                            return Some(chunk.slice(..chunk.len() - (bytes_read - len)))
+                        }
+                    }
+
+                    Some(chunk)
+                }
+            })
+            .try_take_while(|x| std::future::ready(Ok(x.is_some())))
+            // Will never panic, because the moment the stream yields
+            // a None, it is considered exhausted.
+            .map_ok(|x| x.unwrap());
+
+        return Ok(Box::pin(stream))
+    }
+
+
     async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
         let path = self.base.join(format!("{}/{}", domain, filename));
 
@@ -174,17 +249,127 @@ impl MediaStore for FileStore {
 #[cfg(test)]
 mod tests {
     use super::{Metadata, FileStore, MediaStore};
+    use std::ops::Bound;
     use tokio::io::AsyncReadExt;
 
     #[tokio::test]
     #[tracing_test::traced_test]
-    async fn test_streaming_read_write() {
+    async fn test_ranges() {
         let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir");
         let store = FileStore::new(tempdir.path());
-        let tempdir_path = tempdir.into_path();
 
+        let file: &[u8] = include_bytes!("./file.rs");
+        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
+        let metadata = Metadata {
+            filename: Some("file.rs".to_string()),
+            content_type: Some("text/plain".to_string()),
+            length: None,
+            etag: None,
+        };
+
+        // write through the interface
+        let filename = store.write_streaming(
+            "fireburn.ru",
+            metadata, stream
+        ).await.unwrap();
+
+        tracing::debug!("Writing complete.");
+
+        // Ensure the file is there
+        let content = tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(&filename)
+        ).await.unwrap();
+        assert_eq!(content, file);
+
+        tracing::debug!("Reading range from the start...");
+        // try to read range
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &("/".to_string() + &filename),
+                (Bound::Included(0), Bound::Included(299))
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), 300);
+        assert_eq!(range.as_slice(), &file[..=299]);
+
+        tracing::debug!("Reading range from the middle...");
+
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &("/".to_string() + &filename),
+                (Bound::Included(150), Bound::Included(449))
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), 300);
+        assert_eq!(range.as_slice(), &file[150..=449]);
+
+        tracing::debug!("Reading range from the end...");
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &("/".to_string() + &filename),
+                // Note: the `headers` crate parses bounds in a
+                // non-standard way, where unbounded start actually
+                // means getting things from the end...
+                (Bound::Unbounded, Bound::Included(300))
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), 300);
+        assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]);
+
+        tracing::debug!("Reading the whole file...");
+        // try to read range
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &("/".to_string() + &filename),
+                (Bound::Unbounded, Bound::Unbounded)
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), file.len());
+        assert_eq!(range.as_slice(), file);
+    }
+
+    
+    #[tokio::test]
+    #[tracing_test::traced_test]
+    async fn test_streaming_read_write() {
+        let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir");
+        let store = FileStore::new(tempdir.path());
 
-        let file: &[u8] = include_bytes!("../../../companion-lite/style.css");
+        let file: &[u8] = include_bytes!("./file.rs");
         let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
         let metadata = Metadata {
             filename: Some("style.css".to_string()),
@@ -198,12 +383,12 @@ mod tests {
             "fireburn.ru",
             metadata, stream
         ).await.unwrap();
-        println!("{}, {}", filename, tempdir_path
+        println!("{}, {}", filename, tempdir.path()
                  .join("fireburn.ru")
                  .join(&filename)
                  .display());
         let content = tokio::fs::read(
-            tempdir_path
+            tempdir.path()
                 .join("fireburn.ru")
                 .join(&filename)
         ).await.unwrap();
@@ -211,7 +396,7 @@ mod tests {
 
         // check internal metadata format
         let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
-            tempdir_path
+            tempdir.path()
                 .join("fireburn.ru")
                 .join(filename.clone() + ".json")
         ).await.unwrap()).unwrap();
diff --git a/kittybox-rs/src/media/storage/mod.rs b/kittybox-rs/src/media/storage/mod.rs
index 4ef7c7a..020999c 100644
--- a/kittybox-rs/src/media/storage/mod.rs
+++ b/kittybox-rs/src/media/storage/mod.rs
@@ -3,6 +3,7 @@ use axum::extract::multipart::Field;
 use tokio_stream::Stream;
 use bytes::Bytes;
 use serde::{Deserialize, Serialize};
+use std::ops::Bound;
 use std::pin::Pin;
 use std::fmt::Debug;
 use std::num::NonZeroUsize;
@@ -100,5 +101,77 @@ pub trait MediaStore: 'static + Send + Sync + Clone {
         filename: &str,
     ) -> Result<(Metadata, Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>)>;
 
+    async fn stream_range(
+        &self,
+        domain: &str,
+        filename: &str,
+        range: (Bound<u64>, Bound<u64>)
+    ) -> Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>> {
+        use futures::stream::TryStreamExt;
+        use tracing::debug;
+        let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
+        let length = metadata.length.unwrap().get();
+
+        use Bound::*;
+        let (start, end): (usize, usize) = match range {
+            (Unbounded, Unbounded) => return Ok(stream),
+            (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
+            (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
+            (Included(start), Included(end)) => (start.try_into().unwrap(), end.try_into().unwrap()),
+            (_, _) => unreachable!()
+        };
+
+        stream = Box::pin(
+            stream.map_ok({
+                let mut bytes_skipped = 0usize;
+                let mut bytes_read = 0usize;
+
+                move |chunk| {
+                    debug!("Skipped {}/{} bytes, chunk len {}", bytes_skipped, start, chunk.len());
+                    let chunk = if bytes_skipped < start {
+                        let need_to_skip = start - bytes_skipped;
+                        if chunk.len() < need_to_skip {
+                            return None
+                        }
+                        debug!("Skipping {} bytes", need_to_skip);
+                        bytes_skipped += need_to_skip;
+
+                        chunk.slice(need_to_skip..)
+                    } else {
+                        chunk
+                    };
+
+                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
+                    bytes_read += chunk.len();
+
+                    if bytes_read > length {
+                        if bytes_read - length > chunk.len() {
+                            return None
+                        }
+                        debug!("Truncating last {} bytes", bytes_read - length);
+                        return Some(chunk.slice(..chunk.len() - (bytes_read - length)))
+                    }
+
+                    Some(chunk)
+                }
+            })
+                .try_skip_while(|x| std::future::ready(Ok(x.is_none())))
+                .try_take_while(|x| std::future::ready(Ok(x.is_some())))
+                .map_ok(|x| x.unwrap())
+        );
+
+        return Ok(stream);
+    }
+
+    /// Read metadata for a file.
+    ///
+    /// The default implementation uses the `read_streaming` method
+    /// and drops the stream containing file content.
+    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
+        self.read_streaming(domain, filename)
+            .await
+            .map(|(meta, stream)| meta)
+    }
+
     async fn delete(&self, domain: &str, filename: &str) -> Result<()>;
 }