use super::{ErrorKind, MediaStore, MediaStoreError, Metadata, Result};
use futures::FutureExt;
use futures::{StreamExt, TryStreamExt};
use sha2::Digest;
use std::ops::{Bound, Neg};
use std::pin::Pin;
use std::{fmt::Debug, path::PathBuf};
use tokio::fs::OpenOptions;
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufReader, BufWriter};
use tracing::{debug, error};

const BUF_CAPACITY: usize = 16 * 1024;

#[derive(Clone)]
pub struct FileStore {
    base: PathBuf,
}

impl From<tokio::io::Error> for MediaStoreError {
    fn from(source: tokio::io::Error) -> Self {
        Self {
            msg: format!("file I/O error: {}", source),
            kind: match source.kind() {
                std::io::ErrorKind::NotFound => ErrorKind::NotFound,
                _ => ErrorKind::Backend,
            },
            source: Some(Box::new(source)),
        }
    }
}

impl FileStore {
    async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> {
        kittybox_util::fs::mktemp(&self.base, "temp", 16)
            .await
            .map(|(name, file)| (name, BufWriter::new(file)))
            .map_err(Into::into)
    }
}

impl MediaStore for FileStore {
    async fn new(url: &'_ url::Url) -> Result<Self> {
        Ok(Self {
            base: url.path().into(),
        })
    }

    #[tracing::instrument(skip(self, content))]
    async fn write_streaming<T>(
        &self,
        domain: &str,
        mut metadata: Metadata,
        mut content: T,
    ) -> Result<String>
    where
        T: tokio_stream::Stream<
                Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>,
            > + Unpin
            + Send
            + Debug,
    {
        let (tempfilepath, mut tempfile) = self.mktemp().await?;
        debug!(
            "Temporary file opened for storing pending upload: {}",
            tempfilepath.display()
        );
        let mut hasher = sha2::Sha256::new();
        let mut length: usize = 0;

        while let Some(chunk) = content.next().await {
            let chunk = chunk.map_err(|err| MediaStoreError {
                kind: ErrorKind::Backend,
                source: Some(Box::new(err)),
                msg: "Failed to read a data chunk".to_owned(),
            })?;
            debug!("Read {} bytes from the stream", chunk.len());
            length += chunk.len();
            let (write_result, _hasher) = tokio::join!(
                {
                    let chunk = chunk.clone();
                    let tempfile = &mut tempfile;
                    async move { tempfile.write_all(&chunk).await }
                },
                {
                    let chunk = chunk.clone();
                    tokio::task::spawn_blocking(move || {
                        hasher.update(&*chunk);

                        hasher
                    })
                    .map(|r| r.unwrap())
                }
            );
            if let Err(err) = write_result {
                error!("Error while writing pending upload: {}", err);
                drop(tempfile);
                // this is just cleanup, nothing fails if it fails
                // though temporary files might take up space on the hard drive
                // We'll clean them when maintenance time comes
                #[allow(unused_must_use)]
                {
                    tokio::fs::remove_file(tempfilepath).await;
                }
                return Err(err.into());
            }
            hasher = _hasher;
        }
        // Manually flush the buffer and drop the handle to close the file
        tempfile.flush().await?;
        tempfile.into_inner().sync_all().await?;

        let hash = data_encoding::HEXLOWER.encode(&hasher.finalize());
        debug!("Pending upload hash: {}", hash);
        let filename = format!(
            "{}/{}/{}/{}/{}",
            &hash[0..2],
            &hash[2..4],
            &hash[4..6],
            &hash[6..8],
            &hash[8..]
        );
        let domain_str = domain.to_string();
        let filepath = self.base.join(domain_str.as_str()).join(&filename);
        let metafilename = filename.clone() + ".json";
        let metapath = self.base.join(domain_str.as_str()).join(&metafilename);
        let metatemppath = self
            .base
            .join(domain_str.as_str())
            .join(metafilename + ".tmp");
        metadata.length = std::num::NonZeroUsize::new(length);
        metadata.etag = Some(hash);
        debug!(
            "File path: {}, metadata: {}",
            filepath.display(),
            metapath.display()
        );
        {
            let parent = filepath.parent().unwrap();
            tokio::fs::create_dir_all(parent).await?;
        }
        let mut meta = OpenOptions::new()
            .create_new(true)
            .write(true)
            .open(&metatemppath)
            .await?;
        meta.write_all(&serde_json::to_vec(&metadata).unwrap())
            .await?;
        tokio::fs::rename(tempfilepath, filepath).await?;
        tokio::fs::rename(metatemppath, metapath).await?;
        Ok(filename)
    }

    #[tracing::instrument(skip(self))]
    #[allow(clippy::type_complexity)]
    async fn read_streaming(
        &self,
        domain: &str,
        filename: &str,
    ) -> Result<(
        Metadata,
        Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>,
    )> {
        debug!("Domain: {}, filename: {}", domain, filename);
        let path = self.base.join(domain).join(filename);
        debug!("Path: {}", path.display());

        let file = OpenOptions::new().read(true).open(path).await?;
        let meta = self.metadata(domain, filename).await?;

        Ok((
            meta,
            Box::pin(
                tokio_util::io::ReaderStream::new(
                    // TODO: determine if BufReader provides benefit here
                    // From the logs it looks like we're reading 4KiB at a time
                    // Buffering file contents seems to double download speed
                    // How to benchmark this?
                    BufReader::with_capacity(BUF_CAPACITY, file),
                )
                // Sprinkle some salt in form of protective log wrapping
                .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len())),
            ),
        ))
    }

    #[tracing::instrument(skip(self))]
    async fn metadata(&self, domain: &str, filename: &str) -> Result<Metadata> {
        let metapath = self.base.join(domain).join(format!("{}.json", filename));
        debug!("Metadata path: {}", metapath.display());

        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?).map_err(|err| {
            MediaStoreError {
                kind: ErrorKind::Json,
                msg: format!("{}", err),
                source: Some(Box::new(err)),
            }
        })?;

        Ok(meta)
    }

    #[tracing::instrument(skip(self))]
    async fn stream_range(
        &self,
        domain: &str,
        filename: &str,
        range: (Bound<u64>, Bound<u64>),
    ) -> Result<Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>>
    {
        let path = self.base.join(format!("{}/{}", domain, filename));
        let metapath = self.base.join(format!("{}/{}.json", domain, filename));
        debug!("Path: {}, metadata: {}", path.display(), metapath.display());

        let mut file = OpenOptions::new().read(true).open(path).await?;

        let start = match range {
            (Bound::Included(bound), _) => {
                debug!("Seeking {} bytes forward...", bound);
                file.seek(std::io::SeekFrom::Start(bound)).await?
            }
            (Bound::Excluded(_), _) => unreachable!(),
            (Bound::Unbounded, Bound::Included(bound)) => {
                // Seek to the end minus the bounded bytes
                debug!("Seeking {} bytes back from the end...", bound);
                file.seek(std::io::SeekFrom::End(i64::try_from(bound).unwrap().neg()))
                    .await?
            }
            (Bound::Unbounded, Bound::Unbounded) => 0,
            (_, Bound::Excluded(_)) => unreachable!(),
        };

        let stream = Box::pin(tokio_util::io::ReaderStream::new(BufReader::with_capacity(
            BUF_CAPACITY,
            file,
        )))
        .map_ok({
            let mut bytes_read = 0usize;
            let len = match range {
                (_, Bound::Unbounded) => None,
                (Bound::Unbounded, Bound::Included(bound)) => Some(bound),
                (_, Bound::Included(bound)) => Some(bound + 1 - start),
                (_, Bound::Excluded(_)) => unreachable!(),
            };
            move |chunk| {
                debug!(
                    "Read {} bytes from file, {} in this chunk",
                    bytes_read,
                    chunk.len()
                );
                bytes_read += chunk.len();
                if let Some(len) = len.map(|len| len.try_into().unwrap()) {
                    if bytes_read > len {
                        if bytes_read - len > chunk.len() {
                            return None;
                        }
                        debug!("Truncating last {} bytes", bytes_read - len);
                        return Some(chunk.slice(..chunk.len() - (bytes_read - len)));
                    }
                }

                Some(chunk)
            }
        })
        .try_take_while(|x| std::future::ready(Ok(x.is_some())))
        // Will never panic, because the moment the stream yields
        // a None, it is considered exhausted.
        .map_ok(|x| x.unwrap());

        return Ok(Box::pin(stream));
    }

    async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
        let path = self.base.join(format!("{}/{}", domain, filename));

        Ok(tokio::fs::remove_file(path).await?)
    }
}

#[cfg(test)]
mod tests {
    use super::{FileStore, MediaStore, Metadata};
    use std::ops::Bound;
    use tokio::io::AsyncReadExt;

    #[tokio::test]
    #[tracing_test::traced_test]
    async fn test_ranges() {
        let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
        let store = FileStore {
            base: tempdir.path().to_path_buf(),
        };

        let file: &[u8] = include_bytes!("./file.rs");
        let stream = tokio_stream::iter(
            file.chunks(100)
                .map(|i| Ok(bytes::Bytes::copy_from_slice(i))),
        );
        let metadata = Metadata {
            filename: Some("file.rs".to_string()),
            content_type: Some("text/plain".to_string()),
            length: None,
            etag: None,
        };

        // write through the interface
        let filename = store
            .write_streaming("fireburn.ru", metadata, stream)
            .await
            .unwrap();

        tracing::debug!("Writing complete.");

        // Ensure the file is there
        let content = tokio::fs::read(tempdir.path().join("fireburn.ru").join(&filename))
            .await
            .unwrap();
        assert_eq!(content, file);

        tracing::debug!("Reading range from the start...");
        // try to read range
        let range = {
            let stream = store
                .stream_range(
                    "fireburn.ru",
                    &filename,
                    (Bound::Included(0), Bound::Included(299)),
                )
                .await
                .unwrap();

            let mut reader = tokio_util::io::StreamReader::new(stream);

            let mut buf = Vec::default();
            reader.read_to_end(&mut buf).await.unwrap();

            buf
        };

        assert_eq!(range.len(), 300);
        assert_eq!(range.as_slice(), &file[..=299]);

        tracing::debug!("Reading range from the middle...");

        let range = {
            let stream = store
                .stream_range(
                    "fireburn.ru",
                    &filename,
                    (Bound::Included(150), Bound::Included(449)),
                )
                .await
                .unwrap();

            let mut reader = tokio_util::io::StreamReader::new(stream);

            let mut buf = Vec::default();
            reader.read_to_end(&mut buf).await.unwrap();

            buf
        };

        assert_eq!(range.len(), 300);
        assert_eq!(range.as_slice(), &file[150..=449]);

        tracing::debug!("Reading range from the end...");
        let range = {
            let stream = store
                .stream_range(
                    "fireburn.ru",
                    &filename,
                    // Note: the `headers` crate parses bounds in a
                    // non-standard way, where unbounded start actually
                    // means getting things from the end...
                    (Bound::Unbounded, Bound::Included(300)),
                )
                .await
                .unwrap();

            let mut reader = tokio_util::io::StreamReader::new(stream);

            let mut buf = Vec::default();
            reader.read_to_end(&mut buf).await.unwrap();

            buf
        };

        assert_eq!(range.len(), 300);
        assert_eq!(range.as_slice(), &file[file.len() - 300..file.len()]);

        tracing::debug!("Reading the whole file...");
        // try to read range
        let range = {
            let stream = store
                .stream_range(
                    "fireburn.ru",
                    &("/".to_string() + &filename),
                    (Bound::Unbounded, Bound::Unbounded),
                )
                .await
                .unwrap();

            let mut reader = tokio_util::io::StreamReader::new(stream);

            let mut buf = Vec::default();
            reader.read_to_end(&mut buf).await.unwrap();

            buf
        };

        assert_eq!(range.len(), file.len());
        assert_eq!(range.as_slice(), file);
    }

    #[tokio::test]
    #[tracing_test::traced_test]
    async fn test_streaming_read_write() {
        let tempdir = tempfile::tempdir().expect("Failed to create tempdir");
        let store = FileStore {
            base: tempdir.path().to_path_buf(),
        };

        let file: &[u8] = include_bytes!("./file.rs");
        let stream = tokio_stream::iter(
            file.chunks(100)
                .map(|i| Ok(bytes::Bytes::copy_from_slice(i))),
        );
        let metadata = Metadata {
            filename: Some("style.css".to_string()),
            content_type: Some("text/css".to_string()),
            length: None,
            etag: None,
        };

        // write through the interface
        let filename = store
            .write_streaming("fireburn.ru", metadata, stream)
            .await
            .unwrap();
        println!(
            "{}, {}",
            filename,
            tempdir.path().join("fireburn.ru").join(&filename).display()
        );
        let content = tokio::fs::read(tempdir.path().join("fireburn.ru").join(&filename))
            .await
            .unwrap();
        assert_eq!(content, file);

        // check internal metadata format
        let meta: Metadata = serde_json::from_slice(
            &tokio::fs::read(
                tempdir
                    .path()
                    .join("fireburn.ru")
                    .join(filename.clone() + ".json"),
            )
            .await
            .unwrap(),
        )
        .unwrap();
        assert_eq!(meta.content_type.as_deref(), Some("text/css"));
        assert_eq!(meta.filename.as_deref(), Some("style.css"));
        assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
        assert!(meta.etag.is_some());

        // read back the data using the interface
        let (metadata, read_back) = {
            let (metadata, stream) = store
                .read_streaming("fireburn.ru", &filename)
                .await
                .unwrap();
            let mut reader = tokio_util::io::StreamReader::new(stream);

            let mut buf = Vec::default();
            reader.read_to_end(&mut buf).await.unwrap();

            (metadata, buf)
        };

        assert_eq!(read_back, file);
        assert_eq!(metadata.content_type.as_deref(), Some("text/css"));
        assert_eq!(meta.filename.as_deref(), Some("style.css"));
        assert_eq!(meta.length.map(|i| i.get()), Some(file.len()));
        assert!(meta.etag.is_some());
    }
}