more explicit treatment of (optional) exception properties, notably for "serial" -- avoid conflict with startPosition = offset;
1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes peer and subgroup members to be
30 interrupted eventually. Interrupted tasks that lack regular
31 result information, will pick up parallel exceptions from the
32 cumulative group context (as Par_Exn).
34 * Future task groups may be canceled: present and future group
35 members will be interrupted eventually.
37 * Promised "passive" futures are fulfilled by external means. There
38 is no associated evaluation task, but other futures can depend on
39 them via regular join operations.
44 type task = Task_Queue.task
45 type group = Task_Queue.group
46 val new_group: group option -> group
47 val worker_task: unit -> task option
48 val worker_group: unit -> group option
49 val worker_subgroup: unit -> group
51 val task_of: 'a future -> task
52 val peek: 'a future -> 'a Exn.result option
53 val is_finished: 'a future -> bool
54 val ML_statistics: bool Unsynchronized.ref
55 val interruptible_task: ('a -> 'b) -> 'a -> 'b
56 val cancel_group: group -> unit
57 val cancel: 'a future -> unit
58 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
59 val default_params: params
60 val forks: params -> (unit -> 'a) list -> 'a future list
61 val fork_pri: int -> (unit -> 'a) -> 'a future
62 val fork: (unit -> 'a) -> 'a future
63 val join_results: 'a future list -> 'a Exn.result list
64 val join_result: 'a future -> 'a Exn.result
65 val joins: 'a future list -> 'a list
66 val join: 'a future -> 'a
67 val value_result: 'a Exn.result -> 'a future
68 val value: 'a -> 'a future
69 val cond_forks: params -> (unit -> 'a) list -> 'a future list
70 val map: ('a -> 'b) -> 'a future -> 'b future
71 val promise_group: group -> (unit -> unit) -> 'a future
72 val promise: (unit -> unit) -> 'a future
73 val fulfill_result: 'a future -> 'a Exn.result -> unit
74 val fulfill: 'a future -> 'a -> unit
75 val terminate: group -> unit
76 val shutdown: unit -> unit
79 structure Future: FUTURE =
84 type task = Task_Queue.task;
85 type group = Task_Queue.group;
86 val new_group = Task_Queue.new_group;
92 val tag = Universal.tag () : task option Universal.tag;
94 fun worker_task () = the_default NONE (Thread.getLocal tag);
95 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
98 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
99 fun worker_subgroup () = new_group (worker_group ());
101 fun worker_joining e =
102 (case worker_task () of
104 | SOME task => Task_Queue.joining task e);
106 fun worker_waiting deps e =
107 (case worker_task () of
109 | SOME task => Task_Queue.waiting task deps e);
112 (* datatype future *)
114 type 'a result = 'a Exn.result Single_Assignment.var;
116 datatype 'a future = Future of
121 fun task_of (Future {task, ...}) = task;
122 fun result_of (Future {result, ...}) = result;
124 fun peek x = Single_Assignment.peek (result_of x);
125 fun is_finished x = is_some (peek x);
131 (* synchronization *)
133 val scheduler_event = ConditionVar.conditionVar ();
134 val work_available = ConditionVar.conditionVar ();
135 val work_finished = ConditionVar.conditionVar ();
138 val lock = Mutex.mutex ();
141 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
143 fun wait cond = (*requires SYNCHRONIZED*)
144 Multithreading.sync_wait NONE NONE cond lock;
146 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
147 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
149 fun signal cond = (*requires SYNCHRONIZED*)
150 ConditionVar.signal cond;
152 fun broadcast cond = (*requires SYNCHRONIZED*)
153 ConditionVar.broadcast cond;
155 fun broadcast_work () = (*requires SYNCHRONIZED*)
156 (ConditionVar.broadcast work_available;
157 ConditionVar.broadcast work_finished);
164 val queue = Unsynchronized.ref Task_Queue.empty;
165 val next = Unsynchronized.ref 0;
166 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
167 val canceled = Unsynchronized.ref ([]: group list);
168 val do_shutdown = Unsynchronized.ref false;
169 val max_workers = Unsynchronized.ref 0;
170 val max_active = Unsynchronized.ref 0;
171 val worker_trend = Unsynchronized.ref 0;
173 val status_ticks = Unsynchronized.ref 0;
174 val last_round = Unsynchronized.ref Time.zeroTime;
175 val next_round = seconds 0.05;
177 datatype worker_state = Working | Waiting | Sleeping;
178 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
180 fun count_workers state = (*requires SYNCHRONIZED*)
181 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
187 val ML_statistics = Unsynchronized.ref false;
189 fun report_status () = (*requires SYNCHRONIZED*)
190 if ! ML_statistics then
192 val {ready, pending, running, passive} = Task_Queue.status (! queue);
193 val total = length (! workers);
194 val active = count_workers Working;
195 val waiting = count_workers Waiting;
197 [("now", signed_string_of_real (Time.toReal (Time.now ()))),
198 ("tasks_ready", Markup.print_int ready),
199 ("tasks_pending", Markup.print_int pending),
200 ("tasks_running", Markup.print_int running),
201 ("tasks_passive", Markup.print_int passive),
202 ("workers_total", Markup.print_int total),
203 ("workers_active", Markup.print_int active),
204 ("workers_waiting", Markup.print_int waiting)] @
205 ML_Statistics.get ();
207 Output.protocol_message (Markup.ML_statistics :: stats) ""
208 handle Fail msg => warning msg
213 (* cancellation primitives *)
215 fun cancel_now group = (*requires SYNCHRONIZED*)
217 val running = Task_Queue.cancel (! queue) group;
218 val _ = running |> List.app (fn thread =>
219 if Simple_Thread.is_self thread then ()
220 else Simple_Thread.interrupt_unsynchronized thread);
223 fun cancel_all () = (*requires SYNCHRONIZED*)
225 val (groups, threads) = Task_Queue.cancel_all (! queue);
226 val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
229 fun cancel_later group = (*requires SYNCHRONIZED*)
230 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
231 broadcast scheduler_event);
233 fun interruptible_task f x =
234 (if Multithreading.available then
235 Multithreading.with_attributes
236 (if is_some (worker_task ())
237 then Multithreading.private_interrupts
238 else Multithreading.public_interrupts)
240 else interruptible f x)
241 before Multithreading.interrupted ();
246 fun worker_exec (task, jobs) =
248 val group = Task_Queue.group_of_task task;
249 val valid = not (Task_Queue.is_canceled group);
251 Task_Queue.running task (fn () =>
252 setmp_worker_task task (fn () =>
253 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
254 val _ = Multithreading.tracing 2 (fn () =>
256 val s = Task_Queue.str_of_task_groups task;
257 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
258 val (run, wait, deps) = Task_Queue.timing_of_task task;
259 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
260 val _ = SYNCHRONIZED "finish" (fn () =>
262 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
263 val test = Exn.capture Multithreading.interrupted ();
265 if ok andalso not (Exn.is_interrupt_exn test) then ()
266 else if null (cancel_now group) then ()
267 else cancel_later group;
268 val _ = broadcast work_finished;
269 val _ = if maximal then () else signal work_available;
273 fun worker_wait active cond = (*requires SYNCHRONIZED*)
276 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
278 | NONE => raise Fail "Unregistered worker thread");
279 val _ = state := (if active then Waiting else Sleeping);
281 val _ = state := Working;
284 fun worker_next () = (*requires SYNCHRONIZED*)
285 if length (! workers) > ! max_workers then
286 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
287 signal work_available;
289 else if count_workers Working > ! max_active then
290 (worker_wait false work_available; worker_next ())
292 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
293 NONE => (worker_wait false work_available; worker_next ())
294 | some => (signal work_available; some));
296 fun worker_loop name =
297 (case SYNCHRONIZED name (fn () => worker_next ()) of
299 | SOME work => (worker_exec work; worker_loop name));
301 fun worker_start name = (*requires SYNCHRONIZED*)
302 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
303 Unsynchronized.ref Working));
308 fun scheduler_next () = (*requires SYNCHRONIZED*)
310 val now = Time.now ();
311 val tick = Time.<= (Time.+ (! last_round, next_round), now);
312 val _ = if tick then last_round := now else ();
318 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
320 if tick andalso ! status_ticks = 0 then report_status () else ();
323 if forall (Thread.isActive o #1) (! workers) then ()
326 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
327 val _ = workers := alive;
329 Multithreading.tracing 0 (fn () =>
330 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
334 (* worker pool adjustments *)
336 val max_active0 = ! max_active;
337 val max_workers0 = ! max_workers;
339 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
340 val _ = max_active := m;
343 if ! do_shutdown then 0
344 else if m = 9999 then 1
345 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
347 if tick andalso mm > ! max_workers then
348 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
349 else if tick andalso mm < ! max_workers then
350 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
353 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
355 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
356 max_workers := Int.min (mm, 2 * m)
359 val missing = ! max_workers - length (! workers);
362 funpow missing (fn () =>
363 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
367 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
368 else signal work_available;
371 (* canceled groups *)
374 if null (! canceled) then ()
376 (Multithreading.tracing 1 (fn () =>
377 string_of_int (length (! canceled)) ^ " canceled groups");
378 Unsynchronized.change canceled (filter_out (null o cancel_now));
384 val _ = Exn.release (wait_timeout next_round scheduler_event);
389 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
390 val continue = not (! do_shutdown andalso null (! workers));
391 val _ = if continue then () else (report_status (); scheduler := NONE);
393 val _ = broadcast scheduler_event;
396 if Exn.is_interrupt exn then
397 (Multithreading.tracing 1 (fn () => "Interrupt");
398 List.app cancel_later (cancel_all ());
399 broadcast_work (); true)
402 fun scheduler_loop () =
404 Multithreading.with_attributes
405 (Multithreading.sync_interrupts Multithreading.public_interrupts)
406 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
407 do (); last_round := Time.zeroTime);
409 fun scheduler_active () = (*requires SYNCHRONIZED*)
410 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
412 fun scheduler_check () = (*requires SYNCHRONIZED*)
413 (do_shutdown := false;
414 if scheduler_active () then ()
415 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
423 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
425 val _ = if null (cancel_now group) then () else cancel_later group;
426 val _ = signal work_available;
427 val _ = scheduler_check ();
430 fun cancel_group group =
431 SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
433 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
438 fun assign_result group result raw_res =
442 Exn.Exn exn => Exn.Exn (Par_Exn.identify [] exn)
444 val _ = Single_Assignment.assign result res
445 handle exn as Fail _ =>
446 (case Single_Assignment.peek result of
447 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
450 (case the (Single_Assignment.peek result) of
452 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
453 | Exn.Res _ => true);
456 fun future_job group interrupts (e: unit -> 'a) =
458 val result = Single_Assignment.var "future" : 'a result;
459 val pos = Position.thread_data ();
464 Exn.capture (fn () =>
465 Multithreading.with_attributes
467 then Multithreading.private_interrupts else Multithreading.no_interrupts)
468 (fn _ => Position.setmp_thread_data pos e ())) ()
469 else Exn.interrupt_exn;
470 in assign_result group result res end;
471 in (result, job) end;
476 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
477 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
479 fun forks ({name, group, deps, pri, interrupts}: params) es =
485 NONE => worker_subgroup ()
487 fun enqueue e queue =
489 val (result, job) = future_job grp interrupts e;
490 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
491 val future = Future {promised = false, task = task, result = result};
492 in (future, queue') end;
494 SYNCHRONIZED "enqueue" (fn () =>
496 val (futures, queue') = fold_map enqueue es (! queue);
497 val _ = queue := queue';
498 val minimal = forall (not o Task_Queue.known_task queue') deps;
499 val _ = if minimal then signal work_available else ();
500 val _ = scheduler_check ();
505 (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
507 fun fork e = fork_pri 0 e;
514 NONE => Exn.Exn (Fail "Unfinished future")
516 if Exn.is_interrupt_exn res then
517 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
519 | SOME exn => Exn.Exn exn)
524 fun join_next deps = (*requires SYNCHRONIZED*)
525 if null deps then NONE
527 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
530 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
531 | (SOME work, deps') => SOME (work, deps'));
533 fun execute_work NONE = ()
534 | execute_work (SOME (work, deps')) =
535 (worker_joining (fn () => worker_exec work); join_work deps')
537 Multithreading.with_attributes Multithreading.no_interrupts
538 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
542 fun join_results xs =
545 if forall is_finished xs then ()
546 else if Multithreading.self_critical () then
547 error "Cannot join future values within critical section"
548 else if is_some (worker_task ()) then join_work (map task_of xs)
549 else List.app (ignore o Single_Assignment.await o result_of) xs;
550 in map get_result xs end;
554 fun join_result x = singleton join_results x;
555 fun joins xs = Par_Exn.release_all (join_results xs);
556 fun join x = Exn.release (join_result x);
559 (* fast-path versions -- bypassing task queue *)
561 fun value_result (res: 'a Exn.result) =
563 val task = Task_Queue.dummy_task;
564 val group = Task_Queue.group_of_task task;
565 val result = Single_Assignment.var "value" : 'a result;
566 val _ = assign_result group result res;
567 in Future {promised = false, task = task, result = result} end;
569 fun value x = value_result (Exn.Res x);
571 fun cond_forks args es =
572 if Multithreading.enabled () then forks args es
573 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
576 if is_finished x then value (f (join x))
579 val task = task_of x;
580 val group = Task_Queue.group_of_task task;
581 val (result, job) = future_job group true (fn () => f (join x));
583 val extended = SYNCHRONIZED "extend" (fn () =>
584 (case Task_Queue.extend task job (! queue) of
585 SOME queue' => (queue := queue'; true)
588 if extended then Future {promised = false, task = task, result = result}
590 (singleton o cond_forks)
591 {name = "map_future", group = SOME group, deps = [task],
592 pri = Task_Queue.pri_of_task task, interrupts = true}
593 (fn () => f (join x))
597 (* promised futures -- fulfilled by external means *)
599 fun promise_group group abort : 'a future =
601 val result = Single_Assignment.var "promise" : 'a result;
602 fun assign () = assign_result group result Exn.interrupt_exn
603 handle Fail _ => true
605 if Exn.is_interrupt exn
606 then raise Fail "Concurrent attempt to fulfill promise"
609 Multithreading.with_attributes Multithreading.no_interrupts
610 (fn _ => Exn.release (Exn.capture assign () before abort ()));
611 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
612 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
613 in Future {promised = true, task = task, result = result} end;
615 fun promise abort = promise_group (worker_subgroup ()) abort;
617 fun fulfill_result (Future {promised, task, result}) res =
618 if not promised then raise Fail "Not a promised future"
621 val group = Task_Queue.group_of_task task;
622 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
624 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
627 SYNCHRONIZED "fulfill_result" (fn () =>
628 Unsynchronized.change_result queue
629 (Task_Queue.dequeue_passive (Thread.self ()) task));
632 SOME true => worker_exec (task, [job])
634 | NONE => ignore (job (not (Task_Queue.is_canceled group))))
637 if is_some (Single_Assignment.peek result) then ()
638 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
641 fun fulfill x res = fulfill_result x (Exn.Res res);
646 fun terminate group =
649 SYNCHRONIZED "terminate" (fn () =>
650 let val _ = cancel_group_unsynchronized group;
651 in Task_Queue.group_tasks (! queue) group end);
653 if null tasks then ()
656 {name = "terminate", group = SOME (new_group NONE),
657 deps = tasks, pri = 0, interrupts = false} I
665 if Multithreading.available then
666 SYNCHRONIZED "shutdown" (fn () =>
667 while scheduler_active () do
668 (wait scheduler_event; broadcast_work ()))
672 (*final declarations of this structure!*)
673 val map = map_future;
677 type 'a future = 'a Future.future;