use super::{Metadata, ErrorKind, MediaStore, MediaStoreError, Result};
use async_trait::async_trait;
use std::{path::PathBuf, fmt::Debug};
use tokio::fs::OpenOptions;
use tokio::io::{BufReader, BufWriter, AsyncWriteExt};
use futures::{StreamExt, TryStreamExt};
use std::pin::Pin;
use sha2::Digest;
use futures::FutureExt;
use tracing::{debug, error};

const BUF_CAPACITY: usize = 16 * 1024;

#[derive(Clone)]
pub struct FileStore {
    base: PathBuf,
}

impl From<tokio::io::Error> for MediaStoreError {
    fn from(source: tokio::io::Error) -> Self {
        Self {
            msg: format!("file I/O error: {}", source),
            kind: match source.kind() {
                std::io::ErrorKind::NotFound => ErrorKind::NotFound,
                _ => ErrorKind::Backend
            },
            source: Some(Box::new(source)),
        }
    }
}


impl FileStore {
    pub fn new<T: Into<PathBuf>>(base: T) -> Self {
        Self { base: base.into() }
    }

    async fn mktemp(&self) -> Result<(PathBuf, BufWriter<tokio::fs::File>)> {
        use rand::{Rng, distributions::Alphanumeric};
        tokio::fs::create_dir_all(self.base.as_path()).await?;
        loop {
            let filename = self.base.join(format!("temp.{}", {
                let string = rand::thread_rng()
                    .sample_iter(&Alphanumeric)
                    .take(16)
                    .collect::<Vec<u8>>();
                String::from_utf8(string).unwrap()
            }));

            match OpenOptions::new()
                .create_new(true)
                .write(true)
                .open(&filename)
                .await
            {
                // TODO: determine if BufWriter provides benefit here
                Ok(file) => return Ok((filename, BufWriter::with_capacity(BUF_CAPACITY, file))),
                Err(err) => match err.kind() {
                    std::io::ErrorKind::AlreadyExists => continue,
                    _ => return Err(err.into())
                }
            }
        }
    }
}

#[async_trait]
impl MediaStore for FileStore {

    #[tracing::instrument(skip(self))]
    async fn write_streaming<T>(
        &self,
        domain: &str,
        mut metadata: Metadata,
        mut content: T,
    ) -> Result<String>
    where
        T: tokio_stream::Stream<Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>> + Unpin + Send + Debug
    {
        let (tempfilepath, mut tempfile) = self.mktemp().await?;
        debug!("Temporary file opened for storing pending upload: {}", tempfilepath.display());
        let mut hasher = sha2::Sha256::new();
        let mut length: usize = 0;

        while let Some(chunk) = content.next().await {
            let chunk = std::sync::Arc::new(chunk.map_err(|err| MediaStoreError {
                kind: ErrorKind::Backend,
                source: Some(Box::new(err)),
                msg: "Failed to read a data chunk".to_owned()
            })?);
            debug!("Read {} bytes from the stream", chunk.len());
            length += chunk.len();
            let (write_result, _hasher) = tokio::join!(
                tempfile.write_all(&*chunk),
                {
                    let chunk = chunk.clone();
                    tokio::task::spawn_blocking(move || {
                        hasher.update(&*chunk);

                        hasher
                    }).map(|r| r.unwrap())
                }
            );
            if let Err(err) = write_result {
                error!("Error while writing pending upload: {}", err);
                drop(tempfile);
                // this is just cleanup, nothing fails if it fails
                // though temporary files might take up space on the hard drive
                // We'll clean them when maintenance time comes
                #[allow(unused_must_use)]
                { tokio::fs::remove_file(tempfilepath).await; }
                return Err(err.into());
            }
            hasher = _hasher;
        }
        tempfile.flush().await?;
        tempfile.into_inner().sync_all().await?;
        let hash = hasher.finalize();
        debug!("Pending upload hash: {}", hex::encode(&hash));
        let filename = format!(
            "{}/{}/{}/{}/{}",
            hex::encode([hash[0]]),
            hex::encode([hash[1]]),
            hex::encode([hash[2]]),
            hex::encode([hash[3]]),
            hex::encode(&hash[4..32])
        );
        metadata.length = Some(length);
        let domain_str = domain.to_string();
        let filepath = self.base.join(domain_str.as_str()).join(&filename);
        let metafilename = filename.clone() + ".json";
        let metapath = self.base.join(domain_str.as_str()).join(&metafilename);
        let metatemppath = self.base.join(domain_str.as_str()).join(metafilename + ".tmp");
        debug!("File path: {}, metadata: {}", filepath.display(), metapath.display());
        {
            let parent = filepath.parent().unwrap();
            tokio::fs::create_dir_all(parent).await?;            
        }
        let mut meta = OpenOptions::new()
            .create_new(true)
            .write(true)
            .open(&metatemppath)
            .await?;
        meta.write_all(&serde_json::to_vec(&metadata).unwrap()).await?;
        tokio::fs::rename(tempfilepath, filepath).await?;
        tokio::fs::rename(metatemppath, metapath).await?;
        Ok(filename)
    }

    #[tracing::instrument(skip(self))]
    async fn read_streaming(
        &self,
        domain: &str,
        filename: &str,
    ) -> Result<(Metadata, Pin<Box<dyn tokio_stream::Stream<Item = std::io::Result<bytes::Bytes>> + Send>>)> {
        let path = self.base.join(format!("{}{}", domain, filename));
        let metapath = self.base.join(format!("{}{}.json", domain, filename));
        debug!("Path: {}, metadata: {}", path.display(), metapath.display());

        let file = OpenOptions::new()
            .read(true)
            .open(path)
            .await?;
        let meta = serde_json::from_slice(&tokio::fs::read(metapath).await?)
            .map_err(|err| MediaStoreError {
                kind: ErrorKind::Json,
                msg: format!("{}", err),
                source: Some(Box::new(err))
            })?;

        Ok((meta, Box::pin(
            tokio_util::io::ReaderStream::new(
                // TODO: determine if BufReader provides benefit here
                // From the logs it looks like we're reading 4KiB at a time
                // Buffering file contents seems to double download speed
                // How to benchmark this?
                tokio::io::BufReader::with_capacity(BUF_CAPACITY, file)
            )
            // Sprinkle some salt in form of protective log wrapping
                .inspect_ok(|chunk| debug!("Read {} bytes from file", chunk.len()))
        )))
    }

    async fn delete(&self, domain: &str, filename: &str) -> Result<()> {
        let path = self.base.join(format!("{}/{}", domain, filename));

        Ok(tokio::fs::remove_file(path).await?)
    }
}

#[cfg(test)]
mod tests {
    use super::{Metadata, FileStore, MediaStore};
    use tokio::io::AsyncReadExt;

    #[tokio::test]
    #[tracing_test::traced_test]
    async fn test_streaming_read_write() {
        let tempdir = tempdir::TempDir::new("file").expect("Failed to create tempdir");
        let store = FileStore::new(tempdir.path());
        let tempdir_path = tempdir.into_path();


        let file: &[u8] = include_bytes!("../../../companion-lite/style.css");
        let stream = tokio_stream::iter(file.chunks(100).map(|i| Ok(bytes::Bytes::copy_from_slice(i))));
        let metadata = Metadata {
            filename: Some("style.css".to_string()),
            content_type: "text/css".to_string(),
            length: None
        };

        // write through the interface
        let filename = store.write_streaming(
            "fireburn.ru",
            metadata, stream
        ).await.unwrap();
        println!("{}, {}", filename, tempdir_path
                 .join("fireburn.ru")
                 .join(&filename)
                 .display());
        let content = tokio::fs::read(
            tempdir_path
                .join("fireburn.ru")
                .join(&filename)
        ).await.unwrap();
        assert_eq!(content, file);

        // check internal metadata format
        let meta: Metadata = serde_json::from_slice(&tokio::fs::read(
            tempdir_path
                .join("fireburn.ru")
                .join(filename.clone() + ".json")
        ).await.unwrap()).unwrap();
        assert_eq!(&meta.content_type, "text/css");
        assert_eq!(meta.filename.as_deref(), Some("style.css"));
        assert_eq!(meta.length, Some(file.len()));

        // read back the data using the interface
        let (metadata, read_back) = {
            let (metadata, stream) = store.read_streaming(
                "fireburn.ru",
                &("/".to_string() + &filename)
            ).await.unwrap();
            let mut reader = tokio_util::io::StreamReader::new(stream);

            let mut buf = Vec::default();
            reader.read_to_end(&mut buf).await.unwrap();

            (metadata, buf)
        };

        assert_eq!(read_back, file);
        assert_eq!(&metadata.content_type, "text/css");
        assert_eq!(meta.filename.as_deref(), Some("style.css"));
        assert_eq!(meta.length, Some(file.len()));

    }
}