Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions blade/db/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,30 @@ impl state::DB for Postgres {
.context(format!("failed to delete invocation since {ot:#?}"))
}

fn delete_invocations_batch(
&mut self,
ts: &std::time::SystemTime,
limit: i64,
) -> anyhow::Result<usize> {
let ot: time::OffsetDateTime = (*ts).into();
let ids_to_delete: Vec<String> = schema::invocations::table
.select(schema::invocations::id)
.filter(schema::invocations::start.le(ot))
.limit(limit)
.load(&mut self.conn)
.context("failed to query invocations for batch deletion")?;

if ids_to_delete.is_empty() {
return Ok(0);
}

diesel::delete(
schema::invocations::table.filter(schema::invocations::id.eq_any(&ids_to_delete)),
)
.execute(&mut self.conn)
.context("failed to delete invocation batch")
}

fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()> {
use schema::invocations::dsl::*;
let now: time::OffsetDateTime = std::time::SystemTime::now().into();
Expand Down Expand Up @@ -1048,6 +1072,99 @@ mod tests {
}
}

#[test]
fn test_delete_batch() {
let tmp = tempdir::TempDir::new("test_delete_batch").unwrap();
let harness = harness::new(tmp.path().to_str().unwrap()).unwrap();
let uri = harness.uri();
super::init_db(&uri).unwrap();
let mgr = crate::manager::PostgresManager::new(&uri).unwrap();
let mut conn = PgConnection::establish(&uri).unwrap();
let mut db = mgr.get().unwrap();

let start = UNIX_EPOCH;
let mut curr = start;
let day = Duration::from_secs(60 * 60 * 24);

// Create 15 invocations
for i in 0..15 {
db.upsert_shallow_invocation(&state::InvocationResults {
id: format!("id{i}"),
start: curr.checked_add(day).unwrap(),
..Default::default()
})
.unwrap();
curr += day;
}

// Verify all 15 exist
{
let res = super::schema::invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 15);
}

// Delete in batches of 3, targeting first 10 invocations
let cutoff = start.checked_add(day * 10).unwrap();

// First batch: should delete 3
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 3);
{
let res = super::schema::invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 12);
}

// Second batch: should delete 3 more
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 3);
{
let res = super::schema::invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 9);
}

// Third batch: should delete 3 more
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 3);
{
let res = super::schema::invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 6);
}

// Fourth batch: should delete 1 more (only 1 left before cutoff)
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 1);
{
let res = super::schema::invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 5);
}

// Fifth batch: should delete 0 (none left before cutoff)
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 0);
{
let res = super::schema::invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 5);
}
}

#[test]
fn test_options() {
let tmp = tempdir::TempDir::new("test_test").unwrap();
Expand Down
116 changes: 116 additions & 0 deletions blade/db/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,30 @@ impl state::DB for Sqlite {
.context(format!("failed to delete invocation since {ot:#?}"))
}

fn delete_invocations_batch(
&mut self,
ts: &std::time::SystemTime,
limit: i64,
) -> anyhow::Result<usize> {
let ot: time::OffsetDateTime = (*ts).into();
let ids_to_delete: Vec<String> = schema::Invocations::table
.select(schema::Invocations::id)
.filter(unixepoch(schema::Invocations::start).le(unixepoch(ot)))
.limit(limit)
.load(&mut self.conn)
.context("failed to query invocations for batch deletion")?;

if ids_to_delete.is_empty() {
return Ok(0);
}

diesel::delete(
schema::Invocations::table.filter(schema::Invocations::id.eq_any(&ids_to_delete)),
)
.execute(&mut self.conn)
.context("failed to delete invocation batch")
}

fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()> {
use schema::Invocations::dsl::*;
let now: time::OffsetDateTime = std::time::SystemTime::now().into();
Expand Down Expand Up @@ -1069,6 +1093,98 @@ mod tests {
}
}

#[test]
fn test_delete_batch() {
let tmp = tempdir::TempDir::new("test_delete_batch").unwrap();
let db_path = tmp.path().join("test.db");
super::init_db(db_path.to_str().unwrap()).unwrap();
let mut conn = SqliteConnection::establish(db_path.to_str().unwrap()).unwrap();
let mgr = crate::manager::SqliteManager::new(db_path.to_str().unwrap()).unwrap();
let mut db = mgr.get().unwrap();

let start = UNIX_EPOCH;
let mut curr = start;
let day = Duration::from_secs(60 * 60 * 24);

// Create 15 invocations
for i in 0..15 {
db.upsert_shallow_invocation(&state::InvocationResults {
id: format!("id{i}"),
start: curr.checked_add(day).unwrap(),
..Default::default()
})
.unwrap();
curr += day;
}

// Verify all 15 exist
{
let res = super::schema::Invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 15);
}

// Delete in batches of 3, targeting first 10 invocations
let cutoff = start.checked_add(day * 10).unwrap();

// First batch: should delete 3
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 3);
{
let res = super::schema::Invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 12);
}

// Second batch: should delete 3 more
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 3);
{
let res = super::schema::Invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 9);
}

// Third batch: should delete 3 more
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 3);
{
let res = super::schema::Invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 6);
}

// Fourth batch: should delete 1 more (only 1 left before cutoff)
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 1);
{
let res = super::schema::Invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 5);
}

// Fifth batch: should delete 0 (none left before cutoff)
let deleted = db.delete_invocations_batch(&cutoff, 3).unwrap();
assert_eq!(deleted, 0);
{
let res = super::schema::Invocations::table
.select(super::models::Invocation::as_select())
.get_results(&mut conn)
.unwrap();
assert_eq!(res.len(), 5);
}
}

#[test]
fn test_options() {
let tmp = tempdir::TempDir::new("test_target").unwrap();
Expand Down
34 changes: 29 additions & 5 deletions blade/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,9 @@ cfg_if! {
let day = std::time::Duration::from_secs(60 * 60 * 24);
let interval = global.retention.unwrap_or(day);
let check_interval = std::cmp::min(day, interval/7);
const BATCH_SIZE: i64 = 10;
const BATCH_DELAY: std::time::Duration = std::time::Duration::from_millis(100);

loop {
tokio::time::sleep(check_interval).await;
if global.retention.is_none() {
Expand All @@ -349,11 +352,32 @@ cfg_if! {
tracing::warn!("Overflow when clean up time");
continue;
};
db::run(global.db_manager.clone(), move |db_mgr| db_mgr.delete_invocations_since(&since)).await.inspect_err(|e| {
tracing::warn!("Failed to mark old invocations for deletion: {e:#?}");
}).ok().inspect(|count| {
tracing::info!("Marked {} invocations for deletion", count);
});

let mut total_deleted = 0;
loop {
let deleted = db::run(global.db_manager.clone(), move |db_mgr| {
db_mgr.delete_invocations_batch(&since, BATCH_SIZE)
}).await;

match deleted {
Ok(count) => {
if count == 0 {
break;
}
total_deleted += count;
tracing::debug!("Deleted batch of {} invocations", count);
tokio::time::sleep(BATCH_DELAY).await;
},
Err(e) => {
tracing::warn!("Failed to delete invocation batch: {e:#?}");
break;
}
}
}

if total_deleted > 0 {
tracing::info!("Deleted {} total invocations during cleanup", total_deleted);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions blade/state/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ pub trait DB {
fn get_shallow_invocation(&mut self, id: &str) -> anyhow::Result<InvocationResults>;
fn delete_invocation(&mut self, id: &str) -> anyhow::Result<()>;
fn delete_invocations_since(&mut self, ts: &std::time::SystemTime) -> anyhow::Result<usize>;
fn delete_invocations_batch(&mut self, ts: &std::time::SystemTime, limit: i64) -> anyhow::Result<usize>;
fn update_invocation_heartbeat(&mut self, invocation_id: &str) -> anyhow::Result<()>;
fn insert_options(&mut self, id: &str, options: &BuildOptions) -> anyhow::Result<()>;
fn get_options(&mut self, id: &str) -> anyhow::Result<BuildOptions>;
Expand Down