diff --git a/src/api/user_upload.rs b/src/api/user_upload.rs index ac21db6..69bcfc7 100644 --- a/src/api/user_upload.rs +++ b/src/api/user_upload.rs @@ -4,9 +4,12 @@ use serde::Deserialize; use crate::api::{API_BASE_URL, ApiClient, ApiError, check_for_response_success}; #[derive(Debug, Clone)] +#[allow(dead_code)] pub struct UserUploads { pub statistics: UserUploadStatistics, pub uploads: Vec, + pub limit: u32, + pub offset: u32, } #[derive(Deserialize, Debug, Clone)] @@ -51,37 +54,115 @@ pub struct UserUpload { } impl ApiClient { - pub async fn get_user_upload_stats( + pub async fn get_user_upload_statistics( &self, api_key: &str, user_id: &str, - ) -> Result { - // Response structs for the user info endpoint + start_date: Option, + end_date: Option, + ) -> Result { #[derive(Deserialize, Debug)] #[allow(unused)] - struct UserStatsResponse { + struct UserStatisticsResponse { success: bool, user_id: String, statistics: UserUploadStatistics, - uploads: Vec, + } + + let mut url = format!("{API_BASE_URL}/tracker/v2/uploads/user/{user_id}/stats"); + let mut query_params = Vec::new(); + if let Some(start) = start_date { + query_params.push(format!("start_date={}", start.format("%Y-%m-%d"))); + } + if let Some(end) = end_date { + query_params.push(format!("end_date={}", end.format("%Y-%m-%d"))); + } + if !query_params.is_empty() { + url.push('?'); + url.push_str(&query_params.join("&")); } let response = self .client - .get(format!("{API_BASE_URL}/tracker/uploads/user/{user_id}")) + .get(url) .header("Content-Type", "application/json") .header("X-API-Key", api_key) .send() .await?; let response = - check_for_response_success(response, "User upload stats unavailable").await?; + check_for_response_success(response, "User upload statistics unavailable").await?; + + let server_stats = response.json::().await?; - let server_stats = response.json::().await?; + Ok(server_stats.statistics) + } + + pub async fn get_user_upload_list( + &self, + api_key: &str, + user_id: &str, + limit: u32, + offset: u32, + start_date: Option, + end_date: Option, + ) -> Result<(Vec, u32, u32), ApiError> { + #[derive(Deserialize, Debug)] + #[allow(unused)] + struct UserUploadListResponse { + success: bool, + user_id: String, + uploads: Vec, + limit: u32, + offset: u32, + } + + let mut url = format!( + "{API_BASE_URL}/tracker/v2/uploads/user/{user_id}/list?limit={limit}&offset={offset}" + ); + if let Some(start) = start_date { + url.push_str(&format!("&start_date={}", start.format("%Y-%m-%d"))); + } + if let Some(end) = end_date { + url.push_str(&format!("&end_date={}", end.format("%Y-%m-%d"))); + } + + let response = self + .client + .get(url) + .header("Content-Type", "application/json") + .header("X-API-Key", api_key) + .send() + .await?; + + let response = check_for_response_success(response, "User upload list unavailable").await?; + + let server_list = response.json::().await?; + + Ok((server_list.uploads, server_list.limit, server_list.offset)) + } + + /// Legacy method for backward compatibility if needed, though it's better to use the split methods. + #[allow(dead_code)] + pub async fn get_user_upload_stats( + &self, + api_key: &str, + user_id: &str, + limit: u32, + offset: u32, + ) -> Result { + let statistics = self + .get_user_upload_statistics(api_key, user_id, None, None) + .await?; + let (uploads, limit, offset) = self + .get_user_upload_list(api_key, user_id, limit, offset, None, None) + .await?; Ok(UserUploads { - statistics: server_stats.statistics, - uploads: server_stats.uploads, + statistics, + uploads, + limit, + offset, }) } } diff --git a/src/app_state.rs b/src/app_state.rs index 048155b..1045e98 100644 --- a/src/app_state.rs +++ b/src/app_state.rs @@ -12,8 +12,7 @@ use egui_wgpu::wgpu; use tokio::sync::{broadcast, mpsc}; use crate::{ - api::UserUploads, config::Config, play_time::PlayTimeTracker, record::LocalRecording, - upload::ProgressData, + config::Config, play_time::PlayTimeTracker, record::LocalRecording, upload::ProgressData, }; pub struct AppState { @@ -39,6 +38,8 @@ pub struct AppState { pub unsupported_games: RwLock, /// Offline mode state pub offline: OfflineState, + /// Upload filters for date range filtering + pub upload_filters: RwLock, } /// State for offline mode and backoff retry logic @@ -89,12 +90,19 @@ impl AppState { last_recordable_game: RwLock::new(None), unsupported_games: RwLock::new(UnsupportedGames::load_from_embedded()), offline: OfflineState::default(), + upload_filters: RwLock::new(UploadFilters::default()), }; tracing::debug!("AppState::new() complete"); state } } +#[derive(Default, Clone, Copy, Debug)] +pub struct UploadFilters { + pub start_date: Option, + pub end_date: Option, +} + #[derive(Clone, PartialEq)] pub struct ForegroundedGame { pub exe_name: Option, @@ -171,7 +179,11 @@ pub enum AsyncRequest { OpenDataDump, OpenLog, UpdateUnsupportedGames(UnsupportedGames), - LoadUploadStats, + LoadUploadStatistics, + LoadUploadList { + limit: u32, + offset: u32, + }, LoadLocalRecordings, DeleteAllInvalidRecordings, DeleteAllUploadedLocalRecordings, @@ -213,7 +225,12 @@ pub enum UiUpdate { UploadFailed(String), UpdateRecordingState(bool), UpdateNewerReleaseAvailable(GitHubRelease), - UpdateUserUploads(UserUploads), + UpdateUserUploadStatistics(crate::api::UserUploadStatistics), + UpdateUserUploadList { + uploads: Vec, + limit: u32, + offset: u32, + }, UpdateLocalRecordings(Vec), FolderPickerResult { old_path: PathBuf, diff --git a/src/record/local_recording.rs b/src/record/local_recording.rs index 7bcdb70..eb0710d 100644 --- a/src/record/local_recording.rs +++ b/src/record/local_recording.rs @@ -67,15 +67,58 @@ impl UploadProgressState { /// Load progress state from a file pub fn load_from_file(path: &Path) -> eyre::Result { - let content = std::fs::read_to_string(path)?; - let state: Self = serde_json::from_str(&content)?; + let file = std::fs::File::open(path)?; + let reader = std::io::BufReader::new(file); + let mut stream = + serde_json::Deserializer::from_reader(reader).into_iter::(); + + // Read the first object which should be the UploadProgressState + let first_value = stream + .next() + .ok_or_else(|| eyre::eyre!("Empty progress file"))??; + let mut state: Self = serde_json::from_value(first_value)?; + + // If the state was saved in the old format (single JSON object with populated etags), + // we're done (the etags are already in state.chunk_etags). + // If it was saved in the new format (header + log lines), state.chunk_etags might be empty, + // and we need to read the rest of the file. + + // Read subsequent objects as CompleteMultipartUploadChunk + for value in stream { + let chunk: CompleteMultipartUploadChunk = serde_json::from_value(value?)?; + // Avoid duplicates if we're migrating or recovering from a weird state + if !state + .chunk_etags + .iter() + .any(|c| c.chunk_number == chunk.chunk_number) + { + state.chunk_etags.push(chunk); + } + } + Ok(state) } - /// Save progress state to a file + /// Save progress state to a file (Snapshot + Log format) pub fn save_to_file(&self, path: &Path) -> eyre::Result<()> { - let content = serde_json::to_string_pretty(self)?; - std::fs::write(path, content)?; + let file = std::fs::File::create(path)?; + let mut writer = std::io::BufWriter::new(file); + + // 1. Write the base state with EMPTY chunk_etags to the first line. + // We clone to clear the vector without modifying self. + let mut header_state = self.clone(); + header_state.chunk_etags.clear(); + serde_json::to_writer(&mut writer, &header_state)?; + use std::io::Write; + writeln!(&mut writer)?; + + // 2. Write all existing etags as subsequent lines + for chunk in &self.chunk_etags { + serde_json::to_writer(&mut writer, chunk)?; + writeln!(&mut writer)?; + } + + writer.flush()?; Ok(()) } @@ -151,14 +194,30 @@ impl LocalRecordingPaused { &self.upload_progress } - /// Mutate the upload progress state and save to file. - pub fn mutate_upload_progress( + /// Records a successful chunk upload: updates in-memory state and appends to the log file. + pub fn record_chunk_completion( &mut self, - f: impl FnOnce(&mut UploadProgressState) -> R, - ) -> R { - let r = f(&mut self.upload_progress); - self.save_upload_progress().ok(); - r + chunk: CompleteMultipartUploadChunk, + ) -> eyre::Result<()> { + // Update in-memory + self.upload_progress.chunk_etags.push(chunk.clone()); + + // Append to disk (efficient log append) + // We construct the path manually here or use the one from info, + // but UploadProgressState doesn't store the full progress file path, only tar path. + // We can use the helper method on self. + + let path = self.upload_progress_path(); + let mut file = std::fs::OpenOptions::new() + .append(true) + .create(false) // Should already exist + .open(path)?; + + serde_json::to_writer(&mut file, &chunk)?; + use std::io::Write; + writeln!(&mut file)?; + + Ok(()) } /// Save upload progress state to .upload-progress file. diff --git a/src/tokio_thread.rs b/src/tokio_thread.rs index dd9a1cf..6bd32c8 100644 --- a/src/tokio_thread.rs +++ b/src/tokio_thread.rs @@ -215,7 +215,8 @@ async fn main( .send(UiUpdate::UpdateUserId(Ok(user_id))) .ok(); - app_state.async_request_tx.send(AsyncRequest::LoadUploadStats).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadStatistics).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadList { limit: 100, offset: 0 }).await.ok(); } } // no matter if offline or online, local recordings should be loaded @@ -266,31 +267,62 @@ async fn main( unsupported_games.games.len(), ); } - AsyncRequest::LoadUploadStats => { + AsyncRequest::LoadUploadStatistics => { if app_state.offline.mode.load(Ordering::SeqCst) { - tracing::info!("Offline mode enabled, skipping upload stats load"); - // Don't send any update - UI will show no upload stats + tracing::info!("Offline mode enabled, skipping upload statistics load"); } else { match valid_api_key_and_user_id.clone() { Some((api_key, user_id)) => { + let start_date = app_state.upload_filters.read().unwrap().start_date; + let end_date = app_state.upload_filters.read().unwrap().end_date; tokio::spawn({ let app_state = app_state.clone(); let api_client = api_client.clone(); async move { - let stats = match api_client.get_user_upload_stats(&api_key, &user_id).await { + let stats = match api_client.get_user_upload_statistics(&api_key, &user_id, start_date, end_date).await { Ok(stats) => stats, Err(e) => { - tracing::error!(e=?e, "Failed to get user upload stats"); + tracing::error!(e=?e, "Failed to get user upload statistics"); return; } }; - tracing::info!(stats=?stats.statistics, "Loaded upload stats"); - app_state.ui_update_tx.send(UiUpdate::UpdateUserUploads(stats)).ok(); + tracing::info!(stats=?stats, "Loaded upload statistics"); + app_state.ui_update_tx.send(UiUpdate::UpdateUserUploadStatistics(stats)).ok(); } }); } None => { - tracing::error!("API key and user ID not found, skipping upload stats load"); + tracing::error!("API key and user ID not found, skipping upload statistics load"); + } + } + } + } + AsyncRequest::LoadUploadList { limit, offset } => { + if app_state.offline.mode.load(Ordering::SeqCst) { + tracing::info!("Offline mode enabled, skipping upload list load"); + } else { + match valid_api_key_and_user_id.clone() { + Some((api_key, user_id)) => { + let start_date = app_state.upload_filters.read().unwrap().start_date; + let end_date = app_state.upload_filters.read().unwrap().end_date; + tokio::spawn({ + let app_state = app_state.clone(); + let api_client = api_client.clone(); + async move { + let (uploads, limit, offset) = match api_client.get_user_upload_list(&api_key, &user_id, limit, offset, start_date, end_date).await { + Ok(res) => res, + Err(e) => { + tracing::error!(e=?e, "Failed to get user upload list"); + return; + } + }; + tracing::info!(count=uploads.len(), "Loaded upload list"); + app_state.ui_update_tx.send(UiUpdate::UpdateUserUploadList { uploads, limit, offset }).ok(); + } + }); + } + None => { + tracing::error!("API key and user ID not found, skipping upload list load"); } } } @@ -501,7 +533,8 @@ async fn main( app_state.async_request_tx.send(AsyncRequest::CancelOfflineBackoff).await.ok(); app_state.async_request_tx.send(AsyncRequest::ValidateApiKey { api_key }).await.ok(); // Load data now that we're online - app_state.async_request_tx.send(AsyncRequest::LoadUploadStats).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadStatistics).await.ok(); + app_state.async_request_tx.send(AsyncRequest::LoadUploadList { limit: 100, offset: 0 }).await.ok(); app_state.async_request_tx.send(AsyncRequest::LoadLocalRecordings).await.ok(); }, } diff --git a/src/ui/views/main/upload_manager.rs b/src/ui/views/main/upload_manager.rs index 9bac45e..cde9728 100644 --- a/src/ui/views/main/upload_manager.rs +++ b/src/ui/views/main/upload_manager.rs @@ -5,7 +5,7 @@ use egui::{ }; use crate::{ - api::UserUpload, + api::{UserUpload, UserUploadStatistics}, app_state::{AppState, AsyncRequest}, config::Preferences, output_types::Metadata, @@ -26,8 +26,13 @@ pub struct UploadManager { prev_auto_upload_enabled: bool, } impl UploadManager { - pub fn update_user_uploads(&mut self, user_uploads: Vec) { - self.recordings.update_user_uploads(user_uploads); + pub fn update_user_upload_statistics(&mut self, statistics: UserUploadStatistics) { + self.recordings.update_user_upload_statistics(statistics); + } + + pub fn update_user_upload_list(&mut self, uploads: Vec, limit: u32, offset: u32) { + self.recordings + .update_user_upload_list(uploads, limit, offset); self.virtual_list.reset(); } @@ -66,11 +71,22 @@ pub struct Recordings { filtered_excluding_local_uploaded: Vec, latest_upload_timestamp: Option>, invalid_count_filtered: usize, + + statistics: Option, + limit: u32, + offset: u32, } impl Recordings { - pub fn update_user_uploads(&mut self, user_uploads: Vec) { - self.storage.uploaded = user_uploads; + pub fn update_user_upload_statistics(&mut self, statistics: UserUploadStatistics) { + self.statistics = Some(statistics); + self.update_calculated_state(); + } + + pub fn update_user_upload_list(&mut self, uploads: Vec, limit: u32, offset: u32) { + self.storage.uploaded = uploads; self.storage.uploads_available = true; + self.limit = limit; + self.offset = offset; self.update_calculated_state(); } @@ -617,39 +633,20 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { let available_width = ui.available_width() - (cell_count as f32 * 10.0); let cell_width = available_width / cell_count as f32; - // Calculate stats for each of our categories. The endpoint that we use to get - // the uploaded recordings tells us this, but over the entire range: we'd like to - // cover just the user-filtered range. - let mut total_duration: f64 = 0.0; - let mut total_count: usize = 0; - let mut total_size: u64 = 0; - let mut last_upload: Option> = None; - + // Calculate pending stats let mut unuploaded_duration: f64 = 0.0; let mut unuploaded_count: usize = 0; let mut unuploaded_size: u64 = 0; for recording in recordings.iter_filtered() { - match recording { - Recording::Uploaded(recording) => { - total_duration += recording.video_duration_seconds.unwrap_or(0.0); - total_count += 1; - total_size += recording.file_size_bytes; - if last_upload.is_none() || recording.created_at > last_upload.unwrap() { - last_upload = Some(recording.created_at); - } - } - Recording::Local( - LocalRecording::Unuploaded { info, metadata } - | LocalRecording::Paused(LocalRecordingPaused { metadata, info, .. }), - ) => { - unuploaded_duration += metadata.as_ref().map(|m| m.duration).unwrap_or(0.0); - unuploaded_count += 1; - unuploaded_size += info.folder_size; - } - Recording::Local(LocalRecording::Invalid { .. } | LocalRecording::Uploaded { .. }) => { - // We don't count these in our stats - } + if let Recording::Local( + LocalRecording::Unuploaded { info, metadata } + | LocalRecording::Paused(LocalRecordingPaused { metadata, info, .. }), + ) = recording + { + unuploaded_duration += metadata.as_ref().map(|m| m.duration).unwrap_or(0.0); + unuploaded_count += 1; + unuploaded_size += info.folder_size; } } @@ -674,16 +671,14 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { vec2(cell_width, ui.available_height()), Layout::top_down(Align::Center), |ui| { - create_upload_cell( - ui, - "📊", // Icon - "Total Uploaded", - &if recordings.uploads_available() { - util::format_seconds(total_duration as u64) - } else { - "Loading...".to_string() - }, - ); + let val = if let Some(stats) = &recordings.statistics { + util::format_seconds(stats.total_video_time.seconds as u64) + } else if recordings.uploads_available() { + "0s".to_string() + } else { + "Loading...".to_string() + }; + create_upload_cell(ui, "📊", "Total Uploaded", &val); }, ); @@ -692,16 +687,14 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { vec2(cell_width, ui.available_height()), Layout::top_down(Align::Center), |ui| { - create_upload_cell( - ui, - "📁", // Icon - "Files Uploaded", - &if recordings.uploads_available() { - total_count.to_string() - } else { - "Loading...".to_string() - }, - ); + let val = if let Some(stats) = &recordings.statistics { + stats.total_uploads.to_string() + } else if recordings.uploads_available() { + "0".to_string() + } else { + "Loading...".to_string() + }; + create_upload_cell(ui, "📁", "Files Uploaded", &val); }, ); @@ -710,16 +703,14 @@ fn upload_stats_view(ui: &mut Ui, recordings: &Recordings) { vec2(cell_width, ui.available_height()), Layout::top_down(Align::Center), |ui| { - create_upload_cell( - ui, - "💾", // Icon - "Volume Uploaded", - &if recordings.uploads_available() { - util::format_bytes(total_size) - } else { - "Loading...".to_string() - }, - ); + let val = if let Some(stats) = &recordings.statistics { + util::format_bytes(stats.total_data.bytes) + } else if recordings.uploads_available() { + "0 B".to_string() + } else { + "Loading...".to_string() + }; + create_upload_cell(ui, "💾", "Volume Uploaded", &val); }, ); @@ -783,6 +774,7 @@ fn recordings_view( }) .show(ui, |ui| { let button_height = 28.0; + let pagination_height = 30.0; let height = (ui.available_height() - FOOTER_HEIGHT).max(button_height); // Show spinner if still loading @@ -815,8 +807,60 @@ fn recordings_view( .ok(); } + // Pagination Controls + if let Some(stats) = &recordings.statistics + && stats.total_uploads > 0 + { + ui.horizontal(|ui| { + let total_pages = + (stats.total_uploads as f32 / recordings.limit as f32).ceil() as u32; + let current_page = (recordings.offset / recordings.limit.max(1)) + 1; + + ui.add_enabled_ui(recordings.offset > 0, |ui| { + if ui.button("Previous").clicked() { + let new_offset = recordings.offset.saturating_sub(recordings.limit); + app_state + .async_request_tx + .blocking_send(AsyncRequest::LoadUploadList { + limit: recordings.limit, + offset: new_offset, + }) + .ok(); + } + }); + + ui.label(format!("Page {current_page} of {total_pages}")); + + ui.add_enabled_ui(current_page < total_pages, |ui| { + if ui.button("Next").clicked() { + let new_offset = recordings.offset + recordings.limit; + app_state + .async_request_tx + .blocking_send(AsyncRequest::LoadUploadList { + limit: recordings.limit, + offset: new_offset, + }) + .ok(); + } + }); + + ui.with_layout(Layout::right_to_left(Align::Center), |ui| { + ui.label(format!("Total records: {}", stats.total_uploads)); + }); + }); + ui.separator(); + } + ScrollArea::vertical() - .max_height(height - if any_invalid { button_height } else { 0.0 }) + .max_height( + height + - (if any_invalid { button_height } else { 0.0 }) + - (if recordings.statistics.is_some() { + pagination_height + } else { + 0.0 + }), + ) .auto_shrink([false, false]) .show(ui, |ui| { if recordings.is_empty_filtered() { diff --git a/src/ui/views/mod.rs b/src/ui/views/mod.rs index a024978..8144f05 100644 --- a/src/ui/views/mod.rs +++ b/src/ui/views/mod.rs @@ -164,10 +164,19 @@ impl App { UiUpdate::UpdateNewerReleaseAvailable(release) => { self.newer_release_available = Some(release); } - UiUpdate::UpdateUserUploads(uploads) => { + UiUpdate::UpdateUserUploadStatistics(statistics) => { self.main_view_state .upload_manager - .update_user_uploads(uploads.uploads); + .update_user_upload_statistics(statistics); + } + UiUpdate::UpdateUserUploadList { + uploads, + limit, + offset, + } => { + self.main_view_state + .upload_manager + .update_user_upload_list(uploads, limit, offset); } UiUpdate::UpdateLocalRecordings(recordings) => { self.main_view_state diff --git a/src/upload/mod.rs b/src/upload/mod.rs index 6ede1f7..2aca081 100644 --- a/src/upload/mod.rs +++ b/src/upload/mod.rs @@ -80,7 +80,11 @@ pub async fn start( .store(false, std::sync::atomic::Ordering::SeqCst); for req in [ - AsyncRequest::LoadUploadStats, + AsyncRequest::LoadUploadStatistics, + AsyncRequest::LoadUploadList { + limit: 100, + offset: 0, + }, AsyncRequest::LoadLocalRecordings, ] { app_state.async_request_tx.send(req).await.ok(); @@ -220,7 +224,11 @@ async fn run( if should_reload { for req in [ - AsyncRequest::LoadUploadStats, + AsyncRequest::LoadUploadStatistics, + AsyncRequest::LoadUploadList { + limit: 100, + offset: 0, + }, AsyncRequest::LoadLocalRecordings, ] { async_req_tx.send(req).await.ok(); diff --git a/src/upload/upload_folder.rs b/src/upload/upload_folder.rs index 885596c..8d7073d 100644 --- a/src/upload/upload_folder.rs +++ b/src/upload/upload_folder.rs @@ -217,7 +217,12 @@ pub async fn upload_folder( init_response.expires_at, ); - LocalRecordingPaused::new(info, metadata, upload_progress) + let paused = LocalRecordingPaused::new(info, metadata, upload_progress); + // Ensure we save initial progress so that the chunk-completion-recorder has something to work with + paused + .save_upload_progress() + .map_err(std::io::Error::other)?; + paused }; Ok(super::upload_tar::run( diff --git a/src/upload/upload_tar.rs b/src/upload/upload_tar.rs index 73ea0cc..88ef616 100644 --- a/src/upload/upload_tar.rs +++ b/src/upload/upload_tar.rs @@ -200,27 +200,161 @@ pub async fn run( let mut guard = AbortUploadOnDrop::new(api_client.clone(), api_token.to_string(), paused); { - let mut file = tokio::fs::File::open(tar_path).await?; - - // If resuming, seek to the correct position in the file - if start_chunk > 1 { - let bytes_to_skip = (start_chunk - 1) * chunk_size_bytes; - file.seek(std::io::SeekFrom::Start(bytes_to_skip)) - .await - .map_err(UploadTarError::Io)?; - tracing::info!( - "Seeking to byte {} to resume from chunk {}", - bytes_to_skip, - start_chunk - ); - } + let file = tokio::fs::File::open(tar_path.clone()).await?; + + // Pipeline Channels + // Channel 1: Producer -> Signer + // Payload: (Chunk Data, Chunk Hash, Chunk Number) + let (tx_hashed, mut rx_hashed) = tokio::sync::mpsc::channel(2); + + // Channel 2: Signer -> Uploader + // Payload: (Chunk Data, Upload URL, Chunk Number) + let (tx_signed, mut rx_signed) = tokio::sync::mpsc::channel(2); + + // --- STAGE 1: PRODUCER (Read & Hash) --- + let producer_handle = tokio::spawn({ + let mut file = file; + let pause_flag = pause_flag.clone(); + async move { + // If resuming, seek to the correct position in the file + if start_chunk > 1 { + let bytes_to_skip = (start_chunk - 1) * chunk_size_bytes; + if let Err(e) = file.seek(std::io::SeekFrom::Start(bytes_to_skip)).await { + return Err(UploadTarError::Io(e)); + } + tracing::info!( + "Seeking to byte {} to resume from chunk {}", + bytes_to_skip, + start_chunk + ); + } - // TODO: make this less sloppy. - // Instead of allocating a chunk-sized buffer, and then allocating that buffer - // again for each chunk's stream, figure out a way to stream each chunk from the file - // directly into the hasher, and then stream each chunk directly into the uploader - let mut buffer = vec![0u8; chunk_size_bytes as usize]; - let client = reqwest::Client::new(); + let mut buffer = vec![0u8; chunk_size_bytes as usize]; + + for chunk_number in start_chunk..=total_chunks { + // Check pause + if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { + break; + } + + // Read chunk + let mut buffer_start = 0; + loop { + match file.read(&mut buffer[buffer_start..]).await { + Ok(0) => break, // EOF + Ok(n) => buffer_start += n, + Err(e) => return Err(UploadTarError::Io(e)), + } + } + + let chunk_size = buffer_start; + if chunk_size == 0 { + break; + } + + let chunk_data = buffer[..chunk_size].to_vec(); + + // Offload Hashing to blocking thread + let hash_result = tokio::task::spawn_blocking({ + let data = chunk_data.clone(); + move || sha256::digest(&data) + }) + .await; + + let chunk_hash = match hash_result { + Ok(hash) => hash, + Err(join_err) => { + return Err(UploadTarError::from(std::io::Error::other(join_err))); + } + }; + + if tx_hashed + .send(Ok((chunk_data, chunk_hash, chunk_number))) + .await + .is_err() + { + break; // Receiver dropped + } + } + Ok(()) + } + }); + + // --- STAGE 2: SIGNER (Get Upload URL) --- + let signer_handle = tokio::spawn({ + let api_client = api_client.clone(); + let api_token = api_token.to_string(); + let upload_id = upload_id.clone(); + let pause_flag = pause_flag.clone(); + async move { + while let Some(msg) = rx_hashed.recv().await { + if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { + break; + } + + let (chunk_data, chunk_hash, chunk_number) = match msg { + Ok(val) => val, + Err(e) => { + let _ = tx_signed.send(Err(e)).await; + break; + } + }; + + // Retry with exponential backoff for getting signed URL + let backoff = ExponentialBackoff { + initial_interval: Duration::from_millis(500), + max_interval: Duration::from_secs(8), + max_elapsed_time: Some(Duration::from_secs(16)), + multiplier: 2.0, + randomization_factor: 0.25, + ..Default::default() + }; + + let upload_url_result = retry_notify( + backoff, + || async { + api_client + .upload_multipart_chunk( + &api_token, + &upload_id, + chunk_number, + &chunk_hash, + ) + .await + .map(|resp| resp.upload_url) + .map_err(BackoffError::transient) + }, + |err, dur| { + tracing::warn!( + "Failed to get signed URL for chunk {}, retrying in {dur:?}: {err:?}", + chunk_number + ); + }, + ) + .await; + + match upload_url_result { + Ok(url) => { + if tx_signed + .send(Ok((chunk_data, url, chunk_number))) + .await + .is_err() + { + break; + } + } + Err(e) => { + let err = UploadTarError::Api { + api_request: "upload_multipart_chunk", + error: e, + }; + let _ = tx_signed.send(Err(err)).await; + break; + } + } + } + } + }); // Initialize progress sender with bytes already uploaded let bytes_already_uploaded = (start_chunk - 1) * chunk_size_bytes; @@ -230,7 +364,16 @@ pub async fn run( sender })); - for chunk_number in start_chunk..=total_chunks { + let client = reqwest::Client::new(); + + // --- STAGE 3: UPLOADER (PUT Data) --- + while let Some(msg) = rx_signed.recv().await { + // Check for error from previous stages + let (chunk_data, upload_url, chunk_number) = match msg { + Ok(val) => val, + Err(e) => return Err(e), + }; + // Check if upload has been paused (user-initiated pause) if pause_flag.load(std::sync::atomic::Ordering::SeqCst) { // Ensure the latest progress is saved for resume @@ -270,22 +413,10 @@ pub async fn run( "Uploading chunk {chunk_number}/{total_chunks} for upload_id {upload_id}" ); - // Read chunk data from file (only once per chunk, not per retry) - let mut buffer_start = 0; - loop { - let bytes_read = file.read(&mut buffer[buffer_start..]).await?; - if bytes_read == 0 { - break; - } - buffer_start += bytes_read; - } - let chunk_size = buffer_start; - let chunk_data = buffer[..chunk_size].to_vec(); - let chunk_hash = sha256::digest(&chunk_data); - // Store bytes_uploaded before attempting the chunk let bytes_before_chunk = progress_sender.lock().unwrap().bytes_uploaded(); let mut retries = 0u32; + // Should be about 5-6 retries let backoff = ExponentialBackoff { initial_interval: Duration::from_millis(500), @@ -305,22 +436,44 @@ pub async fn run( .unwrap() .set_bytes_uploaded(bytes_before_chunk); - let chunk = Chunk { - data: &chunk_data, - hash: &chunk_hash, - number: chunk_number, - }; - - upload_single_chunk( - chunk, - &api_client, - api_token, - &upload_id, - progress_sender.clone(), - &client, - ) - .await - .map_err(BackoffError::transient) + // Create a stream that wraps chunk_data and tracks upload progress + let progress_stream = + tokio_util::io::ReaderStream::new(std::io::Cursor::new(chunk_data.clone())) + .inspect_ok({ + let progress_sender = progress_sender.clone(); + move |bytes| { + progress_sender + .lock() + .unwrap() + .increment_bytes_uploaded(bytes.len() as u64); + } + }); + + let res = client + .put(&upload_url) + .header("Content-Type", "application/octet-stream") + .header("Content-Length", chunk_data.len()) + .body(reqwest::Body::wrap_stream(progress_stream)) + .send() + .await + .map_err(|e| BackoffError::transient(UploadSingleChunkError::Reqwest(e)))?; + + if !res.status().is_success() { + return Err(BackoffError::transient( + UploadSingleChunkError::ChunkUploadFailed(res.status()), + )); + } + + let etag = res + .headers() + .get("etag") + .and_then(|h| h.to_str().ok()) + .map(|s| s.trim_matches('"').to_owned()) + .ok_or_else(|| { + BackoffError::transient(UploadSingleChunkError::NoEtagHeaderFound) + })?; + + Ok(etag) }, |err, dur| { retries += 1; @@ -339,18 +492,34 @@ pub async fn run( progress_sender.lock().unwrap().send(); - // Update progress state with new chunk and save to file - guard.paused_mut().mutate_upload_progress(|progress| { - progress - .chunk_etags - .push(CompleteMultipartUploadChunk { chunk_number, etag }); - }); + // Update progress state with new chunk and save to file (APPEND ONLY) + if let Err(e) = guard + .paused_mut() + .record_chunk_completion(CompleteMultipartUploadChunk { chunk_number, etag }) + { + tracing::error!("Failed to append chunk completion to log: {:?}", e); + } tracing::info!( "Uploaded chunk {chunk_number}/{total_chunks} for upload_id {upload_id}" ); } + + // Ensure producer and signer tasks didn't crash + if let Err(e) = producer_handle.await { + tracing::error!("Producer task failed: {:?}", e); + return Err(UploadTarError::from(std::io::Error::other( + "Producer task failed", + ))); + } + if let Err(e) = signer_handle.await { + tracing::error!("Signer task failed: {:?}", e); + return Err(UploadTarError::from(std::io::Error::other( + "Signer task failed", + ))); + } } + let completion_result = match api_client .complete_multipart_upload( api_token, @@ -399,19 +568,9 @@ pub async fn run( } } -struct Chunk<'a> { - data: &'a [u8], - hash: &'a str, - number: u64, -} - #[derive(Debug)] pub enum UploadSingleChunkError { Io(std::io::Error), - Api { - api_request: &'static str, - error: ApiError, - }, Reqwest(reqwest::Error), ChunkUploadFailed(reqwest::StatusCode), NoEtagHeaderFound, @@ -420,9 +579,6 @@ impl std::fmt::Display for UploadSingleChunkError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { UploadSingleChunkError::Io(e) => write!(f, "I/O error: {e}"), - UploadSingleChunkError::Api { api_request, error } => { - write!(f, "API error for {api_request}: {error:?}") - } UploadSingleChunkError::Reqwest(e) => write!(f, "Reqwest error: {e}"), UploadSingleChunkError::ChunkUploadFailed(status) => { write!(f, "Chunk upload failed with status: {status}") @@ -439,7 +595,6 @@ impl UploadSingleChunkError { pub fn is_network_error(&self) -> bool { match self { UploadSingleChunkError::Reqwest(e) => e.is_connect() || e.is_timeout(), - UploadSingleChunkError::Api { error, .. } => error.is_network_error(), _ => false, } } @@ -454,54 +609,3 @@ impl From for UploadSingleChunkError { UploadSingleChunkError::Reqwest(e) } } - -async fn upload_single_chunk( - chunk: Chunk<'_>, - api_client: &Arc, - api_token: &str, - upload_id: &str, - progress_sender: Arc>, - client: &reqwest::Client, -) -> Result { - let multipart_chunk_response = api_client - .upload_multipart_chunk(api_token, upload_id, chunk.number, chunk.hash) - .await - .map_err(|e| UploadSingleChunkError::Api { - api_request: "upload_multipart_chunk", - error: e, - })?; - - // Create a stream that wraps chunk_data and tracks upload progress - let progress_stream = - tokio_util::io::ReaderStream::new(std::io::Cursor::new(chunk.data.to_vec())).inspect_ok({ - let progress_sender = progress_sender.clone(); - move |bytes| { - progress_sender - .lock() - .unwrap() - .increment_bytes_uploaded(bytes.len() as u64); - } - }); - - let res = client - .put(&multipart_chunk_response.upload_url) - .header("Content-Type", "application/octet-stream") - .header("Content-Length", chunk.data.len()) - .body(reqwest::Body::wrap_stream(progress_stream)) - .send() - .await?; - - if !res.status().is_success() { - return Err(UploadSingleChunkError::ChunkUploadFailed(res.status())); - } - - // Extract etag header from response - let etag = res - .headers() - .get("etag") - .and_then(|hv| hv.to_str().ok()) - .map(|s| s.trim_matches('"').to_owned()) - .ok_or(UploadSingleChunkError::NoEtagHeaderFound)?; - - Ok(etag) -}