Rewrite to use s3 instead of local files and rocksdb.
This commit is contained in:
343
src/library.rs
343
src/library.rs
@@ -1,178 +1,102 @@
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
|
||||
use cacher::Cacher;
|
||||
use cacher::s3::S3CacherError;
|
||||
use cacher::S3Cacher;
|
||||
use google_photoslibrary1 as photos;
|
||||
use image::imageops;
|
||||
use imageutils::{load_image, resize, resize_to_fill, save_to_jpeg_bytes, FilterType};
|
||||
use log::error;
|
||||
use log::info;
|
||||
use log::warn;
|
||||
use photos::schemas::Album;
|
||||
use photos::schemas::MediaItem;
|
||||
use rocksdb::Direction;
|
||||
use rocksdb::IteratorMode;
|
||||
use rocksdb::DB;
|
||||
use imageutils::{load_image_buffer, resize, resize_to_fill, save_to_jpeg_bytes, FilterType};
|
||||
use log::{error, info};
|
||||
use photos::schemas::{Album, MediaItem};
|
||||
use rusoto_core::RusotoError;
|
||||
use rusoto_s3::GetObjectError;
|
||||
use thiserror::Error;
|
||||
|
||||
// Used to ensure DB is invalidated after schema changes.
|
||||
const LIBRARY_GENERATION: &'static str = "14";
|
||||
const LIBRARY_GENERATION: &'static str = "16";
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum LibraryError {
|
||||
#[error("IO error: {0}")]
|
||||
IoError(#[from] std::io::Error),
|
||||
#[error("s3 error: {0}")]
|
||||
S3CacherError(#[from] S3CacherError),
|
||||
#[error("json error: {0}")]
|
||||
JsonError(#[from] serde_json::Error),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Library {
|
||||
root: PathBuf,
|
||||
originals_dir: PathBuf,
|
||||
cache_db: Arc<DB>,
|
||||
image_cache: Arc<Mutex<Box<dyn Cacher>>>,
|
||||
s3: S3Cacher,
|
||||
}
|
||||
|
||||
impl Library {
|
||||
pub fn new(
|
||||
root: PathBuf,
|
||||
image_cache: Arc<Mutex<Box<dyn Cacher>>>,
|
||||
) -> Result<Library, Box<dyn std::error::Error>> {
|
||||
let db = DB::open_default(root.join("cache"))?;
|
||||
let cache_db = Arc::new(db);
|
||||
let lib = Library {
|
||||
originals_dir: root.join("images").join("originals"),
|
||||
cache_db,
|
||||
root,
|
||||
image_cache,
|
||||
};
|
||||
let cnt = lib.clean_db()?;
|
||||
if cnt != 0 {
|
||||
info!("Deleted {} entries", cnt);
|
||||
}
|
||||
if !lib.originals_dir.exists() {
|
||||
info!(
|
||||
"create originals dir {}",
|
||||
&lib.originals_dir.to_string_lossy()
|
||||
);
|
||||
fs::create_dir_all(&lib.originals_dir)?;
|
||||
}
|
||||
pub fn new(s3: S3Cacher) -> Result<Library, Box<dyn std::error::Error>> {
|
||||
let lib = Library { s3 };
|
||||
Ok(lib)
|
||||
}
|
||||
// Removes all data in the database from older schema.
|
||||
pub fn clean_db(&self) -> Result<usize, rocksdb::Error> {
|
||||
Library::gc(LIBRARY_GENERATION, &self.cache_db)
|
||||
}
|
||||
fn gc(generation: &str, db: &DB) -> Result<usize, rocksdb::Error> {
|
||||
let gen = format!("{}/", generation);
|
||||
// '0' is the next character after '/', so iterator's starting there would be after the
|
||||
// last `gen` entry.
|
||||
let next_gen = format!("{}0", generation);
|
||||
let mut del_cnt = 0;
|
||||
for (k, _v) in db.iterator(IteratorMode::From(gen.as_bytes(), Direction::Reverse)) {
|
||||
if !k.starts_with(gen.as_bytes()) {
|
||||
info!("deleting stale key: {}", String::from_utf8_lossy(&k));
|
||||
db.delete(k)?;
|
||||
del_cnt += 1;
|
||||
}
|
||||
}
|
||||
for (k, _v) in db.iterator(IteratorMode::From(next_gen.as_bytes(), Direction::Forward)) {
|
||||
if !k.starts_with(gen.as_bytes()) {
|
||||
info!("deleting stale key: {}", String::from_utf8_lossy(&k));
|
||||
db.delete(k)?;
|
||||
del_cnt += 1;
|
||||
}
|
||||
}
|
||||
Ok(del_cnt)
|
||||
}
|
||||
pub fn create_album_index(&self, albums: &Vec<Album>) -> io::Result<()> {
|
||||
pub fn create_album_index(&self, albums: &Vec<Album>) -> Result<(), LibraryError> {
|
||||
// Serialize it to a JSON string.
|
||||
let j = serde_json::to_string(albums)?;
|
||||
|
||||
let path = self.root.join("albums.json");
|
||||
info!("saving {}", path.to_string_lossy());
|
||||
fs::write(path, j)
|
||||
let filename = "albums.json";
|
||||
|
||||
self.s3
|
||||
.set(&Library::generational_key(filename), j.as_ref())?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn create_album<P: AsRef<Path>>(
|
||||
pub fn create_album(
|
||||
&self,
|
||||
album_id: P,
|
||||
album_id: &str,
|
||||
media_items: &Vec<MediaItem>,
|
||||
) -> io::Result<()> {
|
||||
let album_dir = self.root.join(album_id);
|
||||
if !album_dir.exists() {
|
||||
info!("making album directory {}", album_dir.to_string_lossy());
|
||||
fs::create_dir_all(&album_dir)?;
|
||||
}
|
||||
) -> Result<(), LibraryError> {
|
||||
let relpath = format!("{}.json", &album_id);
|
||||
let j = serde_json::to_string(&media_items)?;
|
||||
let path = album_dir.join("album.json");
|
||||
info!("saving {}", path.to_string_lossy());
|
||||
fs::write(path, j)
|
||||
|
||||
self.s3
|
||||
.set(&Library::generational_key(&relpath), j.as_ref())?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn albums(&self) -> Result<Vec<Album>, Box<dyn std::error::Error>> {
|
||||
let albums_path = self.root.join("albums.json");
|
||||
info!("loading {}", albums_path.to_string_lossy());
|
||||
let bytes = fs::read(albums_path)?;
|
||||
Ok(serde_json::from_slice(&bytes)?)
|
||||
let filename = "albums.json";
|
||||
|
||||
let bytes = self.s3.get(&Library::generational_key(filename))?;
|
||||
let album: Vec<Album> = serde_json::from_slice(&bytes)?;
|
||||
Ok(album)
|
||||
}
|
||||
pub fn album(&self, album_id: &str) -> Result<Vec<MediaItem>, Box<dyn std::error::Error>> {
|
||||
let album_path = self.root.join(album_id).join("album.json");
|
||||
let bytes = fs::read(album_path)?;
|
||||
Ok(serde_json::from_slice(&bytes)?)
|
||||
let relpath = format!("{}.json", &album_id);
|
||||
let bytes = self.s3.get(&Library::generational_key(&relpath))?;
|
||||
let mis: Vec<MediaItem> = serde_json::from_slice(&bytes)?;
|
||||
Ok(mis)
|
||||
}
|
||||
pub fn download_image(
|
||||
&self,
|
||||
filename: &str,
|
||||
_filename: &str,
|
||||
media_items_id: &str,
|
||||
base_url: &str,
|
||||
) -> Result<PathBuf, Box<dyn std::error::Error>> {
|
||||
// Put images from all albums in common directory.
|
||||
let image_path = self.originals_dir.join(media_items_id);
|
||||
if image_path.exists() {
|
||||
info!(
|
||||
"Skipping already downloaded {} @ {}",
|
||||
&filename,
|
||||
image_path.to_string_lossy()
|
||||
);
|
||||
} else {
|
||||
let download_path = image_path.with_extension("download");
|
||||
let c = Arc::clone(&self.image_cache);
|
||||
let mut c = c.lock().unwrap();
|
||||
match c.get(media_items_id) {
|
||||
Some(bytes) => {
|
||||
info!(
|
||||
"saving local copy of original from cache {}",
|
||||
media_items_id
|
||||
);
|
||||
fs::write(&download_path, bytes)?;
|
||||
}
|
||||
None => {
|
||||
let url = format!("{}=d", base_url);
|
||||
let mut r = reqwest::blocking::get(&url)?;
|
||||
let mut buf = Vec::new();
|
||||
info!("Downloading {}", &url);
|
||||
r.read_to_end(&mut buf)?;
|
||||
fs::write(&download_path, &buf);
|
||||
c.set(media_items_id, &buf);
|
||||
}
|
||||
};
|
||||
info!(
|
||||
"Rename {} -> {}",
|
||||
download_path.to_string_lossy(),
|
||||
image_path.to_string_lossy()
|
||||
);
|
||||
fs::rename(download_path, &image_path)?;
|
||||
let filename = Library::generational_key(&format!("images/originals/{}", media_items_id));
|
||||
if !self.s3.contains_key(&filename) {
|
||||
let url = format!("{}=d", base_url);
|
||||
let mut r = reqwest::blocking::get(&url)?;
|
||||
let mut buf = Vec::new();
|
||||
info!("Downloading {}", &url);
|
||||
r.read_to_end(&mut buf)?;
|
||||
self.s3.set(&filename, &buf)?;
|
||||
}
|
||||
Ok(image_path)
|
||||
Ok(filename.into())
|
||||
}
|
||||
pub fn original(&self, media_items_id: &str) -> Option<PathBuf> {
|
||||
let path = self.originals_dir.join(media_items_id);
|
||||
if path.exists() {
|
||||
Some(path)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
||||
pub fn original_buffer(&self, media_items_id: &str) -> Result<Vec<u8>, LibraryError> {
|
||||
let filename = Library::generational_key(&format!("images/originals/{}", media_items_id));
|
||||
let bytes = self.s3.get(&filename)?;
|
||||
Ok(bytes)
|
||||
}
|
||||
// TODO(wathiede): make this a macro like format! to skip the second string create and copy.
|
||||
fn generational_key(generation: &str, key: &str) -> String {
|
||||
format!("{}/{}", generation, key)
|
||||
fn generational_key(key: &str) -> String {
|
||||
format!("{}/{}", LIBRARY_GENERATION, key)
|
||||
}
|
||||
|
||||
pub fn generate_thumbnail(
|
||||
@@ -182,24 +106,22 @@ impl Library {
|
||||
filter: FilterType,
|
||||
fill: bool,
|
||||
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
|
||||
match self.original(&media_items_id) {
|
||||
None => {
|
||||
warn!("Couldn't find original {}", &media_items_id);
|
||||
Err(io::Error::new(io::ErrorKind::NotFound, format!("{}", media_items_id)).into())
|
||||
}
|
||||
Some(path) => {
|
||||
let orig_img = load_image(&path, dimensions.0, dimensions.1)?;
|
||||
//.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
let img = if fill {
|
||||
resize_to_fill(&orig_img, dimensions, filter)
|
||||
} else {
|
||||
resize(&orig_img, dimensions, filter)
|
||||
};
|
||||
let buf = save_to_jpeg_bytes(&img)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
Ok(buf)
|
||||
}
|
||||
}
|
||||
let buf = self.original_buffer(&media_items_id)?;
|
||||
let dimension_hint = match dimensions {
|
||||
(Some(w), Some(h)) => Some((w, h)),
|
||||
// Partial dimensions should be handled by the caller of this function. So all
|
||||
// other options are None.
|
||||
_ => None,
|
||||
};
|
||||
let orig_img = load_image_buffer(buf, dimension_hint)?;
|
||||
//.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
let img = if fill {
|
||||
resize_to_fill(&orig_img, dimensions, filter)
|
||||
} else {
|
||||
resize(&orig_img, dimensions, filter)
|
||||
};
|
||||
let buf = save_to_jpeg_bytes(&img).map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
|
||||
Ok(buf)
|
||||
}
|
||||
pub fn thumbnail(
|
||||
&self,
|
||||
@@ -214,96 +136,33 @@ impl Library {
|
||||
(None, Some(h)) => format!("-h={}", h),
|
||||
(None, None) => "".to_string(),
|
||||
};
|
||||
Library::generational_key(LIBRARY_GENERATION, &format!("{}{}", media_items_id, dim))
|
||||
Library::generational_key(&format!("images/thumbnails/{}-{}", media_items_id, dim))
|
||||
}
|
||||
let key = cache_key(media_items_id, dimensions);
|
||||
let db = self.cache_db.clone();
|
||||
match db.get(key.as_bytes()) {
|
||||
// Cache hit, return bytes as-is.
|
||||
Ok(Some(bytes)) => Some(bytes),
|
||||
// Cache miss, fill cache and return.
|
||||
Ok(None) => {
|
||||
// TODO(wathiede): use cache for thumbnail like download_image does.
|
||||
let c = Arc::clone(&self.image_cache);
|
||||
let mut c = c.lock().unwrap();
|
||||
let bytes = match c.get(&key) {
|
||||
Some(bytes) => {
|
||||
info!(
|
||||
"saving local copy of thumbnail from cache {}",
|
||||
media_items_id
|
||||
);
|
||||
bytes
|
||||
}
|
||||
None => {
|
||||
info!("cache MISS {}", key);
|
||||
let bytes = match self.generate_thumbnail(
|
||||
media_items_id,
|
||||
dimensions,
|
||||
FilterType::Builtin(imageops::FilterType::Lanczos3),
|
||||
fill,
|
||||
) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to generate thumbnail for {}: {}",
|
||||
media_items_id, e
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
c.set(&key, &bytes);
|
||||
bytes
|
||||
}
|
||||
};
|
||||
match db.put(key.as_bytes(), &bytes) {
|
||||
Ok(_) => Some(bytes),
|
||||
Err(e) => {
|
||||
error!("Failed to put bytes to {}: {}", key, e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
// RocksDB error.
|
||||
match self.s3.get(&key) {
|
||||
Ok(bytes) => return Some(bytes),
|
||||
Err(S3CacherError::GetObjectError(RusotoError::Service(
|
||||
GetObjectError::NoSuchKey(msg),
|
||||
))) => info!("Missing thumbnail {} in s3: {}", key, msg),
|
||||
Err(e) => error!("Error fetching thumbnail {} from s3: {}", key, e),
|
||||
};
|
||||
|
||||
info!("cache MISS {}", key);
|
||||
let bytes = match self.generate_thumbnail(
|
||||
media_items_id,
|
||||
dimensions,
|
||||
FilterType::Builtin(imageops::FilterType::Lanczos3),
|
||||
fill,
|
||||
) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
error!("Failed to search DB for {}: {}", key, e);
|
||||
None
|
||||
error!("Failed to generate thumbnail for {}: {}", media_items_id, e);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
if let Err(e) = self.s3.set(&key, &bytes) {
|
||||
error!("Failed to put thumbnail {}: {}", &key, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use tempdir::TempDir;
|
||||
|
||||
#[test]
|
||||
fn clean_db() {
|
||||
let td = TempDir::new("photosync_test").expect("failed to create temporary directory");
|
||||
eprintln!("creating database in {}", td.path().to_string_lossy());
|
||||
let db = DB::open_default(td.path()).expect("failed to open DB");
|
||||
let keys = vec!["one", "two", "three"];
|
||||
|
||||
fn get_keys(db: &DB) -> Vec<String> {
|
||||
db.iterator(rocksdb::IteratorMode::Start)
|
||||
.map(|(k, _v)| String::from_utf8(k.to_vec()).expect("key not utf-8"))
|
||||
.collect()
|
||||
}
|
||||
for k in &keys {
|
||||
for g in vec!["1", "2", "3"] {
|
||||
db.put(Library::generational_key(g, k), k)
|
||||
.expect("failed to put");
|
||||
}
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
get_keys(&db),
|
||||
vec![
|
||||
"1/one", "1/three", "1/two", "2/one", "2/three", "2/two", "3/one", "3/three",
|
||||
"3/two"
|
||||
]
|
||||
);
|
||||
Library::gc("2", &db).expect("failed to GC DB");
|
||||
assert_eq!(get_keys(&db), vec!["2/one", "2/three", "2/two",]);
|
||||
Some(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
33
src/main.rs
33
src/main.rs
@@ -2,11 +2,10 @@ use std::collections::HashMap;
|
||||
use std::error::Error;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time;
|
||||
|
||||
use cacher::{Cacher, S3Cacher};
|
||||
use cacher::S3Cacher;
|
||||
use google_api_auth;
|
||||
use google_photoslibrary1 as photos;
|
||||
use hexihasher;
|
||||
@@ -32,14 +31,13 @@ struct Sync {
|
||||
/// Optional album title to filter. Default will mirror all albums.
|
||||
#[structopt(short, long)]
|
||||
title_filter: Option<Regex>,
|
||||
/// Directory to store sync.
|
||||
root: PathBuf,
|
||||
/// S3 bucket holding metadata and images.
|
||||
#[structopt(long, default_value = "photosync-dev")]
|
||||
s3_bucket: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, StructOpt)]
|
||||
struct Serve {
|
||||
/// Directory of data fetched by `sync`.
|
||||
root: PathBuf,
|
||||
/// HTTP address to listen for web requests.
|
||||
#[structopt(long = "addr", default_value = "0.0.0.0:0")]
|
||||
addr: SocketAddr,
|
||||
@@ -65,6 +63,9 @@ enum Command {
|
||||
Serve {
|
||||
#[structopt(flatten)]
|
||||
serve: Serve,
|
||||
/// S3 bucket holding metadata and images.
|
||||
#[structopt(default_value = "photosync-dev")]
|
||||
s3_bucket: String,
|
||||
},
|
||||
ServeAndSync {
|
||||
/// Sync albums at given interval.
|
||||
@@ -322,9 +323,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.init()
|
||||
.unwrap();
|
||||
debug!("opt: {:?}", opt);
|
||||
let image_cache: Mutex<Box<dyn Cacher>> =
|
||||
Mutex::new(Box::new(S3Cacher::new("photosync".to_string())?));
|
||||
let image_cache = Arc::new(image_cache);
|
||||
match opt.cmd {
|
||||
Command::ListAlbums { auth, title_filter } => {
|
||||
let client = new_client(&auth.credentials, &auth.token_cache)?;
|
||||
@@ -341,18 +339,21 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
Sync {
|
||||
auth,
|
||||
title_filter,
|
||||
root,
|
||||
s3_bucket,
|
||||
},
|
||||
} => {
|
||||
let s3 = S3Cacher::new(s3_bucket.clone())?;
|
||||
let client = new_client(&auth.credentials, &auth.token_cache)?;
|
||||
let lib = Library::new(root, image_cache)?;
|
||||
let lib = Library::new(s3)?;
|
||||
sync_albums(&client, &title_filter, &lib)?;
|
||||
Ok(())
|
||||
}
|
||||
Command::Serve {
|
||||
serve: Serve { addr, root },
|
||||
serve: Serve { addr },
|
||||
s3_bucket,
|
||||
} => {
|
||||
let lib = Library::new(root, image_cache)?;
|
||||
let s3 = S3Cacher::new(s3_bucket.clone())?;
|
||||
let lib = Library::new(s3)?;
|
||||
serve(addr, lib)
|
||||
}
|
||||
Command::ServeAndSync {
|
||||
@@ -361,12 +362,14 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
Sync {
|
||||
auth,
|
||||
title_filter,
|
||||
root,
|
||||
s3_bucket,
|
||||
},
|
||||
|
||||
addr,
|
||||
} => {
|
||||
let s3 = S3Cacher::new(s3_bucket.clone())?;
|
||||
let client = new_client(&auth.credentials, &auth.token_cache)?;
|
||||
let lib = Library::new(root, image_cache)?;
|
||||
let lib = Library::new(s3)?;
|
||||
background_sync(client, interval, title_filter, lib.clone())?;
|
||||
serve(addr, lib)?;
|
||||
Ok(())
|
||||
|
||||
10
src/rweb.rs
10
src/rweb.rs
@@ -7,6 +7,7 @@ use google_photoslibrary1 as photos;
|
||||
use log::error;
|
||||
use photos::schemas::{Album, MediaItem};
|
||||
use prometheus::Encoder;
|
||||
use rocket::config::{Config, Environment};
|
||||
use rocket::http::ContentType;
|
||||
use rocket::response::status::NotFound;
|
||||
use rocket::response::Content;
|
||||
@@ -116,8 +117,13 @@ fn embedz() -> Content<Vec<u8>> {
|
||||
Content(ContentType::HTML, w)
|
||||
}
|
||||
|
||||
pub fn run(_addr: SocketAddr, lib: Library) -> Result<(), Box<dyn Error>> {
|
||||
let e = rocket::ignite()
|
||||
pub fn run(addr: SocketAddr, lib: Library) -> Result<(), Box<dyn Error>> {
|
||||
let config = Config::build(Environment::Development)
|
||||
.address(addr.ip().to_string())
|
||||
.port(addr.port())
|
||||
.finalize()?;
|
||||
|
||||
let e = rocket::custom(config)
|
||||
.manage(lib)
|
||||
.mount(
|
||||
"/",
|
||||
|
||||
Reference in New Issue
Block a user