1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes peer and subgroup members to be
30 interrupted eventually. Interrupted tasks that lack regular
31 result information, will pick up parallel exceptions from the
32 cumulative group context (as Par_Exn).
34 * Future task groups may be canceled: present and future group
35 members will be interrupted eventually.
37 * Promised "passive" futures are fulfilled by external means. There
38 is no associated evaluation task, but other futures can depend on
39 them via regular join operations.
44 type task = Task_Queue.task
45 type group = Task_Queue.group
46 val new_group: group option -> group
47 val worker_task: unit -> task option
48 val worker_group: unit -> group option
49 val worker_subgroup: unit -> group
51 val task_of: 'a future -> task
52 val peek: 'a future -> 'a Exn.result option
53 val is_finished: 'a future -> bool
54 val ML_statistics: bool Unsynchronized.ref
55 val forked_proofs: int Unsynchronized.ref
56 val interruptible_task: ('a -> 'b) -> 'a -> 'b
57 val cancel_group: group -> unit
58 val cancel: 'a future -> unit
59 val error_msg: Position.T -> (serial * string) * string option -> unit
60 val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
61 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
62 val default_params: params
63 val forks: params -> (unit -> 'a) list -> 'a future list
64 val fork: (unit -> 'a) -> 'a future
65 val join_results: 'a future list -> 'a Exn.result list
66 val join_result: 'a future -> 'a Exn.result
67 val joins: 'a future list -> 'a list
68 val join: 'a future -> 'a
69 val value_result: 'a Exn.result -> 'a future
70 val value: 'a -> 'a future
71 val cond_forks: params -> (unit -> 'a) list -> 'a future list
72 val map: ('a -> 'b) -> 'a future -> 'b future
73 val promise_group: group -> (unit -> unit) -> 'a future
74 val promise: (unit -> unit) -> 'a future
75 val fulfill_result: 'a future -> 'a Exn.result -> unit
76 val fulfill: 'a future -> 'a -> unit
77 val terminate: group -> unit
78 val shutdown: unit -> unit
81 structure Future: FUTURE =
86 type task = Task_Queue.task;
87 type group = Task_Queue.group;
88 val new_group = Task_Queue.new_group;
94 val tag = Universal.tag () : task option Universal.tag;
96 fun worker_task () = the_default NONE (Thread.getLocal tag);
97 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
100 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
101 fun worker_subgroup () = new_group (worker_group ());
103 fun worker_joining e =
104 (case worker_task () of
106 | SOME task => Task_Queue.joining task e);
108 fun worker_waiting deps e =
109 (case worker_task () of
111 | SOME task => Task_Queue.waiting task deps e);
114 (* datatype future *)
116 type 'a result = 'a Exn.result Single_Assignment.var;
118 datatype 'a future = Future of
123 fun task_of (Future {task, ...}) = task;
124 fun result_of (Future {result, ...}) = result;
126 fun peek x = Single_Assignment.peek (result_of x);
127 fun is_finished x = is_some (peek x);
133 (* synchronization *)
135 val scheduler_event = ConditionVar.conditionVar ();
136 val work_available = ConditionVar.conditionVar ();
137 val work_finished = ConditionVar.conditionVar ();
140 val lock = Mutex.mutex ();
143 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
145 fun wait cond = (*requires SYNCHRONIZED*)
146 Multithreading.sync_wait NONE NONE cond lock;
148 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
149 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
151 fun signal cond = (*requires SYNCHRONIZED*)
152 ConditionVar.signal cond;
154 fun broadcast cond = (*requires SYNCHRONIZED*)
155 ConditionVar.broadcast cond;
157 fun broadcast_work () = (*requires SYNCHRONIZED*)
158 (ConditionVar.broadcast work_available;
159 ConditionVar.broadcast work_finished);
166 val queue = Unsynchronized.ref Task_Queue.empty;
167 val next = Unsynchronized.ref 0;
168 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
169 val canceled = Unsynchronized.ref ([]: group list);
170 val do_shutdown = Unsynchronized.ref false;
171 val max_workers = Unsynchronized.ref 0;
172 val max_active = Unsynchronized.ref 0;
173 val worker_trend = Unsynchronized.ref 0;
175 val status_ticks = Unsynchronized.ref 0;
176 val last_round = Unsynchronized.ref Time.zeroTime;
177 val next_round = seconds 0.05;
179 datatype worker_state = Working | Waiting | Sleeping;
180 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
182 fun count_workers state = (*requires SYNCHRONIZED*)
183 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
189 val ML_statistics = Unsynchronized.ref false;
190 val forked_proofs = Unsynchronized.ref 0;
192 fun report_status () = (*requires SYNCHRONIZED*)
193 if ! ML_statistics then
195 val {ready, pending, running, passive} = Task_Queue.status (! queue);
196 val total = length (! workers);
197 val active = count_workers Working;
198 val waiting = count_workers Waiting;
200 [("now", signed_string_of_real (Time.toReal (Time.now ()))),
201 ("tasks_proof", Markup.print_int (! forked_proofs)),
202 ("tasks_ready", Markup.print_int ready),
203 ("tasks_pending", Markup.print_int pending),
204 ("tasks_running", Markup.print_int running),
205 ("tasks_passive", Markup.print_int passive),
206 ("workers_total", Markup.print_int total),
207 ("workers_active", Markup.print_int active),
208 ("workers_waiting", Markup.print_int waiting)] @
209 ML_Statistics.get ();
211 Output.protocol_message (Markup.ML_statistics :: stats) ""
212 handle Fail msg => warning msg
217 (* cancellation primitives *)
219 fun cancel_now group = (*requires SYNCHRONIZED*)
221 val running = Task_Queue.cancel (! queue) group;
222 val _ = running |> List.app (fn thread =>
223 if Simple_Thread.is_self thread then ()
224 else Simple_Thread.interrupt_unsynchronized thread);
227 fun cancel_all () = (*requires SYNCHRONIZED*)
229 val (groups, threads) = Task_Queue.cancel_all (! queue);
230 val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
233 fun cancel_later group = (*requires SYNCHRONIZED*)
234 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
235 broadcast scheduler_event);
237 fun interruptible_task f x =
238 (if Multithreading.available then
239 Multithreading.with_attributes
240 (if is_some (worker_task ())
241 then Multithreading.private_interrupts
242 else Multithreading.public_interrupts)
244 else interruptible f x)
245 before Multithreading.interrupted ();
250 fun worker_exec (task, jobs) =
252 val group = Task_Queue.group_of_task task;
253 val valid = not (Task_Queue.is_canceled group);
255 Task_Queue.running task (fn () =>
256 setmp_worker_task task (fn () =>
257 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
259 if ! Multithreading.trace >= 2 then
260 Output.protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
261 handle Fail msg => warning msg
263 val _ = SYNCHRONIZED "finish" (fn () =>
265 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
266 val test = Exn.capture Multithreading.interrupted ();
268 if ok andalso not (Exn.is_interrupt_exn test) then ()
269 else if null (cancel_now group) then ()
270 else cancel_later group;
271 val _ = broadcast work_finished;
272 val _ = if maximal then () else signal work_available;
276 fun worker_wait active cond = (*requires SYNCHRONIZED*)
279 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
281 | NONE => raise Fail "Unregistered worker thread");
282 val _ = state := (if active then Waiting else Sleeping);
284 val _ = state := Working;
287 fun worker_next () = (*requires SYNCHRONIZED*)
288 if length (! workers) > ! max_workers then
289 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
290 signal work_available;
292 else if count_workers Working > ! max_active then
293 (worker_wait false work_available; worker_next ())
295 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
296 NONE => (worker_wait false work_available; worker_next ())
297 | some => (signal work_available; some));
299 fun worker_loop name =
300 (case SYNCHRONIZED name (fn () => worker_next ()) of
302 | SOME work => (worker_exec work; worker_loop name));
304 fun worker_start name = (*requires SYNCHRONIZED*)
305 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
306 Unsynchronized.ref Working));
311 fun scheduler_next () = (*requires SYNCHRONIZED*)
313 val now = Time.now ();
314 val tick = Time.<= (Time.+ (! last_round, next_round), now);
315 val _ = if tick then last_round := now else ();
321 if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
323 if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
324 then report_status () else ();
327 if forall (Thread.isActive o #1) (! workers) then ()
330 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
331 val _ = workers := alive;
333 Multithreading.tracing 0 (fn () =>
334 "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
338 (* worker pool adjustments *)
340 val max_active0 = ! max_active;
341 val max_workers0 = ! max_workers;
343 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
344 val _ = max_active := m;
347 if ! do_shutdown then 0
348 else if m = 9999 then 1
349 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
351 if tick andalso mm > ! max_workers then
352 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
353 else if tick andalso mm < ! max_workers then
354 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
357 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
359 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
360 max_workers := Int.min (mm, 2 * m)
363 val missing = ! max_workers - length (! workers);
366 funpow missing (fn () =>
367 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
371 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
372 else signal work_available;
375 (* canceled groups *)
378 if null (! canceled) then ()
380 (Multithreading.tracing 1 (fn () =>
381 string_of_int (length (! canceled)) ^ " canceled groups");
382 Unsynchronized.change canceled (filter_out (null o cancel_now));
388 val _ = Exn.release (wait_timeout next_round scheduler_event);
393 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
394 val continue = not (! do_shutdown andalso null (! workers));
395 val _ = if continue then () else (report_status (); scheduler := NONE);
397 val _ = broadcast scheduler_event;
400 if Exn.is_interrupt exn then
401 (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
402 List.app cancel_later (cancel_all ());
403 broadcast_work (); true)
406 fun scheduler_loop () =
408 Multithreading.with_attributes
409 (Multithreading.sync_interrupts Multithreading.public_interrupts)
410 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
411 do (); last_round := Time.zeroTime);
413 fun scheduler_active () = (*requires SYNCHRONIZED*)
414 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
416 fun scheduler_check () = (*requires SYNCHRONIZED*)
417 (do_shutdown := false;
418 if scheduler_active () then ()
419 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
427 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
429 val _ = if null (cancel_now group) then () else cancel_later group;
430 val _ = signal work_available;
431 val _ = scheduler_check ();
434 fun cancel_group group =
435 SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
437 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
442 fun error_msg pos ((serial, msg), exec_id) =
443 Position.setmp_thread_data pos (fn () =>
444 let val id = Position.get_id pos in
445 if is_none id orelse is_none exec_id orelse id = exec_id
446 then Output.error_msg' (serial, msg) else ()
449 fun identify_result pos res =
453 (case Position.get_id pos of
455 | SOME id => [(Markup.exec_idN, id)])
456 in Exn.Exn (Par_Exn.identify exec_id exn) end
459 fun assign_result group result res =
461 val _ = Single_Assignment.assign result res
462 handle exn as Fail _ =>
463 (case Single_Assignment.peek result of
464 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
467 (case the (Single_Assignment.peek result) of
469 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
470 | Exn.Res _ => true);
476 fun future_job group interrupts (e: unit -> 'a) =
478 val result = Single_Assignment.var "future" : 'a result;
479 val pos = Position.thread_data ();
484 Exn.capture (fn () =>
485 Multithreading.with_attributes
487 then Multithreading.private_interrupts else Multithreading.no_interrupts)
488 (fn _ => Position.setmp_thread_data pos e ())) ()
489 else Exn.interrupt_exn;
490 in assign_result group result (identify_result pos res) end;
491 in (result, job) end;
496 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
497 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
499 fun forks ({name, group, deps, pri, interrupts}: params) es =
505 NONE => worker_subgroup ()
507 fun enqueue e queue =
509 val (result, job) = future_job grp interrupts e;
510 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
511 val future = Future {promised = false, task = task, result = result};
512 in (future, queue') end;
514 SYNCHRONIZED "enqueue" (fn () =>
516 val (futures, queue') = fold_map enqueue es (! queue);
517 val _ = queue := queue';
518 val minimal = forall (not o Task_Queue.known_task queue') deps;
519 val _ = if minimal then signal work_available else ();
520 val _ = scheduler_check ();
525 (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
532 NONE => Exn.Exn (Fail "Unfinished future")
534 if Exn.is_interrupt_exn res then
535 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
537 | SOME exn => Exn.Exn exn)
542 fun join_next deps = (*requires SYNCHRONIZED*)
543 if null deps then NONE
545 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
548 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
549 | (SOME work, deps') => SOME (work, deps'));
551 fun execute_work NONE = ()
552 | execute_work (SOME (work, deps')) =
553 (worker_joining (fn () => worker_exec work); join_work deps')
555 Multithreading.with_attributes Multithreading.no_interrupts
556 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
560 fun join_results xs =
563 if forall is_finished xs then ()
564 else if Multithreading.self_critical () then
565 error "Cannot join future values within critical section"
566 else if is_some (worker_task ()) then join_work (map task_of xs)
567 else List.app (ignore o Single_Assignment.await o result_of) xs;
568 in map get_result xs end;
572 fun join_result x = singleton join_results x;
573 fun joins xs = Par_Exn.release_all (join_results xs);
574 fun join x = Exn.release (join_result x);
577 (* fast-path versions -- bypassing task queue *)
579 fun value_result (res: 'a Exn.result) =
581 val task = Task_Queue.dummy_task;
582 val group = Task_Queue.group_of_task task;
583 val result = Single_Assignment.var "value" : 'a result;
584 val _ = assign_result group result (identify_result (Position.thread_data ()) res);
585 in Future {promised = false, task = task, result = result} end;
587 fun value x = value_result (Exn.Res x);
589 fun cond_forks args es =
590 if Multithreading.enabled () then forks args es
591 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
594 if is_finished x then value (f (join x))
597 val task = task_of x;
598 val group = Task_Queue.group_of_task task;
599 val (result, job) = future_job group true (fn () => f (join x));
601 val extended = SYNCHRONIZED "extend" (fn () =>
602 (case Task_Queue.extend task job (! queue) of
603 SOME queue' => (queue := queue'; true)
606 if extended then Future {promised = false, task = task, result = result}
608 (singleton o cond_forks)
609 {name = "map_future", group = SOME group, deps = [task],
610 pri = Task_Queue.pri_of_task task, interrupts = true}
611 (fn () => f (join x))
615 (* promised futures -- fulfilled by external means *)
617 fun promise_group group abort : 'a future =
619 val result = Single_Assignment.var "promise" : 'a result;
620 fun assign () = assign_result group result Exn.interrupt_exn
621 handle Fail _ => true
623 if Exn.is_interrupt exn
624 then raise Fail "Concurrent attempt to fulfill promise"
627 Multithreading.with_attributes Multithreading.no_interrupts
628 (fn _ => Exn.release (Exn.capture assign () before abort ()));
629 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
630 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
631 in Future {promised = true, task = task, result = result} end;
633 fun promise abort = promise_group (worker_subgroup ()) abort;
635 fun fulfill_result (Future {promised, task, result}) res =
636 if not promised then raise Fail "Not a promised future"
639 val group = Task_Queue.group_of_task task;
640 val pos = Position.thread_data ();
642 assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
644 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
647 SYNCHRONIZED "fulfill_result" (fn () =>
648 Unsynchronized.change_result queue
649 (Task_Queue.dequeue_passive (Thread.self ()) task));
652 SOME true => worker_exec (task, [job])
654 | NONE => ignore (job (not (Task_Queue.is_canceled group))))
657 if is_some (Single_Assignment.peek result) then ()
658 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
661 fun fulfill x res = fulfill_result x (Exn.Res res);
666 fun terminate group =
669 SYNCHRONIZED "terminate" (fn () =>
670 let val _ = cancel_group_unsynchronized group;
671 in Task_Queue.group_tasks (! queue) group end);
673 if null tasks then ()
676 {name = "terminate", group = SOME (new_group NONE),
677 deps = tasks, pri = 0, interrupts = false} I
685 if Multithreading.available then
686 SYNCHRONIZED "shutdown" (fn () =>
687 while scheduler_active () do
688 (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
689 wait scheduler_event;
694 (*final declarations of this structure!*)
695 val map = map_future;
699 type 'a future = 'a Future.future;