1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join (and variants). The idea is to model
12 parallel value-oriented computations, but *not* communicating
15 * Futures are grouped; failure of one group member causes the whole
16 group to be interrupted eventually. Groups are block-structured.
18 * Forked futures are evaluated spontaneously by a farm of worker
19 threads in the background; join resynchronizes the computation and
20 delivers results (values or exceptions).
22 * The pool of worker threads is limited, usually in correlation with
23 the number of physical cores on the machine. Note that allocation
24 of runtime resources is distorted either if workers yield CPU time
25 (e.g. via system sleep or wait operations), or if non-worker
26 threads contend for significant runtime resources independently.
28 * Promised futures are fulfilled by external means. There is no
29 associated evaluation task, but other futures can depend on them
35 type task = Task_Queue.task
36 type group = Task_Queue.group
37 val is_worker: unit -> bool
38 val worker_task: unit -> Task_Queue.task option
39 val worker_group: unit -> Task_Queue.group option
40 val worker_subgroup: unit -> Task_Queue.group
42 val task_of: 'a future -> task
43 val group_of: 'a future -> group
44 val peek: 'a future -> 'a Exn.result option
45 val is_finished: 'a future -> bool
46 val forks: {name: string, group: group option, deps: task list, pri: int} ->
47 (unit -> 'a) list -> 'a future list
48 val fork_pri: int -> (unit -> 'a) -> 'a future
49 val fork: (unit -> 'a) -> 'a future
50 val join_results: 'a future list -> 'a Exn.result list
51 val join_result: 'a future -> 'a Exn.result
52 val join: 'a future -> 'a
53 val value: 'a -> 'a future
54 val map: ('a -> 'b) -> 'a future -> 'b future
55 val promise_group: group -> 'a future
56 val promise: unit -> 'a future
57 val fulfill_result: 'a future -> 'a Exn.result -> unit
58 val fulfill: 'a future -> 'a -> unit
59 val interruptible_task: ('a -> 'b) -> 'a -> 'b
60 val cancel_group: group -> unit
61 val cancel: 'a future -> unit
62 val shutdown: unit -> unit
63 val status: (unit -> 'a) -> 'a
66 structure Future: FUTURE =
73 type task = Task_Queue.task;
74 type group = Task_Queue.group;
77 val tag = Universal.tag () : (task * group) option Universal.tag;
79 fun thread_data () = the_default NONE (Thread.getLocal tag);
80 fun setmp_thread_data data f x =
81 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
84 val is_worker = is_some o thread_data;
85 val worker_task = Option.map #1 o thread_data;
86 val worker_group = Option.map #2 o thread_data;
87 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
89 fun worker_joining e =
90 (case worker_task () of
92 | SOME task => Task_Queue.joining task e);
94 fun worker_waiting deps e =
95 (case worker_task () of
97 | SOME task => Task_Queue.waiting task deps e);
100 (* datatype future *)
102 type 'a result = 'a Exn.result Single_Assignment.var;
104 datatype 'a future = Future of
110 fun task_of (Future {task, ...}) = task;
111 fun group_of (Future {group, ...}) = group;
112 fun result_of (Future {result, ...}) = result;
114 fun peek x = Single_Assignment.peek (result_of x);
115 fun is_finished x = is_some (peek x);
117 fun assign_result group result res =
119 val _ = Single_Assignment.assign result res
120 handle exn as Fail _ =>
121 (case Single_Assignment.peek result of
122 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
125 (case the (Single_Assignment.peek result) of
126 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
127 | Exn.Result _ => true);
134 (* synchronization *)
136 val scheduler_event = ConditionVar.conditionVar ();
137 val work_available = ConditionVar.conditionVar ();
138 val work_finished = ConditionVar.conditionVar ();
141 val lock = Mutex.mutex ();
144 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
146 fun wait cond = (*requires SYNCHRONIZED*)
147 Multithreading.sync_wait NONE NONE cond lock;
149 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
150 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
152 fun signal cond = (*requires SYNCHRONIZED*)
153 ConditionVar.signal cond;
155 fun broadcast cond = (*requires SYNCHRONIZED*)
156 ConditionVar.broadcast cond;
158 fun broadcast_work () = (*requires SYNCHRONIZED*)
159 (ConditionVar.broadcast work_available;
160 ConditionVar.broadcast work_finished);
167 val queue = Unsynchronized.ref Task_Queue.empty;
168 val next = Unsynchronized.ref 0;
169 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
170 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
171 val do_shutdown = Unsynchronized.ref false;
172 val max_workers = Unsynchronized.ref 0;
173 val max_active = Unsynchronized.ref 0;
174 val worker_trend = Unsynchronized.ref 0;
176 datatype worker_state = Working | Waiting | Sleeping;
177 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
179 fun count_workers state = (*requires SYNCHRONIZED*)
180 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
183 (* execute future jobs *)
185 fun future_job group (e: unit -> 'a) =
187 val result = Single_Assignment.var "future" : 'a result;
188 val pos = Position.thread_data ();
193 Exn.capture (fn () =>
194 Multithreading.with_attributes Multithreading.private_interrupts
195 (fn _ => Position.setmp_thread_data pos e ())) ()
196 else Exn.interrupt_exn;
197 in assign_result group result res end;
198 in (result, job) end;
200 fun cancel_now group = (*requires SYNCHRONIZED*)
201 Task_Queue.cancel (! queue) group;
203 fun cancel_later group = (*requires SYNCHRONIZED*)
204 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
205 broadcast scheduler_event);
207 fun execute (task, group, jobs) =
209 val valid = not (Task_Queue.is_canceled group);
211 Task_Queue.running task (fn () =>
212 setmp_thread_data (task, group) (fn () =>
213 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
214 val _ = Multithreading.tracing 1 (fn () =>
216 val s = Task_Queue.str_of_task task;
217 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
218 val (run, wait, deps) = Task_Queue.timing_of_task task;
219 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
220 val _ = SYNCHRONIZED "finish" (fn () =>
222 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
225 else if cancel_now group then ()
226 else cancel_later group;
227 val _ = broadcast work_finished;
228 val _ = if maximal then () else signal work_available;
235 fun worker_wait active cond = (*requires SYNCHRONIZED*)
238 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
240 | NONE => raise Fail "Unregistered worker thread");
241 val _ = state := (if active then Waiting else Sleeping);
243 val _ = state := Working;
246 fun worker_next () = (*requires SYNCHRONIZED*)
247 if length (! workers) > ! max_workers then
248 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
249 signal work_available;
251 else if count_workers Working > ! max_active then
252 (worker_wait false work_available; worker_next ())
254 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
255 NONE => (worker_wait false work_available; worker_next ())
256 | some => (signal work_available; some));
258 fun worker_loop name =
259 (case SYNCHRONIZED name (fn () => worker_next ()) of
261 | SOME work => (execute work; worker_loop name));
263 fun worker_start name = (*requires SYNCHRONIZED*)
264 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
265 Unsynchronized.ref Working));
270 val status_ticks = Unsynchronized.ref 0;
272 val last_round = Unsynchronized.ref Time.zeroTime;
273 val next_round = seconds 0.05;
275 fun scheduler_next () = (*requires SYNCHRONIZED*)
277 val now = Time.now ();
278 val tick = Time.<= (Time.+ (! last_round, next_round), now);
279 val _ = if tick then last_round := now else ();
282 (* queue and worker status *)
285 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
287 if tick andalso ! status_ticks = 0 then
288 Multithreading.tracing 1 (fn () =>
290 val {ready, pending, running, passive} = Task_Queue.status (! queue);
291 val total = length (! workers);
292 val active = count_workers Working;
293 val waiting = count_workers Waiting;
295 "SCHEDULE " ^ Time.toString now ^ ": " ^
296 string_of_int ready ^ " ready, " ^
297 string_of_int pending ^ " pending, " ^
298 string_of_int running ^ " running, " ^
299 string_of_int passive ^ " passive; " ^
300 string_of_int total ^ " workers, " ^
301 string_of_int active ^ " active, " ^
302 string_of_int waiting ^ " waiting "
307 if forall (Thread.isActive o #1) (! workers) then ()
310 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
311 val _ = workers := alive;
313 Multithreading.tracing 0 (fn () =>
314 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
318 (* worker pool adjustments *)
320 val max_active0 = ! max_active;
321 val max_workers0 = ! max_workers;
323 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
324 val _ = max_active := m;
327 if ! do_shutdown then 0
328 else if m = 9999 then 1
329 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
331 if tick andalso mm > ! max_workers then
332 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
333 else if tick andalso mm < ! max_workers then
334 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
337 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
339 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
340 max_workers := Int.min (mm, 2 * m)
343 val missing = ! max_workers - length (! workers);
346 funpow missing (fn () =>
347 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
351 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
352 else signal work_available;
355 (* canceled groups *)
358 if null (! canceled) then ()
360 (Multithreading.tracing 1 (fn () =>
361 string_of_int (length (! canceled)) ^ " canceled groups");
362 Unsynchronized.change canceled (filter_out cancel_now);
368 val _ = Exn.release (wait_timeout next_round scheduler_event);
373 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
374 val continue = not (! do_shutdown andalso null (! workers));
375 val _ = if continue then () else scheduler := NONE;
377 val _ = broadcast scheduler_event;
380 if Exn.is_interrupt exn then
381 (Multithreading.tracing 1 (fn () => "Interrupt");
382 List.app cancel_later (Task_Queue.cancel_all (! queue));
383 broadcast_work (); true)
386 fun scheduler_loop () =
388 Multithreading.with_attributes
389 (Multithreading.sync_interrupts Multithreading.public_interrupts)
390 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
393 fun scheduler_active () = (*requires SYNCHRONIZED*)
394 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
396 fun scheduler_check () = (*requires SYNCHRONIZED*)
397 (do_shutdown := false;
398 if scheduler_active () then ()
399 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
407 fun forks {name, group, deps, pri} es =
413 NONE => worker_subgroup ()
415 fun enqueue e (minimal, queue) =
417 val (result, job) = future_job grp e;
418 val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
419 val future = Future {promised = false, task = task, group = grp, result = result};
420 in (future, (minimal orelse minimal', queue')) end;
422 SYNCHRONIZED "enqueue" (fn () =>
424 val (futures, minimal) =
425 Unsynchronized.change_result queue (fn q =>
426 let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
427 in ((futures, minimal), q') end);
428 val _ = if minimal then signal work_available else ();
429 val _ = scheduler_check ();
433 fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
434 fun fork e = fork_pri 0 e;
443 NONE => Exn.Exn (Fail "Unfinished future")
445 if Exn.is_interrupt_exn res then
446 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
448 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
451 fun join_next deps = (*requires SYNCHRONIZED*)
452 if Task_Queue.finished_deps deps then NONE
454 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
456 if Task_Queue.finished_deps deps' then NONE
457 else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
458 | (SOME work, deps') => SOME (work, deps'));
460 fun execute_work NONE = ()
461 | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
463 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
467 fun join_results xs =
470 if forall is_finished xs then ()
471 else if Multithreading.self_critical () then
472 error "Cannot join future values within critical section"
473 else if is_some (thread_data ()) then
474 join_work (Task_Queue.init_deps (map task_of xs))
475 else List.app (ignore o Single_Assignment.await o result_of) xs;
476 in map get_result xs end;
480 fun join_result x = singleton join_results x;
481 fun join x = Exn.release (join_result x);
484 (* fast-path versions -- bypassing full task management *)
488 val group = Task_Queue.new_group NONE;
489 val result = Single_Assignment.var "value" : 'a result;
490 val _ = assign_result group result (Exn.Result x);
491 in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
495 val task = task_of x;
496 val group = Task_Queue.new_group (SOME (group_of x));
497 val (result, job) = future_job group (fn () => f (join x));
499 val extended = SYNCHRONIZED "extend" (fn () =>
500 (case Task_Queue.extend task job (! queue) of
501 SOME queue' => (queue := queue'; true)
504 if extended then Future {promised = false, task = task, group = group, result = result}
507 (forks {name = "Future.map", group = SOME group,
508 deps = [task], pri = Task_Queue.pri_of_task task})
509 (fn () => f (join x))
513 (* promised futures -- fulfilled by external means *)
515 fun promise_group group : 'a future =
517 val result = Single_Assignment.var "promise" : 'a result;
518 fun abort () = assign_result group result Exn.interrupt_exn
519 handle Fail _ => true
521 if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
523 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
524 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
525 in Future {promised = true, task = task, group = group, result = result} end;
527 fun promise () = promise_group (worker_subgroup ());
529 fun fulfill_result (Future {promised, task, group, result}) res =
530 if not promised then raise Fail "Not a promised future"
533 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
535 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
538 SYNCHRONIZED "fulfill_result" (fn () =>
539 Unsynchronized.change_result queue
540 (Task_Queue.dequeue_passive (Thread.self ()) task));
541 in if still_passive then execute (task, group, [job]) else () end);
543 worker_waiting (Task_Queue.init_deps [task])
544 (fn () => Single_Assignment.await result);
547 fun fulfill x res = fulfill_result x (Exn.Result res);
552 fun interruptible_task f x =
553 if Multithreading.available then
554 Multithreading.with_attributes
556 then Multithreading.private_interrupts
557 else Multithreading.public_interrupts)
559 else interruptible f x;
561 (*cancel: present and future group members will be interrupted eventually*)
562 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
563 (if cancel_now group then () else cancel_later group;
564 signal work_available; scheduler_check ()));
566 fun cancel x = cancel_group (group_of x);
572 if Multithreading.available then
573 SYNCHRONIZED "shutdown" (fn () =>
574 while scheduler_active () do
575 (wait scheduler_event; broadcast_work ()))
584 (case worker_task () of
586 | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
587 val _ = Output.status (Markup.markup (task_props Markup.forked) "");
588 val x = e (); (*sic -- report "joined" only for success*)
589 val _ = Output.status (Markup.markup (task_props Markup.joined) "");
593 (*final declarations of this structure!*)
594 val map = map_future;
598 type 'a future = 'a Future.future;