back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
cancel passive tasks more actively via Exn.Interrupt, by treating them like ragular jobs here;
attempts to re-assign canceled futures/promises raise Exn.Interrupt;
tuned;
1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually. Groups are block-structured.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
27 * Promised futures are fulfilled by external means. There is no
28 associated evaluation task, but other futures can depend on them
34 type task = Task_Queue.task
35 type group = Task_Queue.group
36 val is_worker: unit -> bool
37 val worker_task: unit -> Task_Queue.task option
38 val worker_group: unit -> Task_Queue.group option
40 val task_of: 'a future -> task
41 val group_of: 'a future -> group
42 val peek: 'a future -> 'a Exn.result option
43 val is_finished: 'a future -> bool
44 val fork_group: group -> (unit -> 'a) -> 'a future
45 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
46 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
47 val fork_pri: int -> (unit -> 'a) -> 'a future
48 val fork: (unit -> 'a) -> 'a future
49 val join_results: 'a future list -> 'a Exn.result list
50 val join_result: 'a future -> 'a Exn.result
51 val join: 'a future -> 'a
52 val value: 'a -> 'a future
53 val map: ('a -> 'b) -> 'a future -> 'b future
54 val promise_group: group -> 'a future
55 val promise: unit -> 'a future
56 val fulfill_result: 'a future -> 'a Exn.result -> unit
57 val fulfill: 'a future -> 'a -> unit
58 val interruptible_task: ('a -> 'b) -> 'a -> 'b
59 val cancel_group: group -> unit
60 val cancel: 'a future -> unit
61 val shutdown: unit -> unit
62 val report: (unit -> 'a) -> 'a
65 structure Future: FUTURE =
72 type task = Task_Queue.task;
73 type group = Task_Queue.group;
76 val tag = Universal.tag () : (task * group) option Universal.tag;
78 fun thread_data () = the_default NONE (Thread.getLocal tag);
79 fun setmp_thread_data data f x =
80 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
83 val is_worker = is_some o thread_data;
84 val worker_task = Option.map #1 o thread_data;
85 val worker_group = Option.map #2 o thread_data;
87 fun new_group () = Task_Queue.new_group (worker_group ());
92 type 'a result = 'a Exn.result Single_Assignment.var;
94 datatype 'a future = Future of
100 fun task_of (Future {task, ...}) = task;
101 fun group_of (Future {group, ...}) = group;
102 fun result_of (Future {result, ...}) = result;
104 fun peek x = Single_Assignment.peek (result_of x);
105 fun is_finished x = is_some (peek x);
107 fun assign_result group result res =
109 val _ = Single_Assignment.assign result res
110 handle exn as Fail _ =>
111 (case Single_Assignment.peek result of
112 SOME (Exn.Exn Exn.Interrupt) => raise Exn.Interrupt
115 (case the (Single_Assignment.peek result) of
116 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
117 | Exn.Result _ => true);
124 (* synchronization *)
126 val scheduler_event = ConditionVar.conditionVar ();
127 val work_available = ConditionVar.conditionVar ();
128 val work_finished = ConditionVar.conditionVar ();
131 val lock = Mutex.mutex ();
134 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
136 fun wait cond = (*requires SYNCHRONIZED*)
137 Multithreading.sync_wait NONE NONE cond lock;
139 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
140 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
142 fun signal cond = (*requires SYNCHRONIZED*)
143 ConditionVar.signal cond;
145 fun broadcast cond = (*requires SYNCHRONIZED*)
146 ConditionVar.broadcast cond;
148 fun broadcast_work () = (*requires SYNCHRONIZED*)
149 (ConditionVar.broadcast work_available;
150 ConditionVar.broadcast work_finished);
157 val queue = Unsynchronized.ref Task_Queue.empty;
158 val next = Unsynchronized.ref 0;
159 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
160 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
161 val do_shutdown = Unsynchronized.ref false;
162 val max_workers = Unsynchronized.ref 0;
163 val max_active = Unsynchronized.ref 0;
164 val worker_trend = Unsynchronized.ref 0;
166 datatype worker_state = Working | Waiting | Sleeping;
167 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
169 fun count_workers state = (*requires SYNCHRONIZED*)
170 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
173 (* execute future jobs *)
175 fun future_job group (e: unit -> 'a) =
177 val result = Single_Assignment.var "future" : 'a result;
178 val pos = Position.thread_data ();
183 Exn.capture (fn () =>
184 Multithreading.with_attributes Multithreading.private_interrupts
185 (fn _ => Position.setmp_thread_data pos e ())) ()
186 else Exn.Exn Exn.Interrupt;
187 in assign_result group result res end;
188 in (result, job) end;
190 fun cancel_now group = (*requires SYNCHRONIZED*)
191 Task_Queue.cancel (! queue) group;
193 fun cancel_later group = (*requires SYNCHRONIZED*)
194 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
195 broadcast scheduler_event);
197 fun execute (task, group, jobs) =
199 val valid = not (Task_Queue.is_canceled group);
200 val ok = setmp_thread_data (task, group) (fn () =>
201 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
202 val _ = SYNCHRONIZED "finish" (fn () =>
204 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
207 else if cancel_now group then ()
208 else cancel_later group;
209 val _ = broadcast work_finished;
210 val _ = if maximal then () else signal work_available;
217 fun worker_wait active cond = (*requires SYNCHRONIZED*)
220 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
222 | NONE => raise Fail "Unregistered worker thread");
223 val _ = state := (if active then Waiting else Sleeping);
225 val _ = state := Working;
228 fun worker_next () = (*requires SYNCHRONIZED*)
229 if length (! workers) > ! max_workers then
230 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
231 signal work_available;
233 else if count_workers Working > ! max_active then
234 (worker_wait false work_available; worker_next ())
236 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
237 NONE => (worker_wait false work_available; worker_next ())
238 | some => (signal work_available; some));
240 fun worker_loop name =
241 (case SYNCHRONIZED name (fn () => worker_next ()) of
243 | SOME work => (execute work; worker_loop name));
245 fun worker_start name = (*requires SYNCHRONIZED*)
246 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
247 Unsynchronized.ref Working));
252 val status_ticks = Unsynchronized.ref 0;
254 val last_round = Unsynchronized.ref Time.zeroTime;
255 val next_round = Time.fromMilliseconds 50;
257 fun scheduler_next () = (*requires SYNCHRONIZED*)
259 val now = Time.now ();
260 val tick = Time.<= (Time.+ (! last_round, next_round), now);
261 val _ = if tick then last_round := now else ();
264 (* queue and worker status *)
267 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
269 if tick andalso ! status_ticks = 0 then
270 Multithreading.tracing 1 (fn () =>
272 val {ready, pending, running, passive} = Task_Queue.status (! queue);
273 val total = length (! workers);
274 val active = count_workers Working;
275 val waiting = count_workers Waiting;
277 "SCHEDULE " ^ Time.toString now ^ ": " ^
278 string_of_int ready ^ " ready, " ^
279 string_of_int pending ^ " pending, " ^
280 string_of_int running ^ " running, " ^
281 string_of_int passive ^ " passive; " ^
282 string_of_int total ^ " workers, " ^
283 string_of_int active ^ " active, " ^
284 string_of_int waiting ^ " waiting "
289 if forall (Thread.isActive o #1) (! workers) then ()
292 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
293 val _ = workers := alive;
295 Multithreading.tracing 0 (fn () =>
296 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
300 (* worker pool adjustments *)
302 val max_active0 = ! max_active;
303 val max_workers0 = ! max_workers;
305 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
306 val _ = max_active := m;
309 if ! do_shutdown then 0
310 else if m = 9999 then 1
311 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
313 if tick andalso mm > ! max_workers then
314 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
315 else if tick andalso mm < ! max_workers then
316 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
319 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
321 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
322 max_workers := Int.min (mm, 2 * m)
325 val missing = ! max_workers - length (! workers);
328 funpow missing (fn () =>
329 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
333 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
334 else signal work_available;
337 (* canceled groups *)
340 if null (! canceled) then ()
342 (Multithreading.tracing 1 (fn () =>
343 string_of_int (length (! canceled)) ^ " canceled groups");
344 Unsynchronized.change canceled (filter_out cancel_now);
350 val _ = Exn.release (wait_timeout next_round scheduler_event);
355 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
356 val continue = not (! do_shutdown andalso null (! workers));
357 val _ = if continue then () else scheduler := NONE;
359 val _ = broadcast scheduler_event;
361 handle Exn.Interrupt =>
362 (Multithreading.tracing 1 (fn () => "Interrupt");
363 List.app cancel_later (Task_Queue.cancel_all (! queue));
364 broadcast_work (); true);
366 fun scheduler_loop () =
368 Multithreading.with_attributes
369 (Multithreading.sync_interrupts Multithreading.public_interrupts)
370 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
373 fun scheduler_active () = (*requires SYNCHRONIZED*)
374 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
376 fun scheduler_check () = (*requires SYNCHRONIZED*)
377 (do_shutdown := false;
378 if scheduler_active () then ()
379 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
387 fun fork_future opt_group deps pri e =
392 | SOME group => group);
393 val (result, job) = future_job group e;
394 val task = SYNCHRONIZED "enqueue" (fn () =>
396 val (task, minimal) =
397 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
398 val _ = if minimal then signal work_available else ();
399 val _ = scheduler_check ();
401 in Future {promised = false, task = task, group = group, result = result} end;
403 fun fork_group group e = fork_future (SOME group) [] 0 e;
404 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
405 fun fork_deps deps e = fork_deps_pri deps 0 e;
406 fun fork_pri pri e = fork_deps_pri [] pri e;
407 fun fork e = fork_deps [] e;
416 NONE => Exn.Exn (Fail "Unfinished future")
417 | SOME (exn as Exn.Exn Exn.Interrupt) =>
418 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
420 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
423 fun join_next deps = (*requires SYNCHRONIZED*)
424 if null deps then NONE
426 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
428 | (NONE, deps') => (worker_wait true work_finished; join_next deps')
429 | (SOME work, deps') => SOME (work, deps'));
431 fun execute_work NONE = ()
432 | execute_work (SOME (work, deps')) = (execute work; join_work deps')
434 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
436 fun join_depend task deps =
437 execute_work (SYNCHRONIZED "join" (fn () =>
438 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
442 fun join_results xs =
443 if forall is_finished xs then map get_result xs
444 else if Multithreading.self_critical () then
445 error "Cannot join future values within critical section"
447 (case worker_task () of
448 SOME task => join_depend task (map task_of xs)
449 | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
454 fun join_result x = singleton join_results x;
455 fun join x = Exn.release (join_result x);
458 (* fast-path versions -- bypassing full task management *)
462 val group = Task_Queue.new_group NONE;
463 val result = Single_Assignment.var "value" : 'a result;
464 val _ = assign_result group result (Exn.Result x);
465 in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
469 val task = task_of x;
470 val group = Task_Queue.new_group (SOME (group_of x));
471 val (result, job) = future_job group (fn () => f (join x));
473 val extended = SYNCHRONIZED "extend" (fn () =>
474 (case Task_Queue.extend task job (! queue) of
475 SOME queue' => (queue := queue'; true)
478 if extended then Future {promised = false, task = task, group = group, result = result}
479 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
483 (* promised futures -- fulfilled by external means *)
485 fun promise_group group : 'a future =
487 val result = Single_Assignment.var "promise" : 'a result;
488 fun abort () = assign_result group result (Exn.Exn Exn.Interrupt) handle Fail _ => true;
489 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
490 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
491 in Future {promised = true, task = task, group = group, result = result} end;
493 fun promise () = promise_group (new_group ());
495 fun fulfill_result (Future {promised, task, group, result}) res =
497 val _ = promised orelse raise Fail "Not a promised future";
498 fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
499 val _ = execute (task, group, [job]);
502 fun fulfill x res = fulfill_result x (Exn.Result res);
507 fun interruptible_task f x =
508 if Multithreading.available then
509 Multithreading.with_attributes
511 then Multithreading.private_interrupts
512 else Multithreading.public_interrupts)
514 else interruptible f x;
516 (*cancel: present and future group members will be interrupted eventually*)
517 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
518 (if cancel_now group then () else cancel_later group;
519 signal work_available; scheduler_check ()));
521 fun cancel x = cancel_group (group_of x);
527 if Multithreading.available then
528 SYNCHRONIZED "shutdown" (fn () =>
529 while scheduler_active () do
530 (wait scheduler_event; broadcast_work ()))
538 val _ = Output.status (Markup.markup Markup.forked "");
539 val x = e (); (*sic -- report "joined" only for success*)
540 val _ = Output.status (Markup.markup Markup.joined "");
544 (*final declarations of this structure!*)
545 val map = map_future;
549 type 'a future = 'a Future.future;