From 5fd3188a2cf08b45f883cffaf352eb9d53a0d45d Mon Sep 17 00:00:00 2001 From: snow flurry Date: Wed, 24 Apr 2024 01:21:08 -0700 Subject: [PATCH] unnecessary but acceptable changes the bug was in doppler. oops. --- Cargo.lock | 1 + Cargo.toml | 3 ++- src/local.rs | 68 +++++++++++++++++++++++++++++++++++++++++----------- src/main.rs | 2 +- src/web.rs | 2 +- 5 files changed, 59 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b47fd1..108b37b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1824,6 +1824,7 @@ dependencies = [ "futures-util", "http", "hyper-rustls 0.27.1", + "mime_guess", "qrencode", "reqwest", "serde", diff --git a/Cargo.toml b/Cargo.toml index a285b04..7d6ebe9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,4 +22,5 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "mul futures = "0.3" sled = "0.34.7" tracing = "0.1.40" -tracing-subscriber = "0.3.18" \ No newline at end of file +tracing-subscriber = "0.3.18" +mime_guess = "2" \ No newline at end of file diff --git a/src/local.rs b/src/local.rs index d31e746..94aad1b 100644 --- a/src/local.rs +++ b/src/local.rs @@ -1,5 +1,6 @@ use std::{os::unix::ffi::OsStrExt, path::Path, sync::Arc}; +use mime_guess::Mime; use reqwest::{multipart, Url}; use serde::{Deserialize, Serialize}; use tokio::{sync::Semaphore, task::JoinSet}; @@ -77,27 +78,60 @@ impl LocalServer { base_uri, info, cache, - client: reqwest::Client::new(), + client: reqwest::Client::builder() + .http1_title_case_headers() + .build() + .unwrap(), tasks: JoinSet::new(), - semaphore: Arc::new(Semaphore::new(10)), + semaphore: Arc::new(Semaphore::new(1)), }) } - pub fn should_upload(&self, path: impl AsRef) -> bool { + fn fuzzy_mime(&self, mime: Mime) -> Option { + let mime_str = mime.essence_str(); + + if self + .info + .supported_mimetypes + .iter() + .any(|mt| mt == mime_str) + { + Some(mime_str.to_owned()) + } else { + let x_mime = format!("{}/x-{}", mime.type_(), mime.subtype()); + self.info + .supported_mimetypes + .iter() + .find(|mt| *mt == &x_mime) + .map(|_| x_mime) + } + } + + pub fn should_upload(&self, path: impl AsRef) -> Option { + // We need to confirm a few things: + // First, do we have a file extension? if let Some(extension) = path.as_ref().extension() { let extension = extension.as_bytes(); + // Is that file extension in our list of "known" extensions? if self .info .known_file_extensions .iter() .any(|ex| ex.as_bytes() == extension) { - if let Ok(false) = self.cache.has_path(path.as_ref()) { - return true; + // We also have to check the mime type... + let mime_type = mime_guess::from_path(&path); + if let Some(mime) = mime_type.iter().find(|m| m.type_() == "audio") { + // ... is known to the device + if let Some(mime) = self.fuzzy_mime(mime) { + if let Ok(false) = self.cache.has_path(path.as_ref()) { + return Some(mime); + } + } } } } - false + None } /// Adds an upload task to the queue. The task will be spawned immediately, @@ -109,12 +143,13 @@ impl LocalServer { path: impl AsRef, ) -> Result<(), Box> { let path = path.as_ref(); - if !self.should_upload(path) { + let Some(mime) = self.should_upload(path) else { return Err(Box::new(std::io::Error::new( std::io::ErrorKind::InvalidInput, "Invalid file type", ))); - } + }; + let filename = path.file_name().unwrap().to_string_lossy().to_string(); let client = self.client.clone(); let base_uri = self.base_uri.clone(); @@ -125,21 +160,26 @@ impl LocalServer { // The actual upload task self.tasks.spawn(async move { let _permit = semaphore.acquire_owned().await.unwrap(); - let fd = tokio::fs::OpenOptions::new() - .read(true) - .open(path.to_owned()) - .await?; + let fd = tokio::fs::OpenOptions::new().read(true).open(&path).await?; + + let flen = fd.metadata().await.unwrap().len(); let form = multipart::Form::new() .part("filename", multipart::Part::text(filename.to_string())) - .part("file", multipart::Part::stream(fd).file_name(filename)); + .part( + "file", + multipart::Part::stream_with_length(fd, flen) + .file_name(filename) + .mime_str(mime.as_str()) + .unwrap(), + ); let response = client .post(base_uri.join("upload").unwrap()) .multipart(form) .send() .await?; - response.error_for_status()?; + let _bytes = response.bytes().await?; if let Err(err) = cache.store(&path).await { error!("unable to cache {}: {}", path.display(), err); diff --git a/src/main.rs b/src/main.rs index 40f6904..17ea781 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,7 +109,7 @@ async fn sync_dir(dir: impl AsRef, server: &mut LocalServer) -> bool { continue; } }; - } else if path.exists() && server.should_upload(&path) { + } else if path.exists() && server.should_upload(&path).is_some() { debug!("Adding {} to queue", path.display()); // Add the download to the queue if let Err(err) = server.queue_upload(&path).await { diff --git a/src/web.rs b/src/web.rs index fde5ef6..e7c1dd5 100644 --- a/src/web.rs +++ b/src/web.rs @@ -102,7 +102,7 @@ impl DopplerWebClient { &self.code } - pub async fn wait_for_device(&mut self, list: &mut DeviceList) -> Result<(String, Uri), Error> { + pub async fn wait_for_device(mut self, list: &mut DeviceList) -> Result<(String, Uri), Error> { let mut device: Option = None; while let Some(msg) = self.ws.next().await { if let Some(msg) = msg?.as_text() {