1 (* Title: Pure/Concurrent/future.ML
8 * Futures are similar to delayed evaluation, i.e. delay/force is
9 generalized to fork/join (and variants). The idea is to model
10 parallel value-oriented computations, but *not* communicating
13 * Futures are grouped; failure of one group member causes the whole
14 group to be interrupted eventually. Groups are block-structured.
16 * Forked futures are evaluated spontaneously by a farm of worker
17 threads in the background; join resynchronizes the computation and
18 delivers results (values or exceptions).
20 * The pool of worker threads is limited, usually in correlation with
21 the number of physical cores on the machine. Note that allocation
22 of runtime resources is distorted either if workers yield CPU time
23 (e.g. via system sleep or wait operations), or if non-worker
24 threads contend for significant runtime resources independently.
29 val enabled: unit -> bool
30 type task = Task_Queue.task
31 type group = Task_Queue.group
32 val is_worker: unit -> bool
33 val worker_group: unit -> Task_Queue.group option
35 val task_of: 'a future -> task
36 val group_of: 'a future -> group
37 val peek: 'a future -> 'a Exn.result option
38 val is_finished: 'a future -> bool
39 val value: 'a -> 'a future
40 val fork: (unit -> 'a) -> 'a future
41 val fork_group: group -> (unit -> 'a) -> 'a future
42 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
43 val fork_pri: int -> (unit -> 'a) -> 'a future
44 val join_results: 'a future list -> 'a Exn.result list
45 val join_result: 'a future -> 'a Exn.result
46 val join: 'a future -> 'a
47 val map: ('a -> 'b) -> 'a future -> 'b future
48 val interruptible_task: ('a -> 'b) -> 'a -> 'b
49 val cancel_group: group -> unit
50 val cancel: 'a future -> unit
51 val shutdown: unit -> unit
54 structure Future: FUTURE =
60 Multithreading.enabled () andalso
61 not (Multithreading.self_critical ());
66 type task = Task_Queue.task;
67 type group = Task_Queue.group;
70 val tag = Universal.tag () : (string * task * group) option Universal.tag;
72 fun thread_data () = the_default NONE (Thread.getLocal tag);
73 fun setmp_thread_data data f x =
74 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
77 val is_worker = is_some o thread_data;
78 val worker_group = Option.map #3 o thread_data;
83 datatype 'a future = Future of
86 result: 'a Exn.result option ref};
88 fun task_of (Future {task, ...}) = task;
89 fun group_of (Future {group, ...}) = group;
91 fun peek (Future {result, ...}) = ! result;
92 fun is_finished x = is_some (peek x);
95 {task = Task_Queue.new_task 0,
96 group = Task_Queue.new_group NONE,
97 result = ref (SOME (Exn.Result x))};
105 val queue = ref Task_Queue.empty;
107 val workers = ref ([]: (Thread.thread * bool) list);
108 val scheduler = ref (NONE: Thread.thread option);
109 val excessive = ref 0;
110 val canceled = ref ([]: Task_Queue.group list);
111 val do_shutdown = ref false;
114 (* synchronization *)
116 val scheduler_event = ConditionVar.conditionVar ();
117 val work_available = ConditionVar.conditionVar ();
118 val work_finished = ConditionVar.conditionVar ();
121 val lock = Mutex.mutex ();
124 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
126 fun wait cond = (*requires SYNCHRONIZED*)
127 ConditionVar.wait (cond, lock);
129 fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
130 ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
132 fun signal cond = (*requires SYNCHRONIZED*)
133 ConditionVar.signal cond;
135 fun broadcast cond = (*requires SYNCHRONIZED*)
136 ConditionVar.broadcast cond;
138 fun broadcast_all () = (*requires SYNCHRONIZED*)
139 (ConditionVar.broadcast scheduler_event;
140 ConditionVar.broadcast work_available;
141 ConditionVar.broadcast work_finished);
146 (* worker activity *)
148 fun count_active ws =
149 fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
151 fun change_active active = (*requires SYNCHRONIZED*)
152 change workers (AList.update Thread.equal (Thread.self (), active));
155 count_active (! workers) > Multithreading.max_threads_value ();
158 (* execute future jobs *)
160 fun future_job group (e: unit -> 'a) =
162 val result = ref (NONE: 'a Exn.result option);
168 (Multithreading.with_attributes Multithreading.restricted_interrupts
169 (fn _ => fn () => e ())) ()
170 else Exn.Exn Exn.Interrupt;
171 val _ = result := SOME res;
174 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
175 | Exn.Result _ => true)
177 in (result, job) end;
179 fun do_cancel group = (*requires SYNCHRONIZED*)
180 (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
182 fun execute name (task, group, jobs) =
184 val valid = not (Task_Queue.is_canceled group);
185 val ok = setmp_thread_data (name, task, group) (fn () =>
186 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
187 val _ = SYNCHRONIZED "execute" (fn () =>
189 val maximal = change_result queue (Task_Queue.finish task);
192 else if Task_Queue.cancel (! queue) group then ()
193 else do_cancel group;
194 val _ = broadcast work_finished;
195 val _ = if maximal then () else broadcast work_available;
202 fun worker_wait cond = (*requires SYNCHRONIZED*)
203 (change_active false; broadcast scheduler_event;
205 change_active true; broadcast scheduler_event);
207 fun worker_next () = (*requires SYNCHRONIZED*)
208 if ! excessive > 0 then
210 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
211 broadcast scheduler_event;
213 else if overloaded () then (worker_wait scheduler_event; worker_next ())
215 (case change_result queue Task_Queue.dequeue of
216 NONE => (worker_wait work_available; worker_next ())
219 fun worker_loop name =
220 (case SYNCHRONIZED name (fn () => worker_next ()) of
222 | SOME work => (execute name work; worker_loop name));
224 fun worker_start name = (*requires SYNCHRONIZED*)
225 change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
230 val last_status = ref Time.zeroTime;
231 val next_status = Time.fromMilliseconds 450;
233 fun scheduler_next () = (*requires SYNCHRONIZED*)
235 (*queue and worker status*)
237 let val now = Time.now () in
238 if Time.> (Time.+ (! last_status, next_status), now) then ()
240 (last_status := now; Multithreading.tracing 1 (fn () =>
242 val {ready, pending, running} = Task_Queue.status (! queue);
243 val total = length (! workers);
244 val active = count_active (! workers);
247 string_of_int ready ^ " ready, " ^
248 string_of_int pending ^ " pending, " ^
249 string_of_int running ^ " running; " ^
250 string_of_int total ^ " workers, " ^
251 string_of_int active ^ " active"
257 if forall (Thread.isActive o #1) (! workers) then ()
259 (case List.partition (Thread.isActive o #1) (! workers) of
262 (workers := alive; Multithreading.tracing 0 (fn () =>
263 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
265 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
266 val mm = (m * 3) div 2;
267 val l = length (! workers);
268 val _ = excessive := l - mm;
271 (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
272 broadcast scheduler_event)
277 if null (! canceled) then ()
278 else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
282 Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
283 val _ = interruptible (fn () => wait_timeout scheduler_event delay) ()
284 handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
287 val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
288 val continue = not (! do_shutdown andalso null (! workers));
289 val _ = if continue then () else scheduler := NONE;
290 val _ = broadcast scheduler_event;
293 fun scheduler_loop () =
294 while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
296 fun scheduler_active () = (*requires SYNCHRONIZED*)
297 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
299 fun scheduler_check () = (*requires SYNCHRONIZED*)
300 (do_shutdown := false;
301 if not (scheduler_active ()) then
302 (scheduler := SOME (SimpleThread.fork false scheduler_loop); broadcast scheduler_event)
311 fun fork_future opt_group deps pri e =
316 | NONE => Task_Queue.new_group (worker_group ()));
317 val (result, job) = future_job group e;
318 val task = SYNCHRONIZED "future" (fn () =>
320 val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
321 val _ = if minimal then signal work_available else ();
322 val _ = scheduler_check ();
324 in Future {task = task, group = group, result = result} end;
326 fun fork e = fork_future NONE [] 0 e;
327 fun fork_group group e = fork_future (SOME group) [] 0 e;
328 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
329 fun fork_pri pri e = fork_future NONE [] pri e;
338 NONE => Exn.Exn (SYS_ERROR "unfinished future")
339 | SOME (Exn.Exn Exn.Interrupt) =>
340 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
344 if SYNCHRONIZED "join_wait" (fn () =>
345 is_finished x orelse (wait work_finished; false))
346 then () else join_wait x;
348 fun join_next deps = (*requires SYNCHRONIZED*)
349 if null deps then NONE
351 (case change_result queue (Task_Queue.dequeue_towards deps) of
353 | (NONE, deps') => (worker_wait work_finished; join_next deps')
354 | (SOME work, deps') => SOME (work, deps'));
357 (case SYNCHRONIZED "join" (fn () => join_next deps) of
359 | SOME (work, deps') => (execute "join" work; join_work deps'));
363 fun join_results xs =
364 if forall is_finished xs then map get_result xs
365 else uninterruptible (fn _ => fn () =>
367 val _ = Multithreading.self_critical () andalso
368 error "Cannot join future values within critical section";
370 if is_worker () then join_work (map task_of xs)
371 else List.app join_wait xs;
372 in map get_result xs end) ();
376 fun join_result x = singleton join_results x;
377 fun join x = Exn.release (join_result x);
384 val task = task_of x;
385 val group = Task_Queue.new_group (SOME (group_of x));
386 val (result, job) = future_job group (fn () => f (join x));
388 val extended = SYNCHRONIZED "map_future" (fn () =>
389 (case Task_Queue.extend task job (! queue) of
390 SOME queue' => (queue := queue'; true)
393 if extended then Future {task = task, group = group, result = result}
394 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
400 fun interruptible_task f x =
401 if Multithreading.available then
402 Multithreading.with_attributes
404 then Multithreading.restricted_interrupts
405 else Multithreading.regular_interrupts)
407 else interruptible f x;
409 (*cancel: present and future group members will be interrupted eventually*)
410 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
411 fun cancel x = cancel_group (group_of x);
417 if Multithreading.available then
418 SYNCHRONIZED "shutdown" (fn () =>
419 while scheduler_active () do
420 (wait scheduler_event; broadcast_all ()))
424 (*final declarations of this structure!*)
425 val map = map_future;
429 type 'a future = 'a Future.future;