|
| 1 | + |
| 2 | +alter table "public"."sync_info" add column "last_success_start" timestamp with time zone; |
| 3 | + |
| 4 | +alter table "public"."sync_info" alter column "last_task_start" set not null; |
| 5 | + |
| 6 | +update sync_info set last_success_start = last_task_start where last_task_end is not null; |
| 7 | + |
| 8 | +CREATE OR REPLACE FUNCTION public.end_sync_task(s_target bigint, s_function character varying, s_worker character varying, s_status public.task_status) |
| 9 | + RETURNS void |
| 10 | + LANGUAGE plpgsql |
| 11 | + SET search_path TO '' |
| 12 | +AS $function$ |
| 13 | +DECLARE t_id INTEGER; |
| 14 | +DECLARE t_worker varchar; |
| 15 | +DECLARE t_status public.task_status; |
| 16 | +DECLARE t_failure_count SMALLINT; |
| 17 | +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; |
| 18 | +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; |
| 19 | +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; |
| 20 | +BEGIN |
| 21 | + SELECT id, worker, status, failure_count, last_task_start, last_task_end, last_success_start |
| 22 | + INTO STRICT t_id, t_worker, t_status, t_failure_count, t_last_task_start, t_last_task_end, t_last_success_start |
| 23 | + FROM public.sync_info WHERE sync_target = s_target AND sync_function = s_function; |
| 24 | + ASSERT s_status > 'active'; |
| 25 | + ASSERT t_worker = s_worker, 'Wrong worker'; |
| 26 | + ASSERT s_status >= t_status, 'do not go back in status'; |
| 27 | + IF s_status = 'complete' THEN |
| 28 | + t_last_task_end := now(); |
| 29 | + t_last_success_start := t_last_task_start; |
| 30 | + t_failure_count := 0; |
| 31 | + ELSE |
| 32 | + IF t_status != s_status THEN |
| 33 | + t_failure_count := t_failure_count + 1; |
| 34 | + END IF; |
| 35 | + END IF; |
| 36 | + |
| 37 | + UPDATE public.sync_info |
| 38 | + SET status = s_status, |
| 39 | + task_times_out_at=null, |
| 40 | + last_task_end=t_last_task_end, |
| 41 | + last_success_start=t_last_success_start, |
| 42 | + failure_count=t_failure_count |
| 43 | + WHERE id=t_id; |
| 44 | +END; |
| 45 | +$function$ |
| 46 | +; |
| 47 | + |
| 48 | + |
| 49 | +CREATE OR REPLACE FUNCTION public.propose_sync_task(s_target bigint, s_function character varying, s_worker character varying, timeout interval, task_interval interval) |
| 50 | + RETURNS timestamp with time zone |
| 51 | + LANGUAGE plpgsql |
| 52 | + SET search_path TO '' |
| 53 | +AS $function$ |
| 54 | +DECLARE s_id INTEGER; |
| 55 | +DECLARE start_time TIMESTAMP WITH TIME ZONE := now(); |
| 56 | +DECLARE t_status public.task_status; |
| 57 | +DECLARE t_failure_count SMALLINT; |
| 58 | +DECLARE t_last_task_start TIMESTAMP WITH TIME ZONE; |
| 59 | +DECLARE t_last_task_end TIMESTAMP WITH TIME ZONE; |
| 60 | +DECLARE t_times_out_at TIMESTAMP WITH TIME ZONE; |
| 61 | +DECLARE t_last_success_start TIMESTAMP WITH TIME ZONE; |
| 62 | +DECLARE result TIMESTAMP WITH TIME ZONE; |
| 63 | +BEGIN |
| 64 | + ASSERT timeout * 2 < task_interval; |
| 65 | + ASSERT timeout >= '1s'::interval; |
| 66 | + ASSERT task_interval >= '5s'::interval; |
| 67 | + INSERT INTO public.sync_info (sync_target, sync_function, status, worker, last_task_start, task_times_out_at) |
| 68 | + VALUES (s_target, s_function, 'active', s_worker, start_time, start_time+timeout) |
| 69 | + ON CONFLICT (sync_target, sync_function) DO NOTHING |
| 70 | + RETURNING id INTO s_id; |
| 71 | + IF s_id IS NOT NULL THEN |
| 72 | + -- totally new_row, I'm on the task |
| 73 | + -- return last time it was run successfully |
| 74 | + SELECT max(last_task_start) INTO result FROM public.sync_info |
| 75 | + WHERE sync_target = s_target |
| 76 | + AND sync_function = s_function |
| 77 | + AND status = 'complete'; |
| 78 | + RETURN result; |
| 79 | + END IF; |
| 80 | + -- now we know it pre-existed. Maybe already active. |
| 81 | + SELECT id INTO STRICT s_id |
| 82 | + FROM public.sync_info |
| 83 | + WHERE sync_target = s_target AND sync_function = s_function |
| 84 | + FOR UPDATE; |
| 85 | + SELECT status, failure_count, last_task_start, last_task_end, task_times_out_at, last_success_start |
| 86 | + INTO t_status, t_failure_count, t_last_task_start, t_last_task_end, t_times_out_at, t_last_success_start |
| 87 | + FROM public.sync_info |
| 88 | + WHERE id = s_id; |
| 89 | + |
| 90 | + IF t_status = 'active' AND t_last_task_start >= coalesce(t_last_task_end, t_last_task_start) AND start_time > t_times_out_at THEN |
| 91 | + t_status := 'timeout'; |
| 92 | + t_failure_count := t_failure_count + 1; |
| 93 | + END IF; |
| 94 | + -- basic backoff |
| 95 | + task_interval := task_interval * (1+t_failure_count); |
| 96 | + IF coalesce(t_last_task_end, t_last_task_start) + task_interval < now() THEN |
| 97 | + -- we are ready to take on the task |
| 98 | + result := t_last_success_start; |
| 99 | + UPDATE public.sync_info |
| 100 | + SET worker=s_worker, |
| 101 | + status='active', |
| 102 | + task_times_out_at = now() + timeout, |
| 103 | + last_task_start = start_time, |
| 104 | + failure_count=t_failure_count, |
| 105 | + last_task_end = NULL |
| 106 | + WHERE id=s_id; |
| 107 | + ELSE |
| 108 | + -- the task has been tried recently enough |
| 109 | + IF t_status = 'timeout' THEN |
| 110 | + UPDATE public.sync_info |
| 111 | + SET status=t_status, failure_count=t_failure_count |
| 112 | + WHERE id=s_id; |
| 113 | + END IF; |
| 114 | + result := coalesce(t_last_task_end, t_last_task_start) + task_interval; |
| 115 | + END IF; |
| 116 | + |
| 117 | + RETURN result; |
| 118 | +END; |
| 119 | +$function$ |
| 120 | +; |
0 commit comments