use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
use std::{path::PathBuf, fmt::Debug};
use tokio::fs::OpenOptions;
use tokio::io::{BufReader, BufWriter, AsyncWriteExt, AsyncSeekExt};
use futures::{StreamExt, TryStreamExt};
use std::ops::{Bound, Neg};
use std::pin::Pin;
use sha2::Digest;
use futures::FutureExt;
use tracing::{debug, error};
const BUF_CAPACITY: usize = 16 * 1024;
#[derive(Clone)]
pub struct FileStore {
base: PathBuf,
}
impl From<tokio::io::Error> for MediaStoreError {
fn from(source: tokio::io::Error) -> Self {
Self {
msg: format!("file I/O error: {}", source),
kind: match source.kind() {
std::io::ErrorKind::NotFound => ErrorKind::NotFound,
_ => ErrorKind::Backend
},
source: Some(Box::new(source)),
}
}
}
impl FileStore {
async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> {
kittybox_util::fs::mktemp(&self.base, "temp", 16)
.await
.map(|(name, file)| (name, BufWriter::new(file)))
.map_err(Into::into)
}
}
impl MediaStore for FileStore {
async fn new(url: &'_ url::Url) -> Result<Self> {
Ok(Self { base: url.path().into() })
}
#[tracing::instrument(skip(self, content))]
async fn write_streaming<T>(
&self,
domain: &str,
mut metadata: Metadata,
mut content: T,
) -> Result<String>
where
T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug
{
let (tempfilepath, mut tempfile) = self.mktemp().await?;
debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display());
let mut hasher = sha2::Sha256::new();
let mut length: usize = 0;
while let Some(chunk) = content.next().await {
let chunk = chunk.map_err(|err| MediaStoreError {
kind: ErrorKind::Backend,
source: Some(Box::new(err)),
msg: "Failed to read a data chunk".to_owned()
})?;
debug!("Read {} bytes from the stream", chunk.len());
length += chunk.len();
let (write_result, _hasher) = tokio::join!(
{
let chunk = chunk.clone();
let tempfile = &mut tempfile;
async move {
tempfile.write_all(&chunk).await
}
},
{
let chunk = chunk.clone();
tokio::task::spawn_blocking(move || {
hasher.update(&*chunk);
hasher
}).map(|r| r.unwrap())
}
);
if let Err(err) = write_result {
error!("Error while writing pending upload: {}", err);
drop(tempfile);
// this is just cleanup, nothing fails if it fails
// though temporary files might take up space on the hard drive
// We'll clean them when maintenance time comes
#[allow(unused_must_use)]
{ tokio::fs::remove_file(tempfilepath).await; }
return Err(err.into());
}
hasher = _hasher;
}
// Manually flush the buffer and drop the handle to close the file
tempfile.flush().await?;
tempfile.into_inner().sync_all().await?;
let hash = data_encoding::HEXLOWER.encode(&hasher.finalize());
debug!("Pending upload hash: {}", hash);
let filename = format!(
"{}/{}/{}/{}/{}",
&hash[0..2],
&hash[2..4],
&hash[4..6],
&hash[6..8],
&hash[8..]
);
let domain_str = domain.to_string();
let filepath = self.base.join(domain_str.as_str()).join(&filename);
let metafilename = filename.clone() + ".json";
let metapath = self.base.join(domain_str.as_str()).join(&metafilename);
let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp");
metadata.length = std::num::NonZeroUsize::new(length);
metadata.etag = Some(hash);
debug!("File path: {}, metadata: {}", filepath.display(), metapath.display());
{
let parent = filepath.parent().unwrap();
tokio::fs::create_dir_all(parent).await?;
}
let mut meta = OpenOptions::new()
.create_new(true)
.write(true)
.open(&metatemppath)
.await?;
meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
tokio::fs::rename(tempfilepath, filepath).await?;
tokio::fs::rename(metatemppath, metapath).await?;
Ok(filename)
}
#[tracing::instrument(skip(self))]
async fn read_streaming(
&self,
domain: &str,
filename: &str,
) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
debug!("Domain: {}, filename: {}", domain, filename);
let path = self.base.join(domain).join(filename);
debug!("Path: {}", path.display());
let file = OpenOptions::new()
.read(true)
.open(path)
.await?;
let meta = self.metadata(domain, filename).await?;
Ok((meta, Box::pin(
tokio_util::io::ReaderStream::new(
// TODO: determine if BufReader provides benefit here
// From the logs it looks like we're reading 4KiB at a time
// Buffering file contents seems to double download speed
// How to benchmark this?
BufReader::with_capacity(BUF_CAPACITY, file)
)
// Sprinkle some salt in form of protective log wrapping
.inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len()))
)))
}
#[tracing::instrument(skip(self))]
async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
let metapath = self.base.join(domain).join(format!("{}.json", filename));
debug!("Metadata path: {}", metapath.display());
let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
.map_err(|err| MediaStoreError {
kind: ErrorKind::Json,
msg: format!("{}", err),
source: Some(Box::new(err))
})?;
Ok(meta)
}
#[tracing::instrument(skip(self))]
async fn stream_range(
&self,
domain: &str,
filename: &str,
range: (Bound<u64>, Bound<u64>)
) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>> {
let path = self.base.join(format!("{}/{}", domain, filename));
let metapath = self.base.join(format!("{}/{}.json", domain, filename));
debug!("Path: {}, metadata: {}", path.display(), metapath.display());
let mut file = OpenOptions::new()
.read(true)
.open(path)
.await?;
let start = match range {
(Bound::Included(bound), _) => {
debug!("Seeking {} bytes forward...", bound);
file.seek(std::io::SeekFrom::Start(bound)).await?
}
(Bound::Excluded(_), _) => unreachable!(),
(Bound::Unbounded, Bound::Included(bound)) => {
// Seek to the end minus the bounded bytes
debug!("Seeking {} bytes back from the end...", bound);
file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg())).await?
},
(Bound::Unbounded, Bound::Unbounded) => 0,
(_, Bound::Excluded(_)) => unreachable!()
};
let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(BUF_CAPACITY, file)))
.map_ok({
let mut bytes_read = 0usize;
let len = match range {
(_, Bound::Unbounded) => None,
(Bound::Unbounded, Bound::Included(bound)) => Some(bound),
(_, Bound::Included(bound)) => Some(bound + 1 - start),
(_, Bound::Excluded(_)) => unreachable!()
};
move |chunk| {
debug!("Read {} bytes from file, {} in this chunk", bytes_read, chunk.len());
bytes_read += chunk.len();
if let Some(len) = len.map(|len| len.try_into().unwrap()) {
if bytes_read > len {
if bytes_read - len > chunk.len() {
return None
}
debug!("Truncating last {} bytes", bytes_read - len);
return Some(chunk.slice(..chunk.len() - (bytes_read - len)))
}
}
Some(chunk)
}
})
.try_take_while(|x| std::future::ready(Ok(x.is_some())))
// Will never panic, because the moment the stream yields
// a None, it is considered exhausted.
.map_ok(|x| x.unwrap());
return Ok(Box::pin(stream))
}
async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
let path = self.base.join(format!("{}/{}", domain, filename));
Ok(tokio::fs::remove_file(path).await?)
}
}
#[cfg(test)]
mod tests {
use super::{Metadata, FileStore, MediaStore};
use std::ops::Bound;
use tokio::io::AsyncReadExt;
#[tokio::test]
#[tracing_test::traced_test]
async fn test_ranges() {
let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
let store = FileStore { base: tempdir.path().to_path_buf() };
let file: &[u8] = include_bytes!("./file.rs");
let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
let metadata = Metadata {
filename: Some("file.rs".to_string()),
content_type: Some("text/plain".to_string()),
length: None,
etag: None,
};
// write through the interface
let filename = store.write_streaming(
"fireburn.ru",
metadata, stream
).await.unwrap();
tracing::debug!("Writing complete.");
// Ensure the file is there
let content = tokio::fs::read(
tempdir.path()
.join("fireburn.ru")
.join(&filename)
).await.unwrap();
assert_eq!(content, file);
tracing::debug!("Reading range from the start...");
// try to read range
let range = {
let stream = store.stream_range(
"fireburn.ru", &filename,
(Bound::Included(0), Bound::Included(299))
).await.unwrap();
let mut reader = tokio_util::io::StreamReader::new(stream);
let mut buf = Vec::default();
reader.read_to_end(&mut buf).await.unwrap();
buf
};
assert_eq!(range.len(), 300);
assert_eq!(range.as_slice(), &file[..=299]);
tracing::debug!("Reading range from the middle...");
let range = {
let stream = store.stream_range(
"fireburn.ru", &filename,
(Bound::Included(150), Bound::Included(449))
).await.unwrap();
let mut reader = tokio_util::io::StreamReader::new(stream);
let mut buf = Vec::default();
reader.read_to_end(&mut buf).await.unwrap();
buf
};
assert_eq!(range.len(), 300);
assert_eq!(range.as_slice(), &file[150..=449]);
tracing::debug!("Reading range from the end...");
let range = {
let stream = store.stream_range(
"fireburn.ru", &filename,
// Note: the `headers` crate parses bounds in a
// non-standard way, where unbounded start actually
// means getting things from the end...
(Bound::Unbounded, Bound::Included(300))
).await.unwrap();
let mut reader = tokio_util::io::StreamReader::new(stream);
let mut buf = Vec::default();
reader.read_to_end(&mut buf).await.unwrap();
buf
};
assert_eq!(range.len(), 300);
assert_eq!(range.as_slice(), &file[file.len()-300..file.len()]);
tracing::debug!("Reading the whole file...");
// try to read range
let range = {
let stream = store.stream_range(
"fireburn.ru", &("/".to_string() + &filename),
(Bound::Unbounded, Bound::Unbounded)
).await.unwrap();
let mut reader = tokio_util::io::StreamReader::new(stream);
let mut buf = Vec::default();
reader.read_to_end(&mut buf).await.unwrap();
buf
};
assert_eq!(range.len(), file.len());
assert_eq!(range.as_slice(), file);
}
#[tokio::test]
#[tracing_test::traced_test]
async fn test_streaming_read_write() {
let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
let store = FileStore { base: tempdir.path().to_path_buf() };
let file: &[u8] = include_bytes!("./file.rs");
let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
let metadata = Metadata {
filename: Some("style.css".to_string()),
content_type: Some("text/css".to_string()),
length: None,
etag: None,
};
// write through the interface
let filename = store.write_streaming(
"fireburn.ru",
metadata, stream
).await.unwrap();
println!("{}, {}", filename, tempdir.path()
.join("fireburn.ru")
.join(&filename)
.display());
let content = tokio::fs::read(
tempdir.path()
.join("fireburn.ru")
.join(&filename)
).await.unwrap();
assert_eq!(content, file);
// check internal metadata format
let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
tempdir.path()
.join("fireburn.ru")
.join(filename.clone() + ".json")
).await.unwrap()).unwrap();
assert_eq!(meta.content_type.as_deref(), Some("text/css"));
assert_eq!(meta.filename.as_deref(), Some("style.css"));
assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
assert!(meta.etag.is_some());
// read back the data using the interface
let (metadata, read_back) = {
let (metadata, stream) = store.read_streaming(
"fireburn.ru",
&filename
).await.unwrap();
let mut reader = tokio_util::io::StreamReader::new(stream);
let mut buf = Vec::default();
reader.read_to_end(&mut buf).await.unwrap();
(metadata, buf)
};
assert_eq!(read_back, file);
assert_eq!(metadata.content_type.as_deref(), Some("text/css"));
assert_eq!(meta.filename.as_deref(), Some("style.css"));
assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
assert!(meta.etag.is_some());
}
}