about summary refs log blame commit diff
path: root/src/database/file/mod.rs
blob: 4fb7f47b9131ddc810095b7b311fd647eaea68b3 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
                                                                             
                                       
                              
                                            
                                    
                    

                               
               
                     
                              
                               

















                                                             






















































                                                                               
                                                   


                                                                      
                                                                         
                                                         



                                                                                      





                                                                                                   




                                           

                                                               


                                         
                    


                                              






                                                          


                                              





































                                                                      




                                                     





                                                                   
            
 
                
                                                                             



                        
                                                                     




                                                         



                                    
                                                                           




                                   





                                                                                 
                                                     
                                  
                                                 




                                                                                         

                                 
                  
                                    



                                                                                       















                                                                                             
                                                




                                                                              












                                                                                            
                                                        












                                                                         
 

                                                  
                                                               







                                                 


                                                                
                                                                


                                                                                           



                                                                                    

                                                                                        
                                                                                                            

                                                                                                
                                              
                                                    
                         
                            


                 




                                                  










                                                                 
                            















                                                           
                                                          

                                                           

                                                                        
         


                                                                                            






















                                                                      
     
                                                                                            
                                                             
                                    










                                                                                                         
                                      

                                                                                            
             






                                                      







                                            


                                                                                
                                                 




                                                                                     
                             
                                                        
                                                                                    
                                                          
                                                                                        
                                                                                                  


                                                                        
                                                                                          




                                                                                 

                                                      
                               
                         



                                                            

                                                             

                    
         

                                                                 

                                                                

                  

                                                                                        
                                                           













                                                                                      
                         
                                                                                    

                                                                                                    















                                                                              
                        
                                                     
                                                              
                                      
                                        









                                                                           

                                                                    
              
     
use crate::database::{filter_post, ErrorKind, Result, Storage, StorageError};
use async_std::fs::{File, OpenOptions};
use async_std::io::prelude::*;
use async_std::io::ErrorKind as IOErrorKind;
use async_std::task::spawn_blocking;
use async_trait::async_trait;
use fd_lock::RwLock;
use futures::stream;
use futures_util::StreamExt;
use futures_util::TryStreamExt;
use log::debug;
use serde_json::json;
use std::collections::HashMap;
use std::path::{Path, PathBuf};

impl From<std::io::Error> for StorageError {
    fn from(source: std::io::Error) -> Self {
        Self::with_source(
            match source.kind() {
                IOErrorKind::NotFound => ErrorKind::NotFound,
                _ => ErrorKind::Backend,
            },
            "file I/O error",
            Box::new(source),
        )
    }
}

async fn get_lockable_file(file: File) -> RwLock<File> {
    debug!("Trying to create a file lock");
    spawn_blocking(move || RwLock::new(file)).await
}

// Copied from https://stackoverflow.com/questions/39340924
// This routine is adapted from the *old* Path's `path_relative_from`
// function, which works differently from the new `relative_from` function.
// In particular, this handles the case on unix where both paths are
// absolute but with only the root as the common directory.
fn path_relative_from(path: &Path, base: &Path) -> Option<PathBuf> {
    use std::path::Component;

    if path.is_absolute() != base.is_absolute() {
        if path.is_absolute() {
            Some(PathBuf::from(path))
        } else {
            None
        }
    } else {
        let mut ita = path.components();
        let mut itb = base.components();
        let mut comps: Vec<Component> = vec![];
        loop {
            match (ita.next(), itb.next()) {
                (None, None) => break,
                (Some(a), None) => {
                    comps.push(a);
                    comps.extend(ita.by_ref());
                    break;
                }
                (None, _) => comps.push(Component::ParentDir),
                (Some(a), Some(b)) if comps.is_empty() && a == b => (),
                (Some(a), Some(b)) if b == Component::CurDir => comps.push(a),
                (Some(_), Some(b)) if b == Component::ParentDir => return None,
                (Some(a), Some(_)) => {
                    comps.push(Component::ParentDir);
                    for _ in itb {
                        comps.push(Component::ParentDir);
                    }
                    comps.push(a);
                    comps.extend(ita.by_ref());
                    break;
                }
            }
        }
        Some(comps.iter().map(|c| c.as_os_str()).collect())
    }
}

mod tests {
    #[test]
    fn test_relative_path_resolving() {
        let path1 = std::path::Path::new("/home/vika/Projects/kittybox");
        let path2 = std::path::Path::new("/home/vika/Projects/nixpkgs");
        let relative_path = super::path_relative_from(path2, path1).unwrap();

        assert_eq!(relative_path, std::path::Path::new("../nixpkgs"))
    }
}

fn url_to_path(root: &Path, url: &str) -> PathBuf {
    url_to_relative_path(url).to_path(root).to_path_buf()
}

fn url_to_relative_path(url: &str) -> relative_path::RelativePathBuf {
    let url = http_types::Url::parse(url).expect("Couldn't parse a URL");
    let mut path = relative_path::RelativePathBuf::new();
    path.push(url.origin().ascii_serialization() + &url.path().to_string() + ".json");

    path
}

fn modify_post(post: &serde_json::Value, update: &serde_json::Value) -> Result<serde_json::Value> {
    let mut add_keys: HashMap<String, serde_json::Value> = HashMap::new();
    let mut remove_keys: Vec<String> = vec![];
    let mut remove_values: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
    let mut post = post.clone();

    if let Some(delete) = update["delete"].as_array() {
        remove_keys.extend(
            delete
                .iter()
                .filter_map(|v| v.as_str())
                .map(|v| v.to_string()),
        );
    } else if let Some(delete) = update["delete"].as_object() {
        for (k, v) in delete {
            if let Some(v) = v.as_array() {
                remove_values
                    .entry(k.to_string())
                    .or_default()
                    .extend(v.clone());
            } else {
                return Err(StorageError::new(
                    ErrorKind::BadRequest,
                    "Malformed update object",
                ));
            }
        }
    }
    if let Some(add) = update["add"].as_object() {
        for (k, v) in add {
            if v.is_array() {
                add_keys.insert(k.to_string(), v.clone());
            } else {
                return Err(StorageError::new(
                    ErrorKind::BadRequest,
                    "Malformed update object",
                ));
            }
        }
    }
    if let Some(replace) = update["replace"].as_object() {
        for (k, v) in replace {
            remove_keys.push(k.to_string());
            add_keys.insert(k.to_string(), v.clone());
        }
    }

    for k in remove_keys {
        post["properties"].as_object_mut().unwrap().remove(&k);
    }
    for (k, v) in remove_values {
        let k = &k;
        let props;
        if k == "children" {
            props = &mut post;
        } else {
            props = &mut post["properties"];
        }
        v.iter().for_each(|v| {
            if let Some(vec) = props[k].as_array_mut() {
                if let Some(index) = vec.iter().position(|w| w == v) {
                    vec.remove(index);
                }
            }
        });
    }
    for (k, v) in add_keys {
        let props;
        if k == "children" {
            props = &mut post;
        } else {
            props = &mut post["properties"];
        }
        let k = &k;
        if let Some(prop) = props[k].as_array_mut() {
            if k == "children" {
                v.as_array()
                    .unwrap()
                    .iter()
                    .cloned()
                    .rev()
                    .for_each(|v| prop.insert(0, v));
            } else {
                prop.extend(v.as_array().unwrap().iter().cloned());
            }
        } else {
            post["properties"][k] = v
        }
    }
    Ok(post)
}

#[derive(Clone)]
/// A backend using a folder with JSON files as a backing store.
/// Uses symbolic links to represent a many-to-one mapping of URLs to a post.
pub struct FileStorage {
    root_dir: PathBuf,
}

impl FileStorage {
    /// Create a new storage wrapping a folder specified by root_dir.
    pub async fn new(root_dir: PathBuf) -> Result<Self> {
        // TODO check if the dir is writable
        Ok(Self { root_dir })
    }
}

async fn hydrate_author<S: Storage>(
    feed: &mut serde_json::Value,
    user: &'_ Option<String>,
    storage: &S,
) {
    let url = feed["properties"]["uid"][0].as_str().unwrap();
    if let Some(author) = feed["properties"]["author"].clone().as_array() {
        if !feed["type"]
            .as_array()
            .unwrap()
            .iter()
            .any(|i| i == "h-card")
        {
            let author_list: Vec<serde_json::Value> = stream::iter(author.iter())
                .then(|i| async move {
                    if let Some(i) = i.as_str() {
                        match storage.get_post(i).await {
                            Ok(post) => match post {
                                Some(post) => match filter_post(post, user) {
                                    Some(author) => author,
                                    None => json!(i),
                                },
                                None => json!(i),
                            },
                            Err(e) => {
                                log::error!("Error while hydrating post {}: {}", url, e);
                                json!(i)
                            }
                        }
                    } else {
                        i.clone()
                    }
                })
                .collect::<Vec<_>>()
                .await;
            feed["properties"].as_object_mut().unwrap()["author"] = json!(author_list);
        }
    }
}

#[async_trait]
impl Storage for FileStorage {
    async fn post_exists(&self, url: &str) -> Result<bool> {
        let path = url_to_path(&self.root_dir, url);
        debug!("Checking if {:?} exists...", path);
        Ok(spawn_blocking(move || path.is_file()).await)
    }

    async fn get_post(&self, url: &str) -> Result<Option<serde_json::Value>> {
        let path = url_to_path(&self.root_dir, url);
        debug!("Opening {:?}", path);
        // We have to special-case in here because the function should return Ok(None) on 404
        match File::open(path).await {
            Ok(f) => {
                let lock = get_lockable_file(f).await;
                let guard = lock.read()?;

                let mut content = String::new();
                // Apparently this typechecks. Somehow.
                // I can take &mut for a &File because File is not a real type
                // The operating system guards it using something
                // that looks like a mutex to Rust's runtime, allowing me
                // to grab a mutable reference from an immutable reference
                (&mut &*guard).read_to_string(&mut content).await?;
                Ok(Some(serde_json::from_str(&content)?))
            }
            Err(err) => {
                if err.kind() == IOErrorKind::NotFound {
                    Ok(None)
                } else {
                    Err(err.into())
                }
            }
        }
    }

    async fn put_post<'a>(&self, post: &'a serde_json::Value, user: &'a str) -> Result<()> {
        let key = post["properties"]["uid"][0]
            .as_str()
            .expect("Tried to save a post without UID");
        let path = url_to_path(&self.root_dir, key);

        debug!("Creating {:?}", path);

        let parent = path.parent().unwrap().to_owned();
        if !spawn_blocking(move || parent.is_dir()).await {
            async_std::fs::create_dir_all(path.parent().unwrap()).await?;
        }

        let f = OpenOptions::new()
            .write(true)
            .create_new(true)
            .open(&path)
            .await?;

        let mut lock = get_lockable_file(f).await;
        let mut guard = lock.write()?;

        (*guard).write_all(post.to_string().as_bytes()).await?;
        (*guard).flush().await?;
        drop(guard);

        if post["properties"]["url"].is_array() {
            for url in post["properties"]["url"]
                .as_array()
                .unwrap()
                .iter()
                .map(|i| i.as_str().unwrap())
            {
                if url != key && url.starts_with(user) {
                    let link = url_to_path(&self.root_dir, url);
                    debug!("Creating a symlink at {:?}", link);
                    let orig = path.clone();
                    spawn_blocking::<_, Result<()>>(move || {
                        // We're supposed to have a parent here.
                        let basedir = link.parent().ok_or(StorageError::new(
                            ErrorKind::Backend,
                            "Failed to calculate parent directory when creating a symlink",
                        ))?;
                        let relative = path_relative_from(&orig, &basedir).unwrap();
                        println!("{:?} - {:?} = {:?}", &orig, &basedir, &relative);
                        println!("Created a symlink at {:?}", &link);
                        let symlink_result;
                        #[cfg(unix)]
                        {
                            symlink_result = std::os::unix::fs::symlink(relative, link);
                        }
                        // Wow it even supports windows. Not sure if I need it to run on Windows but oh well
                        #[cfg(windows)]
                        {
                            symlink_result = std::os::windows::fs::symlink_file(relative, link);
                        }
                        match symlink_result {
                            Ok(()) => Ok(()),
                            Err(e) => Err(e.into()),
                        }
                    })
                    .await?;
                }
            }
        }

        if post["type"]
            .as_array()
            .unwrap()
            .iter()
            .any(|s| s.as_str() == Some("h-feed"))
        {
            println!("Adding to channel list...");
            // Add the h-feed to the channel list
            let mut path = relative_path::RelativePathBuf::new();
            path.push(user);
            path.push("channels");

            let path = path.to_path(&self.root_dir);
            let file = OpenOptions::new()
                .read(true)
                .write(true)
                .truncate(false)
                .create(true)
                .open(&path)
                .await?;
            let mut lock = get_lockable_file(file).await;
            let mut guard = lock.write()?;

            let mut content = String::new();
            guard.read_to_string(&mut content).await?;
            let mut channels: Vec<super::MicropubChannel>;
            if content.len() > 0 {
                channels = serde_json::from_str(&content)?;
            } else {
                channels = Vec::default();
            }

            channels.push(super::MicropubChannel {
                uid: key.to_string(),
                name: post["properties"]["name"][0]
                    .as_str()
                    .map(|s| s.to_string())
                    .unwrap_or_else(|| String::default()),
            });
            guard.seek(std::io::SeekFrom::Start(0)).await?;
            guard.set_len(0).await?;
            guard
                .write_all(serde_json::to_string(&channels)?.as_bytes())
                .await?;
        }

        Ok(())
    }

    async fn update_post<'a>(&self, url: &'a str, update: serde_json::Value) -> Result<()> {
        let path = url_to_path(&self.root_dir, url);
        let f = OpenOptions::new()
            .write(true)
            .read(true)
            .truncate(false)
            .open(&path)
            .await?;

        let mut lock = get_lockable_file(f).await;
        let mut guard = lock.write()?;

        let mut content = String::new();
        guard.read_to_string(&mut content).await?;
        let json: serde_json::Value = serde_json::from_str(&content)?;
        // Apply the editing algorithms
        let new_json = modify_post(&json, &update)?;

        (*guard).set_len(0).await?;
        (*guard).seek(std::io::SeekFrom::Start(0)).await?;
        (*guard).write_all(new_json.to_string().as_bytes()).await?;
        (*guard).flush().await?;
        drop(guard);
        // TODO check if URLs changed between old and new JSON
        Ok(())
    }

    async fn get_channels<'a>(&self, user: &'a str) -> Result<Vec<super::MicropubChannel>> {
        let mut path = relative_path::RelativePathBuf::new();
        path.push(user.to_string());
        path.push("channels");

        let path = path.to_path(&self.root_dir);
        match File::open(&path).await {
            Ok(f) => {
                let lock = get_lockable_file(f).await;
                let guard = lock.read()?;

                let mut content = String::new();
                (&mut &*guard).read_to_string(&mut content).await?;
                // This should not happen, but if it does, let's handle it gracefully instead of failing.
                if content.len() == 0 {
                    return Ok(vec![]);
                }
                let channels: Vec<super::MicropubChannel> = serde_json::from_str(&content)?;
                Ok(channels)
            }
            Err(e) => {
                if e.kind() == IOErrorKind::NotFound {
                    Ok(vec![])
                } else {
                    Err(e.into())
                }
            }
        }
    }

    async fn read_feed_with_limit<'a>(
        &self,
        url: &'a str,
        after: &'a Option<String>,
        limit: usize,
        user: &'a Option<String>,
    ) -> Result<Option<serde_json::Value>> {
        if let Some(feed) = self.get_post(url).await? {
            if let Some(mut feed) = filter_post(feed, user) {
                if feed["children"].is_array() {
                    let children = feed["children"].as_array().unwrap().clone();
                    let mut posts_iter = children
                        .into_iter()
                        .map(|s: serde_json::Value| s.as_str().unwrap().to_string());
                    if after.is_some() {
                        loop {
                            let i = posts_iter.next();
                            if &i == after {
                                break;
                            }
                        }
                    }
                    let posts = stream::iter(posts_iter)
                        .map(|url: String| async move { self.get_post(&url).await })
                        .buffered(std::cmp::min(3, limit))
                        // Hack to unwrap the Option and sieve out broken links
                        // Broken links return None, and Stream::filter_map skips Nones.
                        .try_filter_map(|post: Option<serde_json::Value>| async move { Ok(post) })
                        .and_then(|mut post| async move {
                            hydrate_author(&mut post, user, self).await;
                            Ok(post)
                        })
                        .try_filter_map(|post| async move { Ok(filter_post(post, user)) })
                        .take(limit);

                    match posts.try_collect::<Vec<serde_json::Value>>().await {
                        Ok(posts) => feed["children"] = serde_json::json!(posts),
                        Err(err) => {
                            return Err(StorageError::with_source(
                                ErrorKind::Other,
                                "Feed assembly error",
                                Box::new(err),
                            ));
                        }
                    }
                }
                hydrate_author(&mut feed, user, self).await;
                Ok(Some(feed))
            } else {
                Err(StorageError::new(
                    ErrorKind::PermissionDenied,
                    "specified user cannot access this post",
                ))
            }
        } else {
            Ok(None)
        }
    }

    async fn delete_post<'a>(&self, url: &'a str) -> Result<()> {
        let path = url_to_path(&self.root_dir, url);
        if let Err(e) = async_std::fs::remove_file(path).await {
            Err(e.into())
        } else {
            Ok(())
        }
    }

    async fn get_setting<'a>(&self, setting: &'a str, user: &'a str) -> Result<String> {
        log::debug!("User for getting settings: {}", user);
        let url = http_types::Url::parse(user).expect("Couldn't parse a URL");
        let mut path = relative_path::RelativePathBuf::new();
        path.push(url.origin().ascii_serialization());
        path.push("settings");

        let path = path.to_path(&self.root_dir);
        let lock = get_lockable_file(File::open(path).await?).await;
        let guard = lock.read()?;

        let mut content = String::new();
        (&mut &*guard).read_to_string(&mut content).await?;
        drop(guard);
        let settings: HashMap<String, String> = serde_json::from_str(&content)?;
        // XXX consider returning string slices instead of cloning a string every time
        // it might come with a performance hit and/or memory usage inflation
        settings
            .get(setting)
            .map(|s| s.clone())
            .ok_or_else(|| StorageError::new(ErrorKind::Backend, "Setting not set"))
    }

    async fn set_setting<'a>(&self, setting: &'a str, user: &'a str, value: &'a str) -> Result<()> {
        let url = http_types::Url::parse(user).expect("Couldn't parse a URL");
        let mut path = relative_path::RelativePathBuf::new();
        path.push(url.origin().ascii_serialization());
        path.push("settings");

        let path = path.to_path(&self.root_dir);

        let parent = path.parent().unwrap().to_owned();
        if !spawn_blocking(move || parent.is_dir()).await {
            async_std::fs::create_dir_all(path.parent().unwrap()).await?;
        }

        let file = OpenOptions::new()
            .write(true)
            .read(true)
            .truncate(false)
            .create(true)
            .open(&path)
            .await?;
        let mut lock = get_lockable_file(file).await;
        log::debug!("Created a lock. Locking for writing...");
        let mut guard = lock.write()?;

        log::debug!("Locked. Writing.");
        let mut content = String::new();
        guard.read_to_string(&mut content).await?;
        let mut settings: HashMap<String, String> = if content.len() == 0 {
            HashMap::default()
        } else {
            serde_json::from_str(&content)?
        };

        settings.insert(setting.to_string(), value.to_string());
        guard.seek(std::io::SeekFrom::Start(0)).await?;
        guard.set_len(0).await?;
        guard
            .write_all(serde_json::to_string(&settings)?.as_bytes())
            .await?;
        Ok(())
    }
}