1 (* Title: Pure/Concurrent/future.ML
4 Value-oriented parallelism via futures and promises. See also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join. The idea is to model parallel
12 value-oriented computations (not communicating processes).
14 * Forked futures are evaluated spontaneously by a farm of worker
15 threads in the background; join resynchronizes the computation and
16 delivers results (values or exceptions).
18 * The pool of worker threads is limited, usually in correlation with
19 the number of physical cores on the machine. Note that allocation
20 of runtime resources may be distorted either if workers yield CPU
21 time (e.g. via system sleep or wait operations), or if non-worker
22 threads contend for significant runtime resources independently.
23 There is a limited number of replacement worker threads that get
24 activated in certain explicit wait conditions.
26 * Future tasks are organized in groups, which are block-structured.
27 When forking a new new task, the default is to open an individual
28 subgroup, unless some common group is specified explicitly.
29 Failure of one group member causes the immediate peers to be
30 interrupted eventually (i.e. none by default). Interrupted tasks
31 that lack regular result information, will pick up parallel
32 exceptions from the cumulative group context (as Par_Exn).
34 * Promised "passive" futures are fulfilled by external means. There
35 is no associated evaluation task, but other futures can depend on
36 them via regular join operations.
41 val worker_task: unit -> Task_Queue.task option
42 val worker_group: unit -> Task_Queue.group option
43 val worker_subgroup: unit -> Task_Queue.group
45 val task_of: 'a future -> Task_Queue.task
46 val peek: 'a future -> 'a Exn.result option
47 val is_finished: 'a future -> bool
48 val get_finished: 'a future -> 'a
49 val interruptible_task: ('a -> 'b) -> 'a -> 'b
50 val cancel_group: Task_Queue.group -> unit
51 val cancel: 'a future -> unit
53 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
54 pri: int, interrupts: bool}
55 val forks: fork_params -> (unit -> 'a) list -> 'a future list
56 val fork_pri: int -> (unit -> 'a) -> 'a future
57 val fork: (unit -> 'a) -> 'a future
58 val join_results: 'a future list -> 'a Exn.result list
59 val join_result: 'a future -> 'a Exn.result
60 val join: 'a future -> 'a
61 val value_result: 'a Exn.result -> 'a future
62 val value: 'a -> 'a future
63 val map: ('a -> 'b) -> 'a future -> 'b future
64 val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
65 val promise_group: Task_Queue.group -> 'a future
66 val promise: unit -> 'a future
67 val fulfill_result: 'a future -> 'a Exn.result -> unit
68 val fulfill: 'a future -> 'a -> unit
69 val shutdown: unit -> unit
70 val status: (unit -> 'a) -> 'a
73 structure Future: FUTURE =
81 val tag = Universal.tag () : Task_Queue.task option Universal.tag;
83 fun worker_task () = the_default NONE (Thread.getLocal tag);
84 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
87 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
90 fun worker_joining e =
91 (case worker_task () of
93 | SOME task => Task_Queue.joining task e);
95 fun worker_waiting deps e =
96 (case worker_task () of
98 | SOME task => Task_Queue.waiting task deps e);
101 (* datatype future *)
103 type 'a result = 'a Exn.result Single_Assignment.var;
105 datatype 'a future = Future of
107 task: Task_Queue.task,
110 fun task_of (Future {task, ...}) = task;
111 fun result_of (Future {result, ...}) = result;
113 fun peek x = Single_Assignment.peek (result_of x);
114 fun is_finished x = is_some (peek x);
118 SOME res => Exn.release res
119 | NONE => raise Fail "Unfinished future evaluation");
125 (* synchronization *)
127 val scheduler_event = ConditionVar.conditionVar ();
128 val work_available = ConditionVar.conditionVar ();
129 val work_finished = ConditionVar.conditionVar ();
132 val lock = Mutex.mutex ();
135 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
137 fun wait cond = (*requires SYNCHRONIZED*)
138 Multithreading.sync_wait NONE NONE cond lock;
140 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
141 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
143 fun signal cond = (*requires SYNCHRONIZED*)
144 ConditionVar.signal cond;
146 fun broadcast cond = (*requires SYNCHRONIZED*)
147 ConditionVar.broadcast cond;
149 fun broadcast_work () = (*requires SYNCHRONIZED*)
150 (ConditionVar.broadcast work_available;
151 ConditionVar.broadcast work_finished);
158 val queue = Unsynchronized.ref Task_Queue.empty;
159 val next = Unsynchronized.ref 0;
160 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
161 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
162 val do_shutdown = Unsynchronized.ref false;
163 val max_workers = Unsynchronized.ref 0;
164 val max_active = Unsynchronized.ref 0;
165 val worker_trend = Unsynchronized.ref 0;
167 datatype worker_state = Working | Waiting | Sleeping;
168 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
170 fun count_workers state = (*requires SYNCHRONIZED*)
171 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
174 (* cancellation primitives *)
176 fun interruptible_task f x =
177 if Multithreading.available then
178 Multithreading.with_attributes
179 (if is_some (worker_task ())
180 then Multithreading.private_interrupts
181 else Multithreading.public_interrupts)
183 else interruptible f x;
185 fun cancel_now group = (*requires SYNCHRONIZED*)
186 Task_Queue.cancel (! queue) group;
188 fun cancel_later group = (*requires SYNCHRONIZED*)
189 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
190 broadcast scheduler_event);
195 fun worker_exec (task, jobs) =
197 val group = Task_Queue.group_of_task task;
198 val valid = not (Task_Queue.is_canceled group);
200 Task_Queue.running task (fn () =>
201 setmp_worker_task task (fn () =>
202 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
203 val _ = Multithreading.tracing 2 (fn () =>
205 val s = Task_Queue.str_of_task_groups task;
206 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
207 val (run, wait, deps) = Task_Queue.timing_of_task task;
208 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
209 val _ = SYNCHRONIZED "finish" (fn () =>
211 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
212 val _ = Exn.capture Multithreading.interrupted ();
215 else if cancel_now group then ()
216 else cancel_later group;
217 val _ = broadcast work_finished;
218 val _ = if maximal then () else signal work_available;
222 fun worker_wait active cond = (*requires SYNCHRONIZED*)
225 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
227 | NONE => raise Fail "Unregistered worker thread");
228 val _ = state := (if active then Waiting else Sleeping);
230 val _ = state := Working;
233 fun worker_next () = (*requires SYNCHRONIZED*)
234 if length (! workers) > ! max_workers then
235 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
236 signal work_available;
238 else if count_workers Working > ! max_active then
239 (worker_wait false work_available; worker_next ())
241 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
242 NONE => (worker_wait false work_available; worker_next ())
243 | some => (signal work_available; some));
245 fun worker_loop name =
246 (case SYNCHRONIZED name (fn () => worker_next ()) of
248 | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name));
250 fun worker_start name = (*requires SYNCHRONIZED*)
251 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
252 Unsynchronized.ref Working));
257 val status_ticks = Unsynchronized.ref 0;
259 val last_round = Unsynchronized.ref Time.zeroTime;
260 val next_round = seconds 0.05;
262 fun scheduler_next () = (*requires SYNCHRONIZED*)
264 val now = Time.now ();
265 val tick = Time.<= (Time.+ (! last_round, next_round), now);
266 val _ = if tick then last_round := now else ();
269 (* queue and worker status *)
272 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
274 if tick andalso ! status_ticks = 0 then
275 Multithreading.tracing 1 (fn () =>
277 val {ready, pending, running, passive} = Task_Queue.status (! queue);
278 val total = length (! workers);
279 val active = count_workers Working;
280 val waiting = count_workers Waiting;
282 "SCHEDULE " ^ Time.toString now ^ ": " ^
283 string_of_int ready ^ " ready, " ^
284 string_of_int pending ^ " pending, " ^
285 string_of_int running ^ " running, " ^
286 string_of_int passive ^ " passive; " ^
287 string_of_int total ^ " workers, " ^
288 string_of_int active ^ " active, " ^
289 string_of_int waiting ^ " waiting "
294 if forall (Thread.isActive o #1) (! workers) then ()
297 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
298 val _ = workers := alive;
300 Multithreading.tracing 0 (fn () =>
301 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
305 (* worker pool adjustments *)
307 val max_active0 = ! max_active;
308 val max_workers0 = ! max_workers;
310 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
311 val _ = max_active := m;
314 if ! do_shutdown then 0
315 else if m = 9999 then 1
316 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
318 if tick andalso mm > ! max_workers then
319 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
320 else if tick andalso mm < ! max_workers then
321 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
324 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
326 else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
327 max_workers := Int.min (mm, 2 * m)
330 val missing = ! max_workers - length (! workers);
333 funpow missing (fn () =>
334 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
338 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
339 else signal work_available;
342 (* canceled groups *)
345 if null (! canceled) then ()
347 (Multithreading.tracing 1 (fn () =>
348 string_of_int (length (! canceled)) ^ " canceled groups");
349 Unsynchronized.change canceled (filter_out cancel_now);
355 val _ = Exn.release (wait_timeout next_round scheduler_event);
360 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
361 val continue = not (! do_shutdown andalso null (! workers));
362 val _ = if continue then () else scheduler := NONE;
364 val _ = broadcast scheduler_event;
367 if Exn.is_interrupt exn then
368 (Multithreading.tracing 1 (fn () => "Interrupt");
369 List.app cancel_later (Task_Queue.cancel_all (! queue));
370 broadcast_work (); true)
373 fun scheduler_loop () =
375 Multithreading.with_attributes
376 (Multithreading.sync_interrupts Multithreading.public_interrupts)
377 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
378 do (); last_round := Time.zeroTime);
380 fun scheduler_active () = (*requires SYNCHRONIZED*)
381 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
383 fun scheduler_check () = (*requires SYNCHRONIZED*)
384 (do_shutdown := false;
385 if scheduler_active () then ()
386 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
394 (*cancel: present and future group members will be interrupted eventually*)
395 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
396 (if cancel_now group then () else cancel_later group;
397 signal work_available; scheduler_check ()));
399 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
404 fun assign_result group result raw_res =
408 Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
410 val _ = Single_Assignment.assign result res
411 handle exn as Fail _ =>
412 (case Single_Assignment.peek result of
413 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
416 (case the (Single_Assignment.peek result) of
418 (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
419 | Exn.Res _ => true);
422 fun future_job group interrupts (e: unit -> 'a) =
424 val result = Single_Assignment.var "future" : 'a result;
425 val pos = Position.thread_data ();
430 Exn.capture (fn () =>
431 Multithreading.with_attributes
433 then Multithreading.private_interrupts else Multithreading.no_interrupts)
434 (fn _ => Position.setmp_thread_data pos e ()) before
435 Multithreading.interrupted ()) ()
436 else Exn.interrupt_exn;
437 in assign_result group result res end;
438 in (result, job) end;
444 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
445 pri: int, interrupts: bool};
447 fun forks ({name, group, deps, pri, interrupts}: fork_params) es =
453 NONE => worker_subgroup ()
455 fun enqueue e queue =
457 val (result, job) = future_job grp interrupts e;
458 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
459 val future = Future {promised = false, task = task, result = result};
460 in (future, queue') end;
462 SYNCHRONIZED "enqueue" (fn () =>
464 val (futures, queue') = fold_map enqueue es (! queue);
465 val _ = queue := queue';
466 val minimal = forall (not o Task_Queue.known_task queue') deps;
467 val _ = if minimal then signal work_available else ();
468 val _ = scheduler_check ();
473 singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e;
475 fun fork e = fork_pri 0 e;
484 NONE => Exn.Exn (Fail "Unfinished future")
486 if Exn.is_interrupt_exn res then
487 (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
489 | SOME exn => Exn.Exn exn)
492 fun join_next deps = (*requires SYNCHRONIZED*)
493 if null deps then NONE
495 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
498 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
499 | (SOME work, deps') => SOME (work, deps'));
501 fun execute_work NONE = ()
502 | execute_work (SOME (work, deps')) =
503 (worker_joining (fn () => worker_exec work); join_work deps')
505 Multithreading.with_attributes Multithreading.no_interrupts
506 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
510 fun join_results xs =
513 if forall is_finished xs then ()
514 else if Multithreading.self_critical () then
515 error "Cannot join future values within critical section"
516 else if is_some (worker_task ()) then join_work (map task_of xs)
517 else List.app (ignore o Single_Assignment.await o result_of) xs;
518 in map get_result xs end;
522 fun join_result x = singleton join_results x;
523 fun join x = Exn.release (join_result x);
526 (* fast-path versions -- bypassing task queue *)
528 fun value_result (res: 'a Exn.result) =
530 val task = Task_Queue.dummy_task ();
531 val group = Task_Queue.group_of_task task;
532 val result = Single_Assignment.var "value" : 'a result;
533 val _ = assign_result group result res;
534 in Future {promised = false, task = task, result = result} end;
536 fun value x = value_result (Exn.Res x);
540 val task = task_of x;
541 val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
542 val (result, job) = future_job group true (fn () => f (join x));
544 val extended = SYNCHRONIZED "extend" (fn () =>
545 (case Task_Queue.extend task job (! queue) of
546 SOME queue' => (queue := queue'; true)
549 if extended then Future {promised = false, task = task, result = result}
552 (forks {name = "Future.map", group = SOME group, deps = [task],
553 pri = Task_Queue.pri_of_task task, interrupts = true})
554 (fn () => f (join x))
557 fun cond_forks args es =
558 if Multithreading.enabled () then forks args es
559 else map (fn e => value_result (Exn.interruptible_capture e ())) es;
562 (* promised futures -- fulfilled by external means *)
564 fun promise_group group : 'a future =
566 val result = Single_Assignment.var "promise" : 'a result;
567 fun abort () = assign_result group result Exn.interrupt_exn
568 handle Fail _ => true
570 if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
572 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
573 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
574 in Future {promised = true, task = task, result = result} end;
576 fun promise () = promise_group (worker_subgroup ());
578 fun fulfill_result (Future {promised, task, result}) res =
579 if not promised then raise Fail "Not a promised future"
582 val group = Task_Queue.group_of_task task;
583 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
585 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
588 SYNCHRONIZED "fulfill_result" (fn () =>
589 Unsynchronized.change_result queue
590 (Task_Queue.dequeue_passive (Thread.self ()) task));
591 in if still_passive then worker_exec (task, [job]) else () end);
593 if is_some (Single_Assignment.peek result) then ()
594 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
597 fun fulfill x res = fulfill_result x (Exn.Res res);
603 if Multithreading.available then
604 SYNCHRONIZED "shutdown" (fn () =>
605 while scheduler_active () do
606 (wait scheduler_event; broadcast_work ()))
615 (case worker_task () of
617 | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
618 val _ = Output.status (Markup.markup_only (task_props Markup.forked));
619 val x = e (); (*sic -- report "joined" only for success*)
620 val _ = Output.status (Markup.markup_only (task_props Markup.joined));
624 (*final declarations of this structure!*)
625 val map = map_future;
629 type 'a future = 'a Future.future;