1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes the immediate peers to be
30 interrupted eventually (i.e. none by default). Interrupted tasks
31 that lack regular result information, will pick up parallel
32 exceptions from the cumulative group context (as Par_Exn).
34 * Future task groups may be canceled: present and future group
35 members will be interrupted eventually.
37 * Promised "passive" futures are fulfilled by external means. There
38 is no associated evaluation task, but other futures can depend on
39 them via regular join operations.
44 type task = Task_Queue.task
45 type group = Task_Queue.group
46 val new_group: group option -> group
47 val worker_task: unit -> task option
48 val worker_group: unit -> group option
49 val worker_subgroup: unit -> group
51 val task_of: 'a future -> task
52 val peek: 'a future -> 'a Exn.result option
53 val is_finished: 'a future -> bool
54 val interruptible_task: ('a -> 'b) -> 'a -> 'b
55 val cancel_group: group -> unit
56 val cancel: 'a future -> unit
57 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
58 val default_params: params
59 val forks: params -> (unit -> 'a) list -> 'a future list
60 val fork_pri: int -> (unit -> 'a) -> 'a future
61 val fork: (unit -> 'a) -> 'a future
62 val join_results: 'a future list -> 'a Exn.result list
63 val join_result: 'a future -> 'a Exn.result
64 val joins: 'a future list -> 'a list
65 val join: 'a future -> 'a
66 val value_result: 'a Exn.result -> 'a future
67 val value: 'a -> 'a future
68 val cond_forks: params -> (unit -> 'a) list -> 'a future list
69 val map: ('a -> 'b) -> 'a future -> 'b future
70 val promise_group: group -> (unit -> unit) -> 'a future
71 val promise: (unit -> unit) -> 'a future
72 val fulfill_result: 'a future -> 'a Exn.result -> unit
73 val fulfill: 'a future -> 'a -> unit
74 val terminate: group -> unit
75 val shutdown: unit -> unit
78 structure Future: FUTURE =
83 type task = Task_Queue.task;
84 type group = Task_Queue.group;
85 val new_group = Task_Queue.new_group;
91 val tag = Universal.tag () : task option Universal.tag;
93 fun worker_task () = the_default NONE (Thread.getLocal tag);
94 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
97 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
98 fun worker_subgroup () = new_group (worker_group ());
100 fun worker_joining e =
101 (case worker_task () of
103 | SOME task => Task_Queue.joining task e);
105 fun worker_waiting deps e =
106 (case worker_task () of
108 | SOME task => Task_Queue.waiting task deps e);
111 (* datatype future *)
113 type 'a result = 'a Exn.result Single_Assignment.var;
115 datatype 'a future = Future of
120 fun task_of (Future {task, ...}) = task;
121 fun result_of (Future {result, ...}) = result;
123 fun peek x = Single_Assignment.peek (result_of x);
124 fun is_finished x = is_some (peek x);
130 (* synchronization *)
132 val scheduler_event = ConditionVar.conditionVar ();
133 val work_available = ConditionVar.conditionVar ();
134 val work_finished = ConditionVar.conditionVar ();
137 val lock = Mutex.mutex ();
140 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
142 fun wait cond = (*requires SYNCHRONIZED*)
143 Multithreading.sync_wait NONE NONE cond lock;
145 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
146 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
148 fun signal cond = (*requires SYNCHRONIZED*)
149 ConditionVar.signal cond;
151 fun broadcast cond = (*requires SYNCHRONIZED*)
152 ConditionVar.broadcast cond;
154 fun broadcast_work () = (*requires SYNCHRONIZED*)
155 (ConditionVar.broadcast work_available;
156 ConditionVar.broadcast work_finished);
163 val queue = Unsynchronized.ref Task_Queue.empty;
164 val next = Unsynchronized.ref 0;
165 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
166 val canceled = Unsynchronized.ref ([]: group list);
167 val do_shutdown = Unsynchronized.ref false;
168 val max_workers = Unsynchronized.ref 0;
169 val max_active = Unsynchronized.ref 0;
170 val worker_trend = Unsynchronized.ref 0;
172 datatype worker_state = Working | Waiting | Sleeping;
173 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
175 fun count_workers state = (*requires SYNCHRONIZED*)
176 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
179 (* cancellation primitives *)
181 fun cancel_now group = (*requires SYNCHRONIZED*)
183 val running = Task_Queue.cancel (! queue) group;
184 val _ = running |> List.app (fn thread =>
185 if Simple_Thread.is_self thread then ()
186 else Simple_Thread.interrupt_unsynchronized thread);
189 fun cancel_all () = (*requires SYNCHRONIZED*)
191 val (groups, threads) = Task_Queue.cancel_all (! queue);
192 val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
195 fun cancel_later group = (*requires SYNCHRONIZED*)
196 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
197 broadcast scheduler_event);
199 fun interruptible_task f x =
200 (if Multithreading.available then
201 Multithreading.with_attributes
202 (if is_some (worker_task ())
203 then Multithreading.private_interrupts
204 else Multithreading.public_interrupts)
206 else interruptible f x)
207 before Multithreading.interrupted ();
212 fun worker_exec (task, jobs) =
214 val group = Task_Queue.group_of_task task;
215 val valid = not (Task_Queue.is_canceled group);
217 Task_Queue.running task (fn () =>
218 setmp_worker_task task (fn () =>
219 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
220 val _ = Multithreading.tracing 2 (fn () =>
222 val s = Task_Queue.str_of_task_groups task;
223 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
224 val (run, wait, deps) = Task_Queue.timing_of_task task;
225 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
226 val _ = SYNCHRONIZED "finish" (fn () =>
228 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
229 val test = Exn.capture Multithreading.interrupted ();
231 if ok andalso not (Exn.is_interrupt_exn test) then ()
232 else if null (cancel_now group) then ()
233 else cancel_later group;
234 val _ = broadcast work_finished;
235 val _ = if maximal then () else signal work_available;
239 fun worker_wait active cond = (*requires SYNCHRONIZED*)
242 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
244 | NONE => raise Fail "Unregistered worker thread");
245 val _ = state := (if active then Waiting else Sleeping);
247 val _ = state := Working;
250 fun worker_next () = (*requires SYNCHRONIZED*)
251 if length (! workers) > ! max_workers then
252 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
253 signal work_available;
255 else if count_workers Working > ! max_active then
256 (worker_wait false work_available; worker_next ())
258 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
259 NONE => (worker_wait false work_available; worker_next ())
260 | some => (signal work_available; some));
262 fun worker_loop name =
263 (case SYNCHRONIZED name (fn () => worker_next ()) of
265 | SOME work => (worker_exec work; worker_loop name));
267 fun worker_start name = (*requires SYNCHRONIZED*)
268 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
269 Unsynchronized.ref Working));
274 val status_ticks = Unsynchronized.ref 0;
276 val last_round = Unsynchronized.ref Time.zeroTime;
277 val next_round = seconds 0.05;
279 fun scheduler_next () = (*requires SYNCHRONIZED*)
281 val now = Time.now ();
282 val tick = Time.<= (Time.+ (! last_round, next_round), now);
283 val _ = if tick then last_round := now else ();
286 (* queue and worker status *)
289 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
291 if tick andalso ! status_ticks = 0 then
292 Multithreading.tracing 1 (fn () =>
294 val {ready, pending, running, passive} = Task_Queue.status (! queue);
295 val total = length (! workers);
296 val active = count_workers Working;
297 val waiting = count_workers Waiting;
299 "SCHEDULE " ^ Time.toString now ^ ": " ^
300 string_of_int ready ^ " ready, " ^
301 string_of_int pending ^ " pending, " ^
302 string_of_int running ^ " running, " ^
303 string_of_int passive ^ " passive; " ^
304 string_of_int total ^ " workers, " ^
305 string_of_int active ^ " active, " ^
306 string_of_int waiting ^ " waiting "
311 if forall (Thread.isActive o #1) (! workers) then ()
314 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
315 val _ = workers := alive;
317 Multithreading.tracing 0 (fn () =>
318 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
322 (* worker pool adjustments *)
324 val max_active0 = ! max_active;
325 val max_workers0 = ! max_workers;
327 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
328 val _ = max_active := m;
331 if ! do_shutdown then 0
332 else if m = 9999 then 1
333 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
335 if tick andalso mm > ! max_workers then
336 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
337 else if tick andalso mm < ! max_workers then
338 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
341 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
343 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
344 max_workers := Int.min (mm, 2 * m)
347 val missing = ! max_workers - length (! workers);
350 funpow missing (fn () =>
351 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
355 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
356 else signal work_available;
359 (* canceled groups *)
362 if null (! canceled) then ()
364 (Multithreading.tracing 1 (fn () =>
365 string_of_int (length (! canceled)) ^ " canceled groups");
366 Unsynchronized.change canceled (filter_out (null o cancel_now));
372 val _ = Exn.release (wait_timeout next_round scheduler_event);
377 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
378 val continue = not (! do_shutdown andalso null (! workers));
379 val _ = if continue then () else scheduler := NONE;
381 val _ = broadcast scheduler_event;
384 if Exn.is_interrupt exn then
385 (Multithreading.tracing 1 (fn () => "Interrupt");
386 List.app cancel_later (cancel_all ());
387 broadcast_work (); true)
390 fun scheduler_loop () =
392 Multithreading.with_attributes
393 (Multithreading.sync_interrupts Multithreading.public_interrupts)
394 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
395 do (); last_round := Time.zeroTime);
397 fun scheduler_active () = (*requires SYNCHRONIZED*)
398 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
400 fun scheduler_check () = (*requires SYNCHRONIZED*)
401 (do_shutdown := false;
402 if scheduler_active () then ()
403 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
411 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
413 val _ = if null (cancel_now group) then () else cancel_later group;
414 val _ = signal work_available;
415 val _ = scheduler_check ();
418 fun cancel_group group =
419 SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
421 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
426 fun assign_result group result raw_res =
430 Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
432 val _ = Single_Assignment.assign result res
433 handle exn as Fail _ =>
434 (case Single_Assignment.peek result of
435 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
438 (case the (Single_Assignment.peek result) of
440 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
441 | Exn.Res _ => true);
444 fun future_job group interrupts (e: unit -> 'a) =
446 val result = Single_Assignment.var "future" : 'a result;
447 val pos = Position.thread_data ();
452 Exn.capture (fn () =>
453 Multithreading.with_attributes
455 then Multithreading.private_interrupts else Multithreading.no_interrupts)
456 (fn _ => Position.setmp_thread_data pos e ())) ()
457 else Exn.interrupt_exn;
458 in assign_result group result res end;
459 in (result, job) end;
464 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
465 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
467 fun forks ({name, group, deps, pri, interrupts}: params) es =
473 NONE => worker_subgroup ()
475 fun enqueue e queue =
477 val (result, job) = future_job grp interrupts e;
478 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
479 val future = Future {promised = false, task = task, result = result};
480 in (future, queue') end;
482 SYNCHRONIZED "enqueue" (fn () =>
484 val (futures, queue') = fold_map enqueue es (! queue);
485 val _ = queue := queue';
486 val minimal = forall (not o Task_Queue.known_task queue') deps;
487 val _ = if minimal then signal work_available else ();
488 val _ = scheduler_check ();
493 (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
495 fun fork e = fork_pri 0 e;
504 NONE => Exn.Exn (Fail "Unfinished future")
506 if Exn.is_interrupt_exn res then
507 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
509 | SOME exn => Exn.Exn exn)
512 fun join_next deps = (*requires SYNCHRONIZED*)
513 if null deps then NONE
515 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
518 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
519 | (SOME work, deps') => SOME (work, deps'));
521 fun execute_work NONE = ()
522 | execute_work (SOME (work, deps')) =
523 (worker_joining (fn () => worker_exec work); join_work deps')
525 Multithreading.with_attributes Multithreading.no_interrupts
526 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
530 fun join_results xs =
533 if forall is_finished xs then ()
534 else if Multithreading.self_critical () then
535 error "Cannot join future values within critical section"
536 else if is_some (worker_task ()) then join_work (map task_of xs)
537 else List.app (ignore o Single_Assignment.await o result_of) xs;
538 in map get_result xs end;
542 fun join_result x = singleton join_results x;
543 fun joins xs = Par_Exn.release_all (join_results xs);
544 fun join x = Exn.release (join_result x);
547 (* fast-path versions -- bypassing task queue *)
549 fun value_result (res: 'a Exn.result) =
551 val task = Task_Queue.dummy_task;
552 val group = Task_Queue.group_of_task task;
553 val result = Single_Assignment.var "value" : 'a result;
554 val _ = assign_result group result res;
555 in Future {promised = false, task = task, result = result} end;
557 fun value x = value_result (Exn.Res x);
559 fun cond_forks args es =
560 if Multithreading.enabled () then forks args es
561 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
565 val task = task_of x;
566 val group = new_group (SOME (Task_Queue.group_of_task task));
567 val (result, job) = future_job group true (fn () => f (join x));
569 val extended = SYNCHRONIZED "extend" (fn () =>
570 (case Task_Queue.extend task job (! queue) of
571 SOME queue' => (queue := queue'; true)
574 if extended then Future {promised = false, task = task, result = result}
576 (singleton o cond_forks)
577 {name = "map_future", group = SOME group, deps = [task],
578 pri = Task_Queue.pri_of_task task, interrupts = true}
579 (fn () => f (join x))
583 (* promised futures -- fulfilled by external means *)
585 fun promise_group group abort : 'a future =
587 val result = Single_Assignment.var "promise" : 'a result;
588 fun assign () = assign_result group result Exn.interrupt_exn
589 handle Fail _ => true
591 if Exn.is_interrupt exn
592 then raise Fail "Concurrent attempt to fulfill promise"
595 Multithreading.with_attributes Multithreading.no_interrupts
596 (fn _ => Exn.release (Exn.capture assign () before abort ()));
597 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
598 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
599 in Future {promised = true, task = task, result = result} end;
601 fun promise abort = promise_group (worker_subgroup ()) abort;
603 fun fulfill_result (Future {promised, task, result}) res =
604 if not promised then raise Fail "Not a promised future"
607 val group = Task_Queue.group_of_task task;
608 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
610 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
613 SYNCHRONIZED "fulfill_result" (fn () =>
614 Unsynchronized.change_result queue
615 (Task_Queue.dequeue_passive (Thread.self ()) task));
618 SOME true => worker_exec (task, [job])
620 | NONE => ignore (job (not (Task_Queue.is_canceled group))))
623 if is_some (Single_Assignment.peek result) then ()
624 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
627 fun fulfill x res = fulfill_result x (Exn.Res res);
632 fun terminate group =
635 SYNCHRONIZED "terminate" (fn () =>
636 let val _ = cancel_group_unsynchronized group;
637 in Task_Queue.group_tasks (! queue) group end);
639 if null tasks then ()
642 {name = "terminate", group = SOME (new_group NONE),
643 deps = tasks, pri = 0, interrupts = false} I
651 if Multithreading.available then
652 SYNCHRONIZED "shutdown" (fn () =>
653 while scheduler_active () do
654 (wait scheduler_event; broadcast_work ()))
660 fun queue_status () = Task_Queue.status (! queue);
663 (*final declarations of this structure!*)
664 val map = map_future;
668 type 'a future = 'a Future.future;