1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
10 * Futures are similar to delayed evaluation, i.e. delay/force is
11 generalized to fork/join (and variants). The idea is to model
12 parallel value-oriented computations, but *not* communicating
15 * Futures are grouped; failure of one group member causes the whole
16 group to be interrupted eventually. Groups are block-structured.
18 * Forked futures are evaluated spontaneously by a farm of worker
19 threads in the background; join resynchronizes the computation and
20 delivers results (values or exceptions).
22 * The pool of worker threads is limited, usually in correlation with
23 the number of physical cores on the machine. Note that allocation
24 of runtime resources is distorted either if workers yield CPU time
25 (e.g. via system sleep or wait operations), or if non-worker
26 threads contend for significant runtime resources independently.
28 * Promised futures are fulfilled by external means. There is no
29 associated evaluation task, but other futures can depend on them
35 val worker_task: unit -> Task_Queue.task option
36 val worker_group: unit -> Task_Queue.group option
37 val worker_subgroup: unit -> Task_Queue.group
39 val task_of: 'a future -> Task_Queue.task
40 val peek: 'a future -> 'a Exn.result option
41 val is_finished: 'a future -> bool
43 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
44 (unit -> 'a) list -> 'a future list
45 val fork_pri: int -> (unit -> 'a) -> 'a future
46 val fork: (unit -> 'a) -> 'a future
47 val join_results: 'a future list -> 'a Exn.result list
48 val join_result: 'a future -> 'a Exn.result
49 val join: 'a future -> 'a
50 val value: 'a -> 'a future
51 val map: ('a -> 'b) -> 'a future -> 'b future
52 val promise_group: Task_Queue.group -> 'a future
53 val promise: unit -> 'a future
54 val fulfill_result: 'a future -> 'a Exn.result -> unit
55 val fulfill: 'a future -> 'a -> unit
56 val interruptible_task: ('a -> 'b) -> 'a -> 'b
57 val cancel_group: Task_Queue.group -> unit
58 val cancel: 'a future -> unit
59 val shutdown: unit -> unit
60 val status: (unit -> 'a) -> 'a
63 structure Future: FUTURE =
71 val tag = Universal.tag () : Task_Queue.task option Universal.tag;
73 fun worker_task () = the_default NONE (Thread.getLocal tag);
74 fun setmp_worker_task data f x =
75 Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
78 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
79 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
81 fun worker_joining e =
82 (case worker_task () of
84 | SOME task => Task_Queue.joining task e);
86 fun worker_waiting deps e =
87 (case worker_task () of
89 | SOME task => Task_Queue.waiting task deps e);
94 type 'a result = 'a Exn.result Single_Assignment.var;
96 datatype 'a future = Future of
98 task: Task_Queue.task,
101 fun task_of (Future {task, ...}) = task;
102 fun result_of (Future {result, ...}) = result;
104 fun peek x = Single_Assignment.peek (result_of x);
105 fun is_finished x = is_some (peek x);
107 fun assign_result group result res =
109 val _ = Single_Assignment.assign result res
110 handle exn as Fail _ =>
111 (case Single_Assignment.peek result of
112 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
115 (case the (Single_Assignment.peek result) of
116 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
117 | Exn.Result _ => true);
124 (* synchronization *)
126 val scheduler_event = ConditionVar.conditionVar ();
127 val work_available = ConditionVar.conditionVar ();
128 val work_finished = ConditionVar.conditionVar ();
131 val lock = Mutex.mutex ();
134 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
136 fun wait cond = (*requires SYNCHRONIZED*)
137 Multithreading.sync_wait NONE NONE cond lock;
139 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
140 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
142 fun signal cond = (*requires SYNCHRONIZED*)
143 ConditionVar.signal cond;
145 fun broadcast cond = (*requires SYNCHRONIZED*)
146 ConditionVar.broadcast cond;
148 fun broadcast_work () = (*requires SYNCHRONIZED*)
149 (ConditionVar.broadcast work_available;
150 ConditionVar.broadcast work_finished);
157 val queue = Unsynchronized.ref Task_Queue.empty;
158 val next = Unsynchronized.ref 0;
159 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
160 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
161 val do_shutdown = Unsynchronized.ref false;
162 val max_workers = Unsynchronized.ref 0;
163 val max_active = Unsynchronized.ref 0;
164 val worker_trend = Unsynchronized.ref 0;
166 datatype worker_state = Working | Waiting | Sleeping;
167 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
169 fun count_workers state = (*requires SYNCHRONIZED*)
170 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
173 (* execute future jobs *)
175 fun future_job group (e: unit -> 'a) =
177 val result = Single_Assignment.var "future" : 'a result;
178 val pos = Position.thread_data ();
183 Exn.capture (fn () =>
184 Multithreading.with_attributes Multithreading.private_interrupts
185 (fn _ => Position.setmp_thread_data pos e ())) ()
186 else Exn.interrupt_exn;
187 in assign_result group result res end;
188 in (result, job) end;
190 fun cancel_now group = (*requires SYNCHRONIZED*)
191 Task_Queue.cancel (! queue) group;
193 fun cancel_later group = (*requires SYNCHRONIZED*)
194 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
195 broadcast scheduler_event);
197 fun execute (task, jobs) =
199 val group = Task_Queue.group_of_task task;
200 val valid = not (Task_Queue.is_canceled group);
202 Task_Queue.running task (fn () =>
203 setmp_worker_task task (fn () =>
204 fold (fn job => fn ok => job valid andalso ok) jobs true) ());
205 val _ = Multithreading.tracing 1 (fn () =>
207 val s = Task_Queue.str_of_task task;
208 fun micros time = string_of_int (Time.toNanoseconds time div 1000);
209 val (run, wait, deps) = Task_Queue.timing_of_task task;
210 in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
211 val _ = SYNCHRONIZED "finish" (fn () =>
213 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
216 else if cancel_now group then ()
217 else cancel_later group;
218 val _ = broadcast work_finished;
219 val _ = if maximal then () else signal work_available;
226 fun worker_wait active cond = (*requires SYNCHRONIZED*)
229 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
231 | NONE => raise Fail "Unregistered worker thread");
232 val _ = state := (if active then Waiting else Sleeping);
234 val _ = state := Working;
237 fun worker_next () = (*requires SYNCHRONIZED*)
238 if length (! workers) > ! max_workers then
239 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
240 signal work_available;
242 else if count_workers Working > ! max_active then
243 (worker_wait false work_available; worker_next ())
245 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
246 NONE => (worker_wait false work_available; worker_next ())
247 | some => (signal work_available; some));
249 fun worker_loop name =
250 (case SYNCHRONIZED name (fn () => worker_next ()) of
252 | SOME work => (execute work; worker_loop name));
254 fun worker_start name = (*requires SYNCHRONIZED*)
255 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
256 Unsynchronized.ref Working));
261 val status_ticks = Unsynchronized.ref 0;
263 val last_round = Unsynchronized.ref Time.zeroTime;
264 val next_round = seconds 0.05;
266 fun scheduler_next () = (*requires SYNCHRONIZED*)
268 val now = Time.now ();
269 val tick = Time.<= (Time.+ (! last_round, next_round), now);
270 val _ = if tick then last_round := now else ();
273 (* queue and worker status *)
276 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
278 if tick andalso ! status_ticks = 0 then
279 Multithreading.tracing 1 (fn () =>
281 val {ready, pending, running, passive} = Task_Queue.status (! queue);
282 val total = length (! workers);
283 val active = count_workers Working;
284 val waiting = count_workers Waiting;
286 "SCHEDULE " ^ Time.toString now ^ ": " ^
287 string_of_int ready ^ " ready, " ^
288 string_of_int pending ^ " pending, " ^
289 string_of_int running ^ " running, " ^
290 string_of_int passive ^ " passive; " ^
291 string_of_int total ^ " workers, " ^
292 string_of_int active ^ " active, " ^
293 string_of_int waiting ^ " waiting "
298 if forall (Thread.isActive o #1) (! workers) then ()
301 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
302 val _ = workers := alive;
304 Multithreading.tracing 0 (fn () =>
305 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
309 (* worker pool adjustments *)
311 val max_active0 = ! max_active;
312 val max_workers0 = ! max_workers;
314 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
315 val _ = max_active := m;
318 if ! do_shutdown then 0
319 else if m = 9999 then 1
320 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
322 if tick andalso mm > ! max_workers then
323 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
324 else if tick andalso mm < ! max_workers then
325 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
328 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
330 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
331 max_workers := Int.min (mm, 2 * m)
334 val missing = ! max_workers - length (! workers);
337 funpow missing (fn () =>
338 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
342 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
343 else signal work_available;
346 (* canceled groups *)
349 if null (! canceled) then ()
351 (Multithreading.tracing 1 (fn () =>
352 string_of_int (length (! canceled)) ^ " canceled groups");
353 Unsynchronized.change canceled (filter_out cancel_now);
359 val _ = Exn.release (wait_timeout next_round scheduler_event);
364 val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
365 val continue = not (! do_shutdown andalso null (! workers));
366 val _ = if continue then () else scheduler := NONE;
368 val _ = broadcast scheduler_event;
371 if Exn.is_interrupt exn then
372 (Multithreading.tracing 1 (fn () => "Interrupt");
373 List.app cancel_later (Task_Queue.cancel_all (! queue));
374 broadcast_work (); true)
377 fun scheduler_loop () =
379 Multithreading.with_attributes
380 (Multithreading.sync_interrupts Multithreading.public_interrupts)
381 (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
384 fun scheduler_active () = (*requires SYNCHRONIZED*)
385 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
387 fun scheduler_check () = (*requires SYNCHRONIZED*)
388 (do_shutdown := false;
389 if scheduler_active () then ()
390 else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
398 fun forks {name, group, deps, pri} es =
404 NONE => worker_subgroup ()
406 fun enqueue e (minimal, queue) =
408 val (result, job) = future_job grp e;
409 val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
410 val future = Future {promised = false, task = task, result = result};
411 in (future, (minimal orelse minimal', queue')) end;
413 SYNCHRONIZED "enqueue" (fn () =>
415 val (futures, minimal) =
416 Unsynchronized.change_result queue (fn q =>
417 let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
418 in ((futures, minimal), q') end);
419 val _ = if minimal then signal work_available else ();
420 val _ = scheduler_check ();
424 fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
425 fun fork e = fork_pri 0 e;
434 NONE => Exn.Exn (Fail "Unfinished future")
436 if Exn.is_interrupt_exn res then
437 (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of
439 | exns => Exn.Exn (Exn.EXCEPTIONS exns))
442 fun join_next deps = (*requires SYNCHRONIZED*)
443 if Task_Queue.finished_deps deps then NONE
445 (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
447 if Task_Queue.finished_deps deps' then NONE
448 else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
449 | (SOME work, deps') => SOME (work, deps'));
451 fun execute_work NONE = ()
452 | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
454 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
458 fun join_results xs =
461 if forall is_finished xs then ()
462 else if Multithreading.self_critical () then
463 error "Cannot join future values within critical section"
464 else if is_some (worker_task ()) then
465 join_work (Task_Queue.init_deps (map task_of xs))
466 else List.app (ignore o Single_Assignment.await o result_of) xs;
467 in map get_result xs end;
471 fun join_result x = singleton join_results x;
472 fun join x = Exn.release (join_result x);
475 (* fast-path versions -- bypassing full task management *)
479 val task = Task_Queue.dummy_task ();
480 val group = Task_Queue.group_of_task task;
481 val result = Single_Assignment.var "value" : 'a result;
482 val _ = assign_result group result (Exn.Result x);
483 in Future {promised = false, task = task, result = result} end;
487 val task = task_of x;
488 val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
489 val (result, job) = future_job group (fn () => f (join x));
491 val extended = SYNCHRONIZED "extend" (fn () =>
492 (case Task_Queue.extend task job (! queue) of
493 SOME queue' => (queue := queue'; true)
496 if extended then Future {promised = false, task = task, result = result}
499 (forks {name = "Future.map", group = SOME group,
500 deps = [task], pri = Task_Queue.pri_of_task task})
501 (fn () => f (join x))
505 (* promised futures -- fulfilled by external means *)
507 fun promise_group group : 'a future =
509 val result = Single_Assignment.var "promise" : 'a result;
510 fun abort () = assign_result group result Exn.interrupt_exn
511 handle Fail _ => true
513 if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
515 val task = SYNCHRONIZED "enqueue_passive" (fn () =>
516 Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
517 in Future {promised = true, task = task, result = result} end;
519 fun promise () = promise_group (worker_subgroup ());
521 fun fulfill_result (Future {promised, task, result}) res =
522 if not promised then raise Fail "Not a promised future"
525 val group = Task_Queue.group_of_task task;
526 fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
528 Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
531 SYNCHRONIZED "fulfill_result" (fn () =>
532 Unsynchronized.change_result queue
533 (Task_Queue.dequeue_passive (Thread.self ()) task));
534 in if still_passive then execute (task, [job]) else () end);
536 worker_waiting (Task_Queue.init_deps [task])
537 (fn () => Single_Assignment.await result);
540 fun fulfill x res = fulfill_result x (Exn.Result res);
545 fun interruptible_task f x =
546 if Multithreading.available then
547 Multithreading.with_attributes
548 (if is_some (worker_task ())
549 then Multithreading.private_interrupts
550 else Multithreading.public_interrupts)
552 else interruptible f x;
554 (*cancel: present and future group members will be interrupted eventually*)
555 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
556 (if cancel_now group then () else cancel_later group;
557 signal work_available; scheduler_check ()));
559 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
565 if Multithreading.available then
566 SYNCHRONIZED "shutdown" (fn () =>
567 while scheduler_active () do
568 (wait scheduler_event; broadcast_work ()))
577 (case worker_task () of
579 | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
580 val _ = Output.status (Markup.markup (task_props Markup.forked) "");
581 val x = e (); (*sic -- report "joined" only for success*)
582 val _ = Output.status (Markup.markup (task_props Markup.joined) "");
586 (*final declarations of this structure!*)
587 val map = map_future;
591 type 'a future = 'a Future.future;