From 6522fe379eb2b76569e4c416f44cb2e26dec3315 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 30 May 2025 08:48:28 -0400 Subject: [PATCH 1/9] itable behave like hash_table for double/reduce/cant_iterate --- dttools/src/itable.c | 185 +++++++++++++++++++++++++++++++++---------- dttools/src/itable.h | 26 ++++++ 2 files changed, 169 insertions(+), 42 deletions(-) diff --git a/dttools/src/itable.c b/dttools/src/itable.c index 73f697a1a8..a9354bc0a0 100644 --- a/dttools/src/itable.c +++ b/dttools/src/itable.c @@ -6,12 +6,14 @@ See the file COPYING for details. */ #include "itable.h" +#include "debug.h" #include #include #define DEFAULT_SIZE 127 -#define DEFAULT_LOAD 0.75 +#define DEFAULT_MAX_LOAD 0.75 +#define DEFAULT_MIN_LOAD 0.125 struct entry { UINT64_T key; @@ -25,6 +27,14 @@ struct itable { struct entry **buckets; int ibucket; struct entry *ientry; + + /* for memory safety, itable_nextkey cannot be called in the same + * iteration if itable_insert or itable_remove has been called. + * In such case, the executable will be terminated with a fatal message. + * If the table should be modified during iterations, consider + * using the array keys from itable_keys_array. (If so, remember + * to free it afterwards with itable_free_keys_array.) */ + int cant_iterate_yet; }; struct itable *itable_create(int bucket_count) @@ -46,6 +56,7 @@ struct itable *itable_create(int bucket_count) } h->size = 0; + h->cant_iterate_yet = 0; return h; } @@ -69,6 +80,9 @@ void itable_clear(struct itable *h, void (*delete_func)(void *)) for (i = 0; i < h->bucket_count; i++) { h->buckets[i] = 0; } + + /* buckets went away, thus a nextkey would be invalid */ + h->cant_iterate_yet = 1; } void itable_delete(struct itable *h) @@ -78,11 +92,63 @@ void itable_delete(struct itable *h) free(h); } +UINT64_T *itable_keys_array(struct itable *h) +{ + UINT64_T *keys = (UINT64_T *)malloc(sizeof(int) * h->size); + int ikey = 0; + + struct entry *e, *f; + int i; + + for (i = 0; i < h->bucket_count; i++) { + e = h->buckets[i]; + while (e) { + keys[ikey] = e->key; + ikey++; + f = e->next; + e = f; + } + } + + return keys; +} + +void itable_free_keys_array(UINT64_T *keys) +{ + free(keys); +} + int itable_size(struct itable *h) { return h->size; } +double itable_load(struct itable *h) +{ + return (double)h->size / h->bucket_count; +} + +static int insert_to_buckets_aux(struct entry **buckets, int bucket_count, struct entry *new_entry) +{ + unsigned index; + struct entry *e; + + index = new_entry->key % bucket_count; + e = buckets[index]; + + while (e) { + /* check that this key does not already exist in the table */ + if (new_entry->key == e->key) { + return 0; + } + e = e->next; + } + + new_entry->next = buckets[index]; + buckets[index] = new_entry; + return 1; +} + void *itable_lookup(struct itable *h, UINT64_T key) { struct entry *e; @@ -103,73 +169,96 @@ void *itable_lookup(struct itable *h, UINT64_T key) static int itable_double_buckets(struct itable *h) { - struct itable *hn = itable_create(2 * h->bucket_count); - - if (!hn) + int new_count = (2 * (h->bucket_count + 1)) - 1; + struct entry **new_buckets = (struct entry **)calloc(new_count, sizeof(struct entry *)); + if (!new_buckets) { return 0; + } - /* Move pairs to new hash */ - uint64_t key; - void *value; - itable_firstkey(h); - while (itable_nextkey(h, &key, &value)) - if (!itable_insert(hn, key, value)) { - itable_delete(hn); - return 0; - } - - /* Delete all old pairs */ struct entry *e, *f; - int i; - for (i = 0; i < h->bucket_count; i++) { + for (int i = 0; i < h->bucket_count; i++) { e = h->buckets[i]; while (e) { f = e->next; - free(e); + e->next = NULL; + insert_to_buckets_aux(new_buckets, new_count, e); e = f; } } /* Make the old point to the new */ free(h->buckets); - h->buckets = hn->buckets; - h->bucket_count = hn->bucket_count; - h->size = hn->size; + h->buckets = new_buckets; + h->bucket_count = new_count; - /* Delete reference to new, so old is safe */ - free(hn); + /* structure of itable changed completely, thus a nextkey would be incorrect. */ + h->cant_iterate_yet = 1; return 1; } int itable_insert(struct itable *h, UINT64_T key, const void *value) { - struct entry *e; - UINT64_T index; - - if (((float)h->size / h->bucket_count) > DEFAULT_LOAD) + if (((float)h->size / h->bucket_count) > DEFAULT_MAX_LOAD) itable_double_buckets(h); - index = key % h->bucket_count; - e = h->buckets[index]; + struct entry *new_entry = (struct entry *)malloc(sizeof(struct entry)); + if (!new_entry) + return 0; - while (e) { - if (key == e->key) { - e->value = (void *)value; - return 1; - } - e = e->next; + new_entry->key = key; + new_entry->value = (void *)value; + + int inserted = insert_to_buckets_aux(h->buckets, h->bucket_count, new_entry); + if (inserted) { + h->size++; + /* inserting cause different behaviours with nextkey (e.g., sometimes the new + * key would be included or skipped in the iteration */ + h->cant_iterate_yet = 1; + + return 1; + } + + return 0; +} + +static int itable_reduce_buckets(struct itable *h) +{ + int new_count = ((h->bucket_count + 1) / 2) - 1; + + /* DEFAULT_SIZE is the minimum size */ + if (new_count < DEFAULT_SIZE) { + return 1; } - e = (struct entry *)malloc(sizeof(struct entry)); - if (!e) + /* Table cannot be reduced above DEFAULT_MAX_LOAD */ + if (((float)h->size / new_count) > DEFAULT_MAX_LOAD) { + return 1; + } + + struct entry **new_buckets = (struct entry **)calloc(new_count, sizeof(struct entry *)); + if (!new_buckets) { return 0; + } - e->key = key; - e->value = (void *)value; - e->next = h->buckets[index]; - h->buckets[index] = e; - h->size++; + struct entry *e, *f; + for (int i = 0; i < h->bucket_count; i++) { + e = h->buckets[i]; + while (e) { + f = e->next; + e->next = NULL; + insert_to_buckets_aux(new_buckets, new_count, e); + e = f; + } + } + + /* Make the old point to the new */ + free(h->buckets); + h->buckets = new_buckets; + h->bucket_count = new_count; + + /* structure of itable changed completely, thus a nextkey would be incorrect. */ + h->cant_iterate_yet = 1; return 1; } @@ -194,6 +283,12 @@ void *itable_remove(struct itable *h, UINT64_T key) value = e->value; free(e); h->size--; + h->cant_iterate_yet = 1; + + if (((float)h->size / h->bucket_count) < DEFAULT_MIN_LOAD) { + itable_reduce_buckets(h); + } + return value; } f = e; @@ -218,6 +313,8 @@ void *itable_pop(struct itable *t) void itable_firstkey(struct itable *h) { + h->cant_iterate_yet = 0; + h->ientry = 0; for (h->ibucket = 0; h->ibucket < h->bucket_count; h->ibucket++) { h->ientry = h->buckets[h->ibucket]; @@ -228,6 +325,10 @@ void itable_firstkey(struct itable *h) int itable_nextkey(struct itable *h, UINT64_T *key, void **value) { + if (h->cant_iterate_yet) { + fatal("cctools bug: the itable iteration has not been reset since last modification"); + } + if (h->ientry) { *key = h->ientry->key; if (value) diff --git a/dttools/src/itable.h b/dttools/src/itable.h index 10c8bb259f..a5bfca24e9 100644 --- a/dttools/src/itable.h +++ b/dttools/src/itable.h @@ -69,6 +69,24 @@ Note that this function will not delete all of the objects contained within the void itable_delete(struct itable *h); + +/** Return an array with all the current keys. +It is the responsibility of the caller to free this array with +itable_free_keys_array. +@param h A pointer to a hash table. +@return An array of all the current keys. +*/ + +UINT64_T *itable_keys_array(struct itable *h); + + +/** Free an array generated from itable_free_keys_array. +@param keys An array of all the keys. +*/ + +void itable_free_keys_array(UINT64_T *keys); + + /** Count the entries in an integer table. @return The number of entries in the table. @param h A pointer to an integer table. @@ -76,6 +94,14 @@ void itable_delete(struct itable *h); int itable_size(struct itable *h); +/** Get the proportion of elements vs buckets in the table. +@return The load of the table. +@param h A pointer to an integer table. +*/ + +double itable_load(struct itable *h); + + /** Insert a key and value. This call will fail if the table already contains the same key. You must call @ref itable_remove to remove it. From 5fa2a3eb6748225d77320832682c596673d8ab3e Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 30 May 2025 08:48:51 -0400 Subject: [PATCH 2/9] fix itable iteration bug in vine_worker --- taskvine/src/worker/vine_worker.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index ee5f404a8e..05844b1216 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -1114,14 +1114,15 @@ then we need to abort to clean things up. static void kill_all_tasks() { - struct vine_process *p; - uint64_t task_id; + uint64_t *task_ids = itable_keys_array(procs_table); + int total_tasks = itable_size(procs_table); - ITABLE_ITERATE(procs_table, task_id, p) - { - do_kill(task_id); + for (int i = 0; i < total_tasks; i++) { + do_kill(task_ids[i]); } + itable_free_keys_array(task_ids); + assert(itable_size(procs_table) == 0); assert(itable_size(procs_running) == 0); assert(itable_size(procs_complete) == 0); From 45c2fe82281e9637ebf69e9929534d12622cf8e1 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 30 May 2025 09:36:56 -0400 Subject: [PATCH 3/9] fix vine_worker itable iteration bug on failed libraries --- taskvine/src/worker/vine_worker.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/taskvine/src/worker/vine_worker.c b/taskvine/src/worker/vine_worker.c index 05844b1216..a500f042a7 100644 --- a/taskvine/src/worker/vine_worker.c +++ b/taskvine/src/worker/vine_worker.c @@ -757,17 +757,21 @@ static void handle_failed_library_process(struct vine_process *p, struct link *m /* Forsake the tasks that are running on this library */ /* It no available libraries on this worker, tasks waiting for this library will be forsaken */ - struct vine_process *p_running; - uint64_t task_id; + uint64_t *task_ids = itable_keys_array(procs_running); + int total_procs = itable_size(procs_running); + + for (int i = 0; i < total_procs; i++) { + uint64_t task_id = task_ids[i]; + struct vine_process *p_running = itable_lookup(procs_running, task_id); - ITABLE_ITERATE(procs_running, task_id, p_running) - { if (p_running->library_process == p) { debug(D_VINE, "killing function task %d running on library task %d", (int)task_id, p->task->task_id); finish_running_task(p_running, VINE_RESULT_FORSAKEN); reap_process(p_running, manager); } } + + itable_free_keys_array(task_ids); } /* From e2c0618625ab85fd7332aa229e6281d167f6d0eb Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Tue, 12 Aug 2025 11:06:05 -0400 Subject: [PATCH 4/9] itable: re-enable replace already existing value This is a possible memory leak. --- dttools/src/itable.c | 12 +----------- dttools/src/itable.h | 3 +-- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/dttools/src/itable.c b/dttools/src/itable.c index a9354bc0a0..3b45194ef7 100644 --- a/dttools/src/itable.c +++ b/dttools/src/itable.c @@ -131,19 +131,9 @@ double itable_load(struct itable *h) static int insert_to_buckets_aux(struct entry **buckets, int bucket_count, struct entry *new_entry) { unsigned index; - struct entry *e; - index = new_entry->key % bucket_count; - e = buckets[index]; - - while (e) { - /* check that this key does not already exist in the table */ - if (new_entry->key == e->key) { - return 0; - } - e = e->next; - } + // Possible memory leak! Silently replacing value if it existed. new_entry->next = buckets[index]; buckets[index] = new_entry; return 1; diff --git a/dttools/src/itable.h b/dttools/src/itable.h index a5bfca24e9..5a8f96c4a8 100644 --- a/dttools/src/itable.h +++ b/dttools/src/itable.h @@ -103,8 +103,7 @@ double itable_load(struct itable *h); /** Insert a key and value. -This call will fail if the table already contains the same key. -You must call @ref itable_remove to remove it. +This call will replace the value if it already contains the same key. Also note that you cannot insert a null value into the table. @param h A pointer to an integer table. @param key An integer key From 6fc9b1afbef5ed195f4cd3f71ab5707ca2f124cc Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 9 Jan 2026 11:04:55 -0500 Subject: [PATCH 5/9] comment why we do not check value replaced in itable --- dttools/src/itable.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/dttools/src/itable.c b/dttools/src/itable.c index 3b45194ef7..a0f1340e1f 100644 --- a/dttools/src/itable.c +++ b/dttools/src/itable.c @@ -199,6 +199,18 @@ int itable_insert(struct itable *h, UINT64_T key, const void *value) new_entry->key = key; new_entry->value = (void *)value; + int inserted = insert_to_buckets_aux(h->buckets, h->bucket_count, new_entry); + if (inserted) { + h->size++; + itable_double_buckets(h); + + struct entry *new_entry = (struct entry *)malloc(sizeof(struct entry)); + if (!new_entry) + return 0; + + new_entry->key = key; + new_entry->value = (void *)value; + int inserted = insert_to_buckets_aux(h->buckets, h->bucket_count, new_entry); if (inserted) { h->size++; From a2e5542e13afe3fb57a50482d597733ffe7ac6be Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 9 Jan 2026 11:15:17 -0500 Subject: [PATCH 6/9] use xxmalloc --- dttools/src/itable.c | 19 ++----------------- 1 file changed, 2 insertions(+), 17 deletions(-) diff --git a/dttools/src/itable.c b/dttools/src/itable.c index a0f1340e1f..b3204d7b15 100644 --- a/dttools/src/itable.c +++ b/dttools/src/itable.c @@ -7,6 +7,7 @@ See the file COPYING for details. #include "itable.h" #include "debug.h" +#include "xxmalloc.h" #include #include @@ -192,22 +193,7 @@ int itable_insert(struct itable *h, UINT64_T key, const void *value) if (((float)h->size / h->bucket_count) > DEFAULT_MAX_LOAD) itable_double_buckets(h); - struct entry *new_entry = (struct entry *)malloc(sizeof(struct entry)); - if (!new_entry) - return 0; - - new_entry->key = key; - new_entry->value = (void *)value; - - int inserted = insert_to_buckets_aux(h->buckets, h->bucket_count, new_entry); - if (inserted) { - h->size++; - itable_double_buckets(h); - - struct entry *new_entry = (struct entry *)malloc(sizeof(struct entry)); - if (!new_entry) - return 0; - + struct entry *new_entry = (struct entry *)xxmalloc(sizeof(struct entry)); new_entry->key = key; new_entry->value = (void *)value; @@ -351,5 +337,4 @@ int itable_nextkey(struct itable *h, UINT64_T *key, void **value) return 0; } } - /* vim: set noexpandtab tabstop=8: */ From 3d9d3f3851be5f3b8c94a871de960329bf5c60f2 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 9 Jan 2026 11:35:04 -0500 Subject: [PATCH 7/9] reset itable in wq worker when needed --- work_queue/src/work_queue_worker.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/work_queue/src/work_queue_worker.c b/work_queue/src/work_queue_worker.c index bf2be83aca..ccecbb93b9 100644 --- a/work_queue/src/work_queue_worker.c +++ b/work_queue/src/work_queue_worker.c @@ -1323,6 +1323,8 @@ static void kill_all_tasks() itable_firstkey(procs_table); while(itable_nextkey(procs_table,&taskid,(void**)&p)) { do_kill(taskid); + /* do_kill removes the task from the table, so we need to reset the iterator. */ + itable_firstkey(procs_table); } assert(itable_size(procs_table)==0); From 5c2739c91ccbd15a19d6e9ff9bb77a504206a5a1 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 9 Jan 2026 12:33:51 -0500 Subject: [PATCH 8/9] do not iterate with nextkey itables that are modified --- taskvine/src/manager/vine_manager.c | 46 ++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/taskvine/src/manager/vine_manager.c b/taskvine/src/manager/vine_manager.c index 66a672b835..a442bcc06e 100644 --- a/taskvine/src/manager/vine_manager.c +++ b/taskvine/src/manager/vine_manager.c @@ -1043,8 +1043,16 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w) vine_current_transfers_wipe_worker(q, w); - ITABLE_ITERATE(w->current_tasks, task_id, t) - { + /* Collect all task IDs first to avoid modifying itable during iteration */ + UINT64_T *task_ids = itable_keys_array(w->current_tasks); + int num_tasks = itable_size(w->current_tasks); + + for (int i = 0; i < num_tasks; i++) { + task_id = task_ids[i]; + t = itable_lookup(w->current_tasks, task_id); + if (!t) + continue; + if (t->time_when_commit_end >= t->time_when_commit_start) { timestamp_t delta_time = timestamp_get() - t->time_when_commit_end; t->time_workers_execute_failure += delta_time; @@ -1055,10 +1063,10 @@ static void cleanup_worker(struct vine_manager *q, struct vine_worker_info *w) reap_task_from_worker(q, w, t, VINE_TASK_READY); vine_task_clean(t); - - itable_firstkey(w->current_tasks); } + itable_free_keys_array(task_ids); + itable_clear(w->current_tasks, 0); itable_clear(w->current_libraries, 0); @@ -2988,12 +2996,23 @@ static void kill_empty_libraries_on_worker(struct vine_manager *q, struct vine_w { uint64_t libtask_id; struct vine_task *libtask; - ITABLE_ITERATE(w->current_libraries, libtask_id, libtask) - { + + /* Collect all library task IDs first to avoid modifying itable during iteration */ + UINT64_T *libtask_ids = itable_keys_array(w->current_libraries); + int num_libraries = itable_size(w->current_libraries); + + for (int i = 0; i < num_libraries; i++) { + libtask_id = libtask_ids[i]; + libtask = itable_lookup(w->current_libraries, libtask_id); + if (!libtask) + continue; + if (libtask->function_slots_inuse == 0 && (!t->needs_library || strcmp(t->needs_library, libtask->provides_library))) { vine_cancel_by_task_id(q, libtask_id); } } + + itable_free_keys_array(libtask_ids); } /* @@ -3636,9 +3655,17 @@ static int receive_tasks_from_worker(struct vine_manager *q, struct vine_worker_ max_to_receive = itable_size(w->current_tasks); } + /* Collect all task IDs first to avoid modifying itable during iteration */ + UINT64_T *task_ids = itable_keys_array(w->current_tasks); + int num_tasks = itable_size(w->current_tasks); + /* Now consider all tasks assigned to that worker .*/ - ITABLE_ITERATE(w->current_tasks, task_id, t) - { + for (int i = 0; i < num_tasks; i++) { + task_id = task_ids[i]; + t = itable_lookup(w->current_tasks, task_id); + if (!t) + continue; + /* If the task is waiting to be retrieved... */ if (t->state == VINE_TASK_WAITING_RETRIEVAL) { /* Attempt to fetch it. */ @@ -3651,11 +3678,14 @@ static int receive_tasks_from_worker(struct vine_manager *q, struct vine_worker_ } } else { /* But if the fetch failed, the worker is no longer vaild, bail out. */ + itable_free_keys_array(task_ids); return tasks_received; } } } + itable_free_keys_array(task_ids); + /* Consider removing the worker if it is empty. */ vine_manager_factory_worker_prune(q, w); From 7cef4feaa88ba26dd9a973e726783c85a87617b3 Mon Sep 17 00:00:00 2001 From: Benjamin Tovar Date: Fri, 9 Jan 2026 12:42:58 -0500 Subject: [PATCH 9/9] allocated uints instead of ints for itable key array --- dttools/src/itable.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dttools/src/itable.c b/dttools/src/itable.c index b3204d7b15..193dc9f087 100644 --- a/dttools/src/itable.c +++ b/dttools/src/itable.c @@ -95,7 +95,7 @@ void itable_delete(struct itable *h) UINT64_T *itable_keys_array(struct itable *h) { - UINT64_T *keys = (UINT64_T *)malloc(sizeof(int) * h->size); + UINT64_T *keys = (UINT64_T *)malloc(sizeof(UINT64_T) * h->size); int ikey = 0; struct entry *e, *f;