about summary refs log tree commit diff
path: root/src/media/storage/file.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/media/storage/file.rs')
-rw-r--r--src/media/storage/file.rs434
1 files changed, 434 insertions, 0 deletions
diff --git a/src/media/storage/file.rs b/src/media/storage/file.rs
new file mode 100644
index 0000000..0aaaa3b
--- /dev/null
+++ b/src/media/storage/file.rs
@@ -0,0 +1,434 @@
+use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
+use async_trait::async_trait;
+use std::{path::PathBuf, fmt::Debug};
+use tokio::fs::OpenOptions;
+use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt};
+use futures::{StreamExt, TryStreamExt};
+use std::ops::{Bound, RangeBounds, Neg};
+use std::pin::Pin;
+use sha2::Digest;
+use futures::FutureExt;
+use tracing::{debug, error};
+
+const BUF_CAPACITY: usize = 16 * 1024;
+
+#[derive(Clone)]
+pub struct FileStore {
+    base: PathBuf,
+}
+
+impl From<tokio::io::Error> for MediaStoreError {
+    fn from(source: tokio::io::Error) -> Self {
+        Self {
+            msg: format!("file I/O error: {}", source),
+            kind: match source.kind() {
+                std::io::ErrorKind::NotFound => ErrorKind::NotFound,
+                _ => ErrorKind::Backend
+            },
+            source: Some(Box::new(source)),
+        }
+    }
+}
+
+impl FileStore {
+    pub fn new<T: Into<PathBuf>>(base: T) -> Self {
+        Self { base: base.into() }
+    }
+
+    async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> {
+        kittybox_util::fs::mktemp(&self.base, "temp", 16)
+            .await
+            .map(|(name, file)| (name, BufWriter::new(file)))
+            .map_err(Into::into)
+    }
+}
+
+#[async_trait]
+impl MediaStore for FileStore {
+
+    #[tracing::instrument(skip(self, content))]
+    async fn write_streaming<T>(
+        &self,
+        domain: &str,
+        mut metadata: Metadata,
+        mut content: T,
+    ) -> Result<String>
+    where
+        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug
+    {
+        let (tempfilepath, mut tempfile) = self.mktemp().await?;
+        debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display());
+        let mut hasher = sha2::Sha256::new();
+        let mut length: usize = 0;
+
+        while let Some(chunk) = content.next().await {
+            let chunk = chunk.map_err(|err| MediaStoreError {
+                kind: ErrorKind::Backend,
+                source: Some(Box::new(err)),
+                msg: "Failed to read a data chunk".to_owned()
+            })?;
+            debug!("Read {} bytes from the stream", chunk.len());
+            length += chunk.len();
+            let (write_result, _hasher) = tokio::join!(
+                {
+                    let chunk = chunk.clone();
+                    let tempfile = &mut tempfile;
+                    async move {
+                        tempfile.write_all(&*chunk).await
+                    }
+                },
+                {
+                    let chunk = chunk.clone();
+                    tokio::task::spawn_blocking(move || {
+                        hasher.update(&*chunk);
+
+                        hasher
+                    }).map(|r| r.unwrap())
+                }
+            );
+            if let Err(err) = write_result {
+                error!("Error while writing pending upload: {}", err);
+                drop(tempfile);
+                // this is just cleanup, nothing fails if it fails
+                // though temporary files might take up space on the hard drive
+                // We'll clean them when maintenance time comes
+                #[allow(unused_must_use)]
+                { tokio::fs::remove_file(tempfilepath).await; }
+                return Err(err.into());
+            }
+            hasher = _hasher;
+        }
+        // Manually flush the buffer and drop the handle to close the file
+        tempfile.flush().await?;
+        tempfile.into_inner().sync_all().await?;
+
+        let hash = hasher.finalize();
+        debug!("Pending upload hash: {}", hex::encode(&hash));
+        let filename = format!(
+            "{}/{}/{}/{}/{}",
+            hex::encode([hash[0]]),
+            hex::encode([hash[1]]),
+            hex::encode([hash[2]]),
+            hex::encode([hash[3]]),
+            hex::encode(&hash[4..32])
+        );
+        let domain_str = domain.to_string();
+        let filepath = self.base.join(domain_str.as_str()).join(&filename);
+        let metafilename = filename.clone() + ".json";
+        let metapath = self.base.join(domain_str.as_str()).join(&metafilename);
+        let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp");
+        metadata.length = std::num::NonZeroUsize::new(length);
+        metadata.etag = Some(hex::encode(&hash));
+        debug!("File path: {}, metadata: {}", filepath.display(), metapath.display());
+        {
+            let parent = filepath.parent().unwrap();
+            tokio::fs::create_dir_all(parent).await?;            
+        }
+        let mut meta = OpenOptions::new()
+            .create_new(true)
+            .write(true)
+            .open(&metatemppath)
+            .await?;
+        meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
+        tokio::fs::rename(tempfilepath, filepath).await?;
+        tokio::fs::rename(metatemppath, metapath).await?;
+        Ok(filename)
+    }
+
+    #[tracing::instrument(skip(self))]
+    async fn read_streaming(
+        &self,
+        domain: &str,
+        filename: &str,
+    ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
+        debug!("Domain: {}, filename: {}", domain, filename);
+        let path = self.base.join(domain).join(filename);
+        debug!("Path: {}", path.display());
+
+        let file = OpenOptions::new()
+            .read(true)
+            .open(path)
+            .await?;
+        let meta = self.metadata(domain, filename).await?;
+
+        Ok((meta, Box::pin(
+            tokio_util::io::ReaderStream::new(
+                // TODO: determine if BufReader provides benefit here
+                // From the logs it looks like we're reading 4KiB at a time
+                // Buffering file contents seems to double download speed
+                // How to benchmark this?
+                BufReader::with_capacity(BUF_CAPACITY, file)
+            )
+            // Sprinkle some salt in form of protective log wrapping
+                .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len()))
+        )))
+    }
+
+    #[tracing::instrument(skip(self))]
+    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
+        let metapath = self.base.join(domain).join(format!("{}.json", filename));
+        debug!("Metadata path: {}", metapath.display());
+
+        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
+            .map_err(|err| MediaStoreError {
+                kind: ErrorKind::Json,
+                msg: format!("{}", err),
+                source: Some(Box::new(err))
+            })?;
+
+        Ok(meta)
+    }
+    
+    #[tracing::instrument(skip(self))]
+    async fn stream_range(
+        &self,
+        domain: &str,
+        filename: &str,
+        range: (Bound<u64>, Bound<u64>)
+    ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> {
+        let path = self.base.join(format!("{}/{}", domain, filename));
+        let metapath = self.base.join(format!("{}/{}.json", domain, filename));
+        debug!("Path: {}, metadata: {}", path.display(), metapath.display());
+
+        let mut file = OpenOptions::new()
+            .read(true)
+            .open(path)
+            .await?;
+
+        let start = match range {
+            (Bound::Included(bound), _) => {
+                debug!("Seeking {} bytes forward...", bound);
+                file.seek(std::io::SeekFrom::Start(bound)).await?
+            }
+            (Bound::Excluded(_), _) => unreachable!(),
+            (Bound::Unbounded, Bound::Included(bound)) => {
+                // Seek to the end minus the bounded bytes
+                debug!("Seeking {} bytes back from the end...", bound);
+                file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await?
+            },
+            (Bound::Unbounded, Bound::Unbounded) => 0,
+            (_, Bound::Excluded(_)) => unreachable!()
+        };
+
+        let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file)))
+            .map_ok({
+                let mut bytes_read = 0usize;
+                let len = match range {
+                    (_, Bound::Unbounded) => None,
+                    (Bound::Unbounded, Bound::Included(bound)) => Some(bound),
+                    (_, Bound::Included(bound)) => Some(bound + 1 - start),
+                    (_, Bound::Excluded(_)) => unreachable!()
+                };
+                move |chunk| {
+                    debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
+                    bytes_read += chunk.len();
+                    if let Some(len) = len.map(|len| len.try_into().unwrap()) {
+                        if bytes_read > len {
+                            if bytes_read - len > chunk.len() {
+                                return None
+                            }
+                            debug!("Truncating last {} bytes", bytes_read - len);
+                            return Some(chunk.slice(..chunk.len() - (bytes_read - len)))
+                        }
+                    }
+
+                    Some(chunk)
+                }
+            })
+            .try_take_while(|x| std::future::ready(Ok(x.is_some())))
+            // Will never panic, because the moment the stream yields
+            // a None, it is considered exhausted.
+            .map_ok(|x| x.unwrap());
+
+        return Ok(Box::pin(stream))
+    }
+
+
+    async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
+        let path = self.base.join(format!("{}/{}", domain, filename));
+
+        Ok(tokio::fs::remove_file(path).await?)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::{Metadata, FileStore, MediaStore};
+    use std::ops::Bound;
+    use tokio::io::AsyncReadExt;
+
+    #[tokio::test]
+    #[tracing_test::traced_test]
+    async fn test_ranges() {
+        let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
+        let store = FileStore::new(tempdir.path());
+
+        let file: &[u8] = include_bytes!("./file.rs");
+        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
+        let metadata = Metadata {
+            filename: Some("file.rs".to_string()),
+            content_type: Some("text/plain".to_string()),
+            length: None,
+            etag: None,
+        };
+
+        // write through the interface
+        let filename = store.write_streaming(
+            "fireburn.ru",
+            metadata, stream
+        ).await.unwrap();
+
+        tracing::debug!("Writing complete.");
+
+        // Ensure the file is there
+        let content = tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(&filename)
+        ).await.unwrap();
+        assert_eq!(content, file);
+
+        tracing::debug!("Reading range from the start...");
+        // try to read range
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &filename,
+                (Bound::Included(0), Bound::Included(299))
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), 300);
+        assert_eq!(range.as_slice(), &file[..=299]);
+
+        tracing::debug!("Reading range from the middle...");
+
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &filename,
+                (Bound::Included(150), Bound::Included(449))
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), 300);
+        assert_eq!(range.as_slice(), &file[150..=449]);
+
+        tracing::debug!("Reading range from the end...");
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &filename,
+                // Note: the `headers` crate parses bounds in a
+                // non-standard way, where unbounded start actually
+                // means getting things from the end...
+                (Bound::Unbounded, Bound::Included(300))
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), 300);
+        assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]);
+
+        tracing::debug!("Reading the whole file...");
+        // try to read range
+        let range = {
+            let stream = store.stream_range(
+                "fireburn.ru", &("/".to_string() + &filename),
+                (Bound::Unbounded, Bound::Unbounded)
+            ).await.unwrap();
+
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            buf
+        };
+
+        assert_eq!(range.len(), file.len());
+        assert_eq!(range.as_slice(), file);
+    }
+
+    
+    #[tokio::test]
+    #[tracing_test::traced_test]
+    async fn test_streaming_read_write() {
+        let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
+        let store = FileStore::new(tempdir.path());
+
+        let file: &[u8] = include_bytes!("./file.rs");
+        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
+        let metadata = Metadata {
+            filename: Some("style.css".to_string()),
+            content_type: Some("text/css".to_string()),
+            length: None,
+            etag: None,
+        };
+
+        // write through the interface
+        let filename = store.write_streaming(
+            "fireburn.ru",
+            metadata, stream
+        ).await.unwrap();
+        println!("{}, {}", filename, tempdir.path()
+                 .join("fireburn.ru")
+                 .join(&filename)
+                 .display());
+        let content = tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(&filename)
+        ).await.unwrap();
+        assert_eq!(content, file);
+
+        // check internal metadata format
+        let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
+            tempdir.path()
+                .join("fireburn.ru")
+                .join(filename.clone() + ".json")
+        ).await.unwrap()).unwrap();
+        assert_eq!(meta.content_type.as_deref(), Some("text/css"));
+        assert_eq!(meta.filename.as_deref(), Some("style.css"));
+        assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
+        assert!(meta.etag.is_some());
+
+        // read back the data using the interface
+        let (metadata, read_back) = {
+            let (metadata, stream) = store.read_streaming(
+                "fireburn.ru",
+                &filename
+            ).await.unwrap();
+            let mut reader = tokio_util::io::StreamReader::new(stream);
+
+            let mut buf = Vec::default();
+            reader.read_to_end(&mut buf).await.unwrap();
+
+            (metadata, buf)
+        };
+
+        assert_eq!(read_back, file);
+        assert_eq!(metadata.content_type.as_deref(), Some("text/css"));
+        assert_eq!(meta.filename.as_deref(), Some("style.css"));
+        assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
+        assert!(meta.etag.is_some());
+
+    }
+}