wenzelm@28156
|
1 |
(* Title: Pure/Concurrent/future.ML
|
wenzelm@28156
|
2 |
Author: Makarius
|
wenzelm@28156
|
3 |
|
wenzelm@45151
|
4 |
Value-oriented parallelism via futures and promises. See also
|
wenzelm@32250
|
5 |
http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
|
wenzelm@38175
|
6 |
http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
|
wenzelm@28201
|
7 |
|
wenzelm@28201
|
8 |
Notes:
|
wenzelm@28201
|
9 |
|
wenzelm@28201
|
10 |
* Futures are similar to delayed evaluation, i.e. delay/force is
|
wenzelm@45151
|
11 |
generalized to fork/join. The idea is to model parallel
|
wenzelm@45151
|
12 |
value-oriented computations (not communicating processes).
|
wenzelm@28201
|
13 |
|
wenzelm@28201
|
14 |
* Forked futures are evaluated spontaneously by a farm of worker
|
wenzelm@28201
|
15 |
threads in the background; join resynchronizes the computation and
|
wenzelm@28201
|
16 |
delivers results (values or exceptions).
|
wenzelm@28201
|
17 |
|
wenzelm@28201
|
18 |
* The pool of worker threads is limited, usually in correlation with
|
wenzelm@28201
|
19 |
the number of physical cores on the machine. Note that allocation
|
wenzelm@45151
|
20 |
of runtime resources may be distorted either if workers yield CPU
|
wenzelm@45151
|
21 |
time (e.g. via system sleep or wait operations), or if non-worker
|
wenzelm@28201
|
22 |
threads contend for significant runtime resources independently.
|
wenzelm@45151
|
23 |
There is a limited number of replacement worker threads that get
|
wenzelm@45151
|
24 |
activated in certain explicit wait conditions.
|
wenzelm@34277
|
25 |
|
wenzelm@45151
|
26 |
* Future tasks are organized in groups, which are block-structured.
|
wenzelm@45151
|
27 |
When forking a new new task, the default is to open an individual
|
wenzelm@45151
|
28 |
subgroup, unless some common group is specified explicitly.
|
wenzelm@45151
|
29 |
Failure of one group member causes the immediate peers to be
|
wenzelm@45151
|
30 |
interrupted eventually (i.e. none by default). Interrupted tasks
|
wenzelm@45151
|
31 |
that lack regular result information, will pick up parallel
|
wenzelm@45151
|
32 |
exceptions from the cumulative group context (as Par_Exn).
|
wenzelm@45151
|
33 |
|
wenzelm@45174
|
34 |
* Future task groups may be canceled: present and future group
|
wenzelm@45174
|
35 |
members will be interrupted eventually.
|
wenzelm@45174
|
36 |
|
wenzelm@45151
|
37 |
* Promised "passive" futures are fulfilled by external means. There
|
wenzelm@45151
|
38 |
is no associated evaluation task, but other futures can depend on
|
wenzelm@45151
|
39 |
them via regular join operations.
|
wenzelm@28156
|
40 |
*)
|
wenzelm@28156
|
41 |
|
wenzelm@28156
|
42 |
signature FUTURE =
|
wenzelm@28156
|
43 |
sig
|
wenzelm@45175
|
44 |
type task = Task_Queue.task
|
wenzelm@45175
|
45 |
type group = Task_Queue.group
|
wenzelm@45175
|
46 |
val new_group: group option -> group
|
wenzelm@45175
|
47 |
val worker_task: unit -> task option
|
wenzelm@45175
|
48 |
val worker_group: unit -> group option
|
wenzelm@45175
|
49 |
val worker_subgroup: unit -> group
|
wenzelm@28972
|
50 |
type 'a future
|
wenzelm@45175
|
51 |
val task_of: 'a future -> task
|
wenzelm@28972
|
52 |
val peek: 'a future -> 'a Exn.result option
|
wenzelm@28972
|
53 |
val is_finished: 'a future -> bool
|
wenzelm@45176
|
54 |
val interruptible_task: ('a -> 'b) -> 'a -> 'b
|
wenzelm@48275
|
55 |
val cancel_group: group -> unit
|
wenzelm@48275
|
56 |
val cancel: 'a future -> unit
|
wenzelm@45298
|
57 |
type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
|
wenzelm@45298
|
58 |
val default_params: params
|
wenzelm@45298
|
59 |
val forks: params -> (unit -> 'a) list -> 'a future list
|
wenzelm@29120
|
60 |
val fork_pri: int -> (unit -> 'a) -> 'a future
|
wenzelm@32729
|
61 |
val fork: (unit -> 'a) -> 'a future
|
wenzelm@28972
|
62 |
val join_results: 'a future list -> 'a Exn.result list
|
wenzelm@28972
|
63 |
val join_result: 'a future -> 'a Exn.result
|
wenzelm@45210
|
64 |
val joins: 'a future list -> 'a list
|
wenzelm@28972
|
65 |
val join: 'a future -> 'a
|
wenzelm@45176
|
66 |
val join_tasks: task list -> unit
|
wenzelm@45169
|
67 |
val value_result: 'a Exn.result -> 'a future
|
wenzelm@34277
|
68 |
val value: 'a -> 'a future
|
wenzelm@45298
|
69 |
val cond_forks: params -> (unit -> 'a) list -> 'a future list
|
wenzelm@28972
|
70 |
val map: ('a -> 'b) -> 'a future -> 'b future
|
wenzelm@45175
|
71 |
val promise_group: group -> (unit -> unit) -> 'a future
|
wenzelm@45173
|
72 |
val promise: (unit -> unit) -> 'a future
|
wenzelm@34277
|
73 |
val fulfill_result: 'a future -> 'a Exn.result -> unit
|
wenzelm@34277
|
74 |
val fulfill: 'a future -> 'a -> unit
|
wenzelm@28203
|
75 |
val shutdown: unit -> unit
|
wenzelm@38492
|
76 |
val status: (unit -> 'a) -> 'a
|
wenzelm@48275
|
77 |
val group_tasks: group -> task list
|
wenzelm@48275
|
78 |
val queue_status: unit -> {ready: int, pending: int, running: int, passive: int}
|
wenzelm@28156
|
79 |
end;
|
wenzelm@28156
|
80 |
|
wenzelm@28156
|
81 |
structure Future: FUTURE =
|
wenzelm@28156
|
82 |
struct
|
wenzelm@28156
|
83 |
|
wenzelm@28177
|
84 |
(** future values **)
|
wenzelm@28177
|
85 |
|
wenzelm@45175
|
86 |
type task = Task_Queue.task;
|
wenzelm@45175
|
87 |
type group = Task_Queue.group;
|
wenzelm@45175
|
88 |
val new_group = Task_Queue.new_group;
|
wenzelm@45175
|
89 |
|
wenzelm@45175
|
90 |
|
wenzelm@28167
|
91 |
(* identifiers *)
|
wenzelm@28167
|
92 |
|
wenzelm@32074
|
93 |
local
|
wenzelm@45175
|
94 |
val tag = Universal.tag () : task option Universal.tag;
|
wenzelm@32074
|
95 |
in
|
wenzelm@42563
|
96 |
fun worker_task () = the_default NONE (Thread.getLocal tag);
|
wenzelm@44993
|
97 |
fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
|
wenzelm@28167
|
98 |
end;
|
wenzelm@28167
|
99 |
|
wenzelm@42563
|
100 |
val worker_group = Option.map Task_Queue.group_of_task o worker_task;
|
wenzelm@45175
|
101 |
fun worker_subgroup () = new_group (worker_group ());
|
wenzelm@34277
|
102 |
|
wenzelm@42559
|
103 |
fun worker_joining e =
|
wenzelm@42559
|
104 |
(case worker_task () of
|
wenzelm@42559
|
105 |
NONE => e ()
|
wenzelm@42559
|
106 |
| SOME task => Task_Queue.joining task e);
|
wenzelm@42559
|
107 |
|
wenzelm@42560
|
108 |
fun worker_waiting deps e =
|
wenzelm@42541
|
109 |
(case worker_task () of
|
wenzelm@42541
|
110 |
NONE => e ()
|
wenzelm@42560
|
111 |
| SOME task => Task_Queue.waiting task deps e);
|
wenzelm@42541
|
112 |
|
wenzelm@28167
|
113 |
|
wenzelm@28167
|
114 |
(* datatype future *)
|
wenzelm@28167
|
115 |
|
wenzelm@35016
|
116 |
type 'a result = 'a Exn.result Single_Assignment.var;
|
wenzelm@35016
|
117 |
|
wenzelm@28972
|
118 |
datatype 'a future = Future of
|
wenzelm@34277
|
119 |
{promised: bool,
|
wenzelm@45175
|
120 |
task: task,
|
wenzelm@35016
|
121 |
result: 'a result};
|
wenzelm@28167
|
122 |
|
wenzelm@28167
|
123 |
fun task_of (Future {task, ...}) = task;
|
wenzelm@32257
|
124 |
fun result_of (Future {result, ...}) = result;
|
wenzelm@28167
|
125 |
|
wenzelm@35016
|
126 |
fun peek x = Single_Assignment.peek (result_of x);
|
wenzelm@28558
|
127 |
fun is_finished x = is_some (peek x);
|
wenzelm@28320
|
128 |
|
wenzelm@28167
|
129 |
|
wenzelm@28177
|
130 |
|
wenzelm@28177
|
131 |
(** scheduling **)
|
wenzelm@28177
|
132 |
|
wenzelm@28177
|
133 |
(* synchronization *)
|
wenzelm@28156
|
134 |
|
wenzelm@32219
|
135 |
val scheduler_event = ConditionVar.conditionVar ();
|
wenzelm@32219
|
136 |
val work_available = ConditionVar.conditionVar ();
|
wenzelm@32219
|
137 |
val work_finished = ConditionVar.conditionVar ();
|
wenzelm@32219
|
138 |
|
wenzelm@28156
|
139 |
local
|
wenzelm@28156
|
140 |
val lock = Mutex.mutex ();
|
wenzelm@28156
|
141 |
in
|
wenzelm@28156
|
142 |
|
wenzelm@37216
|
143 |
fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
|
wenzelm@28156
|
144 |
|
wenzelm@32219
|
145 |
fun wait cond = (*requires SYNCHRONIZED*)
|
wenzelm@32298
|
146 |
Multithreading.sync_wait NONE NONE cond lock;
|
wenzelm@28206
|
147 |
|
wenzelm@32298
|
148 |
fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
|
wenzelm@32298
|
149 |
Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
|
wenzelm@28166
|
150 |
|
wenzelm@32219
|
151 |
fun signal cond = (*requires SYNCHRONIZED*)
|
wenzelm@32219
|
152 |
ConditionVar.signal cond;
|
wenzelm@32219
|
153 |
|
wenzelm@32219
|
154 |
fun broadcast cond = (*requires SYNCHRONIZED*)
|
wenzelm@28166
|
155 |
ConditionVar.broadcast cond;
|
wenzelm@28156
|
156 |
|
wenzelm@32252
|
157 |
fun broadcast_work () = (*requires SYNCHRONIZED*)
|
wenzelm@32252
|
158 |
(ConditionVar.broadcast work_available;
|
wenzelm@32225
|
159 |
ConditionVar.broadcast work_finished);
|
wenzelm@32225
|
160 |
|
wenzelm@28156
|
161 |
end;
|
wenzelm@28156
|
162 |
|
wenzelm@28156
|
163 |
|
wenzelm@33431
|
164 |
(* global state *)
|
wenzelm@33431
|
165 |
|
wenzelm@33431
|
166 |
val queue = Unsynchronized.ref Task_Queue.empty;
|
wenzelm@33431
|
167 |
val next = Unsynchronized.ref 0;
|
wenzelm@33431
|
168 |
val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
|
wenzelm@45175
|
169 |
val canceled = Unsynchronized.ref ([]: group list);
|
wenzelm@33431
|
170 |
val do_shutdown = Unsynchronized.ref false;
|
wenzelm@33431
|
171 |
val max_workers = Unsynchronized.ref 0;
|
wenzelm@33431
|
172 |
val max_active = Unsynchronized.ref 0;
|
wenzelm@33432
|
173 |
val worker_trend = Unsynchronized.ref 0;
|
wenzelm@33431
|
174 |
|
wenzelm@33431
|
175 |
datatype worker_state = Working | Waiting | Sleeping;
|
wenzelm@33431
|
176 |
val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
|
wenzelm@33431
|
177 |
|
wenzelm@33431
|
178 |
fun count_workers state = (*requires SYNCHRONIZED*)
|
wenzelm@33431
|
179 |
fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
|
wenzelm@33431
|
180 |
|
wenzelm@33431
|
181 |
|
wenzelm@44993
|
182 |
(* cancellation primitives *)
|
wenzelm@32119
|
183 |
|
wenzelm@34279
|
184 |
fun cancel_now group = (*requires SYNCHRONIZED*)
|
wenzelm@45221
|
185 |
let
|
wenzelm@48275
|
186 |
val running = Task_Queue.cancel (! queue) group;
|
wenzelm@48275
|
187 |
val _ = List.app Simple_Thread.interrupt_unsynchronized running;
|
wenzelm@48275
|
188 |
in running end;
|
wenzelm@45221
|
189 |
|
wenzelm@45221
|
190 |
fun cancel_all () = (*requires SYNCHRONIZED*)
|
wenzelm@45221
|
191 |
let
|
wenzelm@45221
|
192 |
val (groups, threads) = Task_Queue.cancel_all (! queue);
|
wenzelm@45221
|
193 |
val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
|
wenzelm@45221
|
194 |
in groups end;
|
wenzelm@34279
|
195 |
|
wenzelm@34279
|
196 |
fun cancel_later group = (*requires SYNCHRONIZED*)
|
wenzelm@32738
|
197 |
(Unsynchronized.change canceled (insert Task_Queue.eq_group group);
|
wenzelm@32738
|
198 |
broadcast scheduler_event);
|
wenzelm@29341
|
199 |
|
wenzelm@45176
|
200 |
fun interruptible_task f x =
|
wenzelm@45176
|
201 |
(if Multithreading.available then
|
wenzelm@45176
|
202 |
Multithreading.with_attributes
|
wenzelm@45176
|
203 |
(if is_some (worker_task ())
|
wenzelm@45176
|
204 |
then Multithreading.private_interrupts
|
wenzelm@45176
|
205 |
else Multithreading.public_interrupts)
|
wenzelm@45176
|
206 |
(fn _ => f x)
|
wenzelm@45176
|
207 |
else interruptible f x)
|
wenzelm@45176
|
208 |
before Multithreading.interrupted ();
|
wenzelm@45176
|
209 |
|
wenzelm@45176
|
210 |
|
wenzelm@44993
|
211 |
(* worker threads *)
|
wenzelm@44993
|
212 |
|
wenzelm@44993
|
213 |
fun worker_exec (task, jobs) =
|
wenzelm@28167
|
214 |
let
|
wenzelm@42563
|
215 |
val group = Task_Queue.group_of_task task;
|
wenzelm@32122
|
216 |
val valid = not (Task_Queue.is_canceled group);
|
wenzelm@42541
|
217 |
val ok =
|
wenzelm@42541
|
218 |
Task_Queue.running task (fn () =>
|
wenzelm@42563
|
219 |
setmp_worker_task task (fn () =>
|
wenzelm@42541
|
220 |
fold (fn job => fn ok => job valid andalso ok) jobs true) ());
|
wenzelm@42647
|
221 |
val _ = Multithreading.tracing 2 (fn () =>
|
wenzelm@42541
|
222 |
let
|
wenzelm@44823
|
223 |
val s = Task_Queue.str_of_task_groups task;
|
wenzelm@42541
|
224 |
fun micros time = string_of_int (Time.toNanoseconds time div 1000);
|
wenzelm@42560
|
225 |
val (run, wait, deps) = Task_Queue.timing_of_task task;
|
wenzelm@42560
|
226 |
in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
|
wenzelm@32250
|
227 |
val _ = SYNCHRONIZED "finish" (fn () =>
|
wenzelm@32219
|
228 |
let
|
wenzelm@32738
|
229 |
val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
|
wenzelm@45170
|
230 |
val test = Exn.capture Multithreading.interrupted ();
|
wenzelm@32219
|
231 |
val _ =
|
wenzelm@45170
|
232 |
if ok andalso not (Exn.is_interrupt_exn test) then ()
|
wenzelm@45174
|
233 |
else if null (cancel_now group) then ()
|
wenzelm@34279
|
234 |
else cancel_later group;
|
wenzelm@32219
|
235 |
val _ = broadcast work_finished;
|
wenzelm@33434
|
236 |
val _ = if maximal then () else signal work_available;
|
wenzelm@32219
|
237 |
in () end);
|
wenzelm@28167
|
238 |
in () end;
|
wenzelm@28167
|
239 |
|
wenzelm@33431
|
240 |
fun worker_wait active cond = (*requires SYNCHRONIZED*)
|
wenzelm@33427
|
241 |
let
|
wenzelm@33431
|
242 |
val state =
|
wenzelm@33431
|
243 |
(case AList.lookup Thread.equal (! workers) (Thread.self ()) of
|
wenzelm@33431
|
244 |
SOME state => state
|
wenzelm@33431
|
245 |
| NONE => raise Fail "Unregistered worker thread");
|
wenzelm@33431
|
246 |
val _ = state := (if active then Waiting else Sleeping);
|
wenzelm@33427
|
247 |
val _ = wait cond;
|
wenzelm@33431
|
248 |
val _ = state := Working;
|
wenzelm@33427
|
249 |
in () end;
|
wenzelm@28162
|
250 |
|
wenzelm@33436
|
251 |
fun worker_next () = (*requires SYNCHRONIZED*)
|
wenzelm@33427
|
252 |
if length (! workers) > ! max_workers then
|
wenzelm@33427
|
253 |
(Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
|
wenzelm@33436
|
254 |
signal work_available;
|
wenzelm@28167
|
255 |
NONE)
|
wenzelm@33431
|
256 |
else if count_workers Working > ! max_active then
|
wenzelm@33436
|
257 |
(worker_wait false work_available; worker_next ())
|
wenzelm@28166
|
258 |
else
|
wenzelm@32738
|
259 |
(case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
|
wenzelm@33436
|
260 |
NONE => (worker_wait false work_available; worker_next ())
|
wenzelm@33434
|
261 |
| some => (signal work_available; some));
|
wenzelm@28156
|
262 |
|
wenzelm@28167
|
263 |
fun worker_loop name =
|
wenzelm@33436
|
264 |
(case SYNCHRONIZED name (fn () => worker_next ()) of
|
wenzelm@29120
|
265 |
NONE => ()
|
wenzelm@45170
|
266 |
| SOME work => (worker_exec work; worker_loop name));
|
wenzelm@28156
|
267 |
|
wenzelm@33428
|
268 |
fun worker_start name = (*requires SYNCHRONIZED*)
|
wenzelm@37216
|
269 |
Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
|
wenzelm@33431
|
270 |
Unsynchronized.ref Working));
|
wenzelm@28156
|
271 |
|
wenzelm@28156
|
272 |
|
wenzelm@28156
|
273 |
(* scheduler *)
|
wenzelm@28156
|
274 |
|
wenzelm@33428
|
275 |
val status_ticks = Unsynchronized.ref 0;
|
wenzelm@33428
|
276 |
|
wenzelm@33428
|
277 |
val last_round = Unsynchronized.ref Time.zeroTime;
|
wenzelm@40553
|
278 |
val next_round = seconds 0.05;
|
wenzelm@32226
|
279 |
|
wenzelm@28206
|
280 |
fun scheduler_next () = (*requires SYNCHRONIZED*)
|
wenzelm@28156
|
281 |
let
|
wenzelm@33428
|
282 |
val now = Time.now ();
|
wenzelm@33428
|
283 |
val tick = Time.<= (Time.+ (! last_round, next_round), now);
|
wenzelm@33428
|
284 |
val _ = if tick then last_round := now else ();
|
wenzelm@33428
|
285 |
|
wenzelm@33436
|
286 |
|
wenzelm@33436
|
287 |
(* queue and worker status *)
|
wenzelm@33436
|
288 |
|
wenzelm@32226
|
289 |
val _ =
|
wenzelm@33428
|
290 |
if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
|
wenzelm@33428
|
291 |
val _ =
|
wenzelm@33428
|
292 |
if tick andalso ! status_ticks = 0 then
|
wenzelm@33428
|
293 |
Multithreading.tracing 1 (fn () =>
|
wenzelm@33428
|
294 |
let
|
wenzelm@34277
|
295 |
val {ready, pending, running, passive} = Task_Queue.status (! queue);
|
wenzelm@33428
|
296 |
val total = length (! workers);
|
wenzelm@33431
|
297 |
val active = count_workers Working;
|
wenzelm@33431
|
298 |
val waiting = count_workers Waiting;
|
wenzelm@33428
|
299 |
in
|
wenzelm@33428
|
300 |
"SCHEDULE " ^ Time.toString now ^ ": " ^
|
wenzelm@33428
|
301 |
string_of_int ready ^ " ready, " ^
|
wenzelm@33428
|
302 |
string_of_int pending ^ " pending, " ^
|
wenzelm@34277
|
303 |
string_of_int running ^ " running, " ^
|
wenzelm@34277
|
304 |
string_of_int passive ^ " passive; " ^
|
wenzelm@33428
|
305 |
string_of_int total ^ " workers, " ^
|
wenzelm@33431
|
306 |
string_of_int active ^ " active, " ^
|
wenzelm@33431
|
307 |
string_of_int waiting ^ " waiting "
|
wenzelm@33428
|
308 |
end)
|
wenzelm@33428
|
309 |
else ();
|
wenzelm@32053
|
310 |
|
wenzelm@28191
|
311 |
val _ =
|
wenzelm@32219
|
312 |
if forall (Thread.isActive o #1) (! workers) then ()
|
wenzelm@32115
|
313 |
else
|
wenzelm@33430
|
314 |
let
|
wenzelm@37698
|
315 |
val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
|
wenzelm@33430
|
316 |
val _ = workers := alive;
|
wenzelm@33430
|
317 |
in
|
wenzelm@33430
|
318 |
Multithreading.tracing 0 (fn () =>
|
wenzelm@33430
|
319 |
"SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
|
wenzelm@33430
|
320 |
end;
|
wenzelm@28191
|
321 |
|
wenzelm@33436
|
322 |
|
wenzelm@33436
|
323 |
(* worker pool adjustments *)
|
wenzelm@33436
|
324 |
|
wenzelm@33436
|
325 |
val max_active0 = ! max_active;
|
wenzelm@33436
|
326 |
val max_workers0 = ! max_workers;
|
wenzelm@33436
|
327 |
|
wenzelm@28206
|
328 |
val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
|
wenzelm@33427
|
329 |
val _ = max_active := m;
|
wenzelm@33427
|
330 |
|
wenzelm@33432
|
331 |
val mm =
|
wenzelm@33432
|
332 |
if ! do_shutdown then 0
|
wenzelm@33432
|
333 |
else if m = 9999 then 1
|
wenzelm@33434
|
334 |
else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
|
wenzelm@33432
|
335 |
val _ =
|
wenzelm@33432
|
336 |
if tick andalso mm > ! max_workers then
|
wenzelm@33432
|
337 |
Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
|
wenzelm@33432
|
338 |
else if tick andalso mm < ! max_workers then
|
wenzelm@33432
|
339 |
Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
|
wenzelm@33432
|
340 |
else ();
|
wenzelm@33432
|
341 |
val _ =
|
wenzelm@33436
|
342 |
if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
|
wenzelm@33436
|
343 |
max_workers := mm
|
wenzelm@45044
|
344 |
else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
|
wenzelm@33436
|
345 |
max_workers := Int.min (mm, 2 * m)
|
wenzelm@33432
|
346 |
else ();
|
wenzelm@33427
|
347 |
|
wenzelm@33428
|
348 |
val missing = ! max_workers - length (! workers);
|
wenzelm@28203
|
349 |
val _ =
|
wenzelm@33428
|
350 |
if missing > 0 then
|
wenzelm@33436
|
351 |
funpow missing (fn () =>
|
wenzelm@33436
|
352 |
ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
|
wenzelm@28203
|
353 |
else ();
|
wenzelm@28206
|
354 |
|
wenzelm@33436
|
355 |
val _ =
|
wenzelm@33436
|
356 |
if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
|
wenzelm@33436
|
357 |
else signal work_available;
|
wenzelm@33436
|
358 |
|
wenzelm@33436
|
359 |
|
wenzelm@33436
|
360 |
(* canceled groups *)
|
wenzelm@33436
|
361 |
|
wenzelm@32225
|
362 |
val _ =
|
wenzelm@32225
|
363 |
if null (! canceled) then ()
|
wenzelm@32293
|
364 |
else
|
wenzelm@32293
|
365 |
(Multithreading.tracing 1 (fn () =>
|
wenzelm@32293
|
366 |
string_of_int (length (! canceled)) ^ " canceled groups");
|
wenzelm@45174
|
367 |
Unsynchronized.change canceled (filter_out (null o cancel_now));
|
wenzelm@32293
|
368 |
broadcast_work ());
|
wenzelm@28206
|
369 |
|
wenzelm@33436
|
370 |
|
wenzelm@33436
|
371 |
(* delay loop *)
|
wenzelm@33436
|
372 |
|
wenzelm@32298
|
373 |
val _ = Exn.release (wait_timeout next_round scheduler_event);
|
wenzelm@32219
|
374 |
|
wenzelm@33436
|
375 |
|
wenzelm@33436
|
376 |
(* shutdown *)
|
wenzelm@33436
|
377 |
|
wenzelm@34277
|
378 |
val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
|
wenzelm@32219
|
379 |
val continue = not (! do_shutdown andalso null (! workers));
|
wenzelm@28206
|
380 |
val _ = if continue then () else scheduler := NONE;
|
wenzelm@33436
|
381 |
|
wenzelm@32219
|
382 |
val _ = broadcast scheduler_event;
|
wenzelm@32298
|
383 |
in continue end
|
wenzelm@39509
|
384 |
handle exn =>
|
wenzelm@39509
|
385 |
if Exn.is_interrupt exn then
|
wenzelm@39509
|
386 |
(Multithreading.tracing 1 (fn () => "Interrupt");
|
wenzelm@45221
|
387 |
List.app cancel_later (cancel_all ());
|
wenzelm@39509
|
388 |
broadcast_work (); true)
|
wenzelm@39509
|
389 |
else reraise exn;
|
wenzelm@32298
|
390 |
|
wenzelm@28206
|
391 |
fun scheduler_loop () =
|
wenzelm@45044
|
392 |
(while
|
wenzelm@33437
|
393 |
Multithreading.with_attributes
|
wenzelm@33437
|
394 |
(Multithreading.sync_interrupts Multithreading.public_interrupts)
|
wenzelm@33437
|
395 |
(fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
|
wenzelm@45044
|
396 |
do (); last_round := Time.zeroTime);
|
wenzelm@28191
|
397 |
|
wenzelm@28203
|
398 |
fun scheduler_active () = (*requires SYNCHRONIZED*)
|
wenzelm@28203
|
399 |
(case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
|
wenzelm@28203
|
400 |
|
wenzelm@32228
|
401 |
fun scheduler_check () = (*requires SYNCHRONIZED*)
|
wenzelm@32228
|
402 |
(do_shutdown := false;
|
wenzelm@32252
|
403 |
if scheduler_active () then ()
|
wenzelm@37216
|
404 |
else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
|
wenzelm@28191
|
405 |
|
wenzelm@45176
|
406 |
|
wenzelm@45176
|
407 |
|
wenzelm@45176
|
408 |
(** futures **)
|
wenzelm@45176
|
409 |
|
wenzelm@45176
|
410 |
(* cancel *)
|
wenzelm@45176
|
411 |
|
wenzelm@45176
|
412 |
fun cancel_group group = SYNCHRONIZED "cancel_group" (fn () =>
|
wenzelm@45174
|
413 |
let
|
wenzelm@48292
|
414 |
val _ = if null (cancel_now group) then () else cancel_later group;
|
wenzelm@48292
|
415 |
val _ = signal work_available;
|
wenzelm@48292
|
416 |
val _ = scheduler_check ();
|
wenzelm@48275
|
417 |
in () end);
|
wenzelm@45174
|
418 |
|
wenzelm@45176
|
419 |
fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
|
wenzelm@28191
|
420 |
|
wenzelm@28191
|
421 |
|
wenzelm@44993
|
422 |
(* future jobs *)
|
wenzelm@44993
|
423 |
|
wenzelm@45127
|
424 |
fun assign_result group result raw_res =
|
wenzelm@44993
|
425 |
let
|
wenzelm@45127
|
426 |
val res =
|
wenzelm@45127
|
427 |
(case raw_res of
|
wenzelm@45127
|
428 |
Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
|
wenzelm@45127
|
429 |
| _ => raw_res);
|
wenzelm@44993
|
430 |
val _ = Single_Assignment.assign result res
|
wenzelm@44993
|
431 |
handle exn as Fail _ =>
|
wenzelm@44993
|
432 |
(case Single_Assignment.peek result of
|
wenzelm@44993
|
433 |
SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
|
wenzelm@44993
|
434 |
| _ => reraise exn);
|
wenzelm@44993
|
435 |
val ok =
|
wenzelm@44993
|
436 |
(case the (Single_Assignment.peek result) of
|
wenzelm@44994
|
437 |
Exn.Exn exn =>
|
wenzelm@44994
|
438 |
(SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
|
wenzelm@44993
|
439 |
| Exn.Res _ => true);
|
wenzelm@44993
|
440 |
in ok end;
|
wenzelm@44993
|
441 |
|
wenzelm@44996
|
442 |
fun future_job group interrupts (e: unit -> 'a) =
|
wenzelm@44993
|
443 |
let
|
wenzelm@44993
|
444 |
val result = Single_Assignment.var "future" : 'a result;
|
wenzelm@44993
|
445 |
val pos = Position.thread_data ();
|
wenzelm@44993
|
446 |
fun job ok =
|
wenzelm@44993
|
447 |
let
|
wenzelm@44993
|
448 |
val res =
|
wenzelm@44993
|
449 |
if ok then
|
wenzelm@44993
|
450 |
Exn.capture (fn () =>
|
wenzelm@44996
|
451 |
Multithreading.with_attributes
|
wenzelm@44996
|
452 |
(if interrupts
|
wenzelm@44996
|
453 |
then Multithreading.private_interrupts else Multithreading.no_interrupts)
|
wenzelm@45170
|
454 |
(fn _ => Position.setmp_thread_data pos e ())) ()
|
wenzelm@44993
|
455 |
else Exn.interrupt_exn;
|
wenzelm@44993
|
456 |
in assign_result group result res end;
|
wenzelm@44993
|
457 |
in (result, job) end;
|
wenzelm@44993
|
458 |
|
wenzelm@44993
|
459 |
|
wenzelm@29370
|
460 |
(* fork *)
|
wenzelm@29370
|
461 |
|
wenzelm@45298
|
462 |
type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
|
wenzelm@45298
|
463 |
val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
|
wenzelm@44996
|
464 |
|
wenzelm@45298
|
465 |
fun forks ({name, group, deps, pri, interrupts}: params) es =
|
wenzelm@42545
|
466 |
if null es then []
|
wenzelm@42545
|
467 |
else
|
wenzelm@42545
|
468 |
let
|
wenzelm@42545
|
469 |
val grp =
|
wenzelm@42545
|
470 |
(case group of
|
wenzelm@42545
|
471 |
NONE => worker_subgroup ()
|
wenzelm@42545
|
472 |
| SOME grp => grp);
|
wenzelm@42579
|
473 |
fun enqueue e queue =
|
wenzelm@42545
|
474 |
let
|
wenzelm@44996
|
475 |
val (result, job) = future_job grp interrupts e;
|
wenzelm@42579
|
476 |
val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
|
wenzelm@42563
|
477 |
val future = Future {promised = false, task = task, result = result};
|
wenzelm@42579
|
478 |
in (future, queue') end;
|
wenzelm@42545
|
479 |
in
|
wenzelm@42545
|
480 |
SYNCHRONIZED "enqueue" (fn () =>
|
wenzelm@42545
|
481 |
let
|
wenzelm@42579
|
482 |
val (futures, queue') = fold_map enqueue es (! queue);
|
wenzelm@42579
|
483 |
val _ = queue := queue';
|
wenzelm@42579
|
484 |
val minimal = forall (not o Task_Queue.known_task queue') deps;
|
wenzelm@42545
|
485 |
val _ = if minimal then signal work_available else ();
|
wenzelm@42545
|
486 |
val _ = scheduler_check ();
|
wenzelm@42545
|
487 |
in futures end)
|
wenzelm@42545
|
488 |
end;
|
wenzelm@28162
|
489 |
|
wenzelm@44996
|
490 |
fun fork_pri pri e =
|
wenzelm@45176
|
491 |
(singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
|
wenzelm@44996
|
492 |
|
wenzelm@42543
|
493 |
fun fork e = fork_pri 0 e;
|
wenzelm@28162
|
494 |
|
wenzelm@28186
|
495 |
|
wenzelm@29370
|
496 |
(* join *)
|
wenzelm@28186
|
497 |
|
wenzelm@29551
|
498 |
local
|
wenzelm@29551
|
499 |
|
wenzelm@32119
|
500 |
fun get_result x =
|
wenzelm@32119
|
501 |
(case peek x of
|
wenzelm@38120
|
502 |
NONE => Exn.Exn (Fail "Unfinished future")
|
wenzelm@39509
|
503 |
| SOME res =>
|
wenzelm@39509
|
504 |
if Exn.is_interrupt_exn res then
|
wenzelm@45125
|
505 |
(case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
|
wenzelm@45125
|
506 |
NONE => res
|
wenzelm@45125
|
507 |
| SOME exn => Exn.Exn exn)
|
wenzelm@39509
|
508 |
else res);
|
wenzelm@28177
|
509 |
|
wenzelm@32115
|
510 |
fun join_next deps = (*requires SYNCHRONIZED*)
|
wenzelm@42566
|
511 |
if null deps then NONE
|
wenzelm@32224
|
512 |
else
|
wenzelm@42561
|
513 |
(case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
|
wenzelm@42566
|
514 |
(NONE, []) => NONE
|
wenzelm@42566
|
515 |
| (NONE, deps') =>
|
wenzelm@42566
|
516 |
(worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
|
wenzelm@32224
|
517 |
| (SOME work, deps') => SOME (work, deps'));
|
wenzelm@32115
|
518 |
|
wenzelm@32823
|
519 |
fun execute_work NONE = ()
|
wenzelm@44993
|
520 |
| execute_work (SOME (work, deps')) =
|
wenzelm@44993
|
521 |
(worker_joining (fn () => worker_exec work); join_work deps')
|
wenzelm@32823
|
522 |
and join_work deps =
|
wenzelm@44409
|
523 |
Multithreading.with_attributes Multithreading.no_interrupts
|
wenzelm@44409
|
524 |
(fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
|
wenzelm@32823
|
525 |
|
wenzelm@29551
|
526 |
in
|
wenzelm@29551
|
527 |
|
wenzelm@29370
|
528 |
fun join_results xs =
|
wenzelm@42559
|
529 |
let
|
wenzelm@42559
|
530 |
val _ =
|
wenzelm@42559
|
531 |
if forall is_finished xs then ()
|
wenzelm@42559
|
532 |
else if Multithreading.self_critical () then
|
wenzelm@42559
|
533 |
error "Cannot join future values within critical section"
|
wenzelm@42566
|
534 |
else if is_some (worker_task ()) then join_work (map task_of xs)
|
wenzelm@42561
|
535 |
else List.app (ignore o Single_Assignment.await o result_of) xs;
|
wenzelm@42559
|
536 |
in map get_result xs end;
|
wenzelm@28186
|
537 |
|
wenzelm@29551
|
538 |
end;
|
wenzelm@29551
|
539 |
|
wenzelm@28647
|
540 |
fun join_result x = singleton join_results x;
|
wenzelm@45210
|
541 |
fun joins xs = Par_Exn.release_all (join_results xs);
|
wenzelm@28647
|
542 |
fun join x = Exn.release (join_result x);
|
wenzelm@28156
|
543 |
|
wenzelm@45176
|
544 |
fun join_tasks [] = ()
|
wenzelm@45176
|
545 |
| join_tasks tasks =
|
wenzelm@45176
|
546 |
(singleton o forks)
|
wenzelm@45176
|
547 |
{name = "join_tasks", group = SOME (new_group NONE),
|
wenzelm@45176
|
548 |
deps = tasks, pri = 0, interrupts = false} I
|
wenzelm@45176
|
549 |
|> join;
|
wenzelm@45176
|
550 |
|
wenzelm@28979
|
551 |
|
wenzelm@44993
|
552 |
(* fast-path versions -- bypassing task queue *)
|
wenzelm@34277
|
553 |
|
wenzelm@45169
|
554 |
fun value_result (res: 'a Exn.result) =
|
wenzelm@34277
|
555 |
let
|
wenzelm@46007
|
556 |
val task = Task_Queue.dummy_task;
|
wenzelm@42563
|
557 |
val group = Task_Queue.group_of_task task;
|
wenzelm@35016
|
558 |
val result = Single_Assignment.var "value" : 'a result;
|
wenzelm@45169
|
559 |
val _ = assign_result group result res;
|
wenzelm@42563
|
560 |
in Future {promised = false, task = task, result = result} end;
|
wenzelm@28191
|
561 |
|
wenzelm@45169
|
562 |
fun value x = value_result (Exn.Res x);
|
wenzelm@45169
|
563 |
|
wenzelm@45210
|
564 |
fun cond_forks args es =
|
wenzelm@45210
|
565 |
if Multithreading.enabled () then forks args es
|
wenzelm@45210
|
566 |
else map (fn e => value_result (Exn.interruptible_capture e ())) es;
|
wenzelm@45210
|
567 |
|
wenzelm@29384
|
568 |
fun map_future f x =
|
wenzelm@29370
|
569 |
let
|
wenzelm@29384
|
570 |
val task = task_of x;
|
wenzelm@45175
|
571 |
val group = new_group (SOME (Task_Queue.group_of_task task));
|
wenzelm@44996
|
572 |
val (result, job) = future_job group true (fn () => f (join x));
|
wenzelm@29384
|
573 |
|
wenzelm@32250
|
574 |
val extended = SYNCHRONIZED "extend" (fn () =>
|
wenzelm@29370
|
575 |
(case Task_Queue.extend task job (! queue) of
|
wenzelm@29370
|
576 |
SOME queue' => (queue := queue'; true)
|
wenzelm@29370
|
577 |
| NONE => false));
|
wenzelm@29370
|
578 |
in
|
wenzelm@42563
|
579 |
if extended then Future {promised = false, task = task, result = result}
|
wenzelm@42543
|
580 |
else
|
wenzelm@45210
|
581 |
(singleton o cond_forks)
|
wenzelm@45176
|
582 |
{name = "map_future", group = SOME group, deps = [task],
|
wenzelm@45176
|
583 |
pri = Task_Queue.pri_of_task task, interrupts = true}
|
wenzelm@42543
|
584 |
(fn () => f (join x))
|
wenzelm@29370
|
585 |
end;
|
wenzelm@29370
|
586 |
|
wenzelm@29370
|
587 |
|
wenzelm@34277
|
588 |
(* promised futures -- fulfilled by external means *)
|
wenzelm@34277
|
589 |
|
wenzelm@45173
|
590 |
fun promise_group group abort : 'a future =
|
wenzelm@34277
|
591 |
let
|
wenzelm@35016
|
592 |
val result = Single_Assignment.var "promise" : 'a result;
|
wenzelm@45173
|
593 |
fun assign () = assign_result group result Exn.interrupt_exn
|
wenzelm@39520
|
594 |
handle Fail _ => true
|
wenzelm@39520
|
595 |
| exn =>
|
wenzelm@45173
|
596 |
if Exn.is_interrupt exn
|
wenzelm@45173
|
597 |
then raise Fail "Concurrent attempt to fulfill promise"
|
wenzelm@39520
|
598 |
else reraise exn;
|
wenzelm@45173
|
599 |
fun job () =
|
wenzelm@45173
|
600 |
Multithreading.with_attributes Multithreading.no_interrupts
|
wenzelm@45173
|
601 |
(fn _ => assign () before abort ());
|
wenzelm@38122
|
602 |
val task = SYNCHRONIZED "enqueue_passive" (fn () =>
|
wenzelm@45173
|
603 |
Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
|
wenzelm@42563
|
604 |
in Future {promised = true, task = task, result = result} end;
|
wenzelm@34277
|
605 |
|
wenzelm@45173
|
606 |
fun promise abort = promise_group (worker_subgroup ()) abort;
|
wenzelm@34277
|
607 |
|
wenzelm@42563
|
608 |
fun fulfill_result (Future {promised, task, result}) res =
|
wenzelm@39520
|
609 |
if not promised then raise Fail "Not a promised future"
|
wenzelm@39520
|
610 |
else
|
wenzelm@39520
|
611 |
let
|
wenzelm@42563
|
612 |
val group = Task_Queue.group_of_task task;
|
wenzelm@39520
|
613 |
fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
|
wenzelm@39520
|
614 |
val _ =
|
wenzelm@39520
|
615 |
Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
|
wenzelm@39520
|
616 |
let
|
wenzelm@39520
|
617 |
val still_passive =
|
wenzelm@39520
|
618 |
SYNCHRONIZED "fulfill_result" (fn () =>
|
wenzelm@39520
|
619 |
Unsynchronized.change_result queue
|
wenzelm@39520
|
620 |
(Task_Queue.dequeue_passive (Thread.self ()) task));
|
wenzelm@44993
|
621 |
in if still_passive then worker_exec (task, [job]) else () end);
|
wenzelm@42561
|
622 |
val _ =
|
wenzelm@42566
|
623 |
if is_some (Single_Assignment.peek result) then ()
|
wenzelm@42566
|
624 |
else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
|
wenzelm@39520
|
625 |
in () end;
|
wenzelm@34277
|
626 |
|
wenzelm@44633
|
627 |
fun fulfill x res = fulfill_result x (Exn.Res res);
|
wenzelm@34277
|
628 |
|
wenzelm@34277
|
629 |
|
wenzelm@32228
|
630 |
(* shutdown *)
|
wenzelm@29370
|
631 |
|
wenzelm@28203
|
632 |
fun shutdown () =
|
wenzelm@28276
|
633 |
if Multithreading.available then
|
wenzelm@28276
|
634 |
SYNCHRONIZED "shutdown" (fn () =>
|
wenzelm@32228
|
635 |
while scheduler_active () do
|
wenzelm@34279
|
636 |
(wait scheduler_event; broadcast_work ()))
|
wenzelm@28276
|
637 |
else ();
|
wenzelm@28203
|
638 |
|
wenzelm@29370
|
639 |
|
wenzelm@38492
|
640 |
(* status markup *)
|
wenzelm@37706
|
641 |
|
wenzelm@38492
|
642 |
fun status e =
|
wenzelm@37706
|
643 |
let
|
wenzelm@40705
|
644 |
val task_props =
|
wenzelm@40705
|
645 |
(case worker_task () of
|
wenzelm@40705
|
646 |
NONE => I
|
wenzelm@46537
|
647 |
| SOME task => Markup.properties [(Isabelle_Markup.taskN, Task_Queue.str_of_task task)]);
|
wenzelm@46537
|
648 |
val _ = Output.status (Markup.markup_only (task_props Isabelle_Markup.forked));
|
wenzelm@37706
|
649 |
val x = e (); (*sic -- report "joined" only for success*)
|
wenzelm@46537
|
650 |
val _ = Output.status (Markup.markup_only (task_props Isabelle_Markup.joined));
|
wenzelm@37706
|
651 |
in x end;
|
wenzelm@37706
|
652 |
|
wenzelm@37706
|
653 |
|
wenzelm@48275
|
654 |
(* queue status *)
|
wenzelm@48275
|
655 |
|
wenzelm@48275
|
656 |
fun group_tasks group = Task_Queue.group_tasks (! queue) group;
|
wenzelm@48275
|
657 |
|
wenzelm@48275
|
658 |
fun queue_status () = Task_Queue.status (! queue);
|
wenzelm@48275
|
659 |
|
wenzelm@48275
|
660 |
|
wenzelm@29370
|
661 |
(*final declarations of this structure!*)
|
wenzelm@29370
|
662 |
val map = map_future;
|
wenzelm@29370
|
663 |
|
wenzelm@28156
|
664 |
end;
|
wenzelm@28972
|
665 |
|
wenzelm@28972
|
666 |
type 'a future = 'a Future.future;
|
wenzelm@28972
|
667 |
|