wenzelm@28156
|
1 |
(* Title: Pure/Concurrent/future.ML
|
wenzelm@28156
|
2 |
Author: Makarius
|
wenzelm@28156
|
3 |
|
wenzelm@28201
|
4 |
Future values.
|
wenzelm@28201
|
5 |
|
wenzelm@28201
|
6 |
Notes:
|
wenzelm@28201
|
7 |
|
wenzelm@28201
|
8 |
* Futures are similar to delayed evaluation, i.e. delay/force is
|
wenzelm@28201
|
9 |
generalized to fork/join (and variants). The idea is to model
|
wenzelm@28201
|
10 |
parallel value-oriented computations, but *not* communicating
|
wenzelm@28201
|
11 |
processes.
|
wenzelm@28201
|
12 |
|
wenzelm@28201
|
13 |
* Futures are grouped; failure of one group member causes the whole
|
wenzelm@32220
|
14 |
group to be interrupted eventually. Groups are block-structured.
|
wenzelm@28201
|
15 |
|
wenzelm@28201
|
16 |
* Forked futures are evaluated spontaneously by a farm of worker
|
wenzelm@28201
|
17 |
threads in the background; join resynchronizes the computation and
|
wenzelm@28201
|
18 |
delivers results (values or exceptions).
|
wenzelm@28201
|
19 |
|
wenzelm@28201
|
20 |
* The pool of worker threads is limited, usually in correlation with
|
wenzelm@28201
|
21 |
the number of physical cores on the machine. Note that allocation
|
wenzelm@28201
|
22 |
of runtime resources is distorted either if workers yield CPU time
|
wenzelm@28201
|
23 |
(e.g. via system sleep or wait operations), or if non-worker
|
wenzelm@28201
|
24 |
threads contend for significant runtime resources independently.
|
wenzelm@28156
|
25 |
*)
|
wenzelm@28156
|
26 |
|
wenzelm@28156
|
27 |
signature FUTURE =
|
wenzelm@28156
|
28 |
sig
|
wenzelm@28645
|
29 |
val enabled: unit -> bool
|
wenzelm@29120
|
30 |
type task = Task_Queue.task
|
wenzelm@29120
|
31 |
type group = Task_Queue.group
|
wenzelm@32074
|
32 |
val is_worker: unit -> bool
|
wenzelm@32122
|
33 |
val worker_group: unit -> Task_Queue.group option
|
wenzelm@28972
|
34 |
type 'a future
|
wenzelm@28972
|
35 |
val task_of: 'a future -> task
|
wenzelm@28972
|
36 |
val group_of: 'a future -> group
|
wenzelm@28972
|
37 |
val peek: 'a future -> 'a Exn.result option
|
wenzelm@28972
|
38 |
val is_finished: 'a future -> bool
|
wenzelm@29002
|
39 |
val value: 'a -> 'a future
|
wenzelm@28972
|
40 |
val fork: (unit -> 'a) -> 'a future
|
wenzelm@28979
|
41 |
val fork_group: group -> (unit -> 'a) -> 'a future
|
wenzelm@28979
|
42 |
val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
|
wenzelm@29120
|
43 |
val fork_pri: int -> (unit -> 'a) -> 'a future
|
wenzelm@28972
|
44 |
val join_results: 'a future list -> 'a Exn.result list
|
wenzelm@28972
|
45 |
val join_result: 'a future -> 'a Exn.result
|
wenzelm@28972
|
46 |
val join: 'a future -> 'a
|
wenzelm@28972
|
47 |
val map: ('a -> 'b) -> 'a future -> 'b future
|
wenzelm@30621
|
48 |
val interruptible_task: ('a -> 'b) -> 'a -> 'b
|
wenzelm@29431
|
49 |
val cancel_group: group -> unit
|
wenzelm@28972
|
50 |
val cancel: 'a future -> unit
|
wenzelm@28203
|
51 |
val shutdown: unit -> unit
|
wenzelm@28156
|
52 |
end;
|
wenzelm@28156
|
53 |
|
wenzelm@28156
|
54 |
structure Future: FUTURE =
|
wenzelm@28156
|
55 |
struct
|
wenzelm@28156
|
56 |
|
wenzelm@28177
|
57 |
(** future values **)
|
wenzelm@28177
|
58 |
|
wenzelm@28645
|
59 |
fun enabled () =
|
wenzelm@29119
|
60 |
Multithreading.enabled () andalso
|
wenzelm@28645
|
61 |
not (Multithreading.self_critical ());
|
wenzelm@28645
|
62 |
|
wenzelm@28645
|
63 |
|
wenzelm@28167
|
64 |
(* identifiers *)
|
wenzelm@28167
|
65 |
|
wenzelm@29120
|
66 |
type task = Task_Queue.task;
|
wenzelm@29120
|
67 |
type group = Task_Queue.group;
|
wenzelm@28167
|
68 |
|
wenzelm@32074
|
69 |
local
|
wenzelm@32074
|
70 |
val tag = Universal.tag () : (string * task * group) option Universal.tag;
|
wenzelm@32074
|
71 |
in
|
wenzelm@28177
|
72 |
fun thread_data () = the_default NONE (Thread.getLocal tag);
|
wenzelm@32074
|
73 |
fun setmp_thread_data data f x =
|
wenzelm@32074
|
74 |
Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
|
wenzelm@28167
|
75 |
end;
|
wenzelm@28167
|
76 |
|
wenzelm@32074
|
77 |
val is_worker = is_some o thread_data;
|
wenzelm@32122
|
78 |
val worker_group = Option.map #3 o thread_data;
|
wenzelm@32074
|
79 |
|
wenzelm@28167
|
80 |
|
wenzelm@28167
|
81 |
(* datatype future *)
|
wenzelm@28167
|
82 |
|
wenzelm@28972
|
83 |
datatype 'a future = Future of
|
wenzelm@28167
|
84 |
{task: task,
|
wenzelm@28177
|
85 |
group: group,
|
wenzelm@28167
|
86 |
result: 'a Exn.result option ref};
|
wenzelm@28167
|
87 |
|
wenzelm@28167
|
88 |
fun task_of (Future {task, ...}) = task;
|
wenzelm@28167
|
89 |
fun group_of (Future {group, ...}) = group;
|
wenzelm@28167
|
90 |
|
wenzelm@28558
|
91 |
fun peek (Future {result, ...}) = ! result;
|
wenzelm@28558
|
92 |
fun is_finished x = is_some (peek x);
|
wenzelm@28320
|
93 |
|
wenzelm@29002
|
94 |
fun value x = Future
|
wenzelm@29120
|
95 |
{task = Task_Queue.new_task 0,
|
wenzelm@32122
|
96 |
group = Task_Queue.new_group NONE,
|
wenzelm@29002
|
97 |
result = ref (SOME (Exn.Result x))};
|
wenzelm@29002
|
98 |
|
wenzelm@28167
|
99 |
|
wenzelm@28177
|
100 |
|
wenzelm@28177
|
101 |
(** scheduling **)
|
wenzelm@28177
|
102 |
|
wenzelm@28177
|
103 |
(* global state *)
|
wenzelm@28177
|
104 |
|
wenzelm@29120
|
105 |
val queue = ref Task_Queue.empty;
|
wenzelm@28468
|
106 |
val next = ref 0;
|
wenzelm@28192
|
107 |
val workers = ref ([]: (Thread.thread * bool) list);
|
wenzelm@28177
|
108 |
val scheduler = ref (NONE: Thread.thread option);
|
wenzelm@28177
|
109 |
val excessive = ref 0;
|
wenzelm@29120
|
110 |
val canceled = ref ([]: Task_Queue.group list);
|
wenzelm@28206
|
111 |
val do_shutdown = ref false;
|
wenzelm@28177
|
112 |
|
wenzelm@28177
|
113 |
|
wenzelm@28177
|
114 |
(* synchronization *)
|
wenzelm@28156
|
115 |
|
wenzelm@32219
|
116 |
val scheduler_event = ConditionVar.conditionVar ();
|
wenzelm@32219
|
117 |
val work_available = ConditionVar.conditionVar ();
|
wenzelm@32219
|
118 |
val work_finished = ConditionVar.conditionVar ();
|
wenzelm@32219
|
119 |
|
wenzelm@28156
|
120 |
local
|
wenzelm@28156
|
121 |
val lock = Mutex.mutex ();
|
wenzelm@28156
|
122 |
in
|
wenzelm@28156
|
123 |
|
wenzelm@28575
|
124 |
fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
|
wenzelm@28156
|
125 |
|
wenzelm@32219
|
126 |
fun wait cond = (*requires SYNCHRONIZED*)
|
wenzelm@28206
|
127 |
ConditionVar.wait (cond, lock);
|
wenzelm@28206
|
128 |
|
wenzelm@32219
|
129 |
fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
|
wenzelm@29341
|
130 |
ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
|
wenzelm@28166
|
131 |
|
wenzelm@32219
|
132 |
fun signal cond = (*requires SYNCHRONIZED*)
|
wenzelm@32219
|
133 |
ConditionVar.signal cond;
|
wenzelm@32219
|
134 |
|
wenzelm@32219
|
135 |
fun broadcast cond = (*requires SYNCHRONIZED*)
|
wenzelm@28166
|
136 |
ConditionVar.broadcast cond;
|
wenzelm@28156
|
137 |
|
wenzelm@32225
|
138 |
fun broadcast_all () = (*requires SYNCHRONIZED*)
|
wenzelm@32225
|
139 |
(ConditionVar.broadcast scheduler_event;
|
wenzelm@32225
|
140 |
ConditionVar.broadcast work_available;
|
wenzelm@32225
|
141 |
ConditionVar.broadcast work_finished);
|
wenzelm@32225
|
142 |
|
wenzelm@28156
|
143 |
end;
|
wenzelm@28156
|
144 |
|
wenzelm@28156
|
145 |
|
wenzelm@28382
|
146 |
(* worker activity *)
|
wenzelm@28382
|
147 |
|
wenzelm@32115
|
148 |
fun count_active ws =
|
wenzelm@32115
|
149 |
fold (fn (_, active) => fn i => if active then i + 1 else i) ws 0;
|
wenzelm@32115
|
150 |
|
wenzelm@32186
|
151 |
fun trace_active () = Multithreading.tracing 6 (fn () =>
|
wenzelm@28382
|
152 |
let
|
wenzelm@28382
|
153 |
val ws = ! workers;
|
wenzelm@28382
|
154 |
val m = string_of_int (length ws);
|
wenzelm@32115
|
155 |
val n = string_of_int (count_active ws);
|
wenzelm@32115
|
156 |
in "SCHEDULE: " ^ m ^ " workers, " ^ n ^ " active" end);
|
wenzelm@28382
|
157 |
|
wenzelm@28382
|
158 |
fun change_active active = (*requires SYNCHRONIZED*)
|
wenzelm@28382
|
159 |
change workers (AList.update Thread.equal (Thread.self (), active));
|
wenzelm@28382
|
160 |
|
wenzelm@32115
|
161 |
fun overloaded () =
|
wenzelm@32115
|
162 |
count_active (! workers) > Multithreading.max_threads_value ();
|
wenzelm@32115
|
163 |
|
wenzelm@28382
|
164 |
|
wenzelm@32119
|
165 |
(* execute future jobs *)
|
wenzelm@32119
|
166 |
|
wenzelm@32119
|
167 |
fun future_job group (e: unit -> 'a) =
|
wenzelm@32119
|
168 |
let
|
wenzelm@32119
|
169 |
val result = ref (NONE: 'a Exn.result option);
|
wenzelm@32127
|
170 |
fun job ok =
|
wenzelm@32127
|
171 |
let
|
wenzelm@32127
|
172 |
val res =
|
wenzelm@32127
|
173 |
if ok then
|
wenzelm@32129
|
174 |
Exn.capture
|
wenzelm@32129
|
175 |
(Multithreading.with_attributes Multithreading.restricted_interrupts
|
wenzelm@32129
|
176 |
(fn _ => fn () => e ())) ()
|
wenzelm@32127
|
177 |
else Exn.Exn Exn.Interrupt;
|
wenzelm@32127
|
178 |
val _ = result := SOME res;
|
wenzelm@32127
|
179 |
in
|
wenzelm@32127
|
180 |
(case res of
|
wenzelm@32127
|
181 |
Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
|
wenzelm@32127
|
182 |
| Exn.Result _ => true)
|
wenzelm@32127
|
183 |
end;
|
wenzelm@32119
|
184 |
in (result, job) end;
|
wenzelm@28156
|
185 |
|
wenzelm@29341
|
186 |
fun do_cancel group = (*requires SYNCHRONIZED*)
|
wenzelm@32225
|
187 |
(change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
|
wenzelm@29341
|
188 |
|
wenzelm@29370
|
189 |
fun execute name (task, group, jobs) =
|
wenzelm@28167
|
190 |
let
|
wenzelm@28382
|
191 |
val _ = trace_active ();
|
wenzelm@32122
|
192 |
val valid = not (Task_Queue.is_canceled group);
|
wenzelm@32074
|
193 |
val ok = setmp_thread_data (name, task, group) (fn () =>
|
wenzelm@29384
|
194 |
fold (fn job => fn ok => job valid andalso ok) jobs true) ();
|
wenzelm@28192
|
195 |
val _ = SYNCHRONIZED "execute" (fn () =>
|
wenzelm@32219
|
196 |
let
|
wenzelm@32219
|
197 |
val maximal = change_result queue (Task_Queue.finish task);
|
wenzelm@32219
|
198 |
val _ =
|
wenzelm@32219
|
199 |
if ok then ()
|
wenzelm@32219
|
200 |
else if Task_Queue.cancel (! queue) group then ()
|
wenzelm@32219
|
201 |
else do_cancel group;
|
wenzelm@32219
|
202 |
val _ = broadcast work_finished;
|
wenzelm@32219
|
203 |
val _ = if maximal then () else broadcast work_available;
|
wenzelm@32219
|
204 |
in () end);
|
wenzelm@28167
|
205 |
in () end;
|
wenzelm@28167
|
206 |
|
wenzelm@28167
|
207 |
|
wenzelm@28167
|
208 |
(* worker threads *)
|
wenzelm@28167
|
209 |
|
wenzelm@32219
|
210 |
fun worker_wait cond = (*requires SYNCHRONIZED*)
|
wenzelm@32219
|
211 |
(change_active false; broadcast scheduler_event;
|
wenzelm@32219
|
212 |
wait cond;
|
wenzelm@32219
|
213 |
change_active true; broadcast scheduler_event);
|
wenzelm@28162
|
214 |
|
wenzelm@29120
|
215 |
fun worker_next () = (*requires SYNCHRONIZED*)
|
wenzelm@28167
|
216 |
if ! excessive > 0 then
|
wenzelm@28167
|
217 |
(dec excessive;
|
wenzelm@28192
|
218 |
change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
|
wenzelm@32219
|
219 |
broadcast scheduler_event;
|
wenzelm@28167
|
220 |
NONE)
|
wenzelm@32219
|
221 |
else if overloaded () then (worker_wait scheduler_event; worker_next ())
|
wenzelm@28166
|
222 |
else
|
wenzelm@29120
|
223 |
(case change_result queue Task_Queue.dequeue of
|
wenzelm@32219
|
224 |
NONE => (worker_wait work_available; worker_next ())
|
wenzelm@28166
|
225 |
| some => some);
|
wenzelm@28156
|
226 |
|
wenzelm@28167
|
227 |
fun worker_loop name =
|
wenzelm@32127
|
228 |
(case SYNCHRONIZED name (fn () => worker_next ()) of
|
wenzelm@29120
|
229 |
NONE => ()
|
wenzelm@28167
|
230 |
| SOME work => (execute name work; worker_loop name));
|
wenzelm@28156
|
231 |
|
wenzelm@28167
|
232 |
fun worker_start name = (*requires SYNCHRONIZED*)
|
wenzelm@28242
|
233 |
change workers (cons (SimpleThread.fork false (fn () => worker_loop name), true));
|
wenzelm@28156
|
234 |
|
wenzelm@28156
|
235 |
|
wenzelm@28156
|
236 |
(* scheduler *)
|
wenzelm@28156
|
237 |
|
wenzelm@28206
|
238 |
fun scheduler_next () = (*requires SYNCHRONIZED*)
|
wenzelm@28156
|
239 |
let
|
wenzelm@32053
|
240 |
(*queue status*)
|
wenzelm@32186
|
241 |
val _ = Multithreading.tracing 6 (fn () =>
|
wenzelm@32053
|
242 |
let val {ready, pending, running} = Task_Queue.status (! queue) in
|
wenzelm@32053
|
243 |
"SCHEDULE: " ^
|
wenzelm@32053
|
244 |
string_of_int ready ^ " ready, " ^
|
wenzelm@32053
|
245 |
string_of_int pending ^ " pending, " ^
|
wenzelm@32053
|
246 |
string_of_int running ^ " running"
|
wenzelm@32053
|
247 |
end);
|
wenzelm@32053
|
248 |
|
wenzelm@28206
|
249 |
(*worker threads*)
|
wenzelm@28191
|
250 |
val _ =
|
wenzelm@32219
|
251 |
if forall (Thread.isActive o #1) (! workers) then ()
|
wenzelm@32115
|
252 |
else
|
wenzelm@32219
|
253 |
(case List.partition (Thread.isActive o #1) (! workers) of
|
wenzelm@32115
|
254 |
(_, []) => ()
|
wenzelm@32220
|
255 |
| (alive, dead) =>
|
wenzelm@32220
|
256 |
(workers := alive; Multithreading.tracing 0 (fn () =>
|
wenzelm@32220
|
257 |
"SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
|
wenzelm@28382
|
258 |
val _ = trace_active ();
|
wenzelm@28191
|
259 |
|
wenzelm@28206
|
260 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
|
wenzelm@32115
|
261 |
val mm = (m * 3) div 2;
|
wenzelm@32219
|
262 |
val l = length (! workers);
|
wenzelm@32115
|
263 |
val _ = excessive := l - mm;
|
wenzelm@28203
|
264 |
val _ =
|
wenzelm@32115
|
265 |
if mm > l then
|
wenzelm@32219
|
266 |
(funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
|
wenzelm@32219
|
267 |
broadcast scheduler_event)
|
wenzelm@28203
|
268 |
else ();
|
wenzelm@28206
|
269 |
|
wenzelm@28206
|
270 |
(*canceled groups*)
|
wenzelm@32225
|
271 |
val _ =
|
wenzelm@32225
|
272 |
if null (! canceled) then ()
|
wenzelm@32225
|
273 |
else (change canceled (filter_out (Task_Queue.cancel (! queue))); broadcast_all ());
|
wenzelm@28206
|
274 |
|
wenzelm@32219
|
275 |
val timeout =
|
wenzelm@32219
|
276 |
Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
|
wenzelm@32219
|
277 |
val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
|
wenzelm@32219
|
278 |
handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
|
wenzelm@32219
|
279 |
|
wenzelm@28206
|
280 |
(*shutdown*)
|
wenzelm@32219
|
281 |
val continue = not (! do_shutdown andalso null (! workers));
|
wenzelm@28206
|
282 |
val _ = if continue then () else scheduler := NONE;
|
wenzelm@32219
|
283 |
val _ = broadcast scheduler_event;
|
wenzelm@28206
|
284 |
in continue end;
|
wenzelm@28167
|
285 |
|
wenzelm@28206
|
286 |
fun scheduler_loop () =
|
wenzelm@32127
|
287 |
while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
|
wenzelm@28191
|
288 |
|
wenzelm@28203
|
289 |
fun scheduler_active () = (*requires SYNCHRONIZED*)
|
wenzelm@28203
|
290 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
|
wenzelm@28203
|
291 |
|
wenzelm@28464
|
292 |
fun scheduler_check name = SYNCHRONIZED name (fn () =>
|
wenzelm@28206
|
293 |
if not (scheduler_active ()) then
|
wenzelm@32219
|
294 |
(do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
|
wenzelm@32219
|
295 |
broadcast scheduler_event)
|
wenzelm@28206
|
296 |
else if ! do_shutdown then error "Scheduler shutdown in progress"
|
wenzelm@28206
|
297 |
else ());
|
wenzelm@28191
|
298 |
|
wenzelm@28191
|
299 |
|
wenzelm@28191
|
300 |
|
wenzelm@29370
|
301 |
(** futures **)
|
wenzelm@29370
|
302 |
|
wenzelm@29370
|
303 |
(* fork *)
|
wenzelm@29370
|
304 |
|
wenzelm@29370
|
305 |
fun fork_future opt_group deps pri e =
|
wenzelm@29370
|
306 |
let
|
wenzelm@29370
|
307 |
val _ = scheduler_check "future check";
|
wenzelm@29370
|
308 |
|
wenzelm@32122
|
309 |
val group =
|
wenzelm@32122
|
310 |
(case opt_group of
|
wenzelm@32122
|
311 |
SOME group => group
|
wenzelm@32122
|
312 |
| NONE => Task_Queue.new_group (worker_group ()));
|
wenzelm@29370
|
313 |
val (result, job) = future_job group e;
|
wenzelm@28192
|
314 |
val task = SYNCHRONIZED "future" (fn () =>
|
wenzelm@32219
|
315 |
let
|
wenzelm@32219
|
316 |
val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
|
wenzelm@32219
|
317 |
val _ = if minimal then signal work_available else ();
|
wenzelm@32219
|
318 |
in task end);
|
wenzelm@28166
|
319 |
in Future {task = task, group = group, result = result} end;
|
wenzelm@28162
|
320 |
|
wenzelm@29370
|
321 |
fun fork e = fork_future NONE [] 0 e;
|
wenzelm@29370
|
322 |
fun fork_group group e = fork_future (SOME group) [] 0 e;
|
wenzelm@29370
|
323 |
fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
|
wenzelm@29370
|
324 |
fun fork_pri pri e = fork_future NONE [] pri e;
|
wenzelm@28162
|
325 |
|
wenzelm@28186
|
326 |
|
wenzelm@29370
|
327 |
(* join *)
|
wenzelm@28186
|
328 |
|
wenzelm@29551
|
329 |
local
|
wenzelm@29551
|
330 |
|
wenzelm@32119
|
331 |
fun get_result x =
|
wenzelm@32119
|
332 |
(case peek x of
|
wenzelm@32122
|
333 |
NONE => Exn.Exn (SYS_ERROR "unfinished future")
|
wenzelm@32122
|
334 |
| SOME (Exn.Exn Exn.Interrupt) =>
|
wenzelm@32122
|
335 |
Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
|
wenzelm@32122
|
336 |
| SOME res => res);
|
wenzelm@28177
|
337 |
|
wenzelm@32224
|
338 |
fun join_wait x =
|
wenzelm@32224
|
339 |
if SYNCHRONIZED "join_wait" (fn () =>
|
wenzelm@32224
|
340 |
is_finished x orelse (wait work_finished; false))
|
wenzelm@32224
|
341 |
then () else join_wait x;
|
wenzelm@32224
|
342 |
|
wenzelm@32115
|
343 |
fun join_next deps = (*requires SYNCHRONIZED*)
|
wenzelm@32224
|
344 |
if null deps then NONE
|
wenzelm@32224
|
345 |
else if overloaded () then (worker_wait scheduler_event; join_next deps)
|
wenzelm@32224
|
346 |
else
|
wenzelm@32224
|
347 |
(case change_result queue (Task_Queue.dequeue_towards deps) of
|
wenzelm@32224
|
348 |
(NONE, []) => NONE
|
wenzelm@32224
|
349 |
| (NONE, deps') => (worker_wait work_finished; join_next deps')
|
wenzelm@32224
|
350 |
| (SOME work, deps') => SOME (work, deps'));
|
wenzelm@32115
|
351 |
|
wenzelm@32224
|
352 |
fun join_work deps =
|
wenzelm@32115
|
353 |
(case SYNCHRONIZED "join" (fn () => join_next deps) of
|
wenzelm@29551
|
354 |
NONE => ()
|
wenzelm@32224
|
355 |
| SOME (work, deps') => (execute "join" work; join_work deps'));
|
wenzelm@29551
|
356 |
|
wenzelm@29551
|
357 |
in
|
wenzelm@29551
|
358 |
|
wenzelm@29370
|
359 |
fun join_results xs =
|
wenzelm@29370
|
360 |
if forall is_finished xs then map get_result xs
|
wenzelm@29370
|
361 |
else uninterruptible (fn _ => fn () =>
|
wenzelm@29370
|
362 |
let
|
wenzelm@29370
|
363 |
val _ = scheduler_check "join check";
|
wenzelm@29370
|
364 |
val _ = Multithreading.self_critical () andalso
|
wenzelm@29370
|
365 |
error "Cannot join future values within critical section";
|
wenzelm@32224
|
366 |
val _ =
|
wenzelm@32224
|
367 |
if is_worker () then join_work (map task_of xs)
|
wenzelm@32224
|
368 |
else List.app join_wait xs;
|
wenzelm@29370
|
369 |
in map get_result xs end) ();
|
wenzelm@28186
|
370 |
|
wenzelm@29551
|
371 |
end;
|
wenzelm@29551
|
372 |
|
wenzelm@28647
|
373 |
fun join_result x = singleton join_results x;
|
wenzelm@28647
|
374 |
fun join x = Exn.release (join_result x);
|
wenzelm@28156
|
375 |
|
wenzelm@28979
|
376 |
|
wenzelm@29370
|
377 |
(* map *)
|
wenzelm@28191
|
378 |
|
wenzelm@29384
|
379 |
fun map_future f x =
|
wenzelm@29370
|
380 |
let
|
wenzelm@29370
|
381 |
val _ = scheduler_check "map_future check";
|
wenzelm@29370
|
382 |
|
wenzelm@29384
|
383 |
val task = task_of x;
|
wenzelm@32122
|
384 |
val group = Task_Queue.new_group (SOME (group_of x));
|
wenzelm@29384
|
385 |
val (result, job) = future_job group (fn () => f (join x));
|
wenzelm@29384
|
386 |
|
wenzelm@29370
|
387 |
val extended = SYNCHRONIZED "map_future" (fn () =>
|
wenzelm@29370
|
388 |
(case Task_Queue.extend task job (! queue) of
|
wenzelm@29370
|
389 |
SOME queue' => (queue := queue'; true)
|
wenzelm@29370
|
390 |
| NONE => false));
|
wenzelm@29370
|
391 |
in
|
wenzelm@29384
|
392 |
if extended then Future {task = task, group = group, result = result}
|
wenzelm@32119
|
393 |
else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
|
wenzelm@29370
|
394 |
end;
|
wenzelm@29370
|
395 |
|
wenzelm@29370
|
396 |
|
wenzelm@29431
|
397 |
(* cancellation *)
|
wenzelm@28202
|
398 |
|
wenzelm@30621
|
399 |
fun interruptible_task f x =
|
wenzelm@30621
|
400 |
if Multithreading.available then
|
wenzelm@30621
|
401 |
Multithreading.with_attributes
|
wenzelm@32074
|
402 |
(if is_worker ()
|
wenzelm@30621
|
403 |
then Multithreading.restricted_interrupts
|
wenzelm@30621
|
404 |
else Multithreading.regular_interrupts)
|
wenzelm@30621
|
405 |
(fn _ => f) x
|
wenzelm@30621
|
406 |
else interruptible f x;
|
wenzelm@30621
|
407 |
|
wenzelm@32222
|
408 |
(*cancel_group: present and future group members will be interrupted eventually*)
|
wenzelm@29431
|
409 |
fun cancel_group group =
|
wenzelm@28464
|
410 |
(scheduler_check "cancel check";
|
wenzelm@32225
|
411 |
SYNCHRONIZED "cancel" (fn () => do_cancel group));
|
wenzelm@28206
|
412 |
|
wenzelm@29431
|
413 |
fun cancel x = cancel_group (group_of x);
|
wenzelm@28206
|
414 |
|
wenzelm@29370
|
415 |
|
wenzelm@29370
|
416 |
(** global join and shutdown **)
|
wenzelm@29370
|
417 |
|
wenzelm@28203
|
418 |
fun shutdown () =
|
wenzelm@28276
|
419 |
if Multithreading.available then
|
wenzelm@28464
|
420 |
(scheduler_check "shutdown check";
|
wenzelm@28276
|
421 |
SYNCHRONIZED "shutdown" (fn () =>
|
wenzelm@32219
|
422 |
(while not (scheduler_active ()) do wait scheduler_event;
|
wenzelm@32219
|
423 |
while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
|
wenzelm@28276
|
424 |
do_shutdown := true;
|
wenzelm@32225
|
425 |
while scheduler_active () do (broadcast_all (); wait scheduler_event))))
|
wenzelm@28276
|
426 |
else ();
|
wenzelm@28203
|
427 |
|
wenzelm@29370
|
428 |
|
wenzelm@29370
|
429 |
(*final declarations of this structure!*)
|
wenzelm@29370
|
430 |
val map = map_future;
|
wenzelm@29370
|
431 |
|
wenzelm@28156
|
432 |
end;
|
wenzelm@28972
|
433 |
|
wenzelm@28972
|
434 |
type 'a future = 'a Future.future;
|
wenzelm@28972
|
435 |
|