From e205dd4f6e781d9232404ebbfd3a624b260a4289 Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Mon, 29 Dec 2025 18:24:18 -0800 Subject: [PATCH 01/12] Iffy try at using new API --- src/api/user_upload.rs | 98 ++++++++++++++-- src/app_state.rs | 18 ++- src/tokio_thread.rs | 48 ++++++-- src/ui/views/main/upload_manager.rs | 172 +++++++++++++++++----------- src/ui/views/mod.rs | 13 ++- src/upload/mod.rs | 6 +- 6 files changed, 267 insertions(+), 88 deletions(-) diff --git a/src/api/user_upload.rs b/src/api/user_upload.rs index ac21db69..977c4866 100644 --- a/src/api/user_upload.rs +++ b/src/api/user_upload.rs @@ -7,6 +7,8 @@ use crate::api::{API_BASE_URL, ApiClient, ApiError, check_for_response_success}; pub struct UserUploads { pub statistics: UserUploadStatistics, pub uploads: Vec, + pub limit: u32, + pub offset: u32, } #[derive(Deserialize, Debug, Clone)] @@ -51,37 +53,115 @@ pub struct UserUpload { } impl ApiClient { - pub async fn get_user_upload_stats( + pub async fn get_user_upload_statistics( &self, api_key: &str, user_id: &str, - ) -> Result { - // Response structs for the user info endpoint + start_date: Option, + end_date: Option, + ) -> Result { #[derive(Deserialize, Debug)] #[allow(unused)] - struct UserStatsResponse { + struct UserStatisticsResponse { success: bool, user_id: String, statistics: UserUploadStatistics, + } + + let mut url = format!("{API_BASE_URL}/tracker/v2/uploads/user/{user_id}/stats"); + let mut query_params = Vec::new(); + if let Some(start) = start_date { + query_params.push(format!("start_date={}", start.format("%Y-%m-%d"))); + } + if let Some(end) = end_date { + query_params.push(format!("end_date={}", end.format("%Y-%m-%d"))); + } + if !query_params.is_empty() { + url.push('?'); + url.push_str(&query_params.join("&")); + } + + let response = self + .client + .get(url) + .header("Content-Type", "application/json") + .header("X-API-Key", api_key) + .send() + .await?; + + let response = + check_for_response_success(response, "User upload statistics unavailable").await?; + + let server_stats = response.json::().await?; + + Ok(server_stats.statistics) + } + + pub async fn get_user_upload_list( + &self, + api_key: &str, + user_id: &str, + limit: u32, + offset: u32, + start_date: Option, + end_date: Option, + ) -> Result<(Vec, u32, u32), ApiError> { + #[derive(Deserialize, Debug)] + #[allow(unused)] + struct UserUploadListResponse { + success: bool, + user_id: String, uploads: Vec, + limit: u32, + offset: u32, + } + + let mut url = format!( + "{API_BASE_URL}/tracker/v2/uploads/user/{user_id}/list?limit={limit}&offset={offset}" + ); + if let Some(start) = start_date { + url.push_str(&format!("&start_date={}", start.format("%Y-%m-%d"))); + } + if let Some(end) = end_date { + url.push_str(&format!("&end_date={}", end.format("%Y-%m-%d"))); } let response = self .client - .get(format!("{API_BASE_URL}/tracker/uploads/user/{user_id}")) + .get(url) .header("Content-Type", "application/json") .header("X-API-Key", api_key) .send() .await?; let response = - check_for_response_success(response, "User upload stats unavailable").await?; + check_for_response_success(response, "User upload list unavailable").await?; - let server_stats = response.json::().await?; + let server_list = response.json::().await?; + + Ok((server_list.uploads, server_list.limit, server_list.offset)) + } + + /// Legacy method for backward compatibility if needed, though it's better to use the split methods. + pub async fn get_user_upload_stats( + &self, + api_key: &str, + user_id: &str, + limit: u32, + offset: u32, + ) -> Result { + let statistics = self + .get_user_upload_statistics(api_key, user_id, None, None) + .await?; + let (uploads, limit, offset) = self + .get_user_upload_list(api_key, user_id, limit, offset, None, None) + .await?; Ok(UserUploads { - statistics: server_stats.statistics, - uploads: server_stats.uploads, + statistics, + uploads, + limit, + offset, }) } } diff --git a/src/app_state.rs b/src/app_state.rs index 9a5ed746..160cf544 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -36,6 +36,7 @@ pub struct AppState { pub supported_games: RwLock, /// Flag for offline mode - skips API server calls when enabled pub offline_mode: AtomicBool, + pub upload_filters: RwLock, } impl AppState { pub fn new( @@ -62,12 +63,19 @@ impl AppState { last_foregrounded_game: RwLock::new(None), supported_games: RwLock::new(SupportedGames::load_from_embedded()), offline_mode: AtomicBool::new(false), + upload_filters: RwLock::new(UploadFilters::default()), }; tracing::debug!("AppState::new() complete"); state } } +#[derive(Default, Clone, Copy, Debug)] +pub struct UploadFilters { + pub start_date: Option, + pub end_date: Option, +} + #[derive(Clone, PartialEq)] pub struct ForegroundedGame { pub exe_name: Option, @@ -144,7 +152,8 @@ pub enum AsyncRequest { OpenDataDump, OpenLog, UpdateSupportedGames(SupportedGames), - LoadUploadStats, + LoadUploadStatistics, + LoadUploadList { limit: u32, offset: u32 }, LoadLocalRecordings, DeleteAllInvalidRecordings, DeleteAllUploadedLocalRecordings, @@ -182,7 +191,12 @@ pub enum UiUpdate { UploadFailed(String), UpdateRecordingState(bool), UpdateNewerReleaseAvailable(GitHubRelease), - UpdateUserUploads(UserUploads), + UpdateUserUploadStatistics(crate::api::user_upload::UserUploadStatistics), + UpdateUserUploadList { + uploads: Vec, + limit: u32, + offset: u32, + }, UpdateLocalRecordings(Vec), FolderPickerResult { old_path: PathBuf, diff --git a/src/tokio_thread.rs b/src/tokio_thread.rs index f93dcdb5..44ea3e02 100644 --- a/src/tokio_thread.rs +++ b/src/tokio_thread.rs @@ -213,7 +213,8 @@ async fn main( .send(UiUpdate::UpdateUserId(Ok(user_id))) .ok(); - app_state.async_request_tx.send(AsyncRequest::LoadUploadStats).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadStatistics).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadList { limit: 100, offset: 0 }).await.ok(); } } // no matter if offline or online, local recordings should be loaded @@ -266,31 +267,60 @@ async fn main( supported_games.installed().count() ); } - AsyncRequest::LoadUploadStats => { + AsyncRequest::LoadUploadStatistics => { if app_state.offline_mode.load(std::sync::atomic::Ordering::SeqCst) { - tracing::info!("Offline mode enabled, skipping upload stats load"); - // Don't send any update - UI will show no upload stats + tracing::info!("Offline mode enabled, skipping upload statistics load"); } else { match valid_api_key_and_user_id.clone() { Some((api_key, user_id)) => { + let filters = app_state.upload_filters.read().unwrap(); tokio::spawn({ let app_state = app_state.clone(); let api_client = api_client.clone(); async move { - let stats = match api_client.get_user_upload_stats(&api_key, &user_id).await { + let stats = match api_client.get_user_upload_statistics(&api_key, &user_id, filters.start_date, filters.end_date).await { Ok(stats) => stats, Err(e) => { - tracing::error!(e=?e, "Failed to get user upload stats"); + tracing::error!(e=?e, "Failed to get user upload statistics"); return; } }; - tracing::info!(stats=?stats.statistics, "Loaded upload stats"); - app_state.ui_update_tx.send(UiUpdate::UpdateUserUploads(stats)).ok(); + tracing::info!(stats=?stats, "Loaded upload statistics"); + app_state.ui_update_tx.send(UiUpdate::UpdateUserUploadStatistics(stats)).ok(); } }); } None => { - tracing::error!("API key and user ID not found, skipping upload stats load"); + tracing::error!("API key and user ID not found, skipping upload statistics load"); + } + } + } + } + AsyncRequest::LoadUploadList { limit, offset } => { + if app_state.offline_mode.load(std::sync::atomic::Ordering::SeqCst) { + tracing::info!("Offline mode enabled, skipping upload list load"); + } else { + match valid_api_key_and_user_id.clone() { + Some((api_key, user_id)) => { + let filters = app_state.upload_filters.read().unwrap(); + tokio::spawn({ + let app_state = app_state.clone(); + let api_client = api_client.clone(); + async move { + let (uploads, limit, offset) = match api_client.get_user_upload_list(&api_key, &user_id, limit, offset, filters.start_date, filters.end_date).await { + Ok(res) => res, + Err(e) => { + tracing::error!(e=?e, "Failed to get user upload list"); + return; + } + }; + tracing::info!(count=uploads.len(), "Loaded upload list"); + app_state.ui_update_tx.send(UiUpdate::UpdateUserUploadList { uploads, limit, offset }).ok(); + } + }); + } + None => { + tracing::error!("API key and user ID not found, skipping upload list load"); } } } diff --git a/src/ui/views/main/upload_manager.rs b/src/ui/views/main/upload_manager.rs index f26784b2..12faa5b3 100644 --- a/src/ui/views/main/upload_manager.rs +++ b/src/ui/views/main/upload_manager.rs @@ -5,7 +5,7 @@ use egui::{ }; use crate::{ - api::UserUpload, + api::{UserUpload, UserUploads, UserUploadStatistics}, app_state::{AppState, AsyncRequest}, config::Preferences, output_types::Metadata, @@ -26,8 +26,12 @@ pub struct UploadManager { prev_auto_upload_enabled: bool, } impl UploadManager { - pub fn update_user_uploads(&mut self, user_uploads: Vec) { - self.recordings.update_user_uploads(user_uploads); + pub fn update_user_upload_statistics(&mut self, statistics: UserUploadStatistics) { + self.recordings.update_user_upload_statistics(statistics); + } + + pub fn update_user_upload_list(&mut self, uploads: Vec, limit: u32, offset: u32) { + self.recordings.update_user_upload_list(uploads, limit, offset); self.virtual_list.reset(); } @@ -66,11 +70,22 @@ pub struct Recordings { filtered_excluding_local_uploaded: Vec, latest_upload_timestamp: Option>, invalid_count_filtered: usize, + + statistics: Option, + limit: u32, + offset: u32, } impl Recordings { - pub fn update_user_uploads(&mut self, user_uploads: Vec) { - self.storage.uploaded = user_uploads; + pub fn update_user_upload_statistics(&mut self, statistics: UserUploadStatistics) { + self.statistics = Some(statistics); + self.update_calculated_state(); + } + + pub fn update_user_upload_list(&mut self, uploads: Vec, limit: u32, offset: u32) { + self.storage.uploaded = uploads; self.storage.uploads_available = true; + self.limit = limit; + self.offset = offset; self.update_calculated_state(); } @@ -616,39 +631,20 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { let available_width = ui.available_width() - (cell_count as f32 * 10.0); let cell_width = available_width / cell_count as f32; - // Calculate stats for each of our categories. The endpoint that we use to get - // the uploaded recordings tells us this, but over the entire range: we'd like to - // cover just the user-filtered range. - let mut total_duration: f64 = 0.0; - let mut total_count: usize = 0; - let mut total_size: u64 = 0; - let mut last_upload: Option> = None; - + // Calculate pending stats let mut unuploaded_duration: f64 = 0.0; let mut unuploaded_count: usize = 0; let mut unuploaded_size: u64 = 0; for recording in recordings.iter_filtered() { - match recording { - Recording::Uploaded(recording) => { - total_duration += recording.video_duration_seconds.unwrap_or(0.0); - total_count += 1; - total_size += recording.file_size_bytes; - if last_upload.is_none() || recording.created_at > last_upload.unwrap() { - last_upload = Some(recording.created_at); - } - } - Recording::Local( - LocalRecording::Unuploaded { info, metadata } - | LocalRecording::Paused(LocalRecordingPaused { metadata, info, .. }), - ) => { - unuploaded_duration += metadata.as_ref().map(|m| m.duration).unwrap_or(0.0); - unuploaded_count += 1; - unuploaded_size += info.folder_size; - } - Recording::Local(LocalRecording::Invalid { .. } | LocalRecording::Uploaded { .. }) => { - // We don't count these in our stats - } + if let Recording::Local( + LocalRecording::Unuploaded { info, metadata } + | LocalRecording::Paused(LocalRecordingPaused { metadata, info, .. }), + ) = recording + { + unuploaded_duration += metadata.as_ref().map(|m| m.duration).unwrap_or(0.0); + unuploaded_count += 1; + unuploaded_size += info.folder_size; } } @@ -673,16 +669,14 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { vec2(cell_width, ui.available_height()), Layout::top_down(Align::Center), |ui| { - create_upload_cell( - ui, - "📊", // Icon - "Total Uploaded", - &if recordings.uploads_available() { - util::format_seconds(total_duration as u64) - } else { - "Loading...".to_string() - }, - ); + let val = if let Some(stats) = &recordings.statistics { + util::format_seconds(stats.total_video_time.seconds as u64) + } else if recordings.uploads_available() { + "0s".to_string() + } else { + "Loading...".to_string() + }; + create_upload_cell(ui, "📊", "Total Uploaded", &val); }, ); @@ -691,16 +685,14 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { vec2(cell_width, ui.available_height()), Layout::top_down(Align::Center), |ui| { - create_upload_cell( - ui, - "📁", // Icon - "Files Uploaded", - &if recordings.uploads_available() { - total_count.to_string() - } else { - "Loading...".to_string() - }, - ); + let val = if let Some(stats) = &recordings.statistics { + stats.total_uploads.to_string() + } else if recordings.uploads_available() { + "0".to_string() + } else { + "Loading...".to_string() + }; + create_upload_cell(ui, "📁", "Files Uploaded", &val); }, ); @@ -709,16 +701,14 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { vec2(cell_width, ui.available_height()), Layout::top_down(Align::Center), |ui| { - create_upload_cell( - ui, - "💾", // Icon - "Volume Uploaded", - &if recordings.uploads_available() { - util::format_bytes(total_size) - } else { - "Loading...".to_string() - }, - ); + let val = if let Some(stats) = &recordings.statistics { + util::format_bytes(stats.total_data.bytes) + } else if recordings.uploads_available() { + "0 B".to_string() + } else { + "Loading...".to_string() + }; + create_upload_cell(ui, "💾", "Volume Uploaded", &val); }, ); @@ -782,6 +772,7 @@ fn recordings_view( }) .show(ui, |ui| { let button_height = 28.0; + let pagination_height = 30.0; let height = (ui.available_height() - FOOTER_HEIGHT).max(button_height); // Show spinner if still loading @@ -814,8 +805,61 @@ fn recordings_view( .ok(); } + // Pagination Controls + if let Some(stats) = &recordings.statistics { + if stats.total_uploads > 0 { + ui.horizontal(|ui| { + let total_pages = + (stats.total_uploads as f32 / recordings.limit as f32).ceil() as u32; + let current_page = (recordings.offset / recordings.limit) + 1; + + ui.add_enabled_ui(recordings.offset > 0, |ui| { + if ui.button("Previous").clicked() { + let new_offset = + recordings.offset.saturating_sub(recordings.limit); + app_state + .async_request_tx + .blocking_send(AsyncRequest::LoadUploadStats { + limit: recordings.limit, + offset: new_offset, + }) + .ok(); + } + }); + + ui.label(format!("Page {current_page} of {total_pages}")); + + ui.add_enabled_ui(current_page < total_pages, |ui| { + if ui.button("Next").clicked() { + let new_offset = recordings.offset + recordings.limit; + app_state + .async_request_tx + .blocking_send(AsyncRequest::LoadUploadStats { + limit: recordings.limit, + offset: new_offset, + }) + .ok(); + } + }); + + ui.with_layout(Layout::right_to_left(Align::Center), |ui| { + ui.label(format!("Total records: {}", stats.total_uploads)); + }); + }); + ui.separator(); + } + } + ScrollArea::vertical() - .max_height(height - if any_invalid { button_height } else { 0.0 }) + .max_height( + height + - (if any_invalid { button_height } else { 0.0 }) + - (if recordings.statistics.is_some() { + pagination_height + } else { + 0.0 + }), + ) .auto_shrink([false, false]) .show(ui, |ui| { if recordings.is_empty_filtered() { diff --git a/src/ui/views/mod.rs b/src/ui/views/mod.rs index a0249783..8144f055 100644 --- a/src/ui/views/mod.rs +++ b/src/ui/views/mod.rs @@ -164,10 +164,19 @@ impl App { UiUpdate::UpdateNewerReleaseAvailable(release) => { self.newer_release_available = Some(release); } - UiUpdate::UpdateUserUploads(uploads) => { + UiUpdate::UpdateUserUploadStatistics(statistics) => { self.main_view_state .upload_manager - .update_user_uploads(uploads.uploads); + .update_user_upload_statistics(statistics); + } + UiUpdate::UpdateUserUploadList { + uploads, + limit, + offset, + } => { + self.main_view_state + .upload_manager + .update_user_upload_list(uploads, limit, offset); } UiUpdate::UpdateLocalRecordings(recordings) => { self.main_view_state diff --git a/src/upload/mod.rs b/src/upload/mod.rs index 6ede1f73..bafa4df9 100644 --- a/src/upload/mod.rs +++ b/src/upload/mod.rs @@ -80,7 +80,8 @@ pub async fn start( .store(false, std::sync::atomic::Ordering::SeqCst); for req in [ - AsyncRequest::LoadUploadStats, + AsyncRequest::LoadUploadStatistics, + AsyncRequest::LoadUploadList { limit: 100, offset: 0 }, AsyncRequest::LoadLocalRecordings, ] { app_state.async_request_tx.send(req).await.ok(); @@ -220,7 +221,8 @@ async fn run( if should_reload { for req in [ - AsyncRequest::LoadUploadStats, + AsyncRequest::LoadUploadStatistics, + AsyncRequest::LoadUploadList { limit: 100, offset: 0 }, AsyncRequest::LoadLocalRecordings, ] { async_req_tx.send(req).await.ok(); From df00566eb8eea555b13f2a21b9ee97b853206ce8 Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Mon, 5 Jan 2026 11:28:27 -0800 Subject: [PATCH 02/12] Prototype cleaner upload flow --- src/record/local_recording.rs | 86 +++++++++- src/upload/upload_tar.rs | 303 ++++++++++++++++++++++++---------- 2 files changed, 298 insertions(+), 91 deletions(-) diff --git a/src/record/local_recording.rs b/src/record/local_recording.rs index 7bcdb704..ce25e046 100644 --- a/src/record/local_recording.rs +++ b/src/record/local_recording.rs @@ -67,15 +67,67 @@ impl UploadProgressState { /// Load progress state from a file pub fn load_from_file(path: &Path) -> eyre::Result { - let content = std::fs::read_to_string(path)?; - let state: Self = serde_json::from_str(&content)?; + let file = std::fs::File::open(path)?; + let reader = std::io::BufReader::new(file); + let mut stream = serde_json::Deserializer::from_reader(reader).into_iter::(); + + // Read the first object which should be the UploadProgressState + let first_value = stream.next().ok_or_else(|| eyre::eyre!("Empty progress file"))??; + let mut state: Self = serde_json::from_value(first_value)?; + + // If the state was saved in the old format (single JSON object with populated etags), + // we're done (the etags are already in state.chunk_etags). + // If it was saved in the new format (header + log lines), state.chunk_etags might be empty, + // and we need to read the rest of the file. + + // Read subsequent objects as CompleteMultipartUploadChunk + for value in stream { + let chunk: CompleteMultipartUploadChunk = serde_json::from_value(value?)?; + // Avoid duplicates if we're migrating or recovering from a weird state + if !state.chunk_etags.iter().any(|c| c.chunk_number == chunk.chunk_number) { + state.chunk_etags.push(chunk); + } + } + Ok(state) } - /// Save progress state to a file + /// Save progress state to a file (Snapshot + Log format) pub fn save_to_file(&self, path: &Path) -> eyre::Result<()> { - let content = serde_json::to_string_pretty(self)?; - std::fs::write(path, content)?; + let file = std::fs::File::create(path)?; + let mut writer = std::io::BufWriter::new(file); + + // 1. Write the base state with EMPTY chunk_etags to the first line. + // We clone to clear the vector without modifying self. + let mut header_state = self.clone(); + header_state.chunk_etags.clear(); + serde_json::to_writer(&mut writer, &header_state)?; + use std::io::Write; + writeln!(&mut writer)?; + + // 2. Write all existing etags as subsequent lines + for chunk in &self.chunk_etags { + serde_json::to_writer(&mut writer, chunk)?; + writeln!(&mut writer)?; + } + + writer.flush()?; + Ok(()) + } + + /// Append a single chunk completion to the log file + pub fn append_chunk_to_file(&self, chunk: &CompleteMultipartUploadChunk) -> eyre::Result<()> { + let mut file = std::fs::OpenOptions::new() + .write(true) + .append(true) + .open(&self.tar_path.parent().unwrap().join(constants::filename::recording::UPLOAD_PROGRESS))?; + + // Ensure we start on a new line (though save_to_file guarantees ending with newline) + // serde_json::to_writer doesn't add a newline + serde_json::to_writer(&mut file, chunk)?; + use std::io::Write; + writeln!(&mut file)?; + Ok(()) } @@ -160,6 +212,30 @@ impl LocalRecordingPaused { self.save_upload_progress().ok(); r } + + /// Records a successful chunk upload: updates in-memory state and appends to the log file. + pub fn record_chunk_completion(&mut self, chunk: CompleteMultipartUploadChunk) -> eyre::Result<()> { + // Update in-memory + self.upload_progress.chunk_etags.push(chunk.clone()); + + // Append to disk (efficient log append) + // We construct the path manually here or use the one from info, + // but UploadProgressState doesn't store the full progress file path, only tar path. + // We can use the helper method on self. + + let path = self.upload_progress_path(); + let mut file = std::fs::OpenOptions::new() + .write(true) + .append(true) + .create(false) // Should already exist + .open(path)?; + + serde_json::to_writer(&mut file, &chunk)?; + use std::io::Write; + writeln!(&mut file)?; + + Ok(()) + } /// Save upload progress state to .upload-progress file. pub fn save_upload_progress(&self) -> eyre::Result<()> { diff --git a/src/upload/upload_tar.rs b/src/upload/upload_tar.rs index 5000f31a..f2bbf7b4 100644 --- a/src/upload/upload_tar.rs +++ b/src/upload/upload_tar.rs @@ -198,27 +198,138 @@ pub async fn run( let mut guard = AbortUploadOnDrop::new(api_client.clone(), api_token.to_string(), paused); { - let mut file = tokio::fs::File::open(tar_path).await?; - - // If resuming, seek to the correct position in the file - if start_chunk > 1 { - let bytes_to_skip = (start_chunk - 1) * chunk_size_bytes; - file.seek(std::io::SeekFrom::Start(bytes_to_skip)) - .await - .map_err(UploadTarError::Io)?; - tracing::info!( - "Seeking to byte {} to resume from chunk {}", - bytes_to_skip, - start_chunk - ); - } + let file = tokio::fs::File::open(tar_path.clone()).await?; + + // Pipeline Channels + // Channel 1: Producer -> Signer + // Payload: (Chunk Data, Chunk Hash, Chunk Number) + let (tx_hashed, mut rx_hashed) = tokio::sync::mpsc::channel(2); + + // Channel 2: Signer -> Uploader + // Payload: (Chunk Data, Upload URL, Chunk Number) + let (tx_signed, mut rx_signed) = tokio::sync::mpsc::channel(2); + + // --- STAGE 1: PRODUCER (Read & Hash) --- + let producer_handle = tokio::spawn({ + let mut file = file; + let pause_flag = pause_flag.clone(); + async move { + // If resuming, seek to the correct position in the file + if start_chunk > 1 { + let bytes_to_skip = (start_chunk - 1) * chunk_size_bytes; + if let Err(e) = file.seek(std::io::SeekFrom::Start(bytes_to_skip)).await { + return Err(UploadTarError::Io(e)); + } + tracing::info!( + "Seeking to byte {} to resume from chunk {}", + bytes_to_skip, + start_chunk + ); + } + + let mut buffer = vec![0u8; chunk_size_bytes as usize]; + + for chunk_number in start_chunk..=total_chunks { + // Check pause + if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { + break; + } + + // Read chunk + let mut buffer_start = 0; + loop { + match file.read(&mut buffer[buffer_start..]).await { + Ok(0) => break, // EOF + Ok(n) => buffer_start += n, + Err(e) => return Err(UploadTarError::Io(e)), + } + } + + let chunk_size = buffer_start; + if chunk_size == 0 { + break; + } + + let chunk_data = buffer[..chunk_size].to_vec(); + + // Offload Hashing to blocking thread + let hash_result = tokio::task::spawn_blocking({ + let data = chunk_data.clone(); + move || sha256::digest(&data) + }).await; + + let chunk_hash = match hash_result { + Ok(hash) => hash, + Err(join_err) => return Err(UploadTarError::from(std::io::Error::new(std::io::ErrorKind::Other, join_err))), + }; + + if tx_hashed.send(Ok((chunk_data, chunk_hash, chunk_number))).await.is_err() { + break; // Receiver dropped + } + } + Ok(()) + } + }); - // TODO: make this less sloppy. - // Instead of allocating a chunk-sized buffer, and then allocating that buffer - // again for each chunk's stream, figure out a way to stream each chunk from the file - // directly into the hasher, and then stream each chunk directly into the uploader - let mut buffer = vec![0u8; chunk_size_bytes as usize]; - let client = reqwest::Client::new(); + // --- STAGE 2: SIGNER (Get Upload URL) --- + let signer_handle = tokio::spawn({ + let api_client = api_client.clone(); + let api_token = api_token.to_string(); + let upload_id = upload_id.clone(); + let pause_flag = pause_flag.clone(); + async move { + while let Some(msg) = rx_hashed.recv().await { + if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { + break; + } + + let (chunk_data, chunk_hash, chunk_number) = match msg { + Ok(val) => val, + Err(e) => { + let _ = tx_signed.send(Err(e)).await; + break; + } + }; + + // Retry loop for Getting Signed URL + const MAX_RETRIES: u32 = 5; + let mut upload_url_opt = None; + let mut last_error = None; + + for attempt in 1..=MAX_RETRIES { + match api_client.upload_multipart_chunk(&api_token, &upload_id, chunk_number, &chunk_hash).await { + Ok(resp) => { + upload_url_opt = Some(resp.upload_url); + break; + }, + Err(e) => { + tracing::warn!("Failed to get signed URL for chunk {} (attempt {}/{}): {:?}", chunk_number, attempt, MAX_RETRIES, e); + last_error = Some(e); + if attempt < MAX_RETRIES { + tokio::time::sleep(std::time::Duration::from_millis(500 * attempt as u64)).await; + } + } + } + } + + match upload_url_opt { + Some(url) => { + if tx_signed.send(Ok((chunk_data, url, chunk_number))).await.is_err() { + break; + } + }, + None => { + let err = UploadTarError::Api { + api_request: "upload_multipart_chunk", + error: last_error.unwrap_or(ApiError::ServerInvalidation("Unknown error getting signed URL".into())) + }; + let _ = tx_signed.send(Err(err)).await; + break; + } + } + } + } + }); // Initialize progress sender with bytes already uploaded let bytes_already_uploaded = (start_chunk - 1) * chunk_size_bytes; @@ -228,7 +339,16 @@ pub async fn run( sender })); - for chunk_number in start_chunk..=total_chunks { + let client = reqwest::Client::new(); + + // --- STAGE 3: UPLOADER (PUT Data) --- + while let Some(msg) = rx_signed.recv().await { + // Check for error from previous stages + let (chunk_data, upload_url, chunk_number) = match msg { + Ok(val) => val, + Err(e) => return Err(e), + }; + // Check if upload has been paused (user-initiated pause) if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { // Ensure the latest progress is saved for resume @@ -268,83 +388,94 @@ pub async fn run( "Uploading chunk {chunk_number}/{total_chunks} for upload_id {upload_id}" ); - // Read chunk data from file (only once per chunk, not per retry) - let mut buffer_start = 0; - loop { - let bytes_read = file.read(&mut buffer[buffer_start..]).await?; - if bytes_read == 0 { - break; - } - buffer_start += bytes_read; - } - let chunk_size = buffer_start; - let chunk_data = buffer[..chunk_size].to_vec(); - let chunk_hash = sha256::digest(&chunk_data); - const MAX_RETRIES: u32 = 5; + let mut etag_opt = None; + let mut last_error = None; for attempt in 1..=MAX_RETRIES { // Store bytes_uploaded before attempting the chunk let bytes_before_chunk = progress_sender.lock().unwrap().bytes_uploaded(); - let chunk = Chunk { - data: &chunk_data, - hash: &chunk_hash, - number: chunk_number, - }; - - match upload_single_chunk( - chunk, - &api_client, - api_token, - &upload_id, - progress_sender.clone(), - &client, - ) - .await - { - Ok(etag) => { - progress_sender.lock().unwrap().send(); - - // Update progress state with new chunk and save to file - guard.paused_mut().mutate_upload_progress(|progress| { - progress - .chunk_etags - .push(CompleteMultipartUploadChunk { chunk_number, etag }); - }); - - tracing::info!( - "Uploaded chunk {chunk_number}/{total_chunks} for upload_id {upload_id}" - ); - break; // Success, move to next chunk - } - Err(error) => { - // Reset bytes_uploaded to what it was before the chunk attempt - { - let mut progress_sender = progress_sender.lock().unwrap(); - progress_sender.set_bytes_uploaded(bytes_before_chunk); - } - - tracing::warn!( - "Failed to upload chunk {chunk_number}/{total_chunks} (attempt {attempt}/{MAX_RETRIES}): {error:?}" - ); - - if attempt == MAX_RETRIES { - return Err(UploadTarError::FailedToUploadChunk { - chunk_number, - total_chunks, - max_retries: MAX_RETRIES, - error, - }); + // Create a stream that wraps chunk_data and tracks upload progress + let progress_stream = tokio_util::io::ReaderStream::new(std::io::Cursor::new(chunk_data.clone())) + .inspect_ok({ + let progress_sender = progress_sender.clone(); + move |bytes| { + progress_sender + .lock() + .unwrap() + .increment_bytes_uploaded(bytes.len() as u64); } + }); + + let res = client + .put(&upload_url) + .header("Content-Type", "application/octet-stream") + .header("Content-Length", chunk_data.len()) + .body(reqwest::Body::wrap_stream(progress_stream)) + .send() + .await; + + match res { + Ok(response) => { + if response.status().is_success() { + if let Some(etag) = response.headers().get("etag").and_then(|h| h.to_str().ok()) { + etag_opt = Some(etag.trim_matches('"').to_owned()); + break; // Success + } else { + last_error = Some(UploadSingleChunkError::NoEtagHeaderFound); + } + } else { + last_error = Some(UploadSingleChunkError::ChunkUploadFailed(response.status())); + } + }, + Err(e) => { + last_error = Some(UploadSingleChunkError::Reqwest(e)); + } + } - // Optional: add a small delay before retrying - tokio::time::sleep(std::time::Duration::from_millis(500 * attempt as u64)) - .await; + // Reset bytes on failure + { + let mut progress_sender = progress_sender.lock().unwrap(); + progress_sender.set_bytes_uploaded(bytes_before_chunk); + } + + tracing::warn!("Failed to upload chunk data {} (attempt {}/{}): {:?}", chunk_number, attempt, MAX_RETRIES, last_error); + if attempt < MAX_RETRIES { + tokio::time::sleep(std::time::Duration::from_millis(500 * attempt as u64)).await; + } + } + + match etag_opt { + Some(etag) => { + progress_sender.lock().unwrap().send(); + + // Update progress state with new chunk and save to file (APPEND ONLY) + if let Err(e) = guard.paused_mut().record_chunk_completion(CompleteMultipartUploadChunk { chunk_number, etag }) { + tracing::error!("Failed to append chunk completion to log: {:?}", e); } + tracing::info!("Uploaded chunk {chunk_number}/{total_chunks} for upload_id {upload_id}"); + }, + None => { + return Err(UploadTarError::FailedToUploadChunk { + chunk_number, + total_chunks, + max_retries: MAX_RETRIES, + error: last_error.unwrap_or(UploadSingleChunkError::NoEtagHeaderFound) + }); } } } + + // Ensure producer and signer tasks didn't crash + if let Err(e) = producer_handle.await { + tracing::error!("Producer task failed: {:?}", e); + return Err(UploadTarError::from(std::io::Error::new(std::io::ErrorKind::Other, "Producer task failed"))); + } + if let Err(e) = signer_handle.await { + tracing::error!("Signer task failed: {:?}", e); + return Err(UploadTarError::from(std::io::Error::new(std::io::ErrorKind::Other, "Signer task failed"))); + } } let completion_result = match api_client .complete_multipart_upload( From 3fbcfb4e9247805defb12327474db0d6a9d6c24f Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Mon, 5 Jan 2026 16:20:45 -0800 Subject: [PATCH 03/12] cargo fmt --- src/api/user_upload.rs | 3 +-- src/app_state.rs | 5 ++++- src/ui/views/main/upload_manager.rs | 8 ++++---- src/upload/mod.rs | 10 ++++++++-- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/api/user_upload.rs b/src/api/user_upload.rs index 977c4866..0ee202f4 100644 --- a/src/api/user_upload.rs +++ b/src/api/user_upload.rs @@ -134,8 +134,7 @@ impl ApiClient { .send() .await?; - let response = - check_for_response_success(response, "User upload list unavailable").await?; + let response = check_for_response_success(response, "User upload list unavailable").await?; let server_list = response.json::().await?; diff --git a/src/app_state.rs b/src/app_state.rs index 160cf544..c19a7f1b 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -153,7 +153,10 @@ pub enum AsyncRequest { OpenLog, UpdateSupportedGames(SupportedGames), LoadUploadStatistics, - LoadUploadList { limit: u32, offset: u32 }, + LoadUploadList { + limit: u32, + offset: u32, + }, LoadLocalRecordings, DeleteAllInvalidRecordings, DeleteAllUploadedLocalRecordings, diff --git a/src/ui/views/main/upload_manager.rs b/src/ui/views/main/upload_manager.rs index 12faa5b3..a5b3665f 100644 --- a/src/ui/views/main/upload_manager.rs +++ b/src/ui/views/main/upload_manager.rs @@ -5,7 +5,7 @@ use egui::{ }; use crate::{ - api::{UserUpload, UserUploads, UserUploadStatistics}, + api::{UserUpload, UserUploadStatistics, UserUploads}, app_state::{AppState, AsyncRequest}, config::Preferences, output_types::Metadata, @@ -31,7 +31,8 @@ impl UploadManager { } pub fn update_user_upload_list(&mut self, uploads: Vec, limit: u32, offset: u32) { - self.recordings.update_user_upload_list(uploads, limit, offset); + self.recordings + .update_user_upload_list(uploads, limit, offset); self.virtual_list.reset(); } @@ -815,8 +816,7 @@ fn recordings_view( ui.add_enabled_ui(recordings.offset > 0, |ui| { if ui.button("Previous").clicked() { - let new_offset = - recordings.offset.saturating_sub(recordings.limit); + let new_offset = recordings.offset.saturating_sub(recordings.limit); app_state .async_request_tx .blocking_send(AsyncRequest::LoadUploadStats { diff --git a/src/upload/mod.rs b/src/upload/mod.rs index bafa4df9..2aca081d 100644 --- a/src/upload/mod.rs +++ b/src/upload/mod.rs @@ -81,7 +81,10 @@ pub async fn start( for req in [ AsyncRequest::LoadUploadStatistics, - AsyncRequest::LoadUploadList { limit: 100, offset: 0 }, + AsyncRequest::LoadUploadList { + limit: 100, + offset: 0, + }, AsyncRequest::LoadLocalRecordings, ] { app_state.async_request_tx.send(req).await.ok(); @@ -222,7 +225,10 @@ async fn run( if should_reload { for req in [ AsyncRequest::LoadUploadStatistics, - AsyncRequest::LoadUploadList { limit: 100, offset: 0 }, + AsyncRequest::LoadUploadList { + limit: 100, + offset: 0, + }, AsyncRequest::LoadLocalRecordings, ] { async_req_tx.send(req).await.ok(); From ebddef56bfdb2e972f3cf1776ac7f396ba1df468 Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Mon, 5 Jan 2026 16:21:10 -0800 Subject: [PATCH 04/12] cargo fmt --- src/record/local_recording.rs | 47 ++++--- src/upload/upload_tar.rs | 225 +++++++++++++++++++++------------- 2 files changed, 172 insertions(+), 100 deletions(-) diff --git a/src/record/local_recording.rs b/src/record/local_recording.rs index ce25e046..343bdd9f 100644 --- a/src/record/local_recording.rs +++ b/src/record/local_recording.rs @@ -69,10 +69,13 @@ impl UploadProgressState { pub fn load_from_file(path: &Path) -> eyre::Result { let file = std::fs::File::open(path)?; let reader = std::io::BufReader::new(file); - let mut stream = serde_json::Deserializer::from_reader(reader).into_iter::(); + let mut stream = + serde_json::Deserializer::from_reader(reader).into_iter::(); // Read the first object which should be the UploadProgressState - let first_value = stream.next().ok_or_else(|| eyre::eyre!("Empty progress file"))??; + let first_value = stream + .next() + .ok_or_else(|| eyre::eyre!("Empty progress file"))??; let mut state: Self = serde_json::from_value(first_value)?; // If the state was saved in the old format (single JSON object with populated etags), @@ -84,7 +87,11 @@ impl UploadProgressState { for value in stream { let chunk: CompleteMultipartUploadChunk = serde_json::from_value(value?)?; // Avoid duplicates if we're migrating or recovering from a weird state - if !state.chunk_etags.iter().any(|c| c.chunk_number == chunk.chunk_number) { + if !state + .chunk_etags + .iter() + .any(|c| c.chunk_number == chunk.chunk_number) + { state.chunk_etags.push(chunk); } } @@ -110,24 +117,27 @@ impl UploadProgressState { serde_json::to_writer(&mut writer, chunk)?; writeln!(&mut writer)?; } - + writer.flush()?; Ok(()) } /// Append a single chunk completion to the log file pub fn append_chunk_to_file(&self, chunk: &CompleteMultipartUploadChunk) -> eyre::Result<()> { - let mut file = std::fs::OpenOptions::new() - .write(true) - .append(true) - .open(&self.tar_path.parent().unwrap().join(constants::filename::recording::UPLOAD_PROGRESS))?; - + let mut file = std::fs::OpenOptions::new().write(true).append(true).open( + &self + .tar_path + .parent() + .unwrap() + .join(constants::filename::recording::UPLOAD_PROGRESS), + )?; + // Ensure we start on a new line (though save_to_file guarantees ending with newline) // serde_json::to_writer doesn't add a newline serde_json::to_writer(&mut file, chunk)?; use std::io::Write; writeln!(&mut file)?; - + Ok(()) } @@ -212,28 +222,31 @@ impl LocalRecordingPaused { self.save_upload_progress().ok(); r } - + /// Records a successful chunk upload: updates in-memory state and appends to the log file. - pub fn record_chunk_completion(&mut self, chunk: CompleteMultipartUploadChunk) -> eyre::Result<()> { + pub fn record_chunk_completion( + &mut self, + chunk: CompleteMultipartUploadChunk, + ) -> eyre::Result<()> { // Update in-memory self.upload_progress.chunk_etags.push(chunk.clone()); - + // Append to disk (efficient log append) - // We construct the path manually here or use the one from info, + // We construct the path manually here or use the one from info, // but UploadProgressState doesn't store the full progress file path, only tar path. // We can use the helper method on self. - + let path = self.upload_progress_path(); let mut file = std::fs::OpenOptions::new() .write(true) .append(true) .create(false) // Should already exist .open(path)?; - + serde_json::to_writer(&mut file, &chunk)?; use std::io::Write; writeln!(&mut file)?; - + Ok(()) } diff --git a/src/upload/upload_tar.rs b/src/upload/upload_tar.rs index f2bbf7b4..3c19ace1 100644 --- a/src/upload/upload_tar.rs +++ b/src/upload/upload_tar.rs @@ -204,11 +204,11 @@ pub async fn run( // Channel 1: Producer -> Signer // Payload: (Chunk Data, Chunk Hash, Chunk Number) let (tx_hashed, mut rx_hashed) = tokio::sync::mpsc::channel(2); - + // Channel 2: Signer -> Uploader // Payload: (Chunk Data, Upload URL, Chunk Number) let (tx_signed, mut rx_signed) = tokio::sync::mpsc::channel(2); - + // --- STAGE 1: PRODUCER (Read & Hash) --- let producer_handle = tokio::spawn({ let mut file = file; @@ -226,15 +226,15 @@ pub async fn run( start_chunk ); } - + let mut buffer = vec![0u8; chunk_size_bytes as usize]; - + for chunk_number in start_chunk..=total_chunks { // Check pause if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { break; } - + // Read chunk let mut buffer_start = 0; loop { @@ -244,26 +244,36 @@ pub async fn run( Err(e) => return Err(UploadTarError::Io(e)), } } - + let chunk_size = buffer_start; if chunk_size == 0 { break; } - + let chunk_data = buffer[..chunk_size].to_vec(); - + // Offload Hashing to blocking thread let hash_result = tokio::task::spawn_blocking({ let data = chunk_data.clone(); move || sha256::digest(&data) - }).await; - + }) + .await; + let chunk_hash = match hash_result { Ok(hash) => hash, - Err(join_err) => return Err(UploadTarError::from(std::io::Error::new(std::io::ErrorKind::Other, join_err))), + Err(join_err) => { + return Err(UploadTarError::from(std::io::Error::new( + std::io::ErrorKind::Other, + join_err, + ))); + } }; - - if tx_hashed.send(Ok((chunk_data, chunk_hash, chunk_number))).await.is_err() { + + if tx_hashed + .send(Ok((chunk_data, chunk_hash, chunk_number))) + .await + .is_err() + { break; // Receiver dropped } } @@ -297,31 +307,54 @@ pub async fn run( let mut last_error = None; for attempt in 1..=MAX_RETRIES { - match api_client.upload_multipart_chunk(&api_token, &upload_id, chunk_number, &chunk_hash).await { - Ok(resp) => { - upload_url_opt = Some(resp.upload_url); - break; - }, - Err(e) => { - tracing::warn!("Failed to get signed URL for chunk {} (attempt {}/{}): {:?}", chunk_number, attempt, MAX_RETRIES, e); - last_error = Some(e); - if attempt < MAX_RETRIES { - tokio::time::sleep(std::time::Duration::from_millis(500 * attempt as u64)).await; - } - } - } + match api_client + .upload_multipart_chunk( + &api_token, + &upload_id, + chunk_number, + &chunk_hash, + ) + .await + { + Ok(resp) => { + upload_url_opt = Some(resp.upload_url); + break; + } + Err(e) => { + tracing::warn!( + "Failed to get signed URL for chunk {} (attempt {}/{}): {:?}", + chunk_number, + attempt, + MAX_RETRIES, + e + ); + last_error = Some(e); + if attempt < MAX_RETRIES { + tokio::time::sleep(std::time::Duration::from_millis( + 500 * attempt as u64, + )) + .await; + } + } + } } match upload_url_opt { Some(url) => { - if tx_signed.send(Ok((chunk_data, url, chunk_number))).await.is_err() { - break; - } - }, + if tx_signed + .send(Ok((chunk_data, url, chunk_number))) + .await + .is_err() + { + break; + } + } None => { - let err = UploadTarError::Api { - api_request: "upload_multipart_chunk", - error: last_error.unwrap_or(ApiError::ServerInvalidation("Unknown error getting signed URL".into())) + let err = UploadTarError::Api { + api_request: "upload_multipart_chunk", + error: last_error.unwrap_or(ApiError::ServerInvalidation( + "Unknown error getting signed URL".into(), + )), }; let _ = tx_signed.send(Err(err)).await; break; @@ -340,14 +373,14 @@ pub async fn run( })); let client = reqwest::Client::new(); - + // --- STAGE 3: UPLOADER (PUT Data) --- while let Some(msg) = rx_signed.recv().await { - // Check for error from previous stages - let (chunk_data, upload_url, chunk_number) = match msg { - Ok(val) => val, - Err(e) => return Err(e), - }; + // Check for error from previous stages + let (chunk_data, upload_url, chunk_number) = match msg { + Ok(val) => val, + Err(e) => return Err(e), + }; // Check if upload has been paused (user-initiated pause) if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { @@ -397,16 +430,17 @@ pub async fn run( let bytes_before_chunk = progress_sender.lock().unwrap().bytes_uploaded(); // Create a stream that wraps chunk_data and tracks upload progress - let progress_stream = tokio_util::io::ReaderStream::new(std::io::Cursor::new(chunk_data.clone())) - .inspect_ok({ - let progress_sender = progress_sender.clone(); - move |bytes| { - progress_sender - .lock() - .unwrap() - .increment_bytes_uploaded(bytes.len() as u64); - } - }); + let progress_stream = + tokio_util::io::ReaderStream::new(std::io::Cursor::new(chunk_data.clone())) + .inspect_ok({ + let progress_sender = progress_sender.clone(); + move |bytes| { + progress_sender + .lock() + .unwrap() + .increment_bytes_uploaded(bytes.len() as u64); + } + }); let res = client .put(&upload_url) @@ -415,20 +449,23 @@ pub async fn run( .body(reqwest::Body::wrap_stream(progress_stream)) .send() .await; - + match res { Ok(response) => { - if response.status().is_success() { - if let Some(etag) = response.headers().get("etag").and_then(|h| h.to_str().ok()) { - etag_opt = Some(etag.trim_matches('"').to_owned()); - break; // Success - } else { - last_error = Some(UploadSingleChunkError::NoEtagHeaderFound); - } - } else { - last_error = Some(UploadSingleChunkError::ChunkUploadFailed(response.status())); - } - }, + if response.status().is_success() { + if let Some(etag) = + response.headers().get("etag").and_then(|h| h.to_str().ok()) + { + etag_opt = Some(etag.trim_matches('"').to_owned()); + break; // Success + } else { + last_error = Some(UploadSingleChunkError::NoEtagHeaderFound); + } + } else { + last_error = + Some(UploadSingleChunkError::ChunkUploadFailed(response.status())); + } + } Err(e) => { last_error = Some(UploadSingleChunkError::Reqwest(e)); } @@ -439,42 +476,64 @@ pub async fn run( let mut progress_sender = progress_sender.lock().unwrap(); progress_sender.set_bytes_uploaded(bytes_before_chunk); } - - tracing::warn!("Failed to upload chunk data {} (attempt {}/{}): {:?}", chunk_number, attempt, MAX_RETRIES, last_error); - if attempt < MAX_RETRIES { - tokio::time::sleep(std::time::Duration::from_millis(500 * attempt as u64)).await; - } + + tracing::warn!( + "Failed to upload chunk data {} (attempt {}/{}): {:?}", + chunk_number, + attempt, + MAX_RETRIES, + last_error + ); + if attempt < MAX_RETRIES { + tokio::time::sleep(std::time::Duration::from_millis(500 * attempt as u64)) + .await; + } } - + match etag_opt { Some(etag) => { - progress_sender.lock().unwrap().send(); + progress_sender.lock().unwrap().send(); // Update progress state with new chunk and save to file (APPEND ONLY) - if let Err(e) = guard.paused_mut().record_chunk_completion(CompleteMultipartUploadChunk { chunk_number, etag }) { - tracing::error!("Failed to append chunk completion to log: {:?}", e); + if let Err(e) = + guard + .paused_mut() + .record_chunk_completion(CompleteMultipartUploadChunk { + chunk_number, + etag, + }) + { + tracing::error!("Failed to append chunk completion to log: {:?}", e); } - tracing::info!("Uploaded chunk {chunk_number}/{total_chunks} for upload_id {upload_id}"); - }, + tracing::info!( + "Uploaded chunk {chunk_number}/{total_chunks} for upload_id {upload_id}" + ); + } None => { - return Err(UploadTarError::FailedToUploadChunk { - chunk_number, - total_chunks, - max_retries: MAX_RETRIES, - error: last_error.unwrap_or(UploadSingleChunkError::NoEtagHeaderFound) - }); + return Err(UploadTarError::FailedToUploadChunk { + chunk_number, + total_chunks, + max_retries: MAX_RETRIES, + error: last_error.unwrap_or(UploadSingleChunkError::NoEtagHeaderFound), + }); } } } - + // Ensure producer and signer tasks didn't crash if let Err(e) = producer_handle.await { - tracing::error!("Producer task failed: {:?}", e); - return Err(UploadTarError::from(std::io::Error::new(std::io::ErrorKind::Other, "Producer task failed"))); + tracing::error!("Producer task failed: {:?}", e); + return Err(UploadTarError::from(std::io::Error::new( + std::io::ErrorKind::Other, + "Producer task failed", + ))); } if let Err(e) = signer_handle.await { - tracing::error!("Signer task failed: {:?}", e); - return Err(UploadTarError::from(std::io::Error::new(std::io::ErrorKind::Other, "Signer task failed"))); + tracing::error!("Signer task failed: {:?}", e); + return Err(UploadTarError::from(std::io::Error::new( + std::io::ErrorKind::Other, + "Signer task failed", + ))); } } let completion_result = match api_client From a54bcc507719903dc2318a3c0d8c7536224c5f52 Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Mon, 5 Jan 2026 22:06:41 -0800 Subject: [PATCH 05/12] Fix compilation errors --- src/record/local_recording.rs | 30 -------------- src/upload/upload_tar.rs | 76 ++--------------------------------- 2 files changed, 3 insertions(+), 103 deletions(-) diff --git a/src/record/local_recording.rs b/src/record/local_recording.rs index 343bdd9f..eb0710db 100644 --- a/src/record/local_recording.rs +++ b/src/record/local_recording.rs @@ -122,25 +122,6 @@ impl UploadProgressState { Ok(()) } - /// Append a single chunk completion to the log file - pub fn append_chunk_to_file(&self, chunk: &CompleteMultipartUploadChunk) -> eyre::Result<()> { - let mut file = std::fs::OpenOptions::new().write(true).append(true).open( - &self - .tar_path - .parent() - .unwrap() - .join(constants::filename::recording::UPLOAD_PROGRESS), - )?; - - // Ensure we start on a new line (though save_to_file guarantees ending with newline) - // serde_json::to_writer doesn't add a newline - serde_json::to_writer(&mut file, chunk)?; - use std::io::Write; - writeln!(&mut file)?; - - Ok(()) - } - /// Get the next chunk number to upload (after the last completed chunk) pub fn next_chunk_number(&self) -> u64 { self.chunk_etags @@ -213,16 +194,6 @@ impl LocalRecordingPaused { &self.upload_progress } - /// Mutate the upload progress state and save to file. - pub fn mutate_upload_progress( - &mut self, - f: impl FnOnce(&mut UploadProgressState) -> R, - ) -> R { - let r = f(&mut self.upload_progress); - self.save_upload_progress().ok(); - r - } - /// Records a successful chunk upload: updates in-memory state and appends to the log file. pub fn record_chunk_completion( &mut self, @@ -238,7 +209,6 @@ impl LocalRecordingPaused { let path = self.upload_progress_path(); let mut file = std::fs::OpenOptions::new() - .write(true) .append(true) .create(false) // Should already exist .open(path)?; diff --git a/src/upload/upload_tar.rs b/src/upload/upload_tar.rs index 3c19ace1..eb26bf23 100644 --- a/src/upload/upload_tar.rs +++ b/src/upload/upload_tar.rs @@ -262,10 +262,7 @@ pub async fn run( let chunk_hash = match hash_result { Ok(hash) => hash, Err(join_err) => { - return Err(UploadTarError::from(std::io::Error::new( - std::io::ErrorKind::Other, - join_err, - ))); + return Err(UploadTarError::from(std::io::Error::other(join_err))); } }; @@ -523,15 +520,13 @@ pub async fn run( // Ensure producer and signer tasks didn't crash if let Err(e) = producer_handle.await { tracing::error!("Producer task failed: {:?}", e); - return Err(UploadTarError::from(std::io::Error::new( - std::io::ErrorKind::Other, + return Err(UploadTarError::from(std::io::Error::other( "Producer task failed", ))); } if let Err(e) = signer_handle.await { tracing::error!("Signer task failed: {:?}", e); - return Err(UploadTarError::from(std::io::Error::new( - std::io::ErrorKind::Other, + return Err(UploadTarError::from(std::io::Error::other( "Signer task failed", ))); } @@ -584,19 +579,9 @@ pub async fn run( } } -struct Chunk<'a> { - data: &'a [u8], - hash: &'a str, - number: u64, -} - #[derive(Debug)] pub enum UploadSingleChunkError { Io(std::io::Error), - Api { - api_request: &'static str, - error: ApiError, - }, Reqwest(reqwest::Error), ChunkUploadFailed(reqwest::StatusCode), NoEtagHeaderFound, @@ -605,9 +590,6 @@ impl std::fmt::Display for UploadSingleChunkError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { UploadSingleChunkError::Io(e) => write!(f, "I/O error: {e}"), - UploadSingleChunkError::Api { api_request, error } => { - write!(f, "API error for {api_request}: {error:?}") - } UploadSingleChunkError::Reqwest(e) => write!(f, "Reqwest error: {e}"), UploadSingleChunkError::ChunkUploadFailed(status) => { write!(f, "Chunk upload failed with status: {status}") @@ -624,7 +606,6 @@ impl UploadSingleChunkError { pub fn is_network_error(&self) -> bool { match self { UploadSingleChunkError::Reqwest(e) => e.is_connect() || e.is_timeout(), - UploadSingleChunkError::Api { error, .. } => error.is_network_error(), _ => false, } } @@ -639,54 +620,3 @@ impl From for UploadSingleChunkError { UploadSingleChunkError::Reqwest(e) } } - -async fn upload_single_chunk( - chunk: Chunk<'_>, - api_client: &Arc, - api_token: &str, - upload_id: &str, - progress_sender: Arc>, - client: &reqwest::Client, -) -> Result { - let multipart_chunk_response = api_client - .upload_multipart_chunk(api_token, upload_id, chunk.number, chunk.hash) - .await - .map_err(|e| UploadSingleChunkError::Api { - api_request: "upload_multipart_chunk", - error: e, - })?; - - // Create a stream that wraps chunk_data and tracks upload progress - let progress_stream = - tokio_util::io::ReaderStream::new(std::io::Cursor::new(chunk.data.to_vec())).inspect_ok({ - let progress_sender = progress_sender.clone(); - move |bytes| { - progress_sender - .lock() - .unwrap() - .increment_bytes_uploaded(bytes.len() as u64); - } - }); - - let res = client - .put(&multipart_chunk_response.upload_url) - .header("Content-Type", "application/octet-stream") - .header("Content-Length", chunk.data.len()) - .body(reqwest::Body::wrap_stream(progress_stream)) - .send() - .await?; - - if !res.status().is_success() { - return Err(UploadSingleChunkError::ChunkUploadFailed(res.status())); - } - - // Extract etag header from response - let etag = res - .headers() - .get("etag") - .and_then(|hv| hv.to_str().ok()) - .map(|s| s.trim_matches('"').to_owned()) - .ok_or(UploadSingleChunkError::NoEtagHeaderFound)?; - - Ok(etag) -} From 283834e7cff7c1a7beb3ee5ca2a4f01e9774fa3e Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Tue, 6 Jan 2026 11:01:19 -0800 Subject: [PATCH 06/12] attempted fix --- src/app_state.rs | 7 +++---- src/tokio_thread.rs | 10 ++++++---- src/ui/views/main/upload_manager.rs | 6 +++--- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/app_state.rs b/src/app_state.rs index c19a7f1b..cd02b35f 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -12,8 +12,7 @@ use egui_wgpu::wgpu; use tokio::sync::{broadcast, mpsc}; use crate::{ - api::UserUploads, config::Config, play_time::PlayTimeTracker, record::LocalRecording, - upload::ProgressData, + config::Config, play_time::PlayTimeTracker, record::LocalRecording, upload::ProgressData, }; pub struct AppState { @@ -194,9 +193,9 @@ pub enum UiUpdate { UploadFailed(String), UpdateRecordingState(bool), UpdateNewerReleaseAvailable(GitHubRelease), - UpdateUserUploadStatistics(crate::api::user_upload::UserUploadStatistics), + UpdateUserUploadStatistics(crate::api::UserUploadStatistics), UpdateUserUploadList { - uploads: Vec, + uploads: Vec, limit: u32, offset: u32, }, diff --git a/src/tokio_thread.rs b/src/tokio_thread.rs index 44ea3e02..733540f2 100644 --- a/src/tokio_thread.rs +++ b/src/tokio_thread.rs @@ -273,12 +273,13 @@ async fn main( } else { match valid_api_key_and_user_id.clone() { Some((api_key, user_id)) => { - let filters = app_state.upload_filters.read().unwrap(); + let start_date = app_state.upload_filters.read().unwrap().start_date; + let end_date = app_state.upload_filters.read().unwrap().end_date; tokio::spawn({ let app_state = app_state.clone(); let api_client = api_client.clone(); async move { - let stats = match api_client.get_user_upload_statistics(&api_key, &user_id, filters.start_date, filters.end_date).await { + let stats = match api_client.get_user_upload_statistics(&api_key, &user_id, start_date, end_date).await { Ok(stats) => stats, Err(e) => { tracing::error!(e=?e, "Failed to get user upload statistics"); @@ -302,12 +303,13 @@ async fn main( } else { match valid_api_key_and_user_id.clone() { Some((api_key, user_id)) => { - let filters = app_state.upload_filters.read().unwrap(); + let start_date = app_state.upload_filters.read().unwrap().start_date; + let end_date = app_state.upload_filters.read().unwrap().end_date; tokio::spawn({ let app_state = app_state.clone(); let api_client = api_client.clone(); async move { - let (uploads, limit, offset) = match api_client.get_user_upload_list(&api_key, &user_id, limit, offset, filters.start_date, filters.end_date).await { + let (uploads, limit, offset) = match api_client.get_user_upload_list(&api_key, &user_id, limit, offset, start_date, end_date).await { Ok(res) => res, Err(e) => { tracing::error!(e=?e, "Failed to get user upload list"); diff --git a/src/ui/views/main/upload_manager.rs b/src/ui/views/main/upload_manager.rs index a5b3665f..10b5ce8f 100644 --- a/src/ui/views/main/upload_manager.rs +++ b/src/ui/views/main/upload_manager.rs @@ -5,7 +5,7 @@ use egui::{ }; use crate::{ - api::{UserUpload, UserUploadStatistics, UserUploads}, + api::{UserUpload, UserUploadStatistics}, app_state::{AppState, AsyncRequest}, config::Preferences, output_types::Metadata, @@ -819,7 +819,7 @@ fn recordings_view( let new_offset = recordings.offset.saturating_sub(recordings.limit); app_state .async_request_tx - .blocking_send(AsyncRequest::LoadUploadStats { + .blocking_send(AsyncRequest::LoadUploadList { limit: recordings.limit, offset: new_offset, }) @@ -834,7 +834,7 @@ fn recordings_view( let new_offset = recordings.offset + recordings.limit; app_state .async_request_tx - .blocking_send(AsyncRequest::LoadUploadStats { + .blocking_send(AsyncRequest::LoadUploadList { limit: recordings.limit, offset: new_offset, }) From a984c7a74be890533c0ae49d301c9822cb182328 Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Tue, 6 Jan 2026 11:12:29 -0800 Subject: [PATCH 07/12] Touchup --- src/api/user_upload.rs | 2 + src/ui/views/main/upload_manager.rs | 76 ++++++++++++++--------------- 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/src/api/user_upload.rs b/src/api/user_upload.rs index 0ee202f4..69bcfc76 100644 --- a/src/api/user_upload.rs +++ b/src/api/user_upload.rs @@ -4,6 +4,7 @@ use serde::Deserialize; use crate::api::{API_BASE_URL, ApiClient, ApiError, check_for_response_success}; #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct UserUploads { pub statistics: UserUploadStatistics, pub uploads: Vec, @@ -142,6 +143,7 @@ impl ApiClient { } /// Legacy method for backward compatibility if needed, though it's better to use the split methods. + #[allow(dead_code)] pub async fn get_user_upload_stats( &self, api_key: &str, diff --git a/src/ui/views/main/upload_manager.rs b/src/ui/views/main/upload_manager.rs index 10b5ce8f..263031bd 100644 --- a/src/ui/views/main/upload_manager.rs +++ b/src/ui/views/main/upload_manager.rs @@ -807,47 +807,47 @@ fn recordings_view( } // Pagination Controls - if let Some(stats) = &recordings.statistics { - if stats.total_uploads > 0 { - ui.horizontal(|ui| { - let total_pages = - (stats.total_uploads as f32 / recordings.limit as f32).ceil() as u32; - let current_page = (recordings.offset / recordings.limit) + 1; - - ui.add_enabled_ui(recordings.offset > 0, |ui| { - if ui.button("Previous").clicked() { - let new_offset = recordings.offset.saturating_sub(recordings.limit); - app_state - .async_request_tx - .blocking_send(AsyncRequest::LoadUploadList { - limit: recordings.limit, - offset: new_offset, - }) - .ok(); - } - }); + if let Some(stats) = &recordings.statistics + && stats.total_uploads > 0 + { + ui.horizontal(|ui| { + let total_pages = + (stats.total_uploads as f32 / recordings.limit as f32).ceil() as u32; + let current_page = (recordings.offset / recordings.limit) + 1; + + ui.add_enabled_ui(recordings.offset > 0, |ui| { + if ui.button("Previous").clicked() { + let new_offset = recordings.offset.saturating_sub(recordings.limit); + app_state + .async_request_tx + .blocking_send(AsyncRequest::LoadUploadList { + limit: recordings.limit, + offset: new_offset, + }) + .ok(); + } + }); - ui.label(format!("Page {current_page} of {total_pages}")); - - ui.add_enabled_ui(current_page < total_pages, |ui| { - if ui.button("Next").clicked() { - let new_offset = recordings.offset + recordings.limit; - app_state - .async_request_tx - .blocking_send(AsyncRequest::LoadUploadList { - limit: recordings.limit, - offset: new_offset, - }) - .ok(); - } - }); + ui.label(format!("Page {current_page} of {total_pages}")); + + ui.add_enabled_ui(current_page < total_pages, |ui| { + if ui.button("Next").clicked() { + let new_offset = recordings.offset + recordings.limit; + app_state + .async_request_tx + .blocking_send(AsyncRequest::LoadUploadList { + limit: recordings.limit, + offset: new_offset, + }) + .ok(); + } + }); - ui.with_layout(Layout::right_to_left(Align::Center), |ui| { - ui.label(format!("Total records: {}", stats.total_uploads)); - }); + ui.with_layout(Layout::right_to_left(Align::Center), |ui| { + ui.label(format!("Total records: {}", stats.total_uploads)); }); - ui.separator(); - } + }); + ui.separator(); } ScrollArea::vertical() From 1ded89056274d02203d09909e30b11beffbf0e23 Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Fri, 9 Jan 2026 22:53:34 -0800 Subject: [PATCH 08/12] Run cargo fmt --- src/upload/upload_tar.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/upload/upload_tar.rs b/src/upload/upload_tar.rs index 897b685f..88ef6168 100644 --- a/src/upload/upload_tar.rs +++ b/src/upload/upload_tar.rs @@ -495,10 +495,7 @@ pub async fn run( // Update progress state with new chunk and save to file (APPEND ONLY) if let Err(e) = guard .paused_mut() - .record_chunk_completion(CompleteMultipartUploadChunk { - chunk_number, - etag, - }) + .record_chunk_completion(CompleteMultipartUploadChunk { chunk_number, etag }) { tracing::error!("Failed to append chunk completion to log: {:?}", e); } From be28bfa01b82f836b30a7099bbe5cd41e6b57009 Mon Sep 17 00:00:00 2001 From: Aaron Sanders Date: Fri, 9 Jan 2026 23:18:51 -0800 Subject: [PATCH 09/12] Fix missed LoadUploadStats rename to LoadUploadStatistics --- src/tokio_thread.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tokio_thread.rs b/src/tokio_thread.rs index 9883cc46..878af321 100644 --- a/src/tokio_thread.rs +++ b/src/tokio_thread.rs @@ -532,7 +532,8 @@ async fn main( app_state.async_request_tx.send(AsyncRequest::CancelOfflineBackoff).await.ok(); app_state.async_request_tx.send(AsyncRequest::ValidateApiKey { api_key }).await.ok(); // Load data now that we're online - app_state.async_request_tx.send(AsyncRequest::LoadUploadStats).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadStatistics).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadList { limit: 100, offset: 0 }).await.ok(); app_state.async_request_tx.send(AsyncRequest::LoadLocalRecordings).await.ok(); }, } From efb6d9706dc308d79653fe5331ecf1a986e2a488 Mon Sep 17 00:00:00 2001 From: Philpax Date: Sat, 31 Jan 2026 00:20:47 +0100 Subject: [PATCH 10/12] fix(ui): handle 0-recordings case --- src/ui/views/main/upload_manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ui/views/main/upload_manager.rs b/src/ui/views/main/upload_manager.rs index a2061290..cde97285 100644 --- a/src/ui/views/main/upload_manager.rs +++ b/src/ui/views/main/upload_manager.rs @@ -814,7 +814,7 @@ fn recordings_view( ui.horizontal(|ui| { let total_pages = (stats.total_uploads as f32 / recordings.limit as f32).ceil() as u32; - let current_page = (recordings.offset / recordings.limit) + 1; + let current_page = (recordings.offset / recordings.limit.max(1)) + 1; ui.add_enabled_ui(recordings.offset > 0, |ui| { if ui.button("Previous").clicked() { From cebf8a21434a79b8e0c7970d7739e874e4e0400a Mon Sep 17 00:00:00 2001 From: Philpax Date: Sat, 31 Jan 2026 00:42:44 +0100 Subject: [PATCH 11/12] fix(upload): save initial upload progress --- src/upload/upload_folder.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/upload/upload_folder.rs b/src/upload/upload_folder.rs index 885596c5..d5b8cede 100644 --- a/src/upload/upload_folder.rs +++ b/src/upload/upload_folder.rs @@ -217,7 +217,12 @@ pub async fn upload_folder( init_response.expires_at, ); - LocalRecordingPaused::new(info, metadata, upload_progress) + let paused = LocalRecordingPaused::new(info, metadata, upload_progress); + // Ensure we save initial progress so that the chunk-completion-recorder has something to work with + paused + .save_upload_progress() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + paused }; Ok(super::upload_tar::run( From 0047f4d1ebbcce16c94d82b901c1961f7e2b6736 Mon Sep 17 00:00:00 2001 From: Philpax Date: Sat, 31 Jan 2026 00:56:06 +0100 Subject: [PATCH 12/12] chore: clippy --- src/upload/upload_folder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/upload/upload_folder.rs b/src/upload/upload_folder.rs index d5b8cede..8d7073d0 100644 --- a/src/upload/upload_folder.rs +++ b/src/upload/upload_folder.rs @@ -221,7 +221,7 @@ pub async fn upload_folder( // Ensure we save initial progress so that the chunk-completion-recorder has something to work with paused .save_upload_progress() - .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; + .map_err(std::io::Error::other)?; paused };