use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
use async_trait::async_trait;
use std::{path::PathBuf, fmt::Debug};
use tokio::fs::OpenOptions;
use tokio::io::{BufReader, BufWriter, AsyncWriteExt};
use futures::{StreamExt, TryStreamExt};
use std::pin::Pin;
use sha2::Digest;
use futures::FutureExt;
use tracing::{debug, error};
const BUF_CAPACITY: usize = 16 * 1024;
#[derive(Clone)]
pub struct FileStore {
base: PathBuf,
}
impl From<tokio::io::Error> for MediaStoreError {
fn from(source: tokio::io::Error) -> Self {
Self {
msg: format!("file I/O error: {}", source),
kind: match source.kind() {
std::io::ErrorKind::NotFound => ErrorKind::NotFound,
_ => ErrorKind::Backend
},
source: Some(Box::new(source)),
}
}
}
impl FileStore {
pub fn new<T: Into<PathBuf>>(base: T) -> Self {
Self { base: base.into() }
}
async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> {
use rand::{Rng, distributions::Alphanumeric};
tokio::fs::create_dir_all(self.base.as_path()).await?;
loop {
let filename = self.base.join(format!("temp.{}", {
let string = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(16)
.collect::<Vec<u8>>();
String::from_utf8(string).unwrap()
}));
match OpenOptions::new()
.create_new(true)
.write(true)
.open(&filename)
.await
{
// TODO: determine if BufWriter provides benefit here
Ok(file) => return Ok((filename, BufWriter::with_capacity(BUF_CAPACITY, file))),
Err(err) => match err.kind() {
std::io::ErrorKind::AlreadyExists => continue,
_ => return Err(err.into())
}
}
}
}
}
#[async_trait]
impl MediaStore for FileStore {
#[tracing::instrument(skip(self))]
async fn write_streaming<T>(
&self,
domain: &str,
mut metadata: Metadata,
mut content: T,
) -> Result<String>
where
T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug
{
let (tempfilepath, mut tempfile) = self.mktemp().await?;
debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display());
let mut hasher = sha2::Sha256::new();
let mut length: usize = 0;
while let Some(chunk) = content.next().await {
let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError {
kind: ErrorKind::Backend,
source: Some(Box::new(err)),
msg: "Failed to read a data chunk".to_owned()
})?);
debug!("Read {} bytes from the stream", chunk.len());
length += chunk.len();
let (write_result, _hasher) = tokio::join!(
tempfile.write_all(&*chunk),
{
let chunk = chunk.clone();
tokio::task::spawn_blocking(move || {
hasher.update(&*chunk);
hasher
}).map(|r| r.unwrap())
}
);
if let Err(err) = write_result {
error!("Error while writing pending upload: {}", err);
drop(tempfile);
// this is just cleanup, nothing fails if it fails
// though temporary files might take up space on the hard drive
// We'll clean them when maintenance time comes
#[allow(unused_must_use)]
{ tokio::fs::remove_file(tempfilepath).await; }
return Err(err.into());
}
hasher = _hasher;
}
let hash = hasher.finalize();
debug!("Pending upload hash: {}", hex::encode(&hash));
let filename = format!(
"{}/{}/{}/{}/{}",
hex::encode([hash[0]]),
hex::encode([hash[1]]),
hex::encode([hash[2]]),
hex::encode([hash[3]]),
hex::encode(&hash[4..32])
);
metadata.length = Some(length);
let domain_str = domain.to_string();
let filepath = self.base.join(domain_str.as_str()).join(&filename);
let metafilename = filename.clone() + ".json";
let metapath = self.base.join(domain_str.as_str()).join(&metafilename);
let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp");
debug!("File path: {}, metadata: {}", filepath.display(), metapath.display());
{
let parent = filepath.parent().unwrap();
tokio::fs::create_dir_all(parent).await?;
}
let mut meta = OpenOptions::new()
.create_new(true)
.write(true)
.open(&metatemppath)
.await?;
meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
tokio::fs::rename(tempfilepath, filepath).await?;
tokio::fs::rename(metatemppath, metapath).await?;
Ok(filename)
}
#[tracing::instrument(skip(self))]
async fn read_streaming(
&self,
domain: &str,
filename: &str,
) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
let path = self.base.join(format!("{}{}", domain, filename));
let metapath = self.base.join(format!("{}{}.json", domain, filename));
debug!("Path: {}, metadata: {}", path.display(), metapath.display());
let file = OpenOptions::new()
.read(true)
.open(path)
.await?;
let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
.map_err(|err| MediaStoreError {
kind: ErrorKind::Json,
msg: format!("{}", err),
source: Some(Box::new(err))
})?;
Ok((meta, Box::pin(
tokio_util::io::ReaderStream::new(
// TODO: determine if BufReader provides benefit here
// From the logs it looks like we're reading 4KiB at a time
// Buffering file contents seems to double download speed
// How to benchmark this?
tokio::io::BufReader::with_capacity(BUF_CAPACITY, file)
)
// Sprinkle some salt in form of protective log wrapping
.inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len()))
)))
}
async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
let path = self.base.join(format!("{}/{}", domain, filename));
Ok(tokio::fs::remove_file(path).await?)
}
}
#[cfg(test)]
mod tests {
use super::{Metadata, FileStore, MediaStore};
use tokio::io::AsyncReadExt;
#[tokio::test]
async fn test_streaming_read_write() {
let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir");
let store = FileStore::new(tempdir.path());
let file: &[u8] = include_bytes!("../../../../README.md");
let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
let metadata = Metadata {
filename: Some("README.md".to_string()),
content_type: "text/markdown".to_string(),
length: None
};
let filename = store.write_streaming(
"fireburn.ru",
metadata, stream
).await.unwrap();
let content = tokio::fs::read(
tempdir.path()
.join("fireburn.ru")
.join(&filename)
).await.unwrap();
assert_eq!(content, file);
let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
tempdir.path()
.join("fireburn.ru")
.join(filename.clone() + ".json")
).await.unwrap()).unwrap();
assert_eq!(&meta.content_type, "text/markdown");
assert_eq!(meta.filename.as_deref(), Some("README.md"));
assert_eq!(meta.length, Some(file.len()));
let (metadata, read_back) = {
let (metadata, stream) = store.read_streaming(
"fireburn.ru",
&("/".to_string() + &filename)
).await.unwrap();
let mut reader = tokio_util::io::StreamReader::new(stream);
let mut buf = Vec::default();
reader.read_to_end(&mut buf).await.unwrap();
(metadata, buf)
};
assert_eq!(read_back, file);
assert_eq!(&metadata.content_type, "text/markdown");
assert_eq!(meta.filename.as_deref(), Some("README.md"));
assert_eq!(meta.length, Some(file.len()));
}
}