37 val worker_subgroup: unit -> Task_Queue.group |
37 val worker_subgroup: unit -> Task_Queue.group |
38 type 'a future |
38 type 'a future |
39 val task_of: 'a future -> Task_Queue.task |
39 val task_of: 'a future -> Task_Queue.task |
40 val peek: 'a future -> 'a Exn.result option |
40 val peek: 'a future -> 'a Exn.result option |
41 val is_finished: 'a future -> bool |
41 val is_finished: 'a future -> bool |
|
42 val cancel_group: Task_Queue.group -> unit |
|
43 val cancel: 'a future -> unit |
42 val forks: |
44 val forks: |
43 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} -> |
45 {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} -> |
44 (unit -> 'a) list -> 'a future list |
46 (unit -> 'a) list -> 'a future list |
45 val fork_pri: int -> (unit -> 'a) -> 'a future |
47 val fork_pri: int -> (unit -> 'a) -> 'a future |
46 val fork: (unit -> 'a) -> 'a future |
48 val fork: (unit -> 'a) -> 'a future |
55 val promise_group: Task_Queue.group -> 'a future |
57 val promise_group: Task_Queue.group -> 'a future |
56 val promise: unit -> 'a future |
58 val promise: unit -> 'a future |
57 val fulfill_result: 'a future -> 'a Exn.result -> unit |
59 val fulfill_result: 'a future -> 'a Exn.result -> unit |
58 val fulfill: 'a future -> 'a -> unit |
60 val fulfill: 'a future -> 'a -> unit |
59 val interruptible_task: ('a -> 'b) -> 'a -> 'b |
61 val interruptible_task: ('a -> 'b) -> 'a -> 'b |
60 val cancel_group: Task_Queue.group -> unit |
|
61 val cancel: 'a future -> unit |
|
62 val shutdown: unit -> unit |
62 val shutdown: unit -> unit |
63 val status: (unit -> 'a) -> 'a |
63 val status: (unit -> 'a) -> 'a |
64 end; |
64 end; |
65 |
65 |
66 structure Future: FUTURE = |
66 structure Future: FUTURE = |
72 |
72 |
73 local |
73 local |
74 val tag = Universal.tag () : Task_Queue.task option Universal.tag; |
74 val tag = Universal.tag () : Task_Queue.task option Universal.tag; |
75 in |
75 in |
76 fun worker_task () = the_default NONE (Thread.getLocal tag); |
76 fun worker_task () = the_default NONE (Thread.getLocal tag); |
77 fun setmp_worker_task data f x = |
77 fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x; |
78 Library.setmp_thread_data tag (worker_task ()) (SOME data) f x; |
|
79 end; |
78 end; |
80 |
79 |
81 val worker_group = Option.map Task_Queue.group_of_task o worker_task; |
80 val worker_group = Option.map Task_Queue.group_of_task o worker_task; |
82 fun worker_subgroup () = Task_Queue.new_group (worker_group ()); |
81 fun worker_subgroup () = Task_Queue.new_group (worker_group ()); |
83 |
82 |
104 fun task_of (Future {task, ...}) = task; |
103 fun task_of (Future {task, ...}) = task; |
105 fun result_of (Future {result, ...}) = result; |
104 fun result_of (Future {result, ...}) = result; |
106 |
105 |
107 fun peek x = Single_Assignment.peek (result_of x); |
106 fun peek x = Single_Assignment.peek (result_of x); |
108 fun is_finished x = is_some (peek x); |
107 fun is_finished x = is_some (peek x); |
109 |
|
110 fun assign_result group result res = |
|
111 let |
|
112 val _ = Single_Assignment.assign result res |
|
113 handle exn as Fail _ => |
|
114 (case Single_Assignment.peek result of |
|
115 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) |
|
116 | _ => reraise exn); |
|
117 val ok = |
|
118 (case the (Single_Assignment.peek result) of |
|
119 Exn.Exn exn => (Task_Queue.cancel_group group exn; false) |
|
120 | Exn.Res _ => true); |
|
121 in ok end; |
|
122 |
108 |
123 |
109 |
124 |
110 |
125 (** scheduling **) |
111 (** scheduling **) |
126 |
112 |
171 |
157 |
172 fun count_workers state = (*requires SYNCHRONIZED*) |
158 fun count_workers state = (*requires SYNCHRONIZED*) |
173 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; |
159 fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0; |
174 |
160 |
175 |
161 |
176 (* execute future jobs *) |
162 (* cancellation primitives *) |
177 |
163 |
178 fun future_job group (e: unit -> 'a) = |
164 fun interruptible_task f x = |
179 let |
165 if Multithreading.available then |
180 val result = Single_Assignment.var "future" : 'a result; |
166 Multithreading.with_attributes |
181 val pos = Position.thread_data (); |
167 (if is_some (worker_task ()) |
182 fun job ok = |
168 then Multithreading.private_interrupts |
183 let |
169 else Multithreading.public_interrupts) |
184 val res = |
170 (fn _ => f x) |
185 if ok then |
171 else interruptible f x; |
186 Exn.capture (fn () => |
|
187 Multithreading.with_attributes Multithreading.private_interrupts |
|
188 (fn _ => Position.setmp_thread_data pos e ()) before |
|
189 Multithreading.interrupted ()) () |
|
190 else Exn.interrupt_exn; |
|
191 in assign_result group result res end; |
|
192 in (result, job) end; |
|
193 |
172 |
194 fun cancel_now group = (*requires SYNCHRONIZED*) |
173 fun cancel_now group = (*requires SYNCHRONIZED*) |
195 Task_Queue.cancel (! queue) group; |
174 Task_Queue.cancel (! queue) group; |
196 |
175 |
197 fun cancel_later group = (*requires SYNCHRONIZED*) |
176 fun cancel_later group = (*requires SYNCHRONIZED*) |
198 (Unsynchronized.change canceled (insert Task_Queue.eq_group group); |
177 (Unsynchronized.change canceled (insert Task_Queue.eq_group group); |
199 broadcast scheduler_event); |
178 broadcast scheduler_event); |
200 |
179 |
201 fun execute (task, jobs) = |
180 |
|
181 (* worker threads *) |
|
182 |
|
183 fun worker_exec (task, jobs) = |
202 let |
184 let |
203 val group = Task_Queue.group_of_task task; |
185 val group = Task_Queue.group_of_task task; |
204 val valid = not (Task_Queue.is_canceled group); |
186 val valid = not (Task_Queue.is_canceled group); |
205 val ok = |
187 val ok = |
206 Task_Queue.running task (fn () => |
188 Task_Queue.running task (fn () => |
251 | some => (signal work_available; some)); |
230 | some => (signal work_available; some)); |
252 |
231 |
253 fun worker_loop name = |
232 fun worker_loop name = |
254 (case SYNCHRONIZED name (fn () => worker_next ()) of |
233 (case SYNCHRONIZED name (fn () => worker_next ()) of |
255 NONE => () |
234 NONE => () |
256 | SOME work => (Exn.capture Multithreading.interrupted (); execute work; worker_loop name)); |
235 | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name)); |
257 |
236 |
258 fun worker_start name = (*requires SYNCHRONIZED*) |
237 fun worker_start name = (*requires SYNCHRONIZED*) |
259 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), |
238 Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name), |
260 Unsynchronized.ref Working)); |
239 Unsynchronized.ref Working)); |
261 |
240 |
394 else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); |
373 else scheduler := SOME (Simple_Thread.fork false scheduler_loop)); |
395 |
374 |
396 |
375 |
397 |
376 |
398 (** futures **) |
377 (** futures **) |
|
378 |
|
379 (* cancellation *) |
|
380 |
|
381 (*cancel: present and future group members will be interrupted eventually*) |
|
382 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => |
|
383 (if cancel_now group then () else cancel_later group; |
|
384 signal work_available; scheduler_check ())); |
|
385 |
|
386 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); |
|
387 |
|
388 |
|
389 (* future jobs *) |
|
390 |
|
391 fun assign_result group result res = |
|
392 let |
|
393 val _ = Single_Assignment.assign result res |
|
394 handle exn as Fail _ => |
|
395 (case Single_Assignment.peek result of |
|
396 SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn) |
|
397 | _ => reraise exn); |
|
398 val ok = |
|
399 (case the (Single_Assignment.peek result) of |
|
400 Exn.Exn exn => (Task_Queue.cancel_group group exn; false) |
|
401 | Exn.Res _ => true); |
|
402 in ok end; |
|
403 |
|
404 fun future_job group (e: unit -> 'a) = |
|
405 let |
|
406 val result = Single_Assignment.var "future" : 'a result; |
|
407 val pos = Position.thread_data (); |
|
408 fun job ok = |
|
409 let |
|
410 val res = |
|
411 if ok then |
|
412 Exn.capture (fn () => |
|
413 Multithreading.with_attributes Multithreading.private_interrupts |
|
414 (fn _ => Position.setmp_thread_data pos e ()) before |
|
415 Multithreading.interrupted ()) () |
|
416 else Exn.interrupt_exn; |
|
417 in assign_result group result res end; |
|
418 in (result, job) end; |
|
419 |
399 |
420 |
400 (* fork *) |
421 (* fork *) |
401 |
422 |
402 fun forks {name, group, deps, pri} es = |
423 fun forks {name, group, deps, pri} es = |
403 if null es then [] |
424 if null es then [] |
450 | (NONE, deps') => |
471 | (NONE, deps') => |
451 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') |
472 (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps') |
452 | (SOME work, deps') => SOME (work, deps')); |
473 | (SOME work, deps') => SOME (work, deps')); |
453 |
474 |
454 fun execute_work NONE = () |
475 fun execute_work NONE = () |
455 | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps') |
476 | execute_work (SOME (work, deps')) = |
|
477 (worker_joining (fn () => worker_exec work); join_work deps') |
456 and join_work deps = |
478 and join_work deps = |
457 Multithreading.with_attributes Multithreading.no_interrupts |
479 Multithreading.with_attributes Multithreading.no_interrupts |
458 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps))); |
480 (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps))); |
459 |
481 |
460 in |
482 in |
473 |
495 |
474 fun join_result x = singleton join_results x; |
496 fun join_result x = singleton join_results x; |
475 fun join x = Exn.release (join_result x); |
497 fun join x = Exn.release (join_result x); |
476 |
498 |
477 |
499 |
478 (* fast-path versions -- bypassing full task management *) |
500 (* fast-path versions -- bypassing task queue *) |
479 |
501 |
480 fun value (x: 'a) = |
502 fun value (x: 'a) = |
481 let |
503 let |
482 val task = Task_Queue.dummy_task (); |
504 val task = Task_Queue.dummy_task (); |
483 val group = Task_Queue.group_of_task task; |
505 val group = Task_Queue.group_of_task task; |
536 let |
558 let |
537 val still_passive = |
559 val still_passive = |
538 SYNCHRONIZED "fulfill_result" (fn () => |
560 SYNCHRONIZED "fulfill_result" (fn () => |
539 Unsynchronized.change_result queue |
561 Unsynchronized.change_result queue |
540 (Task_Queue.dequeue_passive (Thread.self ()) task)); |
562 (Task_Queue.dequeue_passive (Thread.self ()) task)); |
541 in if still_passive then execute (task, [job]) else () end); |
563 in if still_passive then worker_exec (task, [job]) else () end); |
542 val _ = |
564 val _ = |
543 if is_some (Single_Assignment.peek result) then () |
565 if is_some (Single_Assignment.peek result) then () |
544 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); |
566 else worker_waiting [task] (fn () => ignore (Single_Assignment.await result)); |
545 in () end; |
567 in () end; |
546 |
568 |
547 fun fulfill x res = fulfill_result x (Exn.Res res); |
569 fun fulfill x res = fulfill_result x (Exn.Res res); |
548 |
|
549 |
|
550 (* cancellation *) |
|
551 |
|
552 fun interruptible_task f x = |
|
553 if Multithreading.available then |
|
554 Multithreading.with_attributes |
|
555 (if is_some (worker_task ()) |
|
556 then Multithreading.private_interrupts |
|
557 else Multithreading.public_interrupts) |
|
558 (fn _ => f x) |
|
559 else interruptible f x; |
|
560 |
|
561 (*cancel: present and future group members will be interrupted eventually*) |
|
562 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => |
|
563 (if cancel_now group then () else cancel_later group; |
|
564 signal work_available; scheduler_check ())); |
|
565 |
|
566 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x)); |
|
567 |
570 |
568 |
571 |
569 (* shutdown *) |
572 (* shutdown *) |
570 |
573 |
571 fun shutdown () = |
574 fun shutdown () = |