1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually. Groups are block-structured.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
27 * Promised futures are fulfilled by external means. There is no
28 associated evaluation task, but other futures can depend on them
34 type task = Task_Queue.task
35 type group = Task_Queue.group
36 val is_worker: unit -> bool
37 val worker_task: unit -> Task_Queue.task option
38 val worker_group: unit -> Task_Queue.group option
40 val task_of: 'a future -> task
41 val group_of: 'a future -> group
42 val peek: 'a future -> 'a Exn.result option
43 val is_finished: 'a future -> bool
44 val fork_group: group -> (unit -> 'a) -> 'a future
45 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
46 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
47 val fork_pri: int -> (unit -> 'a) -> 'a future
48 val fork: (unit -> 'a) -> 'a future
49 val join_results: 'a future list -> 'a Exn.result list
50 val join_result: 'a future -> 'a Exn.result
51 val join: 'a future -> 'a
52 val value: 'a -> 'a future
53 val map: ('a -> 'b) -> 'a future -> 'b future
54 val promise_group: group -> 'a future
55 val promise: unit -> 'a future
56 val fulfill_result: 'a future -> 'a Exn.result -> unit
57 val fulfill: 'a future -> 'a -> unit
58 val interruptible_task: ('a -> 'b) -> 'a -> 'b
59 val cancel_group: group -> unit
60 val cancel: 'a future -> unit
61 val shutdown: unit -> unit
64 structure Future: FUTURE =
71 type task = Task_Queue.task;
72 type group = Task_Queue.group;
75 val tag = Universal.tag () : (task * group) option Universal.tag;
77 fun thread_data () = the_default NONE (Thread.getLocal tag);
78 fun setmp_thread_data data f x =
79 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
82 val is_worker = is_some o thread_data;
83 val worker_task = Option.map #1 o thread_data;
84 val worker_group = Option.map #2 o thread_data;
86 fun new_group () = Task_Queue.new_group (worker_group ());
91 type 'a result = 'a Exn.result Single_Assignment.var;
93 datatype 'a future = Future of
99 fun task_of (Future {task, ...}) = task;
100 fun group_of (Future {group, ...}) = group;
101 fun result_of (Future {result, ...}) = result;
103 fun peek x = Single_Assignment.peek (result_of x);
104 fun is_finished x = is_some (peek x);
106 fun assign_result group result res =
108 val _ = Single_Assignment.assign result res;
111 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
112 | Exn.Result _ => true);
119 (* synchronization *)
121 val scheduler_event = ConditionVar.conditionVar ();
122 val work_available = ConditionVar.conditionVar ();
123 val work_finished = ConditionVar.conditionVar ();
126 val lock = Mutex.mutex ();
129 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
131 fun wait cond = (*requires SYNCHRONIZED*)
132 Multithreading.sync_wait NONE NONE cond lock;
134 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
135 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
137 fun signal cond = (*requires SYNCHRONIZED*)
138 ConditionVar.signal cond;
140 fun broadcast cond = (*requires SYNCHRONIZED*)
141 ConditionVar.broadcast cond;
143 fun broadcast_work () = (*requires SYNCHRONIZED*)
144 (ConditionVar.broadcast work_available;
145 ConditionVar.broadcast work_finished);
152 val queue = Unsynchronized.ref Task_Queue.empty;
153 val next = Unsynchronized.ref 0;
154 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
155 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
156 val do_shutdown = Unsynchronized.ref false;
157 val max_workers = Unsynchronized.ref 0;
158 val max_active = Unsynchronized.ref 0;
159 val worker_trend = Unsynchronized.ref 0;
161 datatype worker_state = Working | Waiting | Sleeping;
162 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
164 fun count_workers state = (*requires SYNCHRONIZED*)
165 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
168 (* execute future jobs *)
170 fun future_job group (e: unit -> 'a) =
172 val result = Single_Assignment.var "future" : 'a result;
173 val pos = Position.thread_data ();
178 Exn.capture (fn () =>
179 Multithreading.with_attributes Multithreading.private_interrupts
180 (fn _ => Position.setmp_thread_data pos e ())) ()
181 else Exn.Exn Exn.Interrupt;
182 in assign_result group result res end;
183 in (result, job) end;
185 fun cancel_now group = (*requires SYNCHRONIZED*)
186 Task_Queue.cancel (! queue) group;
188 fun cancel_later group = (*requires SYNCHRONIZED*)
189 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
190 broadcast scheduler_event);
192 fun execute (task, group, jobs) =
194 val valid = not (Task_Queue.is_canceled group);
195 val ok = setmp_thread_data (task, group) (fn () =>
196 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
197 val _ = SYNCHRONIZED "finish" (fn () =>
199 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
202 else if cancel_now group then ()
203 else cancel_later group;
204 val _ = broadcast work_finished;
205 val _ = if maximal then () else signal work_available;
212 fun worker_wait active cond = (*requires SYNCHRONIZED*)
215 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
217 | NONE => raise Fail "Unregistered worker thread");
218 val _ = state := (if active then Waiting else Sleeping);
220 val _ = state := Working;
223 fun worker_next () = (*requires SYNCHRONIZED*)
224 if length (! workers) > ! max_workers then
225 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
226 signal work_available;
228 else if count_workers Working > ! max_active then
229 (worker_wait false work_available; worker_next ())
231 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
232 NONE => (worker_wait false work_available; worker_next ())
233 | some => (signal work_available; some));
235 fun worker_loop name =
236 (case SYNCHRONIZED name (fn () => worker_next ()) of
238 | SOME work => (execute work; worker_loop name));
240 fun worker_start name = (*requires SYNCHRONIZED*)
241 Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
242 Unsynchronized.ref Working));
247 val status_ticks = Unsynchronized.ref 0;
249 val last_round = Unsynchronized.ref Time.zeroTime;
250 val next_round = Time.fromMilliseconds 50;
252 fun scheduler_next () = (*requires SYNCHRONIZED*)
254 val now = Time.now ();
255 val tick = Time.<= (Time.+ (! last_round, next_round), now);
256 val _ = if tick then last_round := now else ();
259 (* queue and worker status *)
262 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
264 if tick andalso ! status_ticks = 0 then
265 Multithreading.tracing 1 (fn () =>
267 val {ready, pending, running, passive} = Task_Queue.status (! queue);
268 val total = length (! workers);
269 val active = count_workers Working;
270 val waiting = count_workers Waiting;
272 "SCHEDULE " ^ Time.toString now ^ ": " ^
273 string_of_int ready ^ " ready, " ^
274 string_of_int pending ^ " pending, " ^
275 string_of_int running ^ " running, " ^
276 string_of_int passive ^ " passive; " ^
277 string_of_int total ^ " workers, " ^
278 string_of_int active ^ " active, " ^
279 string_of_int waiting ^ " waiting "
284 if forall (Thread.isActive o #1) (! workers) then ()
287 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
288 val _ = workers := alive;
290 Multithreading.tracing 0 (fn () =>
291 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
295 (* worker pool adjustments *)
297 val max_active0 = ! max_active;
298 val max_workers0 = ! max_workers;
300 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
301 val _ = max_active := m;
304 if ! do_shutdown then 0
305 else if m = 9999 then 1
306 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
308 if tick andalso mm > ! max_workers then
309 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
310 else if tick andalso mm < ! max_workers then
311 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
314 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
316 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
317 max_workers := Int.min (mm, 2 * m)
320 val missing = ! max_workers - length (! workers);
323 funpow missing (fn () =>
324 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
328 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
329 else signal work_available;
332 (* canceled groups *)
335 if null (! canceled) then ()
337 (Multithreading.tracing 1 (fn () =>
338 string_of_int (length (! canceled)) ^ " canceled groups");
339 Unsynchronized.change canceled (filter_out cancel_now);
345 val _ = Exn.release (wait_timeout next_round scheduler_event);
350 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
351 val continue = not (! do_shutdown andalso null (! workers));
352 val _ = if continue then () else scheduler := NONE;
354 val _ = broadcast scheduler_event;
356 handle Exn.Interrupt =>
357 (Multithreading.tracing 1 (fn () => "Interrupt");
358 List.app cancel_later (Task_Queue.cancel_all (! queue));
359 broadcast_work (); true);
361 fun scheduler_loop () =
363 Multithreading.with_attributes
364 (Multithreading.sync_interrupts Multithreading.public_interrupts)
365 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
368 fun scheduler_active () = (*requires SYNCHRONIZED*)
369 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
371 fun scheduler_check () = (*requires SYNCHRONIZED*)
372 (do_shutdown := false;
373 if scheduler_active () then ()
374 else scheduler := SOME (SimpleThread.fork false scheduler_loop));
382 fun fork_future opt_group deps pri e =
387 | SOME group => group);
388 val (result, job) = future_job group e;
389 val task = SYNCHRONIZED "enqueue" (fn () =>
391 val (task, minimal) =
392 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
393 val _ = if minimal then signal work_available else ();
394 val _ = scheduler_check ();
396 in Future {promised = false, task = task, group = group, result = result} end;
398 fun fork_group group e = fork_future (SOME group) [] 0 e;
399 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
400 fun fork_deps deps e = fork_deps_pri deps 0 e;
401 fun fork_pri pri e = fork_deps_pri [] pri e;
402 fun fork e = fork_deps [] e;
411 NONE => Exn.Exn (SYS_ERROR "unfinished future")
412 | SOME (exn as Exn.Exn Exn.Interrupt) =>
413 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
415 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
418 fun join_next deps = (*requires SYNCHRONIZED*)
419 if null deps then NONE
421 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
423 | (NONE, deps') => (worker_wait true work_finished; join_next deps')
424 | (SOME work, deps') => SOME (work, deps'));
426 fun execute_work NONE = ()
427 | execute_work (SOME (work, deps')) = (execute work; join_work deps')
429 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
431 fun join_depend task deps =
432 execute_work (SYNCHRONIZED "join" (fn () =>
433 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
437 fun join_results xs =
438 if forall is_finished xs then map get_result xs
439 else if Multithreading.self_critical () then
440 error "Cannot join future values within critical section"
442 (case worker_task () of
443 SOME task => join_depend task (map task_of xs)
444 | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
449 fun join_result x = singleton join_results x;
450 fun join x = Exn.release (join_result x);
453 (* fast-path versions -- bypassing full task management *)
457 val group = Task_Queue.new_group NONE;
458 val result = Single_Assignment.var "value" : 'a result;
459 val _ = assign_result group result (Exn.Result x);
460 in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
464 val task = task_of x;
465 val group = Task_Queue.new_group (SOME (group_of x));
466 val (result, job) = future_job group (fn () => f (join x));
468 val extended = SYNCHRONIZED "extend" (fn () =>
469 (case Task_Queue.extend task job (! queue) of
470 SOME queue' => (queue := queue'; true)
473 if extended then Future {promised = false, task = task, group = group, result = result}
474 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
478 (* promised futures -- fulfilled by external means *)
480 fun promise_group group : 'a future =
482 val result = Single_Assignment.var "promise" : 'a result;
483 val task = SYNCHRONIZED "enqueue" (fn () =>
484 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
485 in Future {promised = true, task = task, group = group, result = result} end;
487 fun promise () = promise_group (new_group ());
489 fun fulfill_result (Future {promised, task, group, result}) res =
491 val _ = promised orelse raise Fail "Not a promised future";
492 fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
493 val _ = execute (task, group, [job]);
496 fun fulfill x res = fulfill_result x (Exn.Result res);
501 fun interruptible_task f x =
502 if Multithreading.available then
503 Multithreading.with_attributes
505 then Multithreading.private_interrupts
506 else Multithreading.public_interrupts)
508 else interruptible f x;
510 (*cancel: present and future group members will be interrupted eventually*)
511 fun cancel_group group =
512 SYNCHRONIZED "cancel" (fn () => if cancel_now group then () else cancel_later group);
513 fun cancel x = cancel_group (group_of x);
519 if Multithreading.available then
520 SYNCHRONIZED "shutdown" (fn () =>
521 while scheduler_active () do
522 (wait scheduler_event; broadcast_work ()))
526 (*final declarations of this structure!*)
527 val map = map_future;
531 type 'a future = 'a Future.future;