1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually. Groups are block-structured.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
30 type task = Task_Queue.task
31 type group = Task_Queue.group
32 val is_worker: unit -> bool
33 val worker_task: unit -> Task_Queue.task option
34 val worker_group: unit -> Task_Queue.group option
36 val task_of: 'a future -> task
37 val group_of: 'a future -> group
38 val peek: 'a future -> 'a Exn.result option
39 val is_finished: 'a future -> bool
40 val value: 'a -> 'a future
41 val fork_group: group -> (unit -> 'a) -> 'a future
42 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
43 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
44 val fork_pri: int -> (unit -> 'a) -> 'a future
45 val fork: (unit -> 'a) -> 'a future
46 val join_results: 'a future list -> 'a Exn.result list
47 val join_result: 'a future -> 'a Exn.result
48 val join: 'a future -> 'a
49 val map: ('a -> 'b) -> 'a future -> 'b future
50 val interruptible_task: ('a -> 'b) -> 'a -> 'b
51 val cancel_group: group -> unit
52 val cancel: 'a future -> unit
53 val shutdown: unit -> unit
56 structure Future: FUTURE =
63 type task = Task_Queue.task;
64 type group = Task_Queue.group;
67 val tag = Universal.tag () : (task * group) option Universal.tag;
69 fun thread_data () = the_default NONE (Thread.getLocal tag);
70 fun setmp_thread_data data f x =
71 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
74 val is_worker = is_some o thread_data;
75 val worker_task = Option.map #1 o thread_data;
76 val worker_group = Option.map #2 o thread_data;
81 datatype 'a future = Future of
84 result: 'a Exn.result option Synchronized.var};
86 fun task_of (Future {task, ...}) = task;
87 fun group_of (Future {group, ...}) = group;
88 fun result_of (Future {result, ...}) = result;
90 fun peek x = Synchronized.value (result_of x);
91 fun is_finished x = is_some (peek x);
94 {task = Task_Queue.new_task 0,
95 group = Task_Queue.new_group NONE,
96 result = Synchronized.var "future" (SOME (Exn.Result x))};
102 (* synchronization *)
104 val scheduler_event = ConditionVar.conditionVar ();
105 val work_available = ConditionVar.conditionVar ();
106 val work_finished = ConditionVar.conditionVar ();
109 val lock = Mutex.mutex ();
112 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
114 fun wait cond = (*requires SYNCHRONIZED*)
115 Multithreading.sync_wait NONE NONE cond lock;
117 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
118 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
120 fun signal cond = (*requires SYNCHRONIZED*)
121 ConditionVar.signal cond;
123 fun broadcast cond = (*requires SYNCHRONIZED*)
124 ConditionVar.broadcast cond;
126 fun broadcast_work () = (*requires SYNCHRONIZED*)
127 (ConditionVar.broadcast work_available;
128 ConditionVar.broadcast work_finished);
135 val queue = Unsynchronized.ref Task_Queue.empty;
136 val next = Unsynchronized.ref 0;
137 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
138 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
139 val do_shutdown = Unsynchronized.ref false;
140 val max_workers = Unsynchronized.ref 0;
141 val max_active = Unsynchronized.ref 0;
142 val worker_trend = Unsynchronized.ref 0;
144 datatype worker_state = Working | Waiting | Sleeping;
145 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
147 fun count_workers state = (*requires SYNCHRONIZED*)
148 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
151 (* execute future jobs *)
153 fun future_job group (e: unit -> 'a) =
155 val result = Synchronized.var "future" (NONE: 'a Exn.result option);
160 Exn.capture (fn () =>
161 Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
162 else Exn.Exn Exn.Interrupt;
163 val _ = Synchronized.assign result (K (SOME res));
166 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
167 | Exn.Result _ => true)
169 in (result, job) end;
171 fun do_cancel group = (*requires SYNCHRONIZED*)
172 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
173 broadcast scheduler_event);
175 fun execute (task, group, jobs) =
177 val valid = not (Task_Queue.is_canceled group);
178 val ok = setmp_thread_data (task, group) (fn () =>
179 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
180 val _ = SYNCHRONIZED "finish" (fn () =>
182 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
185 else if Task_Queue.cancel (! queue) group then ()
186 else do_cancel group;
187 val _ = broadcast work_finished;
188 val _ = if maximal then () else signal work_available;
195 fun worker_wait active cond = (*requires SYNCHRONIZED*)
198 (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
200 | NONE => raise Fail "Unregistered worker thread");
201 val _ = state := (if active then Waiting else Sleeping);
203 val _ = state := Working;
206 fun worker_next () = (*requires SYNCHRONIZED*)
207 if length (! workers) > ! max_workers then
208 (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
209 signal work_available;
211 else if count_workers Working > ! max_active then
212 (worker_wait false work_available; worker_next ())
214 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
215 NONE => (worker_wait false work_available; worker_next ())
216 | some => (signal work_available; some));
218 fun worker_loop name =
219 (case SYNCHRONIZED name (fn () => worker_next ()) of
221 | SOME work => (execute work; worker_loop name));
223 fun worker_start name = (*requires SYNCHRONIZED*)
224 Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
225 Unsynchronized.ref Working));
230 val status_ticks = Unsynchronized.ref 0;
232 val last_round = Unsynchronized.ref Time.zeroTime;
233 val next_round = Time.fromMilliseconds 50;
235 fun scheduler_next () = (*requires SYNCHRONIZED*)
237 val now = Time.now ();
238 val tick = Time.<= (Time.+ (! last_round, next_round), now);
239 val _ = if tick then last_round := now else ();
242 (* queue and worker status *)
245 if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
247 if tick andalso ! status_ticks = 0 then
248 Multithreading.tracing 1 (fn () =>
250 val {ready, pending, running} = Task_Queue.status (! queue);
251 val total = length (! workers);
252 val active = count_workers Working;
253 val waiting = count_workers Waiting;
255 "SCHEDULE " ^ Time.toString now ^ ": " ^
256 string_of_int ready ^ " ready, " ^
257 string_of_int pending ^ " pending, " ^
258 string_of_int running ^ " running; " ^
259 string_of_int total ^ " workers, " ^
260 string_of_int active ^ " active, " ^
261 string_of_int waiting ^ " waiting "
266 if forall (Thread.isActive o #1) (! workers) then ()
269 val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
270 val _ = workers := alive;
272 Multithreading.tracing 0 (fn () =>
273 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
277 (* worker pool adjustments *)
279 val max_active0 = ! max_active;
280 val max_workers0 = ! max_workers;
282 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
283 val _ = max_active := m;
286 if ! do_shutdown then 0
287 else if m = 9999 then 1
288 else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
290 if tick andalso mm > ! max_workers then
291 Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
292 else if tick andalso mm < ! max_workers then
293 Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
296 if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
298 else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
299 max_workers := Int.min (mm, 2 * m)
302 val missing = ! max_workers - length (! workers);
305 funpow missing (fn () =>
306 ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
310 if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
311 else signal work_available;
314 (* canceled groups *)
317 if null (! canceled) then ()
319 (Multithreading.tracing 1 (fn () =>
320 string_of_int (length (! canceled)) ^ " canceled groups");
321 Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
327 val _ = Exn.release (wait_timeout next_round scheduler_event);
332 val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
333 val continue = not (! do_shutdown andalso null (! workers));
334 val _ = if continue then () else scheduler := NONE;
336 val _ = broadcast scheduler_event;
338 handle Exn.Interrupt =>
339 (Multithreading.tracing 1 (fn () => "Interrupt");
340 uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
343 fun scheduler_loop () =
344 Multithreading.with_attributes
345 (Multithreading.sync_interrupts Multithreading.public_interrupts)
346 (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
348 fun scheduler_active () = (*requires SYNCHRONIZED*)
349 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
351 fun scheduler_check () = (*requires SYNCHRONIZED*)
352 (do_shutdown := false;
353 if scheduler_active () then ()
354 else scheduler := SOME (SimpleThread.fork false scheduler_loop));
362 fun fork_future opt_group deps pri e =
367 | NONE => Task_Queue.new_group (worker_group ()));
368 val (result, job) = future_job group e;
369 val task = SYNCHRONIZED "enqueue" (fn () =>
371 val (task, minimal) =
372 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
373 val _ = if minimal then signal work_available else ();
374 val _ = scheduler_check ();
376 in Future {task = task, group = group, result = result} end;
378 fun fork_group group e = fork_future (SOME group) [] 0 e;
379 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
380 fun fork_deps deps e = fork_deps_pri deps 0 e;
381 fun fork_pri pri e = fork_deps_pri [] pri e;
382 fun fork e = fork_deps [] e;
391 NONE => Exn.Exn (SYS_ERROR "unfinished future")
392 | SOME (Exn.Exn Exn.Interrupt) =>
393 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
397 Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
399 fun join_next deps = (*requires SYNCHRONIZED*)
400 if null deps then NONE
402 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
404 | (NONE, deps') => (worker_wait true work_finished; join_next deps')
405 | (SOME work, deps') => SOME (work, deps'));
407 fun execute_work NONE = ()
408 | execute_work (SOME (work, deps')) = (execute work; join_work deps')
410 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
412 fun join_depend task deps =
413 execute_work (SYNCHRONIZED "join" (fn () =>
414 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
418 fun join_results xs =
419 if forall is_finished xs then map get_result xs
420 else if Multithreading.self_critical () then
421 error "Cannot join future values within critical section"
423 (case worker_task () of
424 SOME task => join_depend task (map task_of xs)
425 | NONE => List.app passive_wait xs;
430 fun join_result x = singleton join_results x;
431 fun join x = Exn.release (join_result x);
438 val task = task_of x;
439 val group = Task_Queue.new_group (SOME (group_of x));
440 val (result, job) = future_job group (fn () => f (join x));
442 val extended = SYNCHRONIZED "extend" (fn () =>
443 (case Task_Queue.extend task job (! queue) of
444 SOME queue' => (queue := queue'; true)
447 if extended then Future {task = task, group = group, result = result}
448 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
454 fun interruptible_task f x =
455 if Multithreading.available then
456 Multithreading.with_attributes
458 then Multithreading.private_interrupts
459 else Multithreading.public_interrupts)
461 else interruptible f x;
463 (*cancel: present and future group members will be interrupted eventually*)
464 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
465 fun cancel x = cancel_group (group_of x);
471 if Multithreading.available then
472 SYNCHRONIZED "shutdown" (fn () =>
473 while scheduler_active () do
474 (wait scheduler_event; broadcast_work ()))
478 (*final declarations of this structure!*)
479 val map = map_future;
483 type 'a future = 'a Future.future;