merged, resolving obvious conflicts in NEWS and src/Pure/System/isabelle_process.ML;
1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes peer and subgroup members to be
30 interrupted eventually. Interrupted tasks that lack regular
31 result information, will pick up parallel exceptions from the
32 cumulative group context (as Par_Exn).
34 * Future task groups may be canceled: present and future group
35 members will be interrupted eventually.
37 * Promised "passive" futures are fulfilled by external means. There
38 is no associated evaluation task, but other futures can depend on
39 them via regular join operations.
44 type task = Task_Queue.task
45 type group = Task_Queue.group
46 val new_group: group option -> group
47 val worker_task: unit -> task option
48 val worker_group: unit -> group option
49 val the_worker_group: unit -> group
50 val worker_subgroup: unit -> group
52 val task_of: 'a future -> task
53 val peek: 'a future -> 'a Exn.result option
54 val is_finished: 'a future -> bool
55 val ML_statistics: bool Unsynchronized.ref
56 val interruptible_task: ('a -> 'b) -> 'a -> 'b
57 val cancel_group: group -> unit
58 val cancel: 'a future -> unit
59 val error_msg: Position.T -> (serial * string) * string option -> unit
60 val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
61 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
62 val default_params: params
63 val forks: params -> (unit -> 'a) list -> 'a future list
64 val fork: (unit -> 'a) -> 'a future
65 val join_results: 'a future list -> 'a Exn.result list
66 val join_result: 'a future -> 'a Exn.result
67 val joins: 'a future list -> 'a list
68 val join: 'a future -> 'a
69 val join_tasks: task list -> unit
70 val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
71 val value_result: 'a Exn.result -> 'a future
72 val value: 'a -> 'a future
73 val cond_forks: params -> (unit -> 'a) list -> 'a future list
74 val map: ('a -> 'b) -> 'a future -> 'b future
75 val promise_group: group -> (unit -> unit) -> 'a future
76 val promise: (unit -> unit) -> 'a future
77 val fulfill_result: 'a future -> 'a Exn.result -> unit
78 val fulfill: 'a future -> 'a -> unit
79 val group_snapshot: group -> task list
80 val terminate: group -> unit
81 val shutdown: unit -> unit
84 structure Future: FUTURE =
89 type task = Task_Queue.task;
90 type group = Task_Queue.group;
91 val new_group = Task_Queue.new_group;
97 val tag = Universal.tag () : task option Universal.tag;
99 fun worker_task () = the_default NONE (Thread.getLocal tag);
100 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
103 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
105 fun the_worker_group () =
106 (case worker_group () of
108 | NONE => raise Fail "Missing worker thread context");
110 fun worker_subgroup () = new_group (worker_group ());
112 fun worker_joining e =
113 (case worker_task () of
115 | SOME task => Task_Queue.joining task e);
117 fun worker_waiting deps e =
118 (case worker_task () of
120 | SOME task => Task_Queue.waiting task deps e);
123 (* datatype future *)
125 type 'a result = 'a Exn.result Single_Assignment.var;
127 datatype 'a future = Future of
132 fun task_of (Future {task, ...}) = task;
133 fun result_of (Future {result, ...}) = result;
135 fun peek x = Single_Assignment.peek (result_of x);
136 fun is_finished x = is_some (peek x);
142 (* synchronization *)
144 val scheduler_event = ConditionVar.conditionVar ();
145 val work_available = ConditionVar.conditionVar ();
146 val work_finished = ConditionVar.conditionVar ();
149 val lock = Mutex.mutex ();
152 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
154 fun wait cond = (*requires SYNCHRONIZED*)
155 Multithreading.sync_wait NONE NONE cond lock;
157 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
158 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
160 fun signal cond = (*requires SYNCHRONIZED*)
161 ConditionVar.signal cond;
163 fun broadcast cond = (*requires SYNCHRONIZED*)
164 ConditionVar.broadcast cond;
171 val queue = Unsynchronized.ref Task_Queue.empty;
172 val next = Unsynchronized.ref 0;
173 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
174 val canceled = Unsynchronized.ref ([]: group list);
175 val do_shutdown = Unsynchronized.ref false;
176 val max_workers = Unsynchronized.ref 0;
177 val max_active = Unsynchronized.ref 0;
178 val worker_trend = Unsynchronized.ref 0;
180 val status_ticks = Unsynchronized.ref 0;
181 val last_round = Unsynchronized.ref Time.zeroTime;
182 val next_round = seconds 0.05;
184 datatype worker_state = Working | Waiting | Sleeping;
185 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
187 fun count_workers state = (*requires SYNCHRONIZED*)
188 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
194 val ML_statistics = Unsynchronized.ref false;
196 fun report_status () = (*requires SYNCHRONIZED*)
197 if ! ML_statistics then
199 val {ready, pending, running, passive} = Task_Queue.status (! queue);
200 val total = length (! workers);
201 val active = count_workers Working;
202 val waiting = count_workers Waiting;
204 [("now", Markup.print_real (Time.toReal (Time.now ()))),
205 ("tasks_ready", Markup.print_int ready),
206 ("tasks_pending", Markup.print_int pending),
207 ("tasks_running", Markup.print_int running),
208 ("tasks_passive", Markup.print_int passive),
209 ("workers_total", Markup.print_int total),
210 ("workers_active", Markup.print_int active),
211 ("workers_waiting", Markup.print_int waiting)] @
212 ML_Statistics.get ();
213 in Output.try_protocol_message (Markup.ML_statistics :: stats) "" end
217 (* cancellation primitives *)
219 fun cancel_now group = (*requires SYNCHRONIZED*)
221 val running = Task_Queue.cancel (! queue) group;
222 val _ = running |> List.app (fn thread =>
223 if Simple_Thread.is_self thread then ()
224 else Simple_Thread.interrupt_unsynchronized thread);
227 fun cancel_all () = (*requires SYNCHRONIZED*)
229 val (groups, threads) = Task_Queue.cancel_all (! queue);
230 val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
233 fun cancel_later group = (*requires SYNCHRONIZED*)
234 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
235 broadcast scheduler_event);
237 fun interruptible_task f x =
238 (if Multithreading.available then
239 Multithreading.with_attributes
240 (if is_some (worker_task ())
241 then Multithreading.private_interrupts
242 else Multithreading.public_interrupts)
244 else interruptible f x)
245 before Multithreading.interrupted ();
250 fun worker_exec (task, jobs) =
252 val group = Task_Queue.group_of_task task;
253 val valid = not (Task_Queue.is_canceled group);
255 Task_Queue.running task (fn () =>
256 setmp_worker_task task (fn () =>
257 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
259 if ! Multithreading.trace >= 2 then
260 Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
262 val _ = SYNCHRONIZED "finish" (fn () =>
264 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
265 val test = Exn.capture Multithreading.interrupted ();
267 if ok andalso not (Exn.is_interrupt_exn test) then ()
268 else if null (cancel_now group) then ()
269 else cancel_later group;
270 val _ = broadcast work_finished;
271 val _ = if maximal then () else signal work_available;
275 fun worker_wait active cond = (*requires SYNCHRONIZED*)
276 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
278 (state := (if active then Waiting else Sleeping);
281 | NONE => ignore (wait cond));
283 fun worker_next () = (*requires SYNCHRONIZED*)
284 if length (! workers) > ! max_workers then
285 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
286 signal work_available;
288 else if count_workers Working > ! max_active then
289 (worker_wait false work_available; worker_next ())
291 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
292 NONE => (worker_wait false work_available; worker_next ())
293 | some => (signal work_available; some));
295 fun worker_loop name =
296 (case SYNCHRONIZED name (fn () => worker_next ()) of
298 | SOME work => (worker_exec work; worker_loop name));
300 fun worker_start name = (*requires SYNCHRONIZED*)
301 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
302 Unsynchronized.ref Working));
307 fun scheduler_next () = (*requires SYNCHRONIZED*)
309 val now = Time.now ();
310 val tick = Time.<= (Time.+ (! last_round, next_round), now);
311 val _ = if tick then last_round := now else ();
317 if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
319 if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
320 then report_status () else ();
323 if forall (Thread.isActive o #1) (! workers) then ()
326 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
327 val _ = workers := alive;
329 Multithreading.tracing 0 (fn () =>
330 "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
334 (* worker pool adjustments *)
336 val max_active0 = ! max_active;
337 val max_workers0 = ! max_workers;
339 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
340 val _ = max_active := m;
343 if ! do_shutdown then 0
344 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
346 if tick andalso mm > ! max_workers then
347 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
348 else if tick andalso mm < ! max_workers then
349 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
352 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
354 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
355 max_workers := Int.min (mm, 2 * m)
358 val missing = ! max_workers - length (! workers);
361 funpow missing (fn () =>
362 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
366 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
367 else signal work_available;
370 (* canceled groups *)
373 if null (! canceled) then ()
375 (Multithreading.tracing 1 (fn () =>
376 string_of_int (length (! canceled)) ^ " canceled groups");
377 Unsynchronized.change canceled (filter_out (null o cancel_now));
378 signal work_available);
383 val _ = Exn.release (wait_timeout next_round scheduler_event);
388 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
389 val continue = not (! do_shutdown andalso null (! workers));
390 val _ = if continue then () else (report_status (); scheduler := NONE);
392 val _ = broadcast scheduler_event;
395 if Exn.is_interrupt exn then
396 (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
397 List.app cancel_later (cancel_all ());
398 signal work_available; true)
401 fun scheduler_loop () =
403 Multithreading.with_attributes
404 (Multithreading.sync_interrupts Multithreading.public_interrupts)
405 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
406 do (); last_round := Time.zeroTime);
408 fun scheduler_active () = (*requires SYNCHRONIZED*)
409 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
411 fun scheduler_check () = (*requires SYNCHRONIZED*)
412 (do_shutdown := false;
413 if scheduler_active () then ()
414 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
422 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
424 val _ = if null (cancel_now group) then () else cancel_later group;
425 val _ = signal work_available;
426 val _ = scheduler_check ();
429 fun cancel_group group =
430 SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
432 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
437 fun error_msg pos ((serial, msg), exec_id) =
438 Position.setmp_thread_data pos (fn () =>
439 let val id = Position.get_id pos in
440 if is_none id orelse is_none exec_id orelse id = exec_id
441 then Output.error_message' (serial, msg) else ()
444 fun identify_result pos res =
448 (case Position.get_id pos of
450 | SOME id => [(Markup.exec_idN, id)])
451 in Exn.Exn (Par_Exn.identify exec_id exn) end
454 fun assign_result group result res =
456 val _ = Single_Assignment.assign result res
457 handle exn as Fail _ =>
458 (case Single_Assignment.peek result of
459 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
462 (case the (Single_Assignment.peek result) of
464 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
465 | Exn.Res _ => true);
471 fun future_job group atts (e: unit -> 'a) =
473 val result = Single_Assignment.var "future" : 'a result;
474 val pos = Position.thread_data ();
479 Exn.capture (fn () =>
480 Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) ()
481 else Exn.interrupt_exn;
482 in assign_result group result (identify_result pos res) end;
483 in (result, job) end;
488 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
489 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
491 fun forks ({name, group, deps, pri, interrupts}: params) es =
497 NONE => worker_subgroup ()
499 fun enqueue e queue =
503 then Multithreading.private_interrupts
504 else Multithreading.no_interrupts;
505 val (result, job) = future_job grp atts e;
506 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
507 val future = Future {promised = false, task = task, result = result};
508 in (future, queue') end;
510 SYNCHRONIZED "enqueue" (fn () =>
512 val (futures, queue') = fold_map enqueue es (! queue);
513 val _ = queue := queue';
514 val minimal = forall (not o Task_Queue.known_task queue') deps;
515 val _ = if minimal then signal work_available else ();
516 val _ = scheduler_check ();
521 (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
528 NONE => Exn.Exn (Fail "Unfinished future")
530 if Exn.is_interrupt_exn res then
531 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
533 | exns => Exn.Exn (Par_Exn.make exns))
538 fun join_next deps = (*requires SYNCHRONIZED*)
539 if null deps then NONE
541 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
544 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
545 | (SOME work, deps') => SOME (work, deps'));
547 fun execute_work NONE = ()
548 | execute_work (SOME (work, deps')) =
549 (worker_joining (fn () => worker_exec work); join_work deps')
551 Multithreading.with_attributes Multithreading.no_interrupts
552 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
556 fun join_results xs =
559 if forall is_finished xs then ()
560 else if Multithreading.self_critical () then
561 raise Fail "Cannot join future values within critical section"
562 else if is_some (worker_task ()) then join_work (map task_of xs)
563 else List.app (ignore o Single_Assignment.await o result_of) xs;
564 in map get_result xs end;
568 fun join_result x = singleton join_results x;
569 fun joins xs = Par_Exn.release_all (join_results xs);
570 fun join x = Exn.release (join_result x);
572 fun join_tasks tasks =
573 if null tasks then ()
576 {name = "join_tasks", group = SOME (new_group NONE),
577 deps = tasks, pri = 0, interrupts = false} I
581 (* task context for running thread *)
583 fun task_context name group f x =
584 Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
586 val (result, job) = future_job group orig_atts (fn () => f x);
588 SYNCHRONIZED "enroll" (fn () =>
589 Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
590 val _ = worker_exec (task, [job]);
592 (case Single_Assignment.peek result of
593 NONE => raise Fail "Missing task context result"
594 | SOME res => Exn.release res)
598 (* fast-path operations -- bypass task queue if possible *)
600 fun value_result (res: 'a Exn.result) =
602 val task = Task_Queue.dummy_task;
603 val group = Task_Queue.group_of_task task;
604 val result = Single_Assignment.var "value" : 'a result;
605 val _ = assign_result group result (identify_result (Position.thread_data ()) res);
606 in Future {promised = false, task = task, result = result} end;
608 fun value x = value_result (Exn.Res x);
610 fun cond_forks args es =
611 if Multithreading.enabled () then forks args es
612 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
615 if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
618 val task = task_of x;
619 val group = Task_Queue.group_of_task task;
621 future_job group Multithreading.private_interrupts (fn () => f (join x));
623 val extended = SYNCHRONIZED "extend" (fn () =>
624 (case Task_Queue.extend task job (! queue) of
625 SOME queue' => (queue := queue'; true)
628 if extended then Future {promised = false, task = task, result = result}
630 (singleton o cond_forks)
631 {name = "map_future", group = SOME group, deps = [task],
632 pri = Task_Queue.pri_of_task task, interrupts = true}
633 (fn () => f (join x))
637 (* promised futures -- fulfilled by external means *)
639 fun promise_group group abort : 'a future =
641 val result = Single_Assignment.var "promise" : 'a result;
642 fun assign () = assign_result group result Exn.interrupt_exn
643 handle Fail _ => true
645 if Exn.is_interrupt exn
646 then raise Fail "Concurrent attempt to fulfill promise"
649 Multithreading.with_attributes Multithreading.no_interrupts
650 (fn _ => Exn.release (Exn.capture assign () before abort ()));
651 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
652 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
653 in Future {promised = true, task = task, result = result} end;
655 fun promise abort = promise_group (worker_subgroup ()) abort;
657 fun fulfill_result (Future {promised, task, result}) res =
658 if not promised then raise Fail "Not a promised future"
661 val group = Task_Queue.group_of_task task;
662 val pos = Position.thread_data ();
664 assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
666 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
669 SYNCHRONIZED "fulfill_result" (fn () =>
670 Unsynchronized.change_result queue
671 (Task_Queue.dequeue_passive (Thread.self ()) task));
674 SOME true => worker_exec (task, [job])
676 | NONE => ignore (job (not (Task_Queue.is_canceled group))))
679 if is_some (Single_Assignment.peek result) then ()
680 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
683 fun fulfill x res = fulfill_result x (Exn.Res res);
688 fun group_snapshot group =
689 SYNCHRONIZED "group_snapshot" (fn () =>
690 Task_Queue.group_tasks (! queue) group);
695 fun terminate group =
696 SYNCHRONIZED "terminate" (fn () =>
697 let val _ = cancel_group_unsynchronized group;
698 in Task_Queue.group_tasks (! queue) group end)
705 if not Multithreading.available then ()
706 else if is_some (worker_task ()) then
707 raise Fail "Cannot shutdown while running as worker thread"
709 SYNCHRONIZED "shutdown" (fn () =>
710 while scheduler_active () do
711 (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
712 wait scheduler_event));
715 (*final declarations of this structure!*)
716 val map = map_future;
720 type 'a future = 'a Future.future;