use axum::extract::multipart::Field;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::future::Future;
use std::num::NonZeroUsize;
use std::ops::Bound;
use std::pin::Pin;
use tokio_stream::Stream;

pub mod file;

#[derive(Debug, Deserialize, Serialize)]
pub struct Metadata {
    /// Content type of the file. If None, the content-type is considered undefined.
    pub content_type: Option<String>,
    /// The original filename that was passed.
    pub filename: Option<String>,
    /// The recorded length of the file.
    pub length: Option<NonZeroUsize>,
    /// The e-tag of a file. Note: it must be a strong e-tag, for example, a hash.
    pub etag: Option<String>,
}
impl From<&Field<'_>> for Metadata {
    fn from(field: &Field<'_>) -> Self {
        Self {
            content_type: field.content_type().map(|i| i.to_owned()),
            filename: field.file_name().map(|i| i.to_owned()),
            length: None,
            etag: None,
        }
    }
}

#[derive(Debug, Clone, Copy)]
pub enum ErrorKind {
    Backend,
    Permission,
    Json,
    NotFound,
    Other,
}

#[derive(Debug)]
pub struct MediaStoreError {
    kind: ErrorKind,
    source: Option<Box<dyn std::error::Error + Send + Sync>>,
    msg: String,
}

impl MediaStoreError {
    pub fn kind(&self) -> ErrorKind {
        self.kind
    }
}

impl std::error::Error for MediaStoreError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        self.source
            .as_ref()
            .map(|i| i.as_ref() as &dyn std::error::Error)
    }
}

impl std::fmt::Display for MediaStoreError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "{}: {}",
            match self.kind {
                ErrorKind::Backend => "media storage backend error",
                ErrorKind::Permission => "permission denied",
                ErrorKind::Json => "failed to parse json",
                ErrorKind::NotFound => "blob not found",
                ErrorKind::Other => "unknown media storage error",
            },
            self.msg
        )
    }
}

pub type Result<T> = std::result::Result<T, MediaStoreError>;

pub trait MediaStore: 'static + Send + Sync + Clone {
    // Initialize self from a URL, possibly performing asynchronous initialization.
    fn new(url: &'_ url::Url) -> impl Future<Output = Result<Self>> + Send;

    fn write_streaming<T>(
        &self,
        domain: &str,
        metadata: Metadata,
        content: T,
    ) -> impl Future<Output = Result<String>> + Send
    where
        T: tokio_stream::Stream<
                Item = std::result::Result<bytes::Bytes, axum::extract::multipart::MultipartError>,
            > + Unpin
            + Send
            + Debug;

    #[allow(clippy::type_complexity)]
    fn read_streaming(
        &self,
        domain: &str,
        filename: &str,
    ) -> impl Future<
        Output = Result<(
            Metadata,
            Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>,
        )>,
    > + Send;

    fn stream_range(
        &self,
        domain: &str,
        filename: &str,
        range: (Bound<u64>, Bound<u64>),
    ) -> impl Future<Output = Result<Pin<Box<dyn Stream<Item = std::io::Result<Bytes>> + Send>>>> + Send
    {
        async move {
            use futures::stream::TryStreamExt;
            use tracing::debug;
            let (metadata, mut stream) = self.read_streaming(domain, filename).await?;
            let length = metadata.length.unwrap().get();

            use Bound::*;
            let (start, end): (usize, usize) = match range {
                (Unbounded, Unbounded) => return Ok(stream),
                (Included(start), Unbounded) => (start.try_into().unwrap(), length - 1),
                (Unbounded, Included(end)) => (length - usize::try_from(end).unwrap(), length - 1),
                (Included(start), Included(end)) => {
                    (start.try_into().unwrap(), end.try_into().unwrap())
                }
                (_, _) => unreachable!(),
            };

            stream = Box::pin(
                stream
                    .map_ok({
                        let mut bytes_skipped = 0usize;
                        let mut bytes_read = 0usize;

                        move |chunk| {
                            debug!(
                                "Skipped {}/{} bytes, chunk len {}",
                                bytes_skipped,
                                start,
                                chunk.len()
                            );
                            let chunk = if bytes_skipped < start {
                                let need_to_skip = start - bytes_skipped;
                                if chunk.len() < need_to_skip {
                                    return None;
                                }
                                debug!("Skipping {} bytes", need_to_skip);
                                bytes_skipped += need_to_skip;

                                chunk.slice(need_to_skip..)
                            } else {
                                chunk
                            };

                            debug!(
                                "Read {} bytes from file, {} in this chunk",
                                bytes_read,
                                chunk.len()
                            );
                            bytes_read += chunk.len();

                            if bytes_read > length {
                                if bytes_read - length > chunk.len() {
                                    return None;
                                }
                                debug!("Truncating last {} bytes", bytes_read - length);
                                return Some(chunk.slice(..chunk.len() - (bytes_read - length)));
                            }

                            Some(chunk)
                        }
                    })
                    .try_skip_while(|x| std::future::ready(Ok(x.is_none())))
                    .try_take_while(|x| std::future::ready(Ok(x.is_some())))
                    .map_ok(|x| x.unwrap()),
            );

            Ok(stream)
        }
    }

    /// Read metadata for a file.
    ///
    /// The default implementation uses the `read_streaming` method
    /// and drops the stream containing file content.
    fn metadata(
        &self,
        domain: &str,
        filename: &str,
    ) -> impl Future<Output = Result<Metadata>> + Send {
        async move {
            self.read_streaming(domain, filename)
                .await
                .map(|(meta, _)| meta)
        }
    }

    fn delete(&self, domain: &str, filename: &str) -> impl Future<Output = Result<()>> + Send;
}