more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes the immediate peers to be
30 interrupted eventually (i.e. none by default). Interrupted tasks
31 that lack regular result information, will pick up parallel
32 exceptions from the cumulative group context (as Par_Exn).
34 * Promised "passive" futures are fulfilled by external means. There
35 is no associated evaluation task, but other futures can depend on
36 them via regular join operations.
41 val worker_task: unit -> Task_Queue.task option
42 val worker_group: unit -> Task_Queue.group option
43 val worker_subgroup: unit -> Task_Queue.group
45 val task_of: 'a future -> Task_Queue.task
46 val peek: 'a future -> 'a Exn.result option
47 val is_finished: 'a future -> bool
48 val get_finished: 'a future -> 'a
49 val interruptible_task: ('a -> 'b) -> 'a -> 'b
50 val cancel_group: Task_Queue.group -> unit
51 val cancel: 'a future -> unit
53 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
54 pri: int, interrupts: bool}
55 val forks: fork_params -> (unit -> 'a) list -> 'a future list
56 val fork_pri: int -> (unit -> 'a) -> 'a future
57 val fork: (unit -> 'a) -> 'a future
58 val join_results: 'a future list -> 'a Exn.result list
59 val join_result: 'a future -> 'a Exn.result
60 val join: 'a future -> 'a
61 val value_result: 'a Exn.result -> 'a future
62 val value: 'a -> 'a future
63 val map: ('a -> 'b) -> 'a future -> 'b future
64 val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
65 val promise_group: Task_Queue.group -> 'a future
66 val promise: unit -> 'a future
67 val fulfill_result: 'a future -> 'a Exn.result -> unit
68 val fulfill: 'a future -> 'a -> unit
69 val shutdown: unit -> unit
70 val status: (unit -> 'a) -> 'a
73 structure Future: FUTURE =
81 val tag = Universal.tag () : Task_Queue.task option Universal.tag;
83 fun worker_task () = the_default NONE (Thread.getLocal tag);
84 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
87 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
90 fun worker_joining e =
91 (case worker_task () of
93 | SOME task => Task_Queue.joining task e);
95 fun worker_waiting deps e =
96 (case worker_task () of
98 | SOME task => Task_Queue.waiting task deps e);
101 (* datatype future *)
103 type 'a result = 'a Exn.result Single_Assignment.var;
105 datatype 'a future = Future of
107 task: Task_Queue.task,
110 fun task_of (Future {task, ...}) = task;
111 fun result_of (Future {result, ...}) = result;
113 fun peek x = Single_Assignment.peek (result_of x);
114 fun is_finished x = is_some (peek x);
118 SOME res => Exn.release res
119 | NONE => raise Fail "Unfinished future evaluation");
125 (* synchronization *)
127 val scheduler_event = ConditionVar.conditionVar ();
128 val work_available = ConditionVar.conditionVar ();
129 val work_finished = ConditionVar.conditionVar ();
132 val lock = Mutex.mutex ();
135 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
137 fun wait cond = (*requires SYNCHRONIZED*)
138 Multithreading.sync_wait NONE NONE cond lock;
140 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
141 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
143 fun signal cond = (*requires SYNCHRONIZED*)
144 ConditionVar.signal cond;
146 fun broadcast cond = (*requires SYNCHRONIZED*)
147 ConditionVar.broadcast cond;
149 fun broadcast_work () = (*requires SYNCHRONIZED*)
150 (ConditionVar.broadcast work_available;
151 ConditionVar.broadcast work_finished);
158 val queue = Unsynchronized.ref Task_Queue.empty;
159 val next = Unsynchronized.ref 0;
160 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
161 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
162 val do_shutdown = Unsynchronized.ref false;
163 val max_workers = Unsynchronized.ref 0;
164 val max_active = Unsynchronized.ref 0;
165 val worker_trend = Unsynchronized.ref 0;
167 datatype worker_state = Working | Waiting | Sleeping;
168 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
170 fun count_workers state = (*requires SYNCHRONIZED*)
171 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
174 (* cancellation primitives *)
176 fun interruptible_task f x =
177 (if Multithreading.available then
178 Multithreading.with_attributes
179 (if is_some (worker_task ())
180 then Multithreading.private_interrupts
181 else Multithreading.public_interrupts)
183 else interruptible f x)
184 before Multithreading.interrupted ();
186 fun cancel_now group = (*requires SYNCHRONIZED*)
187 Task_Queue.cancel (! queue) group;
189 fun cancel_later group = (*requires SYNCHRONIZED*)
190 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
191 broadcast scheduler_event);
196 fun worker_exec (task, jobs) =
198 val group = Task_Queue.group_of_task task;
199 val valid = not (Task_Queue.is_canceled group);
201 Task_Queue.running task (fn () =>
202 setmp_worker_task task (fn () =>
203 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
204 val _ = Multithreading.tracing 2 (fn () =>
206 val s = Task_Queue.str_of_task_groups task;
207 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
208 val (run, wait, deps) = Task_Queue.timing_of_task task;
209 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
210 val _ = SYNCHRONIZED "finish" (fn () =>
212 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
213 val test = Exn.capture Multithreading.interrupted ();
215 if ok andalso not (Exn.is_interrupt_exn test) then ()
216 else if cancel_now group then ()
217 else cancel_later group;
218 val _ = broadcast work_finished;
219 val _ = if maximal then () else signal work_available;
223 fun worker_wait active cond = (*requires SYNCHRONIZED*)
226 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
228 | NONE => raise Fail "Unregistered worker thread");
229 val _ = state := (if active then Waiting else Sleeping);
231 val _ = state := Working;
234 fun worker_next () = (*requires SYNCHRONIZED*)
235 if length (! workers) > ! max_workers then
236 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
237 signal work_available;
239 else if count_workers Working > ! max_active then
240 (worker_wait false work_available; worker_next ())
242 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
243 NONE => (worker_wait false work_available; worker_next ())
244 | some => (signal work_available; some));
246 fun worker_loop name =
247 (case SYNCHRONIZED name (fn () => worker_next ()) of
249 | SOME work => (worker_exec work; worker_loop name));
251 fun worker_start name = (*requires SYNCHRONIZED*)
252 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
253 Unsynchronized.ref Working));
258 val status_ticks = Unsynchronized.ref 0;
260 val last_round = Unsynchronized.ref Time.zeroTime;
261 val next_round = seconds 0.05;
263 fun scheduler_next () = (*requires SYNCHRONIZED*)
265 val now = Time.now ();
266 val tick = Time.<= (Time.+ (! last_round, next_round), now);
267 val _ = if tick then last_round := now else ();
270 (* queue and worker status *)
273 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
275 if tick andalso ! status_ticks = 0 then
276 Multithreading.tracing 1 (fn () =>
278 val {ready, pending, running, passive} = Task_Queue.status (! queue);
279 val total = length (! workers);
280 val active = count_workers Working;
281 val waiting = count_workers Waiting;
283 "SCHEDULE " ^ Time.toString now ^ ": " ^
284 string_of_int ready ^ " ready, " ^
285 string_of_int pending ^ " pending, " ^
286 string_of_int running ^ " running, " ^
287 string_of_int passive ^ " passive; " ^
288 string_of_int total ^ " workers, " ^
289 string_of_int active ^ " active, " ^
290 string_of_int waiting ^ " waiting "
295 if forall (Thread.isActive o #1) (! workers) then ()
298 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
299 val _ = workers := alive;
301 Multithreading.tracing 0 (fn () =>
302 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
306 (* worker pool adjustments *)
308 val max_active0 = ! max_active;
309 val max_workers0 = ! max_workers;
311 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
312 val _ = max_active := m;
315 if ! do_shutdown then 0
316 else if m = 9999 then 1
317 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
319 if tick andalso mm > ! max_workers then
320 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
321 else if tick andalso mm < ! max_workers then
322 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
325 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
327 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
328 max_workers := Int.min (mm, 2 * m)
331 val missing = ! max_workers - length (! workers);
334 funpow missing (fn () =>
335 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
339 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
340 else signal work_available;
343 (* canceled groups *)
346 if null (! canceled) then ()
348 (Multithreading.tracing 1 (fn () =>
349 string_of_int (length (! canceled)) ^ " canceled groups");
350 Unsynchronized.change canceled (filter_out cancel_now);
356 val _ = Exn.release (wait_timeout next_round scheduler_event);
361 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
362 val continue = not (! do_shutdown andalso null (! workers));
363 val _ = if continue then () else scheduler := NONE;
365 val _ = broadcast scheduler_event;
368 if Exn.is_interrupt exn then
369 (Multithreading.tracing 1 (fn () => "Interrupt");
370 List.app cancel_later (Task_Queue.cancel_all (! queue));
371 broadcast_work (); true)
374 fun scheduler_loop () =
376 Multithreading.with_attributes
377 (Multithreading.sync_interrupts Multithreading.public_interrupts)
378 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
379 do (); last_round := Time.zeroTime);
381 fun scheduler_active () = (*requires SYNCHRONIZED*)
382 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
384 fun scheduler_check () = (*requires SYNCHRONIZED*)
385 (do_shutdown := false;
386 if scheduler_active () then ()
387 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
395 (*cancel: present and future group members will be interrupted eventually*)
396 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
397 (if cancel_now group then () else cancel_later group;
398 signal work_available; scheduler_check ()));
400 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
405 fun assign_result group result raw_res =
409 Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
411 val _ = Single_Assignment.assign result res
412 handle exn as Fail _ =>
413 (case Single_Assignment.peek result of
414 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
417 (case the (Single_Assignment.peek result) of
419 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
420 | Exn.Res _ => true);
423 fun future_job group interrupts (e: unit -> 'a) =
425 val result = Single_Assignment.var "future" : 'a result;
426 val pos = Position.thread_data ();
431 Exn.capture (fn () =>
432 Multithreading.with_attributes
434 then Multithreading.private_interrupts else Multithreading.no_interrupts)
435 (fn _ => Position.setmp_thread_data pos e ())) ()
436 else Exn.interrupt_exn;
437 in assign_result group result res end;
438 in (result, job) end;
444 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
445 pri: int, interrupts: bool};
447 fun forks ({name, group, deps, pri, interrupts}: fork_params) es =
453 NONE => worker_subgroup ()
455 fun enqueue e queue =
457 val (result, job) = future_job grp interrupts e;
458 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
459 val future = Future {promised = false, task = task, result = result};
460 in (future, queue') end;
462 SYNCHRONIZED "enqueue" (fn () =>
464 val (futures, queue') = fold_map enqueue es (! queue);
465 val _ = queue := queue';
466 val minimal = forall (not o Task_Queue.known_task queue') deps;
467 val _ = if minimal then signal work_available else ();
468 val _ = scheduler_check ();
473 singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e;
475 fun fork e = fork_pri 0 e;
484 NONE => Exn.Exn (Fail "Unfinished future")
486 if Exn.is_interrupt_exn res then
487 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
489 | SOME exn => Exn.Exn exn)
492 fun join_next deps = (*requires SYNCHRONIZED*)
493 if null deps then NONE
495 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
498 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
499 | (SOME work, deps') => SOME (work, deps'));
501 fun execute_work NONE = ()
502 | execute_work (SOME (work, deps')) =
503 (worker_joining (fn () => worker_exec work); join_work deps')
505 Multithreading.with_attributes Multithreading.no_interrupts
506 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
510 fun join_results xs =
513 if forall is_finished xs then ()
514 else if Multithreading.self_critical () then
515 error "Cannot join future values within critical section"
516 else if is_some (worker_task ()) then join_work (map task_of xs)
517 else List.app (ignore o Single_Assignment.await o result_of) xs;
518 in map get_result xs end;
522 fun join_result x = singleton join_results x;
523 fun join x = Exn.release (join_result x);
526 (* fast-path versions -- bypassing task queue *)
528 fun value_result (res: 'a Exn.result) =
530 val task = Task_Queue.dummy_task ();
531 val group = Task_Queue.group_of_task task;
532 val result = Single_Assignment.var "value" : 'a result;
533 val _ = assign_result group result res;
534 in Future {promised = false, task = task, result = result} end;
536 fun value x = value_result (Exn.Res x);
540 val task = task_of x;
541 val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
542 val (result, job) = future_job group true (fn () => f (join x));
544 val extended = SYNCHRONIZED "extend" (fn () =>
545 (case Task_Queue.extend task job (! queue) of
546 SOME queue' => (queue := queue'; true)
549 if extended then Future {promised = false, task = task, result = result}
552 (forks {name = "Future.map", group = SOME group, deps = [task],
553 pri = Task_Queue.pri_of_task task, interrupts = true})
554 (fn () => f (join x))
557 fun cond_forks args es =
558 if Multithreading.enabled () then forks args es
559 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
562 (* promised futures -- fulfilled by external means *)
564 fun promise_group group : 'a future =
566 val result = Single_Assignment.var "promise" : 'a result;
567 fun abort () = assign_result group result Exn.interrupt_exn
568 handle Fail _ => true
570 if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
572 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
573 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
574 in Future {promised = true, task = task, result = result} end;
576 fun promise () = promise_group (worker_subgroup ());
578 fun fulfill_result (Future {promised, task, result}) res =
579 if not promised then raise Fail "Not a promised future"
582 val group = Task_Queue.group_of_task task;
583 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
585 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
588 SYNCHRONIZED "fulfill_result" (fn () =>
589 Unsynchronized.change_result queue
590 (Task_Queue.dequeue_passive (Thread.self ()) task));
591 in if still_passive then worker_exec (task, [job]) else () end);
593 if is_some (Single_Assignment.peek result) then ()
594 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
597 fun fulfill x res = fulfill_result x (Exn.Res res);
603 if Multithreading.available then
604 SYNCHRONIZED "shutdown" (fn () =>
605 while scheduler_active () do
606 (wait scheduler_event; broadcast_work ()))
615 (case worker_task () of
617 | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
618 val _ = Output.status (Markup.markup_only (task_props Markup.forked));
619 val x = e (); (*sic -- report "joined" only for success*)
620 val _ = Output.status (Markup.markup_only (task_props Markup.joined));
624 (*final declarations of this structure!*)
625 val map = map_future;
629 type 'a future = 'a Future.future;