disallow shutdown from worker, which would lead to deadlock since the scheduler cannot terminate;
1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes peer and subgroup members to be
30 interrupted eventually. Interrupted tasks that lack regular
31 result information, will pick up parallel exceptions from the
32 cumulative group context (as Par_Exn).
34 * Future task groups may be canceled: present and future group
35 members will be interrupted eventually.
37 * Promised "passive" futures are fulfilled by external means. There
38 is no associated evaluation task, but other futures can depend on
39 them via regular join operations.
44 type task = Task_Queue.task
45 type group = Task_Queue.group
46 val new_group: group option -> group
47 val worker_task: unit -> task option
48 val worker_group: unit -> group option
49 val worker_subgroup: unit -> group
51 val task_of: 'a future -> task
52 val peek: 'a future -> 'a Exn.result option
53 val is_finished: 'a future -> bool
54 val ML_statistics: bool Unsynchronized.ref
55 val forked_proofs: int Unsynchronized.ref
56 val interruptible_task: ('a -> 'b) -> 'a -> 'b
57 val cancel_group: group -> unit
58 val cancel: 'a future -> unit
59 val error_msg: Position.T -> (serial * string) * string option -> unit
60 val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
61 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
62 val default_params: params
63 val forks: params -> (unit -> 'a) list -> 'a future list
64 val fork: (unit -> 'a) -> 'a future
65 val join_results: 'a future list -> 'a Exn.result list
66 val join_result: 'a future -> 'a Exn.result
67 val joins: 'a future list -> 'a list
68 val join: 'a future -> 'a
69 val value_result: 'a Exn.result -> 'a future
70 val value: 'a -> 'a future
71 val cond_forks: params -> (unit -> 'a) list -> 'a future list
72 val map: ('a -> 'b) -> 'a future -> 'b future
73 val promise_group: group -> (unit -> unit) -> 'a future
74 val promise: (unit -> unit) -> 'a future
75 val fulfill_result: 'a future -> 'a Exn.result -> unit
76 val fulfill: 'a future -> 'a -> unit
77 val terminate: group -> unit
78 val shutdown: unit -> unit
81 structure Future: FUTURE =
86 type task = Task_Queue.task;
87 type group = Task_Queue.group;
88 val new_group = Task_Queue.new_group;
94 val tag = Universal.tag () : task option Universal.tag;
96 fun worker_task () = the_default NONE (Thread.getLocal tag);
97 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
100 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
101 fun worker_subgroup () = new_group (worker_group ());
103 fun worker_joining e =
104 (case worker_task () of
106 | SOME task => Task_Queue.joining task e);
108 fun worker_waiting deps e =
109 (case worker_task () of
111 | SOME task => Task_Queue.waiting task deps e);
114 (* datatype future *)
116 type 'a result = 'a Exn.result Single_Assignment.var;
118 datatype 'a future = Future of
123 fun task_of (Future {task, ...}) = task;
124 fun result_of (Future {result, ...}) = result;
126 fun peek x = Single_Assignment.peek (result_of x);
127 fun is_finished x = is_some (peek x);
133 (* synchronization *)
135 val scheduler_event = ConditionVar.conditionVar ();
136 val work_available = ConditionVar.conditionVar ();
137 val work_finished = ConditionVar.conditionVar ();
140 val lock = Mutex.mutex ();
143 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
145 fun wait cond = (*requires SYNCHRONIZED*)
146 Multithreading.sync_wait NONE NONE cond lock;
148 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
149 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
151 fun signal cond = (*requires SYNCHRONIZED*)
152 ConditionVar.signal cond;
154 fun broadcast cond = (*requires SYNCHRONIZED*)
155 ConditionVar.broadcast cond;
162 val queue = Unsynchronized.ref Task_Queue.empty;
163 val next = Unsynchronized.ref 0;
164 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
165 val canceled = Unsynchronized.ref ([]: group list);
166 val do_shutdown = Unsynchronized.ref false;
167 val max_workers = Unsynchronized.ref 0;
168 val max_active = Unsynchronized.ref 0;
169 val worker_trend = Unsynchronized.ref 0;
171 val status_ticks = Unsynchronized.ref 0;
172 val last_round = Unsynchronized.ref Time.zeroTime;
173 val next_round = seconds 0.05;
175 datatype worker_state = Working | Waiting | Sleeping;
176 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
178 fun count_workers state = (*requires SYNCHRONIZED*)
179 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
185 val ML_statistics = Unsynchronized.ref false;
186 val forked_proofs = Unsynchronized.ref 0;
188 fun report_status () = (*requires SYNCHRONIZED*)
189 if ! ML_statistics then
191 val {ready, pending, running, passive} = Task_Queue.status (! queue);
192 val total = length (! workers);
193 val active = count_workers Working;
194 val waiting = count_workers Waiting;
196 [("now", signed_string_of_real (Time.toReal (Time.now ()))),
197 ("tasks_proof", Markup.print_int (! forked_proofs)),
198 ("tasks_ready", Markup.print_int ready),
199 ("tasks_pending", Markup.print_int pending),
200 ("tasks_running", Markup.print_int running),
201 ("tasks_passive", Markup.print_int passive),
202 ("workers_total", Markup.print_int total),
203 ("workers_active", Markup.print_int active),
204 ("workers_waiting", Markup.print_int waiting)] @
205 ML_Statistics.get ();
207 Output.protocol_message (Markup.ML_statistics :: stats) ""
208 handle Fail msg => warning msg
213 (* cancellation primitives *)
215 fun cancel_now group = (*requires SYNCHRONIZED*)
217 val running = Task_Queue.cancel (! queue) group;
218 val _ = running |> List.app (fn thread =>
219 if Simple_Thread.is_self thread then ()
220 else Simple_Thread.interrupt_unsynchronized thread);
223 fun cancel_all () = (*requires SYNCHRONIZED*)
225 val (groups, threads) = Task_Queue.cancel_all (! queue);
226 val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
229 fun cancel_later group = (*requires SYNCHRONIZED*)
230 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
231 broadcast scheduler_event);
233 fun interruptible_task f x =
234 (if Multithreading.available then
235 Multithreading.with_attributes
236 (if is_some (worker_task ())
237 then Multithreading.private_interrupts
238 else Multithreading.public_interrupts)
240 else interruptible f x)
241 before Multithreading.interrupted ();
246 fun worker_exec (task, jobs) =
248 val group = Task_Queue.group_of_task task;
249 val valid = not (Task_Queue.is_canceled group);
251 Task_Queue.running task (fn () =>
252 setmp_worker_task task (fn () =>
253 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
255 if ! Multithreading.trace >= 2 then
256 Output.protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
257 handle Fail msg => warning msg
259 val _ = SYNCHRONIZED "finish" (fn () =>
261 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
262 val test = Exn.capture Multithreading.interrupted ();
264 if ok andalso not (Exn.is_interrupt_exn test) then ()
265 else if null (cancel_now group) then ()
266 else cancel_later group;
267 val _ = broadcast work_finished;
268 val _ = if maximal then () else signal work_available;
272 fun worker_wait active cond = (*requires SYNCHRONIZED*)
275 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
277 | NONE => raise Fail "Unregistered worker thread");
278 val _ = state := (if active then Waiting else Sleeping);
280 val _ = state := Working;
283 fun worker_next () = (*requires SYNCHRONIZED*)
284 if length (! workers) > ! max_workers then
285 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
286 signal work_available;
288 else if count_workers Working > ! max_active then
289 (worker_wait false work_available; worker_next ())
291 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
292 NONE => (worker_wait false work_available; worker_next ())
293 | some => (signal work_available; some));
295 fun worker_loop name =
296 (case SYNCHRONIZED name (fn () => worker_next ()) of
298 | SOME work => (worker_exec work; worker_loop name));
300 fun worker_start name = (*requires SYNCHRONIZED*)
301 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
302 Unsynchronized.ref Working));
307 fun scheduler_next () = (*requires SYNCHRONIZED*)
309 val now = Time.now ();
310 val tick = Time.<= (Time.+ (! last_round, next_round), now);
311 val _ = if tick then last_round := now else ();
317 if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
319 if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
320 then report_status () else ();
323 if forall (Thread.isActive o #1) (! workers) then ()
326 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
327 val _ = workers := alive;
329 Multithreading.tracing 0 (fn () =>
330 "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
334 (* worker pool adjustments *)
336 val max_active0 = ! max_active;
337 val max_workers0 = ! max_workers;
339 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
340 val _ = max_active := m;
343 if ! do_shutdown then 0
344 else if m = 9999 then 1
345 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
347 if tick andalso mm > ! max_workers then
348 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
349 else if tick andalso mm < ! max_workers then
350 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
353 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
355 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
356 max_workers := Int.min (mm, 2 * m)
359 val missing = ! max_workers - length (! workers);
362 funpow missing (fn () =>
363 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
367 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
368 else signal work_available;
371 (* canceled groups *)
374 if null (! canceled) then ()
376 (Multithreading.tracing 1 (fn () =>
377 string_of_int (length (! canceled)) ^ " canceled groups");
378 Unsynchronized.change canceled (filter_out (null o cancel_now));
379 signal work_available);
384 val _ = Exn.release (wait_timeout next_round scheduler_event);
389 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
390 val continue = not (! do_shutdown andalso null (! workers));
391 val _ = if continue then () else (report_status (); scheduler := NONE);
393 val _ = broadcast scheduler_event;
396 if Exn.is_interrupt exn then
397 (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
398 List.app cancel_later (cancel_all ());
399 signal work_available; true)
402 fun scheduler_loop () =
404 Multithreading.with_attributes
405 (Multithreading.sync_interrupts Multithreading.public_interrupts)
406 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
407 do (); last_round := Time.zeroTime);
409 fun scheduler_active () = (*requires SYNCHRONIZED*)
410 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
412 fun scheduler_check () = (*requires SYNCHRONIZED*)
413 (do_shutdown := false;
414 if scheduler_active () then ()
415 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
423 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
425 val _ = if null (cancel_now group) then () else cancel_later group;
426 val _ = signal work_available;
427 val _ = scheduler_check ();
430 fun cancel_group group =
431 SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
433 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
438 fun error_msg pos ((serial, msg), exec_id) =
439 Position.setmp_thread_data pos (fn () =>
440 let val id = Position.get_id pos in
441 if is_none id orelse is_none exec_id orelse id = exec_id
442 then Output.error_msg' (serial, msg) else ()
445 fun identify_result pos res =
449 (case Position.get_id pos of
451 | SOME id => [(Markup.exec_idN, id)])
452 in Exn.Exn (Par_Exn.identify exec_id exn) end
455 fun assign_result group result res =
457 val _ = Single_Assignment.assign result res
458 handle exn as Fail _ =>
459 (case Single_Assignment.peek result of
460 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
463 (case the (Single_Assignment.peek result) of
465 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
466 | Exn.Res _ => true);
472 fun future_job group interrupts (e: unit -> 'a) =
474 val result = Single_Assignment.var "future" : 'a result;
475 val pos = Position.thread_data ();
480 Exn.capture (fn () =>
481 Multithreading.with_attributes
483 then Multithreading.private_interrupts else Multithreading.no_interrupts)
484 (fn _ => Position.setmp_thread_data pos e ())) ()
485 else Exn.interrupt_exn;
486 in assign_result group result (identify_result pos res) end;
487 in (result, job) end;
492 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
493 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
495 fun forks ({name, group, deps, pri, interrupts}: params) es =
501 NONE => worker_subgroup ()
503 fun enqueue e queue =
505 val (result, job) = future_job grp interrupts e;
506 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
507 val future = Future {promised = false, task = task, result = result};
508 in (future, queue') end;
510 SYNCHRONIZED "enqueue" (fn () =>
512 val (futures, queue') = fold_map enqueue es (! queue);
513 val _ = queue := queue';
514 val minimal = forall (not o Task_Queue.known_task queue') deps;
515 val _ = if minimal then signal work_available else ();
516 val _ = scheduler_check ();
521 (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
528 NONE => Exn.Exn (Fail "Unfinished future")
530 if Exn.is_interrupt_exn res then
531 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
533 | SOME exn => Exn.Exn exn)
538 fun join_next deps = (*requires SYNCHRONIZED*)
539 if null deps then NONE
541 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
544 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
545 | (SOME work, deps') => SOME (work, deps'));
547 fun execute_work NONE = ()
548 | execute_work (SOME (work, deps')) =
549 (worker_joining (fn () => worker_exec work); join_work deps')
551 Multithreading.with_attributes Multithreading.no_interrupts
552 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
556 fun join_results xs =
559 if forall is_finished xs then ()
560 else if Multithreading.self_critical () then
561 error "Cannot join future values within critical section"
562 else if is_some (worker_task ()) then join_work (map task_of xs)
563 else List.app (ignore o Single_Assignment.await o result_of) xs;
564 in map get_result xs end;
568 fun join_result x = singleton join_results x;
569 fun joins xs = Par_Exn.release_all (join_results xs);
570 fun join x = Exn.release (join_result x);
573 (* fast-path versions -- bypassing task queue *)
575 fun value_result (res: 'a Exn.result) =
577 val task = Task_Queue.dummy_task;
578 val group = Task_Queue.group_of_task task;
579 val result = Single_Assignment.var "value" : 'a result;
580 val _ = assign_result group result (identify_result (Position.thread_data ()) res);
581 in Future {promised = false, task = task, result = result} end;
583 fun value x = value_result (Exn.Res x);
585 fun cond_forks args es =
586 if Multithreading.enabled () then forks args es
587 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
590 if is_finished x then value (f (join x))
593 val task = task_of x;
594 val group = Task_Queue.group_of_task task;
595 val (result, job) = future_job group true (fn () => f (join x));
597 val extended = SYNCHRONIZED "extend" (fn () =>
598 (case Task_Queue.extend task job (! queue) of
599 SOME queue' => (queue := queue'; true)
602 if extended then Future {promised = false, task = task, result = result}
604 (singleton o cond_forks)
605 {name = "map_future", group = SOME group, deps = [task],
606 pri = Task_Queue.pri_of_task task, interrupts = true}
607 (fn () => f (join x))
611 (* promised futures -- fulfilled by external means *)
613 fun promise_group group abort : 'a future =
615 val result = Single_Assignment.var "promise" : 'a result;
616 fun assign () = assign_result group result Exn.interrupt_exn
617 handle Fail _ => true
619 if Exn.is_interrupt exn
620 then raise Fail "Concurrent attempt to fulfill promise"
623 Multithreading.with_attributes Multithreading.no_interrupts
624 (fn _ => Exn.release (Exn.capture assign () before abort ()));
625 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
626 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
627 in Future {promised = true, task = task, result = result} end;
629 fun promise abort = promise_group (worker_subgroup ()) abort;
631 fun fulfill_result (Future {promised, task, result}) res =
632 if not promised then raise Fail "Not a promised future"
635 val group = Task_Queue.group_of_task task;
636 val pos = Position.thread_data ();
638 assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
640 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
643 SYNCHRONIZED "fulfill_result" (fn () =>
644 Unsynchronized.change_result queue
645 (Task_Queue.dequeue_passive (Thread.self ()) task));
648 SOME true => worker_exec (task, [job])
650 | NONE => ignore (job (not (Task_Queue.is_canceled group))))
653 if is_some (Single_Assignment.peek result) then ()
654 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
657 fun fulfill x res = fulfill_result x (Exn.Res res);
662 fun terminate group =
665 SYNCHRONIZED "terminate" (fn () =>
666 let val _ = cancel_group_unsynchronized group;
667 in Task_Queue.group_tasks (! queue) group end);
669 if null tasks then ()
672 {name = "terminate", group = SOME (new_group NONE),
673 deps = tasks, pri = 0, interrupts = false} I
681 if not Multithreading.available then ()
682 else if is_some (worker_task ()) then
683 raise Fail "Cannot shutdown while running as worker thread"
685 SYNCHRONIZED "shutdown" (fn () =>
686 while scheduler_active () do
687 (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
688 wait scheduler_event));
691 (*final declarations of this structure!*)
692 val map = map_future;
696 type 'a future = 'a Future.future;