Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
ocaml-compiler: [4.14.x]
ocaml-compiler: [4.14.x, 5.3.x]
runs-on: ${{ matrix.os }}
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Use OCaml ${{ matrix.ocaml-compiler }}
uses: ocaml/setup-ocaml@v2
uses: ocaml/setup-ocaml@v3
with:
ocaml-compiler: ${{ matrix.ocaml-compiler }}

Expand Down
14 changes: 14 additions & 0 deletions lib/events.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ let pp_system_event _ fmt = function

type 'a queue_event =
| WaitQueue1 : 'b Queue.t * ('b -> 'a) -> 'a queue_event
| WaitQueueBatch1 : 'b Queue.t * ('b -> 'b list -> 'a) -> 'a queue_event
| WaitQueue2 : 'b Queue.t * 'c Queue.t * ('b -> 'c -> 'a) -> 'a queue_event

let pp_queue_event _ fmt = function
| WaitQueue1 _ -> Stdlib.Format.fprintf fmt "WaitQueue1"
| WaitQueueBatch1 _ -> Stdlib.Format.fprintf fmt "WaitQueueBatch1"
| WaitQueue2 _ -> Stdlib.Format.fprintf fmt "WaitQueue2"

type 'a task_event = 'a [@@deriving show]
Expand Down Expand Up @@ -96,6 +98,7 @@ let map_system_event f = function

let map_queue_event f = function
| WaitQueue1(q1,k) -> WaitQueue1(q1,(fun x -> f (k x)))
| WaitQueueBatch1(q1,k) -> WaitQueueBatch1(q1,(fun x xs -> f (k x xs)))
| WaitQueue2(q1,q2,k) -> WaitQueue2(q1,q2,(fun x y -> f (k x y)))

let map_task_event f x = f x
Expand Down Expand Up @@ -180,6 +183,9 @@ let httpcle ?priority ?name fd k : 'a Event.t =
let queue ?priority ?name q1 k : 'a Event.t =
make_event ?priority ?name @@ QueueEvent (WaitQueue1(q1,k))

let queue_all ?priority ?name q1 k : 'a Event.t =
make_event ?priority ?name @@ QueueEvent (WaitQueueBatch1(q1,k))

let queues ?priority ?name q1 q2 k : 'a Event.t =
make_event?priority ?name @@ QueueEvent (WaitQueue2(q1,q2,k))

Expand All @@ -191,6 +197,14 @@ let now ?priority ?name task : 'a Event.t =
let advance_queue _ _ = function
| WaitQueue1(q1,_) as x when Queue.is_empty q1 -> (), No x
| WaitQueue1(q1,k) -> (), Yes(k (Queue.pop q1))
| WaitQueueBatch1(q1,_) as x when Queue.is_empty q1 -> (), No x
| WaitQueueBatch1(q1,k) ->
let hd = Queue.pop q1 in
let tl = ref [] in
while not @@ Queue.is_empty q1 do
tl := Queue.pop q1 :: !tl
done;
(), Yes(k hd (List.rev !tl))
| WaitQueue2(q1,q2,_) as x when Queue.is_empty q1 || Queue.is_empty q2 -> (), No x
| WaitQueue2(q1,q2,k) -> (), Yes(k (Queue.pop q1) (Queue.pop q2))

Expand Down
5 changes: 5 additions & 0 deletions lib/sel.mli
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ module On : sig
val queue : ?priority:int -> ?name:string ->
'b Queue.t -> ('b -> 'a) -> 'a Event.t

(** Synchronization events between a component and an event.
The queue is emptied, useful to process the contents in batches *)
val queue_all : ?priority:int -> ?name:string ->
'b Queue.t -> ('b -> 'b list -> 'a) -> 'a Event.t

end

(** mix a regular computations with blocking events. E.g.
Expand Down
17 changes: 17 additions & 0 deletions test/test1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ let%test_unit "sel.event.queue" =
[%test_eq: bool] (Todo.is_empty todo) true;
;;

(* queue does not run unless something is pushed *)
let%test_unit "sel.event.queue" =
let q = Stdlib.Queue.create () in
let todo = Todo.add Todo.empty [On.queue_all q (fun () l -> l)] in
(* no progress since the queue is empty *)
let _ready, todo = wait_timeout todo in
(* progress since the queue has a token *)
Stdlib.Queue.push () q;
Stdlib.Queue.push () q;
Stdlib.Queue.push () q;
let ready, todo = pop_opt todo in
[%test_eq: bool] (Option.is_none ready) false;
[%test_eq: unit list option] ready (Some [();()]);
[%test_eq: bool] (Todo.is_empty todo) true;
[%test_eq: bool] (Stdlib.Queue.is_empty q) true;
;;

(* queue2 does not advance unless both queues are pushed *)
let%test_unit "sel.event.queue2" =
let q1 = Stdlib.Queue.create () in
Expand Down