1 (* Title: Pure/Concurrent/future.ML
4 Future values, see also
5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually. Groups are block-structured.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
30 type task = Task_Queue.task
31 type group = Task_Queue.group
32 val is_worker: unit -> bool
33 val worker_task: unit -> Task_Queue.task option
34 val worker_group: unit -> Task_Queue.group option
36 val task_of: 'a future -> task
37 val group_of: 'a future -> group
38 val peek: 'a future -> 'a Exn.result option
39 val is_finished: 'a future -> bool
40 val value: 'a -> 'a future
41 val fork_group: group -> (unit -> 'a) -> 'a future
42 val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
43 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
44 val fork_pri: int -> (unit -> 'a) -> 'a future
45 val fork: (unit -> 'a) -> 'a future
46 val join_results: 'a future list -> 'a Exn.result list
47 val join_result: 'a future -> 'a Exn.result
48 val join: 'a future -> 'a
49 val map: ('a -> 'b) -> 'a future -> 'b future
50 val interruptible_task: ('a -> 'b) -> 'a -> 'b
51 val cancel_group: group -> unit
52 val cancel: 'a future -> unit
53 val shutdown: unit -> unit
56 structure Future: FUTURE =
63 type task = Task_Queue.task;
64 type group = Task_Queue.group;
67 val tag = Universal.tag () : (string * task * group) option Universal.tag;
69 fun thread_data () = the_default NONE (Thread.getLocal tag);
70 fun setmp_thread_data data f x =
71 Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
74 val is_worker = is_some o thread_data;
75 val worker_task = Option.map #2 o thread_data;
76 val worker_group = Option.map #3 o thread_data;
81 datatype 'a future = Future of
84 result: 'a Exn.result option Synchronized.var};
86 fun task_of (Future {task, ...}) = task;
87 fun group_of (Future {group, ...}) = group;
88 fun result_of (Future {result, ...}) = result;
90 fun peek x = Synchronized.value (result_of x);
91 fun is_finished x = is_some (peek x);
94 {task = Task_Queue.new_task 0,
95 group = Task_Queue.new_group NONE,
96 result = Synchronized.var "future" (SOME (Exn.Result x))};
104 val queue = Unsynchronized.ref Task_Queue.empty;
105 val next = Unsynchronized.ref 0;
106 val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
107 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
108 val excessive = Unsynchronized.ref 0;
109 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
110 val do_shutdown = Unsynchronized.ref false;
113 (* synchronization *)
115 val scheduler_event = ConditionVar.conditionVar ();
116 val work_available = ConditionVar.conditionVar ();
117 val work_finished = ConditionVar.conditionVar ();
120 val lock = Mutex.mutex ();
123 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
125 fun wait cond = (*requires SYNCHRONIZED*)
126 Multithreading.sync_wait NONE NONE cond lock;
128 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
129 Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
131 fun signal cond = (*requires SYNCHRONIZED*)
132 ConditionVar.signal cond;
134 fun broadcast cond = (*requires SYNCHRONIZED*)
135 ConditionVar.broadcast cond;
137 fun broadcast_work () = (*requires SYNCHRONIZED*)
138 (ConditionVar.broadcast work_available;
139 ConditionVar.broadcast work_finished);
144 (* execute future jobs *)
146 fun future_job group (e: unit -> 'a) =
148 val result = Synchronized.var "future" (NONE: 'a Exn.result option);
153 Exn.capture (fn () =>
154 Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
155 else Exn.Exn Exn.Interrupt;
156 val _ = Synchronized.assign result (K (SOME res));
159 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
160 | Exn.Result _ => true)
162 in (result, job) end;
164 fun do_cancel group = (*requires SYNCHRONIZED*)
165 (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
166 broadcast scheduler_event);
168 fun execute name (task, group, jobs) =
170 val valid = not (Task_Queue.is_canceled group);
171 val ok = setmp_thread_data (name, task, group) (fn () =>
172 fold (fn job => fn ok => job valid andalso ok) jobs true) ();
173 val _ = SYNCHRONIZED "finish" (fn () =>
175 val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
178 else if Task_Queue.cancel (! queue) group then ()
179 else do_cancel group;
180 val _ = broadcast work_finished;
181 val _ = if maximal then () else broadcast work_available;
186 (* worker activity *)
188 fun count_active () = (*requires SYNCHRONIZED*)
189 fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
191 fun change_active active = (*requires SYNCHRONIZED*)
192 Unsynchronized.change workers
193 (AList.update Thread.equal (Thread.self (), active));
198 fun worker_wait cond = (*requires SYNCHRONIZED*)
199 (change_active false; wait cond; change_active true);
201 fun worker_next () = (*requires SYNCHRONIZED*)
202 if ! excessive > 0 then
203 (Unsynchronized.dec excessive;
204 Unsynchronized.change workers
205 (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
206 broadcast scheduler_event;
208 else if count_active () > Multithreading.max_threads_value () then
209 (worker_wait scheduler_event; worker_next ())
211 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
212 NONE => (worker_wait work_available; worker_next ())
215 fun worker_loop name =
216 (case SYNCHRONIZED name (fn () => worker_next ()) of
218 | SOME work => (execute name work; worker_loop name));
220 fun worker_start name = (*requires SYNCHRONIZED*)
221 Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
222 (broadcast scheduler_event; worker_loop name)), true));
227 val last_status = Unsynchronized.ref Time.zeroTime;
228 val next_status = Time.fromMilliseconds 500;
229 val next_round = Time.fromMilliseconds 50;
231 fun scheduler_next () = (*requires SYNCHRONIZED*)
233 (*queue and worker status*)
235 let val now = Time.now () in
236 if Time.> (Time.+ (! last_status, next_status), now) then ()
238 (last_status := now; Multithreading.tracing 1 (fn () =>
240 val {ready, pending, running} = Task_Queue.status (! queue);
241 val total = length (! workers);
242 val active = count_active ();
244 "SCHEDULE " ^ Time.toString now ^ ": " ^
245 string_of_int ready ^ " ready, " ^
246 string_of_int pending ^ " pending, " ^
247 string_of_int running ^ " running; " ^
248 string_of_int total ^ " workers, " ^
249 string_of_int active ^ " active"
255 if forall (Thread.isActive o #1) (! workers) then ()
257 (case List.partition (Thread.isActive o #1) (! workers) of
260 (workers := alive; Multithreading.tracing 0 (fn () =>
261 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
263 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
264 val mm = if m = 9999 then 1 else m * 2;
265 val l = length (! workers);
266 val _ = excessive := l - mm;
269 funpow (mm - l) (fn () =>
270 worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
275 if null (! canceled) then ()
277 (Multithreading.tracing 1 (fn () =>
278 string_of_int (length (! canceled)) ^ " canceled groups");
279 Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
283 val _ = Exn.release (wait_timeout next_round scheduler_event);
286 val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
287 val continue = not (! do_shutdown andalso null (! workers));
288 val _ = if continue then () else scheduler := NONE;
289 val _ = broadcast scheduler_event;
291 handle Exn.Interrupt =>
292 (Multithreading.tracing 1 (fn () => "Interrupt");
293 uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
296 fun scheduler_loop () =
297 Multithreading.with_attributes
298 (Multithreading.sync_interrupts Multithreading.public_interrupts)
299 (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
301 fun scheduler_active () = (*requires SYNCHRONIZED*)
302 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
304 fun scheduler_check () = (*requires SYNCHRONIZED*)
305 (do_shutdown := false;
306 if scheduler_active () then ()
307 else scheduler := SOME (SimpleThread.fork false scheduler_loop));
315 fun fork_future opt_group deps pri e =
320 | NONE => Task_Queue.new_group (worker_group ()));
321 val (result, job) = future_job group e;
322 val task = SYNCHRONIZED "enqueue" (fn () =>
324 val (task, minimal) =
325 Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
326 val _ = if minimal then signal work_available else ();
327 val _ = scheduler_check ();
329 in Future {task = task, group = group, result = result} end;
331 fun fork_group group e = fork_future (SOME group) [] 0 e;
332 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
333 fun fork_deps deps e = fork_deps_pri deps 0 e;
334 fun fork_pri pri e = fork_deps_pri [] pri e;
335 fun fork e = fork_deps [] e;
344 NONE => Exn.Exn (SYS_ERROR "unfinished future")
345 | SOME (Exn.Exn Exn.Interrupt) =>
346 Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
350 Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
352 fun join_next deps = (*requires SYNCHRONIZED*)
353 if null deps then NONE
355 (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
357 | (NONE, deps') => (worker_wait work_finished; join_next deps')
358 | (SOME work, deps') => SOME (work, deps'));
360 fun execute_work NONE = ()
361 | execute_work (SOME (work, deps')) = (execute "join" work; join_work deps')
363 execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
365 fun join_depend task deps =
366 execute_work (SYNCHRONIZED "join" (fn () =>
367 (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
371 fun join_results xs =
372 if forall is_finished xs then map get_result xs
373 else if Multithreading.self_critical () then
374 error "Cannot join future values within critical section"
376 (case worker_task () of
377 SOME task => join_depend task (map task_of xs)
378 | NONE => List.app join_wait xs;
383 fun join_result x = singleton join_results x;
384 fun join x = Exn.release (join_result x);
391 val task = task_of x;
392 val group = Task_Queue.new_group (SOME (group_of x));
393 val (result, job) = future_job group (fn () => f (join x));
395 val extended = SYNCHRONIZED "extend" (fn () =>
396 (case Task_Queue.extend task job (! queue) of
397 SOME queue' => (queue := queue'; true)
400 if extended then Future {task = task, group = group, result = result}
401 else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
407 fun interruptible_task f x =
408 if Multithreading.available then
409 Multithreading.with_attributes
411 then Multithreading.private_interrupts
412 else Multithreading.public_interrupts)
414 else interruptible f x;
416 (*cancel: present and future group members will be interrupted eventually*)
417 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
418 fun cancel x = cancel_group (group_of x);
424 if Multithreading.available then
425 SYNCHRONIZED "shutdown" (fn () =>
426 while scheduler_active () do
427 (wait scheduler_event; broadcast_work ()))
431 (*final declarations of this structure!*)
432 val map = map_future;
436 type 'a future = 'a Future.future;