1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join (and variants). The idea is to model
12 parallel value-oriented computations, but *not* communicating
15 * Futures are grouped; failure of one group member causes the whole
16 group to be interrupted eventually. Groups are block-structured.
18 * Forked futures are evaluated spontaneously by a farm of worker
19 threads in the background; join resynchronizes the computation and
20 delivers results (values or exceptions).
22 * The pool of worker threads is limited, usually in correlation with
23 the number of physical cores on the machine. Note that allocation
24 of runtime resources is distorted either if workers yield CPU time
25 (e.g. via system sleep or wait operations), or if non-worker
26 threads contend for significant runtime resources independently.
28 * Promised futures are fulfilled by external means. There is no
29 associated evaluation task, but other futures can depend on them
35 type task = Task_Queue.task
36 type group = Task_Queue.group
37 val is_worker: unit -> bool
38 val worker_task: unit -> Task_Queue.task option
39 val worker_group: unit -> Task_Queue.group option
40 val worker_subgroup: unit -> Task_Queue.group
42 val task_of: 'a future -> task
43 val group_of: 'a future -> group
44 val peek: 'a future -> 'a Exn.result option
45 val is_finished: 'a future -> bool
46 val fork_group: group -> (unit -> 'a) -> 'a future
47 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
48 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
49 val fork_pri: int -> (unit -> 'a) -> 'a future
50 val fork: (unit -> 'a) -> 'a future
51 val join_results: 'a future list -> 'a Exn.result list
52 val join_result: 'a future -> 'a Exn.result
53 val join: 'a future -> 'a
54 val value: 'a -> 'a future
55 val map: ('a -> 'b) -> 'a future -> 'b future
56 val promise_group: group -> 'a future
57 val promise: unit -> 'a future
58 val fulfill_result: 'a future -> 'a Exn.result -> unit
59 val fulfill: 'a future -> 'a -> unit
60 val interruptible_task: ('a -> 'b) -> 'a -> 'b
61 val cancel_group: group -> unit
62 val cancel: 'a future -> unit
63 val shutdown: unit -> unit
64 val status: (unit -> 'a) -> 'a
67 structure Future: FUTURE =
74 type task = Task_Queue.task;
75 type group = Task_Queue.group;
78 val tag = Universal.tag () : (task * group) option Universal.tag;
80 fun thread_data () = the_default NONE (Thread.getLocal tag);
81 fun setmp_thread_data data f x =
82 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
85 val is_worker = is_some o thread_data;
86 val worker_task = Option.map #1 o thread_data;
87 val worker_group = Option.map #2 o thread_data;
88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
93 type 'a result = 'a Exn.result Single_Assignment.var;
95 datatype 'a future = Future of
101 fun task_of (Future {task, ...}) = task;
102 fun group_of (Future {group, ...}) = group;
103 fun result_of (Future {result, ...}) = result;
105 fun peek x = Single_Assignment.peek (result_of x);
106 fun is_finished x = is_some (peek x);
108 fun assign_result group result res =
110 val _ = Single_Assignment.assign result res
111 handle exn as Fail _ =>
112 (case Single_Assignment.peek result of
113 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
116 (case the (Single_Assignment.peek result) of
117 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
118 | Exn.Result _ => true);
125 (* synchronization *)
127 val scheduler_event = ConditionVar.conditionVar ();
128 val work_available = ConditionVar.conditionVar ();
129 val work_finished = ConditionVar.conditionVar ();
132 val lock = Mutex.mutex ();
135 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
137 fun wait cond = (*requires SYNCHRONIZED*)
138 Multithreading.sync_wait NONE NONE cond lock;
140 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
141 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
143 fun signal cond = (*requires SYNCHRONIZED*)
144 ConditionVar.signal cond;
146 fun broadcast cond = (*requires SYNCHRONIZED*)
147 ConditionVar.broadcast cond;
149 fun broadcast_work () = (*requires SYNCHRONIZED*)
150 (ConditionVar.broadcast work_available;
151 ConditionVar.broadcast work_finished);
158 val queue = Unsynchronized.ref Task_Queue.empty;
159 val next = Unsynchronized.ref 0;
160 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
161 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
162 val do_shutdown = Unsynchronized.ref false;
163 val max_workers = Unsynchronized.ref 0;
164 val max_active = Unsynchronized.ref 0;
165 val worker_trend = Unsynchronized.ref 0;
167 datatype worker_state = Working | Waiting | Sleeping;
168 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
170 fun count_workers state = (*requires SYNCHRONIZED*)
171 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
174 (* execute future jobs *)
176 fun future_job group (e: unit -> 'a) =
178 val result = Single_Assignment.var "future" : 'a result;
179 val pos = Position.thread_data ();
184 Exn.capture (fn () =>
185 Multithreading.with_attributes Multithreading.private_interrupts
186 (fn _ => Position.setmp_thread_data pos e ())) ()
187 else Exn.interrupt_exn;
188 in assign_result group result res end;
189 in (result, job) end;
191 fun cancel_now group = (*requires SYNCHRONIZED*)
192 Task_Queue.cancel (! queue) group;
194 fun cancel_later group = (*requires SYNCHRONIZED*)
195 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
196 broadcast scheduler_event);
198 fun execute (task, group, jobs) =
200 val valid = not (Task_Queue.is_canceled group);
201 val ok = setmp_thread_data (task, group) (fn () =>
202 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
203 val _ = SYNCHRONIZED "finish" (fn () =>
205 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
208 else if cancel_now group then ()
209 else cancel_later group;
210 val _ = broadcast work_finished;
211 val _ = if maximal then () else signal work_available;
218 fun worker_wait active cond = (*requires SYNCHRONIZED*)
221 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
223 | NONE => raise Fail "Unregistered worker thread");
224 val _ = state := (if active then Waiting else Sleeping);
226 val _ = state := Working;
229 fun worker_next () = (*requires SYNCHRONIZED*)
230 if length (! workers) > ! max_workers then
231 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
232 signal work_available;
234 else if count_workers Working > ! max_active then
235 (worker_wait false work_available; worker_next ())
237 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
238 NONE => (worker_wait false work_available; worker_next ())
239 | some => (signal work_available; some));
241 fun worker_loop name =
242 (case SYNCHRONIZED name (fn () => worker_next ()) of
244 | SOME work => (execute work; worker_loop name));
246 fun worker_start name = (*requires SYNCHRONIZED*)
247 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
248 Unsynchronized.ref Working));
253 val status_ticks = Unsynchronized.ref 0;
255 val last_round = Unsynchronized.ref Time.zeroTime;
256 val next_round = seconds 0.05;
258 fun scheduler_next () = (*requires SYNCHRONIZED*)
260 val now = Time.now ();
261 val tick = Time.<= (Time.+ (! last_round, next_round), now);
262 val _ = if tick then last_round := now else ();
265 (* queue and worker status *)
268 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
270 if tick andalso ! status_ticks = 0 then
271 Multithreading.tracing 1 (fn () =>
273 val {ready, pending, running, passive} = Task_Queue.status (! queue);
274 val total = length (! workers);
275 val active = count_workers Working;
276 val waiting = count_workers Waiting;
278 "SCHEDULE " ^ Time.toString now ^ ": " ^
279 string_of_int ready ^ " ready, " ^
280 string_of_int pending ^ " pending, " ^
281 string_of_int running ^ " running, " ^
282 string_of_int passive ^ " passive; " ^
283 string_of_int total ^ " workers, " ^
284 string_of_int active ^ " active, " ^
285 string_of_int waiting ^ " waiting "
290 if forall (Thread.isActive o #1) (! workers) then ()
293 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
294 val _ = workers := alive;
296 Multithreading.tracing 0 (fn () =>
297 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
301 (* worker pool adjustments *)
303 val max_active0 = ! max_active;
304 val max_workers0 = ! max_workers;
306 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
307 val _ = max_active := m;
310 if ! do_shutdown then 0
311 else if m = 9999 then 1
312 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
314 if tick andalso mm > ! max_workers then
315 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
316 else if tick andalso mm < ! max_workers then
317 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
320 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
322 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
323 max_workers := Int.min (mm, 2 * m)
326 val missing = ! max_workers - length (! workers);
329 funpow missing (fn () =>
330 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
334 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
335 else signal work_available;
338 (* canceled groups *)
341 if null (! canceled) then ()
343 (Multithreading.tracing 1 (fn () =>
344 string_of_int (length (! canceled)) ^ " canceled groups");
345 Unsynchronized.change canceled (filter_out cancel_now);
351 val _ = Exn.release (wait_timeout next_round scheduler_event);
356 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
357 val continue = not (! do_shutdown andalso null (! workers));
358 val _ = if continue then () else scheduler := NONE;
360 val _ = broadcast scheduler_event;
363 if Exn.is_interrupt exn then
364 (Multithreading.tracing 1 (fn () => "Interrupt");
365 List.app cancel_later (Task_Queue.cancel_all (! queue));
366 broadcast_work (); true)
369 fun scheduler_loop () =
371 Multithreading.with_attributes
372 (Multithreading.sync_interrupts Multithreading.public_interrupts)
373 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
376 fun scheduler_active () = (*requires SYNCHRONIZED*)
377 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
379 fun scheduler_check () = (*requires SYNCHRONIZED*)
380 (do_shutdown := false;
381 if scheduler_active () then ()
382 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
390 fun fork_future opt_group deps pri e =
394 NONE => worker_subgroup ()
395 | SOME group => group);
396 val (result, job) = future_job group e;
397 val task = SYNCHRONIZED "enqueue" (fn () =>
399 val (task, minimal) =
400 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
401 val _ = if minimal then signal work_available else ();
402 val _ = scheduler_check ();
404 in Future {promised = false, task = task, group = group, result = result} end;
406 fun fork_group group e = fork_future (SOME group) [] 0 e;
407 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
408 fun fork_deps deps e = fork_deps_pri deps 0 e;
409 fun fork_pri pri e = fork_deps_pri [] pri e;
410 fun fork e = fork_deps [] e;
419 NONE => Exn.Exn (Fail "Unfinished future")
421 if Exn.is_interrupt_exn res then
422 (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
424 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
427 fun join_next deps = (*requires SYNCHRONIZED*)
428 if null deps then NONE
430 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
432 | (NONE, deps') => (worker_wait true work_finished; join_next deps')
433 | (SOME work, deps') => SOME (work, deps'));
435 fun execute_work NONE = ()
436 | execute_work (SOME (work, deps')) = (execute work; join_work deps')
438 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
440 fun join_depend task deps =
441 execute_work (SYNCHRONIZED "join" (fn () =>
442 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
446 fun join_results xs =
447 if forall is_finished xs then map get_result xs
448 else if Multithreading.self_critical () then
449 error "Cannot join future values within critical section"
451 (case worker_task () of
452 SOME task => join_depend task (map task_of xs)
453 | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
458 fun join_result x = singleton join_results x;
459 fun join x = Exn.release (join_result x);
462 (* fast-path versions -- bypassing full task management *)
466 val group = Task_Queue.new_group NONE;
467 val result = Single_Assignment.var "value" : 'a result;
468 val _ = assign_result group result (Exn.Result x);
469 in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
473 val task = task_of x;
474 val group = Task_Queue.new_group (SOME (group_of x));
475 val (result, job) = future_job group (fn () => f (join x));
477 val extended = SYNCHRONIZED "extend" (fn () =>
478 (case Task_Queue.extend task job (! queue) of
479 SOME queue' => (queue := queue'; true)
482 if extended then Future {promised = false, task = task, group = group, result = result}
483 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
487 (* promised futures -- fulfilled by external means *)
489 fun promise_group group : 'a future =
491 val result = Single_Assignment.var "promise" : 'a result;
492 fun abort () = assign_result group result Exn.interrupt_exn
493 handle Fail _ => true
495 if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
497 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
498 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
499 in Future {promised = true, task = task, group = group, result = result} end;
501 fun promise () = promise_group (worker_subgroup ());
503 fun fulfill_result (Future {promised, task, group, result}) res =
504 if not promised then raise Fail "Not a promised future"
507 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
509 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
512 SYNCHRONIZED "fulfill_result" (fn () =>
513 Unsynchronized.change_result queue
514 (Task_Queue.dequeue_passive (Thread.self ()) task));
515 in if still_passive then execute (task, group, [job]) else () end);
516 val _ = Single_Assignment.await result;
519 fun fulfill x res = fulfill_result x (Exn.Result res);
524 fun interruptible_task f x =
525 if Multithreading.available then
526 Multithreading.with_attributes
528 then Multithreading.private_interrupts
529 else Multithreading.public_interrupts)
531 else interruptible f x;
533 (*cancel: present and future group members will be interrupted eventually*)
534 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
535 (if cancel_now group then () else cancel_later group;
536 signal work_available; scheduler_check ()));
538 fun cancel x = cancel_group (group_of x);
544 if Multithreading.available then
545 SYNCHRONIZED "shutdown" (fn () =>
546 while scheduler_active () do
547 (wait scheduler_event; broadcast_work ()))
556 (case worker_task () of
558 | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
559 val _ = Output.status (Markup.markup (task_props Markup.forked) "");
560 val x = e (); (*sic -- report "joined" only for success*)
561 val _ = Output.status (Markup.markup (task_props Markup.joined) "");
565 (*final declarations of this structure!*)
566 val map = map_future;
570 type 'a future = 'a Future.future;