diff --git a/blade/db/postgres/mod.rs b/blade/db/postgres/mod.rs index e33d8bc..c1b4a29 100644 --- a/blade/db/postgres/mod.rs +++ b/blade/db/postgres/mod.rs @@ -344,6 +344,30 @@ impl state::DB for Postgres { .context(format!("failed to delete invocation since {ot:#?}")) } + fn delete_invocations_batch( + &mut self, + ts: &std::time::SystemTime, + limit: i64, + ) -> anyhow::Result { + let ot: time::OffsetDateTime = (*ts).into(); + let ids_to_delete: Vec = schema::invocations::table + .select(schema::invocations::id) + .filter(schema::invocations::start.le(ot)) + .limit(limit) + .load(&mut self.conn) + .context("failed to query invocations for batch deletion")?; + + if ids_to_delete.is_empty() { + return Ok(0); + } + + diesel::delete( + schema::invocations::table.filter(schema::invocations::id.eq_any(&ids_to_delete)), + ) + .execute(&mut self.conn) + .context("failed to delete invocation batch") + } + fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()> { use schema::invocations::dsl::*; let now: time::OffsetDateTime = std::time::SystemTime::now().into(); @@ -1048,6 +1072,99 @@ mod tests { } } + #[test] + fn test_delete_batch() { + let tmp = tempdir::TempDir::new("test_delete_batch").unwrap(); + let harness = harness::new(tmp.path().to_str().unwrap()).unwrap(); + let uri = harness.uri(); + super::init_db(&uri).unwrap(); + let mgr = crate::manager::PostgresManager::new(&uri).unwrap(); + let mut conn = PgConnection::establish(&uri).unwrap(); + let mut db = mgr.get().unwrap(); + + let start = UNIX_EPOCH; + let mut curr = start; + let day = Duration::from_secs(60 * 60 * 24); + + // Create 15 invocations + for i in 0..15 { + db.upsert_shallow_invocation(&state::InvocationResults { + id: format!("id{i}"), + start: curr.checked_add(day).unwrap(), + ..Default::default() + }) + .unwrap(); + curr += day; + } + + // Verify all 15 exist + { + let res = super::schema::invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 15); + } + + // Delete in batches of 3, targeting first 10 invocations + let cutoff = start.checked_add(day * 10).unwrap(); + + // First batch: should delete 3 + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 3); + { + let res = super::schema::invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 12); + } + + // Second batch: should delete 3 more + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 3); + { + let res = super::schema::invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 9); + } + + // Third batch: should delete 3 more + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 3); + { + let res = super::schema::invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 6); + } + + // Fourth batch: should delete 1 more (only 1 left before cutoff) + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 1); + { + let res = super::schema::invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 5); + } + + // Fifth batch: should delete 0 (none left before cutoff) + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 0); + { + let res = super::schema::invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 5); + } + } + #[test] fn test_options() { let tmp = tempdir::TempDir::new("test_test").unwrap(); diff --git a/blade/db/sqlite/mod.rs b/blade/db/sqlite/mod.rs index 2e1b674..e36ff04 100644 --- a/blade/db/sqlite/mod.rs +++ b/blade/db/sqlite/mod.rs @@ -362,6 +362,30 @@ impl state::DB for Sqlite { .context(format!("failed to delete invocation since {ot:#?}")) } + fn delete_invocations_batch( + &mut self, + ts: &std::time::SystemTime, + limit: i64, + ) -> anyhow::Result { + let ot: time::OffsetDateTime = (*ts).into(); + let ids_to_delete: Vec = schema::Invocations::table + .select(schema::Invocations::id) + .filter(unixepoch(schema::Invocations::start).le(unixepoch(ot))) + .limit(limit) + .load(&mut self.conn) + .context("failed to query invocations for batch deletion")?; + + if ids_to_delete.is_empty() { + return Ok(0); + } + + diesel::delete( + schema::Invocations::table.filter(schema::Invocations::id.eq_any(&ids_to_delete)), + ) + .execute(&mut self.conn) + .context("failed to delete invocation batch") + } + fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()> { use schema::Invocations::dsl::*; let now: time::OffsetDateTime = std::time::SystemTime::now().into(); @@ -1069,6 +1093,98 @@ mod tests { } } + #[test] + fn test_delete_batch() { + let tmp = tempdir::TempDir::new("test_delete_batch").unwrap(); + let db_path = tmp.path().join("test.db"); + super::init_db(db_path.to_str().unwrap()).unwrap(); + let mut conn = SqliteConnection::establish(db_path.to_str().unwrap()).unwrap(); + let mgr = crate::manager::SqliteManager::new(db_path.to_str().unwrap()).unwrap(); + let mut db = mgr.get().unwrap(); + + let start = UNIX_EPOCH; + let mut curr = start; + let day = Duration::from_secs(60 * 60 * 24); + + // Create 15 invocations + for i in 0..15 { + db.upsert_shallow_invocation(&state::InvocationResults { + id: format!("id{i}"), + start: curr.checked_add(day).unwrap(), + ..Default::default() + }) + .unwrap(); + curr += day; + } + + // Verify all 15 exist + { + let res = super::schema::Invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 15); + } + + // Delete in batches of 3, targeting first 10 invocations + let cutoff = start.checked_add(day * 10).unwrap(); + + // First batch: should delete 3 + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 3); + { + let res = super::schema::Invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 12); + } + + // Second batch: should delete 3 more + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 3); + { + let res = super::schema::Invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 9); + } + + // Third batch: should delete 3 more + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 3); + { + let res = super::schema::Invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 6); + } + + // Fourth batch: should delete 1 more (only 1 left before cutoff) + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 1); + { + let res = super::schema::Invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 5); + } + + // Fifth batch: should delete 0 (none left before cutoff) + let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap(); + assert_eq!(deleted, 0); + { + let res = super::schema::Invocations::table + .select(super::models::Invocation::as_select()) + .get_results(&mut conn) + .unwrap(); + assert_eq!(res.len(), 5); + } + } + #[test] fn test_options() { let tmp = tempdir::TempDir::new("test_target").unwrap(); diff --git a/blade/main.rs b/blade/main.rs index b5e2426..770be23 100644 --- a/blade/main.rs +++ b/blade/main.rs @@ -340,6 +340,9 @@ cfg_if! { let day = std::time::Duration::from_secs(60 * 60 * 24); let interval = global.retention.unwrap_or(day); let check_interval = std::cmp::min(day, interval/7); + const BATCH_SIZE: i64 = 10; + const BATCH_DELAY: std::time::Duration = std::time::Duration::from_millis(100); + loop { tokio::time::sleep(check_interval).await; if global.retention.is_none() { @@ -349,11 +352,32 @@ cfg_if! { tracing::warn!("Overflow when clean up time"); continue; }; - db::run(global.db_manager.clone(), move |db_mgr| db_mgr.delete_invocations_since(&since)).await.inspect_err(|e| { - tracing::warn!("Failed to mark old invocations for deletion: {e:#?}"); - }).ok().inspect(|count| { - tracing::info!("Marked {} invocations for deletion", count); - }); + + let mut total_deleted = 0; + loop { + let deleted = db::run(global.db_manager.clone(), move |db_mgr| { + db_mgr.delete_invocations_batch(&since, BATCH_SIZE) + }).await; + + match deleted { + Ok(count) => { + if count == 0 { + break; + } + total_deleted += count; + tracing::debug!("Deleted batch of {} invocations", count); + tokio::time::sleep(BATCH_DELAY).await; + }, + Err(e) => { + tracing::warn!("Failed to delete invocation batch: {e:#?}"); + break; + } + } + } + + if total_deleted > 0 { + tracing::info!("Deleted {} total invocations during cleanup", total_deleted); + } } } diff --git a/blade/state/lib.rs b/blade/state/lib.rs index e82fec2..9e68f38 100644 --- a/blade/state/lib.rs +++ b/blade/state/lib.rs @@ -212,6 +212,7 @@ pub trait DB { fn get_shallow_invocation(&mut self, id: &str) -> anyhow::Result; fn delete_invocation(&mut self, id: &str) -> anyhow::Result<()>; fn delete_invocations_since(&mut self, ts: &std::time::SystemTime) -> anyhow::Result; + fn delete_invocations_batch(&mut self, ts: &std::time::SystemTime, limit: i64) -> anyhow::Result; fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()>; fn insert_options(&mut self, id: &str, options: &BuildOptions) -> anyhow::Result<()>; fn get_options(&mut self, id: &str) -> anyhow::Result;