1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes peer and subgroup members to be
30 interrupted eventually. Interrupted tasks that lack regular
31 result information, will pick up parallel exceptions from the
32 cumulative group context (as Par_Exn).
34 * Future task groups may be canceled: present and future group
35 members will be interrupted eventually.
37 * Promised "passive" futures are fulfilled by external means. There
38 is no associated evaluation task, but other futures can depend on
39 them via regular join operations.
44 type task = Task_Queue.task
45 type group = Task_Queue.group
46 val new_group: group option -> group
47 val worker_task: unit -> task option
48 val worker_group: unit -> group option
49 val the_worker_group: unit -> group
50 val worker_subgroup: unit -> group
51 val worker_guest: string -> group -> ('a -> 'b) -> 'a -> 'b
53 val task_of: 'a future -> task
54 val peek: 'a future -> 'a Exn.result option
55 val is_finished: 'a future -> bool
56 val ML_statistics: bool Unsynchronized.ref
57 val forked_proofs: int Unsynchronized.ref
58 val interruptible_task: ('a -> 'b) -> 'a -> 'b
59 val cancel_group: group -> unit
60 val cancel: 'a future -> unit
61 val error_msg: Position.T -> (serial * string) * string option -> unit
62 val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
63 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
64 val default_params: params
65 val forks: params -> (unit -> 'a) list -> 'a future list
66 val fork: (unit -> 'a) -> 'a future
67 val join_results: 'a future list -> 'a Exn.result list
68 val join_result: 'a future -> 'a Exn.result
69 val joins: 'a future list -> 'a list
70 val join: 'a future -> 'a
71 val value_result: 'a Exn.result -> 'a future
72 val value: 'a -> 'a future
73 val cond_forks: params -> (unit -> 'a) list -> 'a future list
74 val map: ('a -> 'b) -> 'a future -> 'b future
75 val promise_group: group -> (unit -> unit) -> 'a future
76 val promise: (unit -> unit) -> 'a future
77 val fulfill_result: 'a future -> 'a Exn.result -> unit
78 val fulfill: 'a future -> 'a -> unit
79 val terminate: group -> unit
80 val shutdown: unit -> unit
83 structure Future: FUTURE =
88 type task = Task_Queue.task;
89 type group = Task_Queue.group;
90 val new_group = Task_Queue.new_group;
96 val tag = Universal.tag () : task option Universal.tag;
98 fun worker_task () = the_default NONE (Thread.getLocal tag);
99 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
102 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
104 fun the_worker_group () =
105 (case worker_group () of
107 | NONE => raise Fail "Missing worker thread context");
109 fun worker_subgroup () = new_group (worker_group ());
111 fun worker_guest name group f x =
112 if is_some (worker_task ()) then
113 raise Fail "Already running as worker thread"
114 else setmp_worker_task (Task_Queue.new_task group name NONE) f x;
116 fun worker_joining e =
117 (case worker_task () of
119 | SOME task => Task_Queue.joining task e);
121 fun worker_waiting deps e =
122 (case worker_task () of
124 | SOME task => Task_Queue.waiting task deps e);
127 (* datatype future *)
129 type 'a result = 'a Exn.result Single_Assignment.var;
131 datatype 'a future = Future of
136 fun task_of (Future {task, ...}) = task;
137 fun result_of (Future {result, ...}) = result;
139 fun peek x = Single_Assignment.peek (result_of x);
140 fun is_finished x = is_some (peek x);
146 (* synchronization *)
148 val scheduler_event = ConditionVar.conditionVar ();
149 val work_available = ConditionVar.conditionVar ();
150 val work_finished = ConditionVar.conditionVar ();
153 val lock = Mutex.mutex ();
156 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
158 fun wait cond = (*requires SYNCHRONIZED*)
159 Multithreading.sync_wait NONE NONE cond lock;
161 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
162 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
164 fun signal cond = (*requires SYNCHRONIZED*)
165 ConditionVar.signal cond;
167 fun broadcast cond = (*requires SYNCHRONIZED*)
168 ConditionVar.broadcast cond;
175 val queue = Unsynchronized.ref Task_Queue.empty;
176 val next = Unsynchronized.ref 0;
177 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
178 val canceled = Unsynchronized.ref ([]: group list);
179 val do_shutdown = Unsynchronized.ref false;
180 val max_workers = Unsynchronized.ref 0;
181 val max_active = Unsynchronized.ref 0;
182 val worker_trend = Unsynchronized.ref 0;
184 val status_ticks = Unsynchronized.ref 0;
185 val last_round = Unsynchronized.ref Time.zeroTime;
186 val next_round = seconds 0.05;
188 datatype worker_state = Working | Waiting | Sleeping;
189 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
191 fun count_workers state = (*requires SYNCHRONIZED*)
192 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
198 val ML_statistics = Unsynchronized.ref false;
199 val forked_proofs = Unsynchronized.ref 0;
201 fun report_status () = (*requires SYNCHRONIZED*)
202 if ! ML_statistics then
204 val {ready, pending, running, passive} = Task_Queue.status (! queue);
205 val total = length (! workers);
206 val active = count_workers Working;
207 val waiting = count_workers Waiting;
209 [("now", Markup.print_real (Time.toReal (Time.now ()))),
210 ("tasks_proof", Markup.print_int (! forked_proofs)),
211 ("tasks_ready", Markup.print_int ready),
212 ("tasks_pending", Markup.print_int pending),
213 ("tasks_running", Markup.print_int running),
214 ("tasks_passive", Markup.print_int passive),
215 ("workers_total", Markup.print_int total),
216 ("workers_active", Markup.print_int active),
217 ("workers_waiting", Markup.print_int waiting)] @
218 ML_Statistics.get ();
219 in Output.try_protocol_message (Markup.ML_statistics :: stats) "" end
223 (* cancellation primitives *)
225 fun cancel_now group = (*requires SYNCHRONIZED*)
227 val running = Task_Queue.cancel (! queue) group;
228 val _ = running |> List.app (fn thread =>
229 if Simple_Thread.is_self thread then ()
230 else Simple_Thread.interrupt_unsynchronized thread);
233 fun cancel_all () = (*requires SYNCHRONIZED*)
235 val (groups, threads) = Task_Queue.cancel_all (! queue);
236 val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
239 fun cancel_later group = (*requires SYNCHRONIZED*)
240 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
241 broadcast scheduler_event);
243 fun interruptible_task f x =
244 (if Multithreading.available then
245 Multithreading.with_attributes
246 (if is_some (worker_task ())
247 then Multithreading.private_interrupts
248 else Multithreading.public_interrupts)
250 else interruptible f x)
251 before Multithreading.interrupted ();
256 fun worker_exec (task, jobs) =
258 val group = Task_Queue.group_of_task task;
259 val valid = not (Task_Queue.is_canceled group);
261 Task_Queue.running task (fn () =>
262 setmp_worker_task task (fn () =>
263 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
265 if ! Multithreading.trace >= 2 then
266 Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
268 val _ = SYNCHRONIZED "finish" (fn () =>
270 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
271 val test = Exn.capture Multithreading.interrupted ();
273 if ok andalso not (Exn.is_interrupt_exn test) then ()
274 else if null (cancel_now group) then ()
275 else cancel_later group;
276 val _ = broadcast work_finished;
277 val _ = if maximal then () else signal work_available;
281 fun worker_wait active cond = (*requires SYNCHRONIZED*)
282 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
284 (state := (if active then Waiting else Sleeping);
287 | NONE => ignore (wait cond));
289 fun worker_next () = (*requires SYNCHRONIZED*)
290 if length (! workers) > ! max_workers then
291 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
292 signal work_available;
294 else if count_workers Working > ! max_active then
295 (worker_wait false work_available; worker_next ())
297 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
298 NONE => (worker_wait false work_available; worker_next ())
299 | some => (signal work_available; some));
301 fun worker_loop name =
302 (case SYNCHRONIZED name (fn () => worker_next ()) of
304 | SOME work => (worker_exec work; worker_loop name));
306 fun worker_start name = (*requires SYNCHRONIZED*)
307 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
308 Unsynchronized.ref Working));
313 fun scheduler_next () = (*requires SYNCHRONIZED*)
315 val now = Time.now ();
316 val tick = Time.<= (Time.+ (! last_round, next_round), now);
317 val _ = if tick then last_round := now else ();
323 if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
325 if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
326 then report_status () else ();
329 if forall (Thread.isActive o #1) (! workers) then ()
332 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
333 val _ = workers := alive;
335 Multithreading.tracing 0 (fn () =>
336 "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
340 (* worker pool adjustments *)
342 val max_active0 = ! max_active;
343 val max_workers0 = ! max_workers;
345 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
346 val _ = max_active := m;
349 if ! do_shutdown then 0
350 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
352 if tick andalso mm > ! max_workers then
353 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
354 else if tick andalso mm < ! max_workers then
355 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
358 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
360 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
361 max_workers := Int.min (mm, 2 * m)
364 val missing = ! max_workers - length (! workers);
367 funpow missing (fn () =>
368 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
372 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
373 else signal work_available;
376 (* canceled groups *)
379 if null (! canceled) then ()
381 (Multithreading.tracing 1 (fn () =>
382 string_of_int (length (! canceled)) ^ " canceled groups");
383 Unsynchronized.change canceled (filter_out (null o cancel_now));
384 signal work_available);
389 val _ = Exn.release (wait_timeout next_round scheduler_event);
394 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
395 val continue = not (! do_shutdown andalso null (! workers));
396 val _ = if continue then () else (report_status (); scheduler := NONE);
398 val _ = broadcast scheduler_event;
401 if Exn.is_interrupt exn then
402 (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
403 List.app cancel_later (cancel_all ());
404 signal work_available; true)
407 fun scheduler_loop () =
409 Multithreading.with_attributes
410 (Multithreading.sync_interrupts Multithreading.public_interrupts)
411 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
412 do (); last_round := Time.zeroTime);
414 fun scheduler_active () = (*requires SYNCHRONIZED*)
415 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
417 fun scheduler_check () = (*requires SYNCHRONIZED*)
418 (do_shutdown := false;
419 if scheduler_active () then ()
420 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
428 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
430 val _ = if null (cancel_now group) then () else cancel_later group;
431 val _ = signal work_available;
432 val _ = scheduler_check ();
435 fun cancel_group group =
436 SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
438 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
443 fun error_msg pos ((serial, msg), exec_id) =
444 Position.setmp_thread_data pos (fn () =>
445 let val id = Position.get_id pos in
446 if is_none id orelse is_none exec_id orelse id = exec_id
447 then Output.error_msg' (serial, msg) else ()
450 fun identify_result pos res =
454 (case Position.get_id pos of
456 | SOME id => [(Markup.exec_idN, id)])
457 in Exn.Exn (Par_Exn.identify exec_id exn) end
460 fun assign_result group result res =
462 val _ = Single_Assignment.assign result res
463 handle exn as Fail _ =>
464 (case Single_Assignment.peek result of
465 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
468 (case the (Single_Assignment.peek result) of
470 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
471 | Exn.Res _ => true);
477 fun future_job group interrupts (e: unit -> 'a) =
479 val result = Single_Assignment.var "future" : 'a result;
480 val pos = Position.thread_data ();
485 Exn.capture (fn () =>
486 Multithreading.with_attributes
488 then Multithreading.private_interrupts else Multithreading.no_interrupts)
489 (fn _ => Position.setmp_thread_data pos e ())) ()
490 else Exn.interrupt_exn;
491 in assign_result group result (identify_result pos res) end;
492 in (result, job) end;
497 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
498 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
500 fun forks ({name, group, deps, pri, interrupts}: params) es =
506 NONE => worker_subgroup ()
508 fun enqueue e queue =
510 val (result, job) = future_job grp interrupts e;
511 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
512 val future = Future {promised = false, task = task, result = result};
513 in (future, queue') end;
515 SYNCHRONIZED "enqueue" (fn () =>
517 val (futures, queue') = fold_map enqueue es (! queue);
518 val _ = queue := queue';
519 val minimal = forall (not o Task_Queue.known_task queue') deps;
520 val _ = if minimal then signal work_available else ();
521 val _ = scheduler_check ();
526 (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
533 NONE => Exn.Exn (Fail "Unfinished future")
535 if Exn.is_interrupt_exn res then
536 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
538 | SOME exn => Exn.Exn exn)
543 fun join_next deps = (*requires SYNCHRONIZED*)
544 if null deps then NONE
546 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
549 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
550 | (SOME work, deps') => SOME (work, deps'));
552 fun execute_work NONE = ()
553 | execute_work (SOME (work, deps')) =
554 (worker_joining (fn () => worker_exec work); join_work deps')
556 Multithreading.with_attributes Multithreading.no_interrupts
557 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
561 fun join_results xs =
564 if forall is_finished xs then ()
565 else if Multithreading.self_critical () then
566 raise Fail "Cannot join future values within critical section"
567 else if is_some (worker_task ()) then join_work (map task_of xs)
568 else List.app (ignore o Single_Assignment.await o result_of) xs;
569 in map get_result xs end;
573 fun join_result x = singleton join_results x;
574 fun joins xs = Par_Exn.release_all (join_results xs);
575 fun join x = Exn.release (join_result x);
578 (* fast-path operations -- bypass task queue if possible *)
580 fun value_result (res: 'a Exn.result) =
582 val task = Task_Queue.dummy_task;
583 val group = Task_Queue.group_of_task task;
584 val result = Single_Assignment.var "value" : 'a result;
585 val _ = assign_result group result (identify_result (Position.thread_data ()) res);
586 in Future {promised = false, task = task, result = result} end;
588 fun value x = value_result (Exn.Res x);
590 fun cond_forks args es =
591 if Multithreading.enabled () then forks args es
592 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
595 if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
598 val task = task_of x;
599 val group = Task_Queue.group_of_task task;
600 val (result, job) = future_job group true (fn () => f (join x));
602 val extended = SYNCHRONIZED "extend" (fn () =>
603 (case Task_Queue.extend task job (! queue) of
604 SOME queue' => (queue := queue'; true)
607 if extended then Future {promised = false, task = task, result = result}
609 (singleton o cond_forks)
610 {name = "map_future", group = SOME group, deps = [task],
611 pri = Task_Queue.pri_of_task task, interrupts = true}
612 (fn () => f (join x))
616 (* promised futures -- fulfilled by external means *)
618 fun promise_group group abort : 'a future =
620 val result = Single_Assignment.var "promise" : 'a result;
621 fun assign () = assign_result group result Exn.interrupt_exn
622 handle Fail _ => true
624 if Exn.is_interrupt exn
625 then raise Fail "Concurrent attempt to fulfill promise"
628 Multithreading.with_attributes Multithreading.no_interrupts
629 (fn _ => Exn.release (Exn.capture assign () before abort ()));
630 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
631 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
632 in Future {promised = true, task = task, result = result} end;
634 fun promise abort = promise_group (worker_subgroup ()) abort;
636 fun fulfill_result (Future {promised, task, result}) res =
637 if not promised then raise Fail "Not a promised future"
640 val group = Task_Queue.group_of_task task;
641 val pos = Position.thread_data ();
643 assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
645 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
648 SYNCHRONIZED "fulfill_result" (fn () =>
649 Unsynchronized.change_result queue
650 (Task_Queue.dequeue_passive (Thread.self ()) task));
653 SOME true => worker_exec (task, [job])
655 | NONE => ignore (job (not (Task_Queue.is_canceled group))))
658 if is_some (Single_Assignment.peek result) then ()
659 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
662 fun fulfill x res = fulfill_result x (Exn.Res res);
667 fun terminate group =
670 SYNCHRONIZED "terminate" (fn () =>
671 let val _ = cancel_group_unsynchronized group;
672 in Task_Queue.group_tasks (! queue) group end);
674 if null tasks then ()
677 {name = "terminate", group = SOME (new_group NONE),
678 deps = tasks, pri = 0, interrupts = false} I
686 if not Multithreading.available then ()
687 else if is_some (worker_task ()) then
688 raise Fail "Cannot shutdown while running as worker thread"
690 SYNCHRONIZED "shutdown" (fn () =>
691 while scheduler_active () do
692 (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
693 wait scheduler_event));
696 (*final declarations of this structure!*)
697 val map = map_future;
701 type 'a future = 'a Future.future;