discontinued odd magic number, which was once used for performance measurements;
1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes peer and subgroup members to be
30 interrupted eventually. Interrupted tasks that lack regular
31 result information, will pick up parallel exceptions from the
32 cumulative group context (as Par_Exn).
34 * Future task groups may be canceled: present and future group
35 members will be interrupted eventually.
37 * Promised "passive" futures are fulfilled by external means. There
38 is no associated evaluation task, but other futures can depend on
39 them via regular join operations.
44 type task = Task_Queue.task
45 type group = Task_Queue.group
46 val new_group: group option -> group
47 val worker_task: unit -> task option
48 val worker_group: unit -> group option
49 val worker_subgroup: unit -> group
51 val task_of: 'a future -> task
52 val peek: 'a future -> 'a Exn.result option
53 val is_finished: 'a future -> bool
54 val ML_statistics: bool Unsynchronized.ref
55 val forked_proofs: int Unsynchronized.ref
56 val interruptible_task: ('a -> 'b) -> 'a -> 'b
57 val cancel_group: group -> unit
58 val cancel: 'a future -> unit
59 val error_msg: Position.T -> (serial * string) * string option -> unit
60 val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
61 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
62 val default_params: params
63 val forks: params -> (unit -> 'a) list -> 'a future list
64 val fork: (unit -> 'a) -> 'a future
65 val join_results: 'a future list -> 'a Exn.result list
66 val join_result: 'a future -> 'a Exn.result
67 val joins: 'a future list -> 'a list
68 val join: 'a future -> 'a
69 val value_result: 'a Exn.result -> 'a future
70 val value: 'a -> 'a future
71 val cond_forks: params -> (unit -> 'a) list -> 'a future list
72 val map: ('a -> 'b) -> 'a future -> 'b future
73 val promise_group: group -> (unit -> unit) -> 'a future
74 val promise: (unit -> unit) -> 'a future
75 val fulfill_result: 'a future -> 'a Exn.result -> unit
76 val fulfill: 'a future -> 'a -> unit
77 val terminate: group -> unit
78 val shutdown: unit -> unit
81 structure Future: FUTURE =
86 type task = Task_Queue.task;
87 type group = Task_Queue.group;
88 val new_group = Task_Queue.new_group;
94 val tag = Universal.tag () : task option Universal.tag;
96 fun worker_task () = the_default NONE (Thread.getLocal tag);
97 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
100 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
101 fun worker_subgroup () = new_group (worker_group ());
103 fun worker_joining e =
104 (case worker_task () of
106 | SOME task => Task_Queue.joining task e);
108 fun worker_waiting deps e =
109 (case worker_task () of
111 | SOME task => Task_Queue.waiting task deps e);
114 (* datatype future *)
116 type 'a result = 'a Exn.result Single_Assignment.var;
118 datatype 'a future = Future of
123 fun task_of (Future {task, ...}) = task;
124 fun result_of (Future {result, ...}) = result;
126 fun peek x = Single_Assignment.peek (result_of x);
127 fun is_finished x = is_some (peek x);
133 (* synchronization *)
135 val scheduler_event = ConditionVar.conditionVar ();
136 val work_available = ConditionVar.conditionVar ();
137 val work_finished = ConditionVar.conditionVar ();
140 val lock = Mutex.mutex ();
143 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
145 fun wait cond = (*requires SYNCHRONIZED*)
146 Multithreading.sync_wait NONE NONE cond lock;
148 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
149 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
151 fun signal cond = (*requires SYNCHRONIZED*)
152 ConditionVar.signal cond;
154 fun broadcast cond = (*requires SYNCHRONIZED*)
155 ConditionVar.broadcast cond;
162 val queue = Unsynchronized.ref Task_Queue.empty;
163 val next = Unsynchronized.ref 0;
164 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
165 val canceled = Unsynchronized.ref ([]: group list);
166 val do_shutdown = Unsynchronized.ref false;
167 val max_workers = Unsynchronized.ref 0;
168 val max_active = Unsynchronized.ref 0;
169 val worker_trend = Unsynchronized.ref 0;
171 val status_ticks = Unsynchronized.ref 0;
172 val last_round = Unsynchronized.ref Time.zeroTime;
173 val next_round = seconds 0.05;
175 datatype worker_state = Working | Waiting | Sleeping;
176 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
178 fun count_workers state = (*requires SYNCHRONIZED*)
179 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
185 val ML_statistics = Unsynchronized.ref false;
186 val forked_proofs = Unsynchronized.ref 0;
188 fun report_status () = (*requires SYNCHRONIZED*)
189 if ! ML_statistics then
191 val {ready, pending, running, passive} = Task_Queue.status (! queue);
192 val total = length (! workers);
193 val active = count_workers Working;
194 val waiting = count_workers Waiting;
196 [("now", signed_string_of_real (Time.toReal (Time.now ()))),
197 ("tasks_proof", Markup.print_int (! forked_proofs)),
198 ("tasks_ready", Markup.print_int ready),
199 ("tasks_pending", Markup.print_int pending),
200 ("tasks_running", Markup.print_int running),
201 ("tasks_passive", Markup.print_int passive),
202 ("workers_total", Markup.print_int total),
203 ("workers_active", Markup.print_int active),
204 ("workers_waiting", Markup.print_int waiting)] @
205 ML_Statistics.get ();
207 Output.protocol_message (Markup.ML_statistics :: stats) ""
208 handle Fail msg => warning msg
213 (* cancellation primitives *)
215 fun cancel_now group = (*requires SYNCHRONIZED*)
217 val running = Task_Queue.cancel (! queue) group;
218 val _ = running |> List.app (fn thread =>
219 if Simple_Thread.is_self thread then ()
220 else Simple_Thread.interrupt_unsynchronized thread);
223 fun cancel_all () = (*requires SYNCHRONIZED*)
225 val (groups, threads) = Task_Queue.cancel_all (! queue);
226 val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
229 fun cancel_later group = (*requires SYNCHRONIZED*)
230 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
231 broadcast scheduler_event);
233 fun interruptible_task f x =
234 (if Multithreading.available then
235 Multithreading.with_attributes
236 (if is_some (worker_task ())
237 then Multithreading.private_interrupts
238 else Multithreading.public_interrupts)
240 else interruptible f x)
241 before Multithreading.interrupted ();
246 fun worker_exec (task, jobs) =
248 val group = Task_Queue.group_of_task task;
249 val valid = not (Task_Queue.is_canceled group);
251 Task_Queue.running task (fn () =>
252 setmp_worker_task task (fn () =>
253 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
255 if ! Multithreading.trace >= 2 then
256 Output.protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
257 handle Fail msg => warning msg
259 val _ = SYNCHRONIZED "finish" (fn () =>
261 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
262 val test = Exn.capture Multithreading.interrupted ();
264 if ok andalso not (Exn.is_interrupt_exn test) then ()
265 else if null (cancel_now group) then ()
266 else cancel_later group;
267 val _ = broadcast work_finished;
268 val _ = if maximal then () else signal work_available;
272 fun worker_wait active cond = (*requires SYNCHRONIZED*)
275 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
277 | NONE => raise Fail "Unregistered worker thread");
278 val _ = state := (if active then Waiting else Sleeping);
280 val _ = state := Working;
283 fun worker_next () = (*requires SYNCHRONIZED*)
284 if length (! workers) > ! max_workers then
285 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
286 signal work_available;
288 else if count_workers Working > ! max_active then
289 (worker_wait false work_available; worker_next ())
291 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
292 NONE => (worker_wait false work_available; worker_next ())
293 | some => (signal work_available; some));
295 fun worker_loop name =
296 (case SYNCHRONIZED name (fn () => worker_next ()) of
298 | SOME work => (worker_exec work; worker_loop name));
300 fun worker_start name = (*requires SYNCHRONIZED*)
301 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
302 Unsynchronized.ref Working));
307 fun scheduler_next () = (*requires SYNCHRONIZED*)
309 val now = Time.now ();
310 val tick = Time.<= (Time.+ (! last_round, next_round), now);
311 val _ = if tick then last_round := now else ();
317 if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
319 if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
320 then report_status () else ();
323 if forall (Thread.isActive o #1) (! workers) then ()
326 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
327 val _ = workers := alive;
329 Multithreading.tracing 0 (fn () =>
330 "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
334 (* worker pool adjustments *)
336 val max_active0 = ! max_active;
337 val max_workers0 = ! max_workers;
339 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
340 val _ = max_active := m;
343 if ! do_shutdown then 0
344 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
346 if tick andalso mm > ! max_workers then
347 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
348 else if tick andalso mm < ! max_workers then
349 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
352 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
354 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
355 max_workers := Int.min (mm, 2 * m)
358 val missing = ! max_workers - length (! workers);
361 funpow missing (fn () =>
362 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
366 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
367 else signal work_available;
370 (* canceled groups *)
373 if null (! canceled) then ()
375 (Multithreading.tracing 1 (fn () =>
376 string_of_int (length (! canceled)) ^ " canceled groups");
377 Unsynchronized.change canceled (filter_out (null o cancel_now));
378 signal work_available);
383 val _ = Exn.release (wait_timeout next_round scheduler_event);
388 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
389 val continue = not (! do_shutdown andalso null (! workers));
390 val _ = if continue then () else (report_status (); scheduler := NONE);
392 val _ = broadcast scheduler_event;
395 if Exn.is_interrupt exn then
396 (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
397 List.app cancel_later (cancel_all ());
398 signal work_available; true)
401 fun scheduler_loop () =
403 Multithreading.with_attributes
404 (Multithreading.sync_interrupts Multithreading.public_interrupts)
405 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
406 do (); last_round := Time.zeroTime);
408 fun scheduler_active () = (*requires SYNCHRONIZED*)
409 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
411 fun scheduler_check () = (*requires SYNCHRONIZED*)
412 (do_shutdown := false;
413 if scheduler_active () then ()
414 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
422 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
424 val _ = if null (cancel_now group) then () else cancel_later group;
425 val _ = signal work_available;
426 val _ = scheduler_check ();
429 fun cancel_group group =
430 SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
432 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
437 fun error_msg pos ((serial, msg), exec_id) =
438 Position.setmp_thread_data pos (fn () =>
439 let val id = Position.get_id pos in
440 if is_none id orelse is_none exec_id orelse id = exec_id
441 then Output.error_msg' (serial, msg) else ()
444 fun identify_result pos res =
448 (case Position.get_id pos of
450 | SOME id => [(Markup.exec_idN, id)])
451 in Exn.Exn (Par_Exn.identify exec_id exn) end
454 fun assign_result group result res =
456 val _ = Single_Assignment.assign result res
457 handle exn as Fail _ =>
458 (case Single_Assignment.peek result of
459 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
462 (case the (Single_Assignment.peek result) of
464 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
465 | Exn.Res _ => true);
471 fun future_job group interrupts (e: unit -> 'a) =
473 val result = Single_Assignment.var "future" : 'a result;
474 val pos = Position.thread_data ();
479 Exn.capture (fn () =>
480 Multithreading.with_attributes
482 then Multithreading.private_interrupts else Multithreading.no_interrupts)
483 (fn _ => Position.setmp_thread_data pos e ())) ()
484 else Exn.interrupt_exn;
485 in assign_result group result (identify_result pos res) end;
486 in (result, job) end;
491 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
492 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
494 fun forks ({name, group, deps, pri, interrupts}: params) es =
500 NONE => worker_subgroup ()
502 fun enqueue e queue =
504 val (result, job) = future_job grp interrupts e;
505 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
506 val future = Future {promised = false, task = task, result = result};
507 in (future, queue') end;
509 SYNCHRONIZED "enqueue" (fn () =>
511 val (futures, queue') = fold_map enqueue es (! queue);
512 val _ = queue := queue';
513 val minimal = forall (not o Task_Queue.known_task queue') deps;
514 val _ = if minimal then signal work_available else ();
515 val _ = scheduler_check ();
520 (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
527 NONE => Exn.Exn (Fail "Unfinished future")
529 if Exn.is_interrupt_exn res then
530 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
532 | SOME exn => Exn.Exn exn)
537 fun join_next deps = (*requires SYNCHRONIZED*)
538 if null deps then NONE
540 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
543 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
544 | (SOME work, deps') => SOME (work, deps'));
546 fun execute_work NONE = ()
547 | execute_work (SOME (work, deps')) =
548 (worker_joining (fn () => worker_exec work); join_work deps')
550 Multithreading.with_attributes Multithreading.no_interrupts
551 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
555 fun join_results xs =
558 if forall is_finished xs then ()
559 else if Multithreading.self_critical () then
560 error "Cannot join future values within critical section"
561 else if is_some (worker_task ()) then join_work (map task_of xs)
562 else List.app (ignore o Single_Assignment.await o result_of) xs;
563 in map get_result xs end;
567 fun join_result x = singleton join_results x;
568 fun joins xs = Par_Exn.release_all (join_results xs);
569 fun join x = Exn.release (join_result x);
572 (* fast-path operations -- bypass task queue if possible *)
574 fun value_result (res: 'a Exn.result) =
576 val task = Task_Queue.dummy_task;
577 val group = Task_Queue.group_of_task task;
578 val result = Single_Assignment.var "value" : 'a result;
579 val _ = assign_result group result (identify_result (Position.thread_data ()) res);
580 in Future {promised = false, task = task, result = result} end;
582 fun value x = value_result (Exn.Res x);
584 fun cond_forks args es =
585 if Multithreading.enabled () then forks args es
586 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
589 if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
592 val task = task_of x;
593 val group = Task_Queue.group_of_task task;
594 val (result, job) = future_job group true (fn () => f (join x));
596 val extended = SYNCHRONIZED "extend" (fn () =>
597 (case Task_Queue.extend task job (! queue) of
598 SOME queue' => (queue := queue'; true)
601 if extended then Future {promised = false, task = task, result = result}
603 (singleton o cond_forks)
604 {name = "map_future", group = SOME group, deps = [task],
605 pri = Task_Queue.pri_of_task task, interrupts = true}
606 (fn () => f (join x))
610 (* promised futures -- fulfilled by external means *)
612 fun promise_group group abort : 'a future =
614 val result = Single_Assignment.var "promise" : 'a result;
615 fun assign () = assign_result group result Exn.interrupt_exn
616 handle Fail _ => true
618 if Exn.is_interrupt exn
619 then raise Fail "Concurrent attempt to fulfill promise"
622 Multithreading.with_attributes Multithreading.no_interrupts
623 (fn _ => Exn.release (Exn.capture assign () before abort ()));
624 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
625 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
626 in Future {promised = true, task = task, result = result} end;
628 fun promise abort = promise_group (worker_subgroup ()) abort;
630 fun fulfill_result (Future {promised, task, result}) res =
631 if not promised then raise Fail "Not a promised future"
634 val group = Task_Queue.group_of_task task;
635 val pos = Position.thread_data ();
637 assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
639 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
642 SYNCHRONIZED "fulfill_result" (fn () =>
643 Unsynchronized.change_result queue
644 (Task_Queue.dequeue_passive (Thread.self ()) task));
647 SOME true => worker_exec (task, [job])
649 | NONE => ignore (job (not (Task_Queue.is_canceled group))))
652 if is_some (Single_Assignment.peek result) then ()
653 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
656 fun fulfill x res = fulfill_result x (Exn.Res res);
661 fun terminate group =
664 SYNCHRONIZED "terminate" (fn () =>
665 let val _ = cancel_group_unsynchronized group;
666 in Task_Queue.group_tasks (! queue) group end);
668 if null tasks then ()
671 {name = "terminate", group = SOME (new_group NONE),
672 deps = tasks, pri = 0, interrupts = false} I
680 if not Multithreading.available then ()
681 else if is_some (worker_task ()) then
682 raise Fail "Cannot shutdown while running as worker thread"
684 SYNCHRONIZED "shutdown" (fn () =>
685 while scheduler_active () do
686 (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
687 wait scheduler_event));
690 (*final declarations of this structure!*)
691 val map = map_future;
695 type 'a future = 'a Future.future;