1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join (and variants). The idea is to model
12 parallel value-oriented computations, but *not* communicating
15 * Futures are grouped; failure of one group member causes the whole
16 group to be interrupted eventually. Groups are block-structured.
18 * Forked futures are evaluated spontaneously by a farm of worker
19 threads in the background; join resynchronizes the computation and
20 delivers results (values or exceptions).
22 * The pool of worker threads is limited, usually in correlation with
23 the number of physical cores on the machine. Note that allocation
24 of runtime resources is distorted either if workers yield CPU time
25 (e.g. via system sleep or wait operations), or if non-worker
26 threads contend for significant runtime resources independently.
28 * Promised futures are fulfilled by external means. There is no
29 associated evaluation task, but other futures can depend on them
35 val worker_task: unit -> Task_Queue.task option
36 val worker_group: unit -> Task_Queue.group option
37 val worker_subgroup: unit -> Task_Queue.group
39 val task_of: 'a future -> Task_Queue.task
40 val peek: 'a future -> 'a Exn.result option
41 val is_finished: 'a future -> bool
43 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
44 (unit -> 'a) list -> 'a future list
45 val fork_pri: int -> (unit -> 'a) -> 'a future
46 val fork: (unit -> 'a) -> 'a future
47 val join_results: 'a future list -> 'a Exn.result list
48 val join_result: 'a future -> 'a Exn.result
49 val join: 'a future -> 'a
50 val value: 'a -> 'a future
51 val map: ('a -> 'b) -> 'a future -> 'b future
53 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
54 (unit -> 'a) list -> 'a future list
55 val promise_group: Task_Queue.group -> 'a future
56 val promise: unit -> 'a future
57 val fulfill_result: 'a future -> 'a Exn.result -> unit
58 val fulfill: 'a future -> 'a -> unit
59 val interruptible_task: ('a -> 'b) -> 'a -> 'b
60 val cancel_group: Task_Queue.group -> unit
61 val cancel: 'a future -> unit
62 val shutdown: unit -> unit
63 val status: (unit -> 'a) -> 'a
66 structure Future: FUTURE =
74 val tag = Universal.tag () : Task_Queue.task option Universal.tag;
76 fun worker_task () = the_default NONE (Thread.getLocal tag);
77 fun setmp_worker_task data f x =
78 Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
81 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
82 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
84 fun worker_joining e =
85 (case worker_task () of
87 | SOME task => Task_Queue.joining task e);
89 fun worker_waiting deps e =
90 (case worker_task () of
92 | SOME task => Task_Queue.waiting task deps e);
97 type 'a result = 'a Exn.result Single_Assignment.var;
99 datatype 'a future = Future of
101 task: Task_Queue.task,
104 fun task_of (Future {task, ...}) = task;
105 fun result_of (Future {result, ...}) = result;
107 fun peek x = Single_Assignment.peek (result_of x);
108 fun is_finished x = is_some (peek x);
110 fun assign_result group result res =
112 val _ = Single_Assignment.assign result res
113 handle exn as Fail _ =>
114 (case Single_Assignment.peek result of
115 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
118 (case the (Single_Assignment.peek result) of
119 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
120 | Exn.Res _ => true);
127 (* synchronization *)
129 val scheduler_event = ConditionVar.conditionVar ();
130 val work_available = ConditionVar.conditionVar ();
131 val work_finished = ConditionVar.conditionVar ();
134 val lock = Mutex.mutex ();
137 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
139 fun wait cond = (*requires SYNCHRONIZED*)
140 Multithreading.sync_wait NONE NONE cond lock;
142 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
143 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
145 fun signal cond = (*requires SYNCHRONIZED*)
146 ConditionVar.signal cond;
148 fun broadcast cond = (*requires SYNCHRONIZED*)
149 ConditionVar.broadcast cond;
151 fun broadcast_work () = (*requires SYNCHRONIZED*)
152 (ConditionVar.broadcast work_available;
153 ConditionVar.broadcast work_finished);
160 val queue = Unsynchronized.ref Task_Queue.empty;
161 val next = Unsynchronized.ref 0;
162 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
163 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
164 val do_shutdown = Unsynchronized.ref false;
165 val max_workers = Unsynchronized.ref 0;
166 val max_active = Unsynchronized.ref 0;
167 val worker_trend = Unsynchronized.ref 0;
169 datatype worker_state = Working | Waiting | Sleeping;
170 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
172 fun count_workers state = (*requires SYNCHRONIZED*)
173 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
176 (* execute future jobs *)
178 fun future_job group (e: unit -> 'a) =
180 val result = Single_Assignment.var "future" : 'a result;
181 val pos = Position.thread_data ();
186 Exn.capture (fn () =>
187 Multithreading.with_attributes Multithreading.private_interrupts
188 (fn _ => Position.setmp_thread_data pos e ()) before
189 Multithreading.interrupted ()) ()
190 else Exn.interrupt_exn;
191 in assign_result group result res end;
192 in (result, job) end;
194 fun cancel_now group = (*requires SYNCHRONIZED*)
195 Task_Queue.cancel (! queue) group;
197 fun cancel_later group = (*requires SYNCHRONIZED*)
198 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
199 broadcast scheduler_event);
201 fun execute (task, jobs) =
203 val group = Task_Queue.group_of_task task;
204 val valid = not (Task_Queue.is_canceled group);
206 Task_Queue.running task (fn () =>
207 setmp_worker_task task (fn () =>
208 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
209 val _ = Multithreading.tracing 2 (fn () =>
211 val s = Task_Queue.str_of_task_groups task;
212 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
213 val (run, wait, deps) = Task_Queue.timing_of_task task;
214 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
215 val _ = SYNCHRONIZED "finish" (fn () =>
217 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
220 else if cancel_now group then ()
221 else cancel_later group;
222 val _ = broadcast work_finished;
223 val _ = if maximal then () else signal work_available;
230 fun worker_wait active cond = (*requires SYNCHRONIZED*)
233 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
235 | NONE => raise Fail "Unregistered worker thread");
236 val _ = state := (if active then Waiting else Sleeping);
238 val _ = state := Working;
241 fun worker_next () = (*requires SYNCHRONIZED*)
242 if length (! workers) > ! max_workers then
243 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
244 signal work_available;
246 else if count_workers Working > ! max_active then
247 (worker_wait false work_available; worker_next ())
249 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
250 NONE => (worker_wait false work_available; worker_next ())
251 | some => (signal work_available; some));
253 fun worker_loop name =
254 (case SYNCHRONIZED name (fn () => worker_next ()) of
256 | SOME work => (execute work; worker_loop name));
258 fun worker_start name = (*requires SYNCHRONIZED*)
259 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
260 Unsynchronized.ref Working));
265 val status_ticks = Unsynchronized.ref 0;
267 val last_round = Unsynchronized.ref Time.zeroTime;
268 val next_round = seconds 0.05;
270 fun scheduler_next () = (*requires SYNCHRONIZED*)
272 val now = Time.now ();
273 val tick = Time.<= (Time.+ (! last_round, next_round), now);
274 val _ = if tick then last_round := now else ();
277 (* queue and worker status *)
280 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
282 if tick andalso ! status_ticks = 0 then
283 Multithreading.tracing 1 (fn () =>
285 val {ready, pending, running, passive} = Task_Queue.status (! queue);
286 val total = length (! workers);
287 val active = count_workers Working;
288 val waiting = count_workers Waiting;
290 "SCHEDULE " ^ Time.toString now ^ ": " ^
291 string_of_int ready ^ " ready, " ^
292 string_of_int pending ^ " pending, " ^
293 string_of_int running ^ " running, " ^
294 string_of_int passive ^ " passive; " ^
295 string_of_int total ^ " workers, " ^
296 string_of_int active ^ " active, " ^
297 string_of_int waiting ^ " waiting "
302 if forall (Thread.isActive o #1) (! workers) then ()
305 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
306 val _ = workers := alive;
308 Multithreading.tracing 0 (fn () =>
309 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
313 (* worker pool adjustments *)
315 val max_active0 = ! max_active;
316 val max_workers0 = ! max_workers;
318 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
319 val _ = max_active := m;
322 if ! do_shutdown then 0
323 else if m = 9999 then 1
324 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
326 if tick andalso mm > ! max_workers then
327 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
328 else if tick andalso mm < ! max_workers then
329 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
332 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
334 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
335 max_workers := Int.min (mm, 2 * m)
338 val missing = ! max_workers - length (! workers);
341 funpow missing (fn () =>
342 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
346 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
347 else signal work_available;
350 (* canceled groups *)
353 if null (! canceled) then ()
355 (Multithreading.tracing 1 (fn () =>
356 string_of_int (length (! canceled)) ^ " canceled groups");
357 Unsynchronized.change canceled (filter_out cancel_now);
363 val _ = Exn.release (wait_timeout next_round scheduler_event);
368 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
369 val continue = not (! do_shutdown andalso null (! workers));
370 val _ = if continue then () else scheduler := NONE;
372 val _ = broadcast scheduler_event;
375 if Exn.is_interrupt exn then
376 (Multithreading.tracing 1 (fn () => "Interrupt");
377 List.app cancel_later (Task_Queue.cancel_all (! queue));
378 broadcast_work (); true)
381 fun scheduler_loop () =
383 Multithreading.with_attributes
384 (Multithreading.sync_interrupts Multithreading.public_interrupts)
385 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
388 fun scheduler_active () = (*requires SYNCHRONIZED*)
389 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
391 fun scheduler_check () = (*requires SYNCHRONIZED*)
392 (do_shutdown := false;
393 if scheduler_active () then ()
394 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
402 fun forks {name, group, deps, pri} es =
408 NONE => worker_subgroup ()
410 fun enqueue e queue =
412 val (result, job) = future_job grp e;
413 val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
414 val future = Future {promised = false, task = task, result = result};
415 in (future, queue') end;
417 SYNCHRONIZED "enqueue" (fn () =>
419 val (futures, queue') = fold_map enqueue es (! queue);
420 val _ = queue := queue';
421 val minimal = forall (not o Task_Queue.known_task queue') deps;
422 val _ = if minimal then signal work_available else ();
423 val _ = scheduler_check ();
427 fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
428 fun fork e = fork_pri 0 e;
437 NONE => Exn.Exn (Fail "Unfinished future")
439 if Exn.is_interrupt_exn res then
440 (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of
442 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
445 fun join_next deps = (*requires SYNCHRONIZED*)
446 if null deps then NONE
448 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
451 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
452 | (SOME work, deps') => SOME (work, deps'));
454 fun execute_work NONE = ()
455 | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
457 Multithreading.with_attributes Multithreading.no_interrupts
458 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
462 fun join_results xs =
465 if forall is_finished xs then ()
466 else if Multithreading.self_critical () then
467 error "Cannot join future values within critical section"
468 else if is_some (worker_task ()) then join_work (map task_of xs)
469 else List.app (ignore o Single_Assignment.await o result_of) xs;
470 in map get_result xs end;
474 fun join_result x = singleton join_results x;
475 fun join x = Exn.release (join_result x);
478 (* fast-path versions -- bypassing full task management *)
482 val task = Task_Queue.dummy_task ();
483 val group = Task_Queue.group_of_task task;
484 val result = Single_Assignment.var "value" : 'a result;
485 val _ = assign_result group result (Exn.Res x);
486 in Future {promised = false, task = task, result = result} end;
490 val task = task_of x;
491 val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
492 val (result, job) = future_job group (fn () => f (join x));
494 val extended = SYNCHRONIZED "extend" (fn () =>
495 (case Task_Queue.extend task job (! queue) of
496 SOME queue' => (queue := queue'; true)
499 if extended then Future {promised = false, task = task, result = result}
502 (forks {name = "Future.map", group = SOME group,
503 deps = [task], pri = Task_Queue.pri_of_task task})
504 (fn () => f (join x))
507 fun cond_forks args es =
508 if Multithreading.enabled () then forks args es
509 else map (fn e => value (e ())) es;
512 (* promised futures -- fulfilled by external means *)
514 fun promise_group group : 'a future =
516 val result = Single_Assignment.var "promise" : 'a result;
517 fun abort () = assign_result group result Exn.interrupt_exn
518 handle Fail _ => true
520 if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
522 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
523 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
524 in Future {promised = true, task = task, result = result} end;
526 fun promise () = promise_group (worker_subgroup ());
528 fun fulfill_result (Future {promised, task, result}) res =
529 if not promised then raise Fail "Not a promised future"
532 val group = Task_Queue.group_of_task task;
533 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
535 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
538 SYNCHRONIZED "fulfill_result" (fn () =>
539 Unsynchronized.change_result queue
540 (Task_Queue.dequeue_passive (Thread.self ()) task));
541 in if still_passive then execute (task, [job]) else () end);
543 if is_some (Single_Assignment.peek result) then ()
544 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
547 fun fulfill x res = fulfill_result x (Exn.Res res);
552 fun interruptible_task f x =
553 if Multithreading.available then
554 Multithreading.with_attributes
555 (if is_some (worker_task ())
556 then Multithreading.private_interrupts
557 else Multithreading.public_interrupts)
559 else interruptible f x;
561 (*cancel: present and future group members will be interrupted eventually*)
562 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
563 (if cancel_now group then () else cancel_later group;
564 signal work_available; scheduler_check ()));
566 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
572 if Multithreading.available then
573 SYNCHRONIZED "shutdown" (fn () =>
574 while scheduler_active () do
575 (wait scheduler_event; broadcast_work ()))
584 (case worker_task () of
586 | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
587 val _ = Output.status (Markup.markup_only (task_props Markup.forked));
588 val x = e (); (*sic -- report "joined" only for success*)
589 val _ = Output.status (Markup.markup_only (task_props Markup.joined));
593 (*final declarations of this structure!*)
594 val map = map_future;
598 type 'a future = 'a Future.future;