1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually. Groups are block-structured.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
30 type task = Task_Queue.task
31 type group = Task_Queue.group
32 val is_worker: unit -> bool
33 val worker_group: unit -> Task_Queue.group option
35 val task_of: 'a future -> task
36 val group_of: 'a future -> group
37 val peek: 'a future -> 'a Exn.result option
38 val is_finished: 'a future -> bool
39 val value: 'a -> 'a future
40 val fork: (unit -> 'a) -> 'a future
41 val fork_group: group -> (unit -> 'a) -> 'a future
42 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
43 val fork_pri: int -> (unit -> 'a) -> 'a future
44 val join_results: 'a future list -> 'a Exn.result list
45 val join_result: 'a future -> 'a Exn.result
46 val join: 'a future -> 'a
47 val map: ('a -> 'b) -> 'a future -> 'b future
48 val interruptible_task: ('a -> 'b) -> 'a -> 'b
49 val cancel_group: group -> unit
50 val cancel: 'a future -> unit
51 val shutdown: unit -> unit
54 structure Future: FUTURE =
61 type task = Task_Queue.task;
62 type group = Task_Queue.group;
65 val tag = Universal.tag () : (string * task * group) option Universal.tag;
67 fun thread_data () = the_default NONE (Thread.getLocal tag);
68 fun setmp_thread_data data f x =
69 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
72 val is_worker = is_some o thread_data;
73 val worker_group = Option.map #3 o thread_data;
78 datatype 'a future = Future of
81 result: 'a Exn.result option Synchronized.var};
83 fun task_of (Future {task, ...}) = task;
84 fun group_of (Future {group, ...}) = group;
85 fun result_of (Future {result, ...}) = result;
87 fun peek x = Synchronized.peek (result_of x);
88 fun is_finished x = is_some (peek x);
91 {task = Task_Queue.new_task 0,
92 group = Task_Queue.new_group NONE,
93 result = Synchronized.var "future" (SOME (Exn.Result x))};
101 val queue = ref Task_Queue.empty;
103 val workers = ref ([]: (Thread.thread * bool) list);
104 val scheduler = ref (NONE: Thread.thread option);
105 val excessive = ref 0;
106 val canceled = ref ([]: Task_Queue.group list);
107 val do_shutdown = ref false;
110 (* synchronization *)
112 val scheduler_event = ConditionVar.conditionVar ();
113 val work_available = ConditionVar.conditionVar ();
114 val work_finished = ConditionVar.conditionVar ();
117 val lock = Mutex.mutex ();
120 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
122 fun wait cond = (*requires SYNCHRONIZED*)
123 Multithreading.sync_wait NONE NONE cond lock;
125 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
126 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
128 fun signal cond = (*requires SYNCHRONIZED*)
129 ConditionVar.signal cond;
131 fun broadcast cond = (*requires SYNCHRONIZED*)
132 ConditionVar.broadcast cond;
134 fun broadcast_work () = (*requires SYNCHRONIZED*)
135 (ConditionVar.broadcast work_available;
136 ConditionVar.broadcast work_finished);
141 (* execute future jobs *)
143 fun future_job group (e: unit -> 'a) =
145 val result = Synchronized.var "future" (NONE: 'a Exn.result option);
150 Exn.capture (fn () =>
151 Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
152 else Exn.Exn Exn.Interrupt;
153 val _ = Synchronized.change result
155 | SOME _ => raise Fail "Duplicate assignment of future value");
158 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
159 | Exn.Result _ => true)
161 in (result, job) end;
163 fun do_cancel group = (*requires SYNCHRONIZED*)
164 (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
166 fun execute name (task, group, jobs) =
168 val valid = not (Task_Queue.is_canceled group);
169 val ok = setmp_thread_data (name, task, group) (fn () =>
170 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
171 val _ = SYNCHRONIZED "finish" (fn () =>
173 val maximal = change_result queue (Task_Queue.finish task);
176 else if Task_Queue.cancel (! queue) group then ()
177 else do_cancel group;
178 val _ = broadcast work_finished;
179 val _ = if maximal then () else broadcast work_available;
184 (* worker activity *)
186 fun count_active () = (*requires SYNCHRONIZED*)
187 fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
189 fun change_active active = (*requires SYNCHRONIZED*)
190 change workers (AList.update Thread.equal (Thread.self (), active));
195 fun worker_wait cond = (*requires SYNCHRONIZED*)
196 (change_active false; wait cond; change_active true);
198 fun worker_next () = (*requires SYNCHRONIZED*)
199 if ! excessive > 0 then
201 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
202 broadcast scheduler_event;
204 else if count_active () > Multithreading.max_threads_value () then
205 (worker_wait scheduler_event; worker_next ())
207 (case change_result queue (Task_Queue.dequeue (Thread.self ())) of
208 NONE => (worker_wait work_available; worker_next ())
211 fun worker_loop name =
212 (case SYNCHRONIZED name (fn () => worker_next ()) of
214 | SOME work => (execute name work; worker_loop name));
216 fun worker_start name = (*requires SYNCHRONIZED*)
217 change workers (cons (SimpleThread.fork false (fn () =>
218 (broadcast scheduler_event; worker_loop name)), true));
223 val last_status = ref Time.zeroTime;
224 val next_status = Time.fromMilliseconds 500;
225 val next_round = Time.fromMilliseconds 50;
227 fun scheduler_next () = (*requires SYNCHRONIZED*)
229 (*queue and worker status*)
231 let val now = Time.now () in
232 if Time.> (Time.+ (! last_status, next_status), now) then ()
234 (last_status := now; Multithreading.tracing 1 (fn () =>
236 val {ready, pending, running} = Task_Queue.status (! queue);
237 val total = length (! workers);
238 val active = count_active ();
240 "SCHEDULE " ^ string_of_int (Time.toMilliseconds now) ^ ": " ^
241 string_of_int ready ^ " ready, " ^
242 string_of_int pending ^ " pending, " ^
243 string_of_int running ^ " running; " ^
244 string_of_int total ^ " workers, " ^
245 string_of_int active ^ " active"
251 if forall (Thread.isActive o #1) (! workers) then ()
253 (case List.partition (Thread.isActive o #1) (! workers) of
256 (workers := alive; Multithreading.tracing 0 (fn () =>
257 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
259 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
260 val mm = (m * 3) div 2;
261 val l = length (! workers);
262 val _ = excessive := l - mm;
265 funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
270 if null (! canceled) then ()
272 (Multithreading.tracing 1 (fn () =>
273 string_of_int (length (! canceled)) ^ " canceled groups");
274 change canceled (filter_out (Task_Queue.cancel (! queue)));
278 val _ = Exn.release (wait_timeout next_round scheduler_event);
281 val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
282 val continue = not (! do_shutdown andalso null (! workers));
283 val _ = if continue then () else scheduler := NONE;
284 val _ = broadcast scheduler_event;
286 handle Exn.Interrupt =>
287 (Multithreading.tracing 1 (fn () => "Interrupt");
288 uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
291 fun scheduler_loop () =
292 Multithreading.with_attributes
293 (Multithreading.sync_interrupts Multithreading.public_interrupts)
294 (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
296 fun scheduler_active () = (*requires SYNCHRONIZED*)
297 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
299 fun scheduler_check () = (*requires SYNCHRONIZED*)
300 (do_shutdown := false;
301 if scheduler_active () then ()
302 else scheduler := SOME (SimpleThread.fork false scheduler_loop));
310 fun fork_future opt_group deps pri e =
315 | NONE => Task_Queue.new_group (worker_group ()));
316 val (result, job) = future_job group e;
317 val task = SYNCHRONIZED "enqueue" (fn () =>
319 val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
320 val _ = if minimal then signal work_available else ();
321 val _ = scheduler_check ();
323 in Future {task = task, group = group, result = result} end;
325 fun fork e = fork_future NONE [] 0 e;
326 fun fork_group group e = fork_future (SOME group) [] 0 e;
327 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
328 fun fork_pri pri e = fork_future NONE [] pri e;
337 NONE => Exn.Exn (SYS_ERROR "unfinished future")
338 | SOME (Exn.Exn Exn.Interrupt) =>
339 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
343 Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
345 fun join_next deps = (*requires SYNCHRONIZED*)
346 if null deps then NONE
348 (case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
350 | (NONE, deps') => (worker_wait work_finished; join_next deps')
351 | (SOME work, deps') => SOME (work, deps'));
354 (case SYNCHRONIZED "join" (fn () => join_next deps) of
356 | SOME (work, deps') => (execute "join" work; join_work deps'));
360 fun join_results xs =
361 if forall is_finished xs then map get_result xs
362 else if Multithreading.self_critical () then
363 error "Cannot join future values within critical section"
364 else uninterruptible (fn _ => fn () =>
366 then join_work (map task_of xs)
367 else List.app join_wait xs;
368 map get_result xs)) ();
372 fun join_result x = singleton join_results x;
373 fun join x = Exn.release (join_result x);
380 val task = task_of x;
381 val group = Task_Queue.new_group (SOME (group_of x));
382 val (result, job) = future_job group (fn () => f (join x));
384 val extended = SYNCHRONIZED "extend" (fn () =>
385 (case Task_Queue.extend task job (! queue) of
386 SOME queue' => (queue := queue'; true)
389 if extended then Future {task = task, group = group, result = result}
390 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
396 fun interruptible_task f x =
397 if Multithreading.available then
398 Multithreading.with_attributes
400 then Multithreading.private_interrupts
401 else Multithreading.public_interrupts)
403 else interruptible f x;
405 (*cancel: present and future group members will be interrupted eventually*)
406 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
407 fun cancel x = cancel_group (group_of x);
413 if Multithreading.available then
414 SYNCHRONIZED "shutdown" (fn () =>
415 while scheduler_active () do
416 (wait scheduler_event; broadcast_work ()))
420 (*final declarations of this structure!*)
421 val map = map_future;
425 type 'a future = 'a Future.future;