1 (* Title: Pure/Concurrent/future.ML
9 * Futures are similar to delayed evaluation, i.e. delay/force is
10 generalized to fork/join (and variants). The idea is to model
11 parallel value-oriented computations, but *not* communicating
14 * Futures are grouped; failure of one group member causes the whole
15 group to be interrupted eventually.
17 * Forked futures are evaluated spontaneously by a farm of worker
18 threads in the background; join resynchronizes the computation and
19 delivers results (values or exceptions).
21 * The pool of worker threads is limited, usually in correlation with
22 the number of physical cores on the machine. Note that allocation
23 of runtime resources is distorted either if workers yield CPU time
24 (e.g. via system sleep or wait operations), or if non-worker
25 threads contend for significant runtime resources independently.
30 val enabled: unit -> bool
31 type task = TaskQueue.task
32 type group = TaskQueue.group
33 val thread_data: unit -> (string * task) option
35 val task_of: 'a future -> task
36 val group_of: 'a future -> group
37 val peek: 'a future -> 'a Exn.result option
38 val is_finished: 'a future -> bool
39 val fork: (unit -> 'a) -> 'a future
40 val fork_group: group -> (unit -> 'a) -> 'a future
41 val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
42 val fork_background: (unit -> 'a) -> 'a future
43 val join_results: 'a future list -> 'a Exn.result list
44 val join_result: 'a future -> 'a Exn.result
45 val join: 'a future -> 'a
46 val map: ('a -> 'b) -> 'a future -> 'b future
47 val focus: task list -> unit
48 val interrupt_task: string -> unit
49 val cancel: 'a future -> unit
50 val shutdown: unit -> unit
53 structure Future: FUTURE =
59 ! future_scheduler andalso Multithreading.enabled () andalso
60 not (Multithreading.self_critical ());
65 type task = TaskQueue.task;
66 type group = TaskQueue.group;
68 local val tag = Universal.tag () : (string * task) option Universal.tag in
69 fun thread_data () = the_default NONE (Thread.getLocal tag);
70 fun setmp_thread_data data f x = Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
76 datatype 'a future = Future of
79 result: 'a Exn.result option ref};
81 fun task_of (Future {task, ...}) = task;
82 fun group_of (Future {group, ...}) = group;
84 fun peek (Future {result, ...}) = ! result;
85 fun is_finished x = is_some (peek x);
93 val queue = ref TaskQueue.empty;
95 val workers = ref ([]: (Thread.thread * bool) list);
96 val scheduler = ref (NONE: Thread.thread option);
97 val excessive = ref 0;
98 val canceled = ref ([]: TaskQueue.group list);
99 val do_shutdown = ref false;
102 (* synchronization *)
105 val lock = Mutex.mutex ();
106 val cond = ConditionVar.conditionVar ();
109 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
111 fun wait name = (*requires SYNCHRONIZED*)
112 (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
113 ConditionVar.wait (cond, lock);
114 Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
116 fun wait_timeout name timeout = (*requires SYNCHRONIZED*)
117 (Multithreading.tracing 3 (fn () => name ^ ": wait ...");
118 ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout));
119 Multithreading.tracing 3 (fn () => name ^ ": ... continue"));
121 fun notify_all () = (*requires SYNCHRONIZED*)
122 ConditionVar.broadcast cond;
127 (* worker activity *)
129 fun trace_active () =
132 val m = string_of_int (length ws);
133 val n = string_of_int (length (filter #2 ws));
134 in Multithreading.tracing 1 (fn () => "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active") end;
136 fun change_active active = (*requires SYNCHRONIZED*)
137 change workers (AList.update Thread.equal (Thread.self (), active));
142 fun execute name (task, group, run) =
144 val _ = trace_active ();
145 val ok = setmp_thread_data (name, task) run ();
146 val _ = SYNCHRONIZED "execute" (fn () =>
147 (change queue (TaskQueue.finish task);
149 else if TaskQueue.cancel (! queue) group then ()
150 else change canceled (cons group);
157 fun worker_wait name = (*requires SYNCHRONIZED*)
158 (change_active false; wait name; change_active true);
160 fun worker_next name = (*requires SYNCHRONIZED*)
161 if ! excessive > 0 then
163 change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
167 (case change_result queue TaskQueue.dequeue of
168 NONE => (worker_wait name; worker_next name)
171 fun worker_loop name =
172 (case SYNCHRONIZED name (fn () => worker_next name) of
173 NONE => Multithreading.tracing 3 (fn () => name ^ ": exit")
174 | SOME work => (execute name work; worker_loop name));
176 fun worker_start name = (*requires SYNCHRONIZED*)
177 change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
182 fun scheduler_next () = (*requires SYNCHRONIZED*)
186 (case List.partition (Thread.isActive o #1) (! workers) of
188 | (active, inactive) =>
189 (workers := active; Multithreading.tracing 0 (fn () =>
190 "SCHEDULE: disposed " ^ string_of_int (length inactive) ^ " dead worker threads")));
191 val _ = trace_active ();
193 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
194 val l = length (! workers);
195 val _ = excessive := l - m;
197 if m > l then funpow (m - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
201 val _ = change canceled (filter_out (TaskQueue.cancel (! queue)));
204 val continue = not (! do_shutdown andalso null (! workers));
205 val _ = if continue then () else scheduler := NONE;
207 val _ = notify_all ();
208 val _ = wait_timeout "scheduler" (Time.fromSeconds 3);
211 fun scheduler_loop () =
212 (while SYNCHRONIZED "scheduler" scheduler_next do ();
213 Multithreading.tracing 3 (fn () => "scheduler: exit"));
215 fun scheduler_active () = (*requires SYNCHRONIZED*)
216 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
218 fun scheduler_check name = SYNCHRONIZED name (fn () =>
219 if not (scheduler_active ()) then
220 (Multithreading.tracing 3 (fn () => "scheduler: fork");
221 do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
222 else if ! do_shutdown then error "Scheduler shutdown in progress"
226 (* future values: fork independent computation *)
228 fun future opt_group deps pri (e: unit -> 'a) =
230 val _ = scheduler_check "future check";
232 val group = (case opt_group of SOME group => group | NONE => TaskQueue.new_group ());
234 val result = ref (NONE: 'a Exn.result option);
235 val run = Multithreading.with_attributes (Thread.getAttributes ())
238 val res = if ok then Exn.capture e () else Exn.Exn Exn.Interrupt;
239 val _ = result := SOME res;
243 | Exn.Exn Exn.Interrupt => (TaskQueue.invalidate_group group; true)
247 val task = SYNCHRONIZED "future" (fn () =>
248 change_result queue (TaskQueue.enqueue group deps pri run) before notify_all ());
249 in Future {task = task, group = group, result = result} end;
251 fun fork e = future NONE [] true e;
252 fun fork_group group e = future (SOME group) [] true e;
253 fun fork_deps deps e = future NONE (map task_of deps) true e;
254 fun fork_background e = future NONE [] false e;
257 (* join: retrieve results *)
259 fun join_results [] = []
260 | join_results xs = uninterruptible (fn _ => fn () =>
262 val _ = scheduler_check "join check";
263 val _ = Multithreading.self_critical () andalso
264 exists (not o is_finished) xs andalso
265 error "Cannot join future values within critical section";
267 fun join_loop _ [] = ()
268 | join_loop name tasks =
269 (case SYNCHRONIZED name (fn () =>
270 change_result queue (TaskQueue.dequeue_towards tasks)) of
272 | SOME (work, tasks') => (execute name work; join_loop name tasks'));
274 (case thread_data () of
276 (*alien thread -- refrain from contending for resources*)
277 while exists (not o is_finished) xs
278 do SYNCHRONIZED "join_thread" (fn () => wait "join_thread")
279 | SOME (name, task) =>
280 (*proper task -- actively work towards results*)
282 val unfinished = xs |> map_filter
283 (fn Future {task, result = ref NONE, ...} => SOME task | _ => NONE);
284 val _ = SYNCHRONIZED "join" (fn () =>
285 (change queue (TaskQueue.depend unfinished task); notify_all ()));
286 val _ = join_loop ("join_loop: " ^ name) unfinished;
288 while exists (not o is_finished) xs
289 do SYNCHRONIZED "join_task" (fn () => worker_wait "join_task");
292 in xs |> map (fn Future {result = ref (SOME res), ...} => res) end) ();
294 fun join_result x = singleton join_results x;
295 fun join x = Exn.release (join_result x);
297 fun map f x = fork_deps [x] (fn () => f (join x));
300 (* misc operations *)
302 (*focus: collection of high-priority task*)
303 fun focus tasks = SYNCHRONIZED "focus" (fn () =>
304 change queue (TaskQueue.focus tasks));
306 (*interrupt: permissive signal, may get ignored*)
307 fun interrupt_task id = SYNCHRONIZED "interrupt"
308 (fn () => TaskQueue.interrupt_external (! queue) id);
310 (*cancel: present and future group members will be interrupted eventually*)
312 (scheduler_check "cancel check";
313 SYNCHRONIZED "cancel" (fn () => (change canceled (cons (group_of x)); notify_all ())));
316 (*global join and shutdown*)
318 if Multithreading.available then
319 (scheduler_check "shutdown check";
320 SYNCHRONIZED "shutdown" (fn () =>
321 (while not (scheduler_active ()) do wait "shutdown: scheduler inactive";
322 while not (TaskQueue.is_empty (! queue)) do wait "shutdown: join";
325 while not (null (! workers)) do wait "shutdown: workers";
326 while scheduler_active () do wait "shutdown: scheduler still active";
327 OS.Process.sleep (Time.fromMilliseconds 300))))
332 type 'a future = 'a Future.future;