1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join (and variants). The idea is to model
12 parallel value-oriented computations, but *not* communicating
15 * Futures are grouped; failure of one group member causes the whole
16 group to be interrupted eventually. Groups are block-structured.
18 * Forked futures are evaluated spontaneously by a farm of worker
19 threads in the background; join resynchronizes the computation and
20 delivers results (values or exceptions).
22 * The pool of worker threads is limited, usually in correlation with
23 the number of physical cores on the machine. Note that allocation
24 of runtime resources is distorted either if workers yield CPU time
25 (e.g. via system sleep or wait operations), or if non-worker
26 threads contend for significant runtime resources independently.
28 * Promised futures are fulfilled by external means. There is no
29 associated evaluation task, but other futures can depend on them
35 type task = Task_Queue.task
36 type group = Task_Queue.group
37 val is_worker: unit -> bool
38 val worker_task: unit -> Task_Queue.task option
39 val worker_group: unit -> Task_Queue.group option
40 val worker_subgroup: unit -> Task_Queue.group
41 val worker_waiting: (unit -> 'a) -> 'a
43 val task_of: 'a future -> task
44 val group_of: 'a future -> group
45 val peek: 'a future -> 'a Exn.result option
46 val is_finished: 'a future -> bool
47 val bulk: {name: string, group: group option, deps: task list, pri: int} ->
48 (unit -> 'a) list -> 'a future list
49 val fork_pri: int -> (unit -> 'a) -> 'a future
50 val fork: (unit -> 'a) -> 'a future
51 val join_results: 'a future list -> 'a Exn.result list
52 val join_result: 'a future -> 'a Exn.result
53 val join: 'a future -> 'a
54 val value: 'a -> 'a future
55 val map: ('a -> 'b) -> 'a future -> 'b future
56 val promise_group: group -> 'a future
57 val promise: unit -> 'a future
58 val fulfill_result: 'a future -> 'a Exn.result -> unit
59 val fulfill: 'a future -> 'a -> unit
60 val interruptible_task: ('a -> 'b) -> 'a -> 'b
61 val cancel_group: group -> unit
62 val cancel: 'a future -> unit
63 val shutdown: unit -> unit
64 val status: (unit -> 'a) -> 'a
67 structure Future: FUTURE =
74 type task = Task_Queue.task;
75 type group = Task_Queue.group;
78 val tag = Universal.tag () : (task * group) option Universal.tag;
80 fun thread_data () = the_default NONE (Thread.getLocal tag);
81 fun setmp_thread_data data f x =
82 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
85 val is_worker = is_some o thread_data;
86 val worker_task = Option.map #1 o thread_data;
87 val worker_group = Option.map #2 o thread_data;
88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
90 fun worker_waiting e =
91 (case worker_task () of
93 | SOME task => Task_Queue.waiting task e);
98 type 'a result = 'a Exn.result Single_Assignment.var;
100 datatype 'a future = Future of
106 fun task_of (Future {task, ...}) = task;
107 fun group_of (Future {group, ...}) = group;
108 fun result_of (Future {result, ...}) = result;
110 fun peek x = Single_Assignment.peek (result_of x);
111 fun is_finished x = is_some (peek x);
113 fun assign_result group result res =
115 val _ = Single_Assignment.assign result res
116 handle exn as Fail _ =>
117 (case Single_Assignment.peek result of
118 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
121 (case the (Single_Assignment.peek result) of
122 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
123 | Exn.Result _ => true);
130 (* synchronization *)
132 val scheduler_event = ConditionVar.conditionVar ();
133 val work_available = ConditionVar.conditionVar ();
134 val work_finished = ConditionVar.conditionVar ();
137 val lock = Mutex.mutex ();
140 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
142 fun wait cond = (*requires SYNCHRONIZED*)
143 Multithreading.sync_wait NONE NONE cond lock;
145 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
146 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
148 fun signal cond = (*requires SYNCHRONIZED*)
149 ConditionVar.signal cond;
151 fun broadcast cond = (*requires SYNCHRONIZED*)
152 ConditionVar.broadcast cond;
154 fun broadcast_work () = (*requires SYNCHRONIZED*)
155 (ConditionVar.broadcast work_available;
156 ConditionVar.broadcast work_finished);
163 val queue = Unsynchronized.ref Task_Queue.empty;
164 val next = Unsynchronized.ref 0;
165 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
166 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
167 val do_shutdown = Unsynchronized.ref false;
168 val max_workers = Unsynchronized.ref 0;
169 val max_active = Unsynchronized.ref 0;
170 val worker_trend = Unsynchronized.ref 0;
172 datatype worker_state = Working | Waiting | Sleeping;
173 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
175 fun count_workers state = (*requires SYNCHRONIZED*)
176 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
179 (* execute future jobs *)
181 fun future_job group (e: unit -> 'a) =
183 val result = Single_Assignment.var "future" : 'a result;
184 val pos = Position.thread_data ();
189 Exn.capture (fn () =>
190 Multithreading.with_attributes Multithreading.private_interrupts
191 (fn _ => Position.setmp_thread_data pos e ())) ()
192 else Exn.interrupt_exn;
193 in assign_result group result res end;
194 in (result, job) end;
196 fun cancel_now group = (*requires SYNCHRONIZED*)
197 Task_Queue.cancel (! queue) group;
199 fun cancel_later group = (*requires SYNCHRONIZED*)
200 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
201 broadcast scheduler_event);
203 fun execute (task, group, jobs) =
205 val valid = not (Task_Queue.is_canceled group);
207 Task_Queue.running task (fn () =>
208 setmp_thread_data (task, group) (fn () =>
209 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
210 val _ = Multithreading.tracing 1 (fn () =>
212 val s = Task_Queue.str_of_task task;
213 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
214 val (run, wait) = pairself micros (Task_Queue.timing_of_task task);
215 in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end);
216 val _ = SYNCHRONIZED "finish" (fn () =>
218 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
221 else if cancel_now group then ()
222 else cancel_later group;
223 val _ = broadcast work_finished;
224 val _ = if maximal then () else signal work_available;
231 fun worker_wait active cond = (*requires SYNCHRONIZED*)
234 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
236 | NONE => raise Fail "Unregistered worker thread");
237 val _ = state := (if active then Waiting else Sleeping);
239 val _ = state := Working;
242 fun worker_next () = (*requires SYNCHRONIZED*)
243 if length (! workers) > ! max_workers then
244 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
245 signal work_available;
247 else if count_workers Working > ! max_active then
248 (worker_wait false work_available; worker_next ())
250 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
251 NONE => (worker_wait false work_available; worker_next ())
252 | some => (signal work_available; some));
254 fun worker_loop name =
255 (case SYNCHRONIZED name (fn () => worker_next ()) of
257 | SOME work => (execute work; worker_loop name));
259 fun worker_start name = (*requires SYNCHRONIZED*)
260 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
261 Unsynchronized.ref Working));
266 val status_ticks = Unsynchronized.ref 0;
268 val last_round = Unsynchronized.ref Time.zeroTime;
269 val next_round = seconds 0.05;
271 fun scheduler_next () = (*requires SYNCHRONIZED*)
273 val now = Time.now ();
274 val tick = Time.<= (Time.+ (! last_round, next_round), now);
275 val _ = if tick then last_round := now else ();
278 (* queue and worker status *)
281 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
283 if tick andalso ! status_ticks = 0 then
284 Multithreading.tracing 1 (fn () =>
286 val {ready, pending, running, passive} = Task_Queue.status (! queue);
287 val total = length (! workers);
288 val active = count_workers Working;
289 val waiting = count_workers Waiting;
291 "SCHEDULE " ^ Time.toString now ^ ": " ^
292 string_of_int ready ^ " ready, " ^
293 string_of_int pending ^ " pending, " ^
294 string_of_int running ^ " running, " ^
295 string_of_int passive ^ " passive; " ^
296 string_of_int total ^ " workers, " ^
297 string_of_int active ^ " active, " ^
298 string_of_int waiting ^ " waiting "
303 if forall (Thread.isActive o #1) (! workers) then ()
306 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
307 val _ = workers := alive;
309 Multithreading.tracing 0 (fn () =>
310 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
314 (* worker pool adjustments *)
316 val max_active0 = ! max_active;
317 val max_workers0 = ! max_workers;
319 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
320 val _ = max_active := m;
323 if ! do_shutdown then 0
324 else if m = 9999 then 1
325 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
327 if tick andalso mm > ! max_workers then
328 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
329 else if tick andalso mm < ! max_workers then
330 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
333 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
335 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
336 max_workers := Int.min (mm, 2 * m)
339 val missing = ! max_workers - length (! workers);
342 funpow missing (fn () =>
343 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
347 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
348 else signal work_available;
351 (* canceled groups *)
354 if null (! canceled) then ()
356 (Multithreading.tracing 1 (fn () =>
357 string_of_int (length (! canceled)) ^ " canceled groups");
358 Unsynchronized.change canceled (filter_out cancel_now);
364 val _ = Exn.release (wait_timeout next_round scheduler_event);
369 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
370 val continue = not (! do_shutdown andalso null (! workers));
371 val _ = if continue then () else scheduler := NONE;
373 val _ = broadcast scheduler_event;
376 if Exn.is_interrupt exn then
377 (Multithreading.tracing 1 (fn () => "Interrupt");
378 List.app cancel_later (Task_Queue.cancel_all (! queue));
379 broadcast_work (); true)
382 fun scheduler_loop () =
384 Multithreading.with_attributes
385 (Multithreading.sync_interrupts Multithreading.public_interrupts)
386 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
389 fun scheduler_active () = (*requires SYNCHRONIZED*)
390 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
392 fun scheduler_check () = (*requires SYNCHRONIZED*)
393 (do_shutdown := false;
394 if scheduler_active () then ()
395 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
403 fun bulk {name, group, deps, pri} es =
407 NONE => worker_subgroup ()
409 fun enqueue e (minimal, queue) =
411 val (result, job) = future_job grp e;
412 val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
413 val future = Future {promised = false, task = task, group = grp, result = result};
414 in (future, (minimal orelse minimal', queue')) end;
416 SYNCHRONIZED "enqueue" (fn () =>
418 val (futures, minimal) =
419 Unsynchronized.change_result queue (fn q =>
420 let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
421 in ((futures, minimal), q') end);
422 val _ = if minimal then signal work_available else ();
423 val _ = scheduler_check ();
427 fun fork_pri pri e = singleton (bulk {name = "", group = NONE, deps = [], pri = pri}) e;
428 fun fork e = fork_pri 0 e;
437 NONE => Exn.Exn (Fail "Unfinished future")
439 if Exn.is_interrupt_exn res then
440 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
442 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
445 fun join_next deps = (*requires SYNCHRONIZED*)
446 if null deps then NONE
448 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
450 | (NONE, deps') => (worker_wait true work_finished; join_next deps')
451 | (SOME work, deps') => SOME (work, deps'));
453 fun execute_work NONE = ()
454 | execute_work (SOME (work, deps')) = (execute work; join_work deps')
456 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
458 fun join_depend task deps =
459 execute_work (SYNCHRONIZED "join" (fn () =>
460 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
464 fun join_results xs =
465 if forall is_finished xs then map get_result xs
466 else if Multithreading.self_critical () then
467 error "Cannot join future values within critical section"
469 worker_waiting (fn () =>
470 (case worker_task () of
471 SOME task => join_depend task (map task_of xs)
472 | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
477 fun join_result x = singleton join_results x;
478 fun join x = Exn.release (join_result x);
481 (* fast-path versions -- bypassing full task management *)
485 val group = Task_Queue.new_group NONE;
486 val result = Single_Assignment.var "value" : 'a result;
487 val _ = assign_result group result (Exn.Result x);
488 in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
492 val task = task_of x;
493 val group = Task_Queue.new_group (SOME (group_of x));
494 val (result, job) = future_job group (fn () => f (join x));
496 val extended = SYNCHRONIZED "extend" (fn () =>
497 (case Task_Queue.extend task job (! queue) of
498 SOME queue' => (queue := queue'; true)
501 if extended then Future {promised = false, task = task, group = group, result = result}
504 (bulk {name = "Future.map", group = SOME group,
505 deps = [task], pri = Task_Queue.pri_of_task task})
506 (fn () => f (join x))
510 (* promised futures -- fulfilled by external means *)
512 fun promise_group group : 'a future =
514 val result = Single_Assignment.var "promise" : 'a result;
515 fun abort () = assign_result group result Exn.interrupt_exn
516 handle Fail _ => true
518 if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
520 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
521 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
522 in Future {promised = true, task = task, group = group, result = result} end;
524 fun promise () = promise_group (worker_subgroup ());
526 fun fulfill_result (Future {promised, task, group, result}) res =
527 if not promised then raise Fail "Not a promised future"
530 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
532 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
535 SYNCHRONIZED "fulfill_result" (fn () =>
536 Unsynchronized.change_result queue
537 (Task_Queue.dequeue_passive (Thread.self ()) task));
538 in if still_passive then execute (task, group, [job]) else () end);
539 val _ = worker_waiting (fn () => Single_Assignment.await result);
542 fun fulfill x res = fulfill_result x (Exn.Result res);
547 fun interruptible_task f x =
548 if Multithreading.available then
549 Multithreading.with_attributes
551 then Multithreading.private_interrupts
552 else Multithreading.public_interrupts)
554 else interruptible f x;
556 (*cancel: present and future group members will be interrupted eventually*)
557 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
558 (if cancel_now group then () else cancel_later group;
559 signal work_available; scheduler_check ()));
561 fun cancel x = cancel_group (group_of x);
567 if Multithreading.available then
568 SYNCHRONIZED "shutdown" (fn () =>
569 while scheduler_active () do
570 (wait scheduler_event; broadcast_work ()))
579 (case worker_task () of
581 | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
582 val _ = Output.status (Markup.markup (task_props Markup.forked) "");
583 val x = e (); (*sic -- report "joined" only for success*)
584 val _ = Output.status (Markup.markup (task_props Markup.joined) "");
588 (*final declarations of this structure!*)
589 val map = map_future;
593 type 'a future = 'a Future.future;