src/Pure/Concurrent/future.ML
author wenzelm
Thu, 24 Jan 2013 17:31:12 +0100
changeset 52014 26a0984191b3
parent 51998 1290afb88f90
child 52416 f4a2fa9286e9
permissions -rw-r--r--
report status more frequently on demand;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes peer and subgroup members to be
    30     interrupted eventually.  Interrupted tasks that lack regular
    31     result information, will pick up parallel exceptions from the
    32     cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val worker_subgroup: unit -> group
    50   type 'a future
    51   val task_of: 'a future -> task
    52   val peek: 'a future -> 'a Exn.result option
    53   val is_finished: 'a future -> bool
    54   val ML_statistics: bool Unsynchronized.ref
    55   val forked_proofs: int Unsynchronized.ref
    56   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    57   val cancel_group: group -> unit
    58   val cancel: 'a future -> unit
    59   val error_msg: Position.T -> (serial * string) * string option -> unit
    60   val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
    61   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    62   val default_params: params
    63   val forks: params -> (unit -> 'a) list -> 'a future list
    64   val fork: (unit -> 'a) -> 'a future
    65   val join_results: 'a future list -> 'a Exn.result list
    66   val join_result: 'a future -> 'a Exn.result
    67   val joins: 'a future list -> 'a list
    68   val join: 'a future -> 'a
    69   val value_result: 'a Exn.result -> 'a future
    70   val value: 'a -> 'a future
    71   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    72   val map: ('a -> 'b) -> 'a future -> 'b future
    73   val promise_group: group -> (unit -> unit) -> 'a future
    74   val promise: (unit -> unit) -> 'a future
    75   val fulfill_result: 'a future -> 'a Exn.result -> unit
    76   val fulfill: 'a future -> 'a -> unit
    77   val terminate: group -> unit
    78   val shutdown: unit -> unit
    79 end;
    80 
    81 structure Future: FUTURE =
    82 struct
    83 
    84 (** future values **)
    85 
    86 type task = Task_Queue.task;
    87 type group = Task_Queue.group;
    88 val new_group = Task_Queue.new_group;
    89 
    90 
    91 (* identifiers *)
    92 
    93 local
    94   val tag = Universal.tag () : task option Universal.tag;
    95 in
    96   fun worker_task () = the_default NONE (Thread.getLocal tag);
    97   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    98 end;
    99 
   100 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
   101 fun worker_subgroup () = new_group (worker_group ());
   102 
   103 fun worker_joining e =
   104   (case worker_task () of
   105     NONE => e ()
   106   | SOME task => Task_Queue.joining task e);
   107 
   108 fun worker_waiting deps e =
   109   (case worker_task () of
   110     NONE => e ()
   111   | SOME task => Task_Queue.waiting task deps e);
   112 
   113 
   114 (* datatype future *)
   115 
   116 type 'a result = 'a Exn.result Single_Assignment.var;
   117 
   118 datatype 'a future = Future of
   119  {promised: bool,
   120   task: task,
   121   result: 'a result};
   122 
   123 fun task_of (Future {task, ...}) = task;
   124 fun result_of (Future {result, ...}) = result;
   125 
   126 fun peek x = Single_Assignment.peek (result_of x);
   127 fun is_finished x = is_some (peek x);
   128 
   129 
   130 
   131 (** scheduling **)
   132 
   133 (* synchronization *)
   134 
   135 val scheduler_event = ConditionVar.conditionVar ();
   136 val work_available = ConditionVar.conditionVar ();
   137 val work_finished = ConditionVar.conditionVar ();
   138 
   139 local
   140   val lock = Mutex.mutex ();
   141 in
   142 
   143 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   144 
   145 fun wait cond = (*requires SYNCHRONIZED*)
   146   Multithreading.sync_wait NONE NONE cond lock;
   147 
   148 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   149   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   150 
   151 fun signal cond = (*requires SYNCHRONIZED*)
   152   ConditionVar.signal cond;
   153 
   154 fun broadcast cond = (*requires SYNCHRONIZED*)
   155   ConditionVar.broadcast cond;
   156 
   157 fun broadcast_work () = (*requires SYNCHRONIZED*)
   158  (ConditionVar.broadcast work_available;
   159   ConditionVar.broadcast work_finished);
   160 
   161 end;
   162 
   163 
   164 (* global state *)
   165 
   166 val queue = Unsynchronized.ref Task_Queue.empty;
   167 val next = Unsynchronized.ref 0;
   168 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   169 val canceled = Unsynchronized.ref ([]: group list);
   170 val do_shutdown = Unsynchronized.ref false;
   171 val max_workers = Unsynchronized.ref 0;
   172 val max_active = Unsynchronized.ref 0;
   173 val worker_trend = Unsynchronized.ref 0;
   174 
   175 val status_ticks = Unsynchronized.ref 0;
   176 val last_round = Unsynchronized.ref Time.zeroTime;
   177 val next_round = seconds 0.05;
   178 
   179 datatype worker_state = Working | Waiting | Sleeping;
   180 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   181 
   182 fun count_workers state = (*requires SYNCHRONIZED*)
   183   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   184 
   185 
   186 
   187 (* status *)
   188 
   189 val ML_statistics = Unsynchronized.ref false;
   190 val forked_proofs = Unsynchronized.ref 0;
   191 
   192 fun report_status () = (*requires SYNCHRONIZED*)
   193   if ! ML_statistics then
   194     let
   195       val {ready, pending, running, passive} = Task_Queue.status (! queue);
   196       val total = length (! workers);
   197       val active = count_workers Working;
   198       val waiting = count_workers Waiting;
   199       val stats =
   200        [("now", signed_string_of_real (Time.toReal (Time.now ()))),
   201         ("tasks_proof", Markup.print_int (! forked_proofs)),
   202         ("tasks_ready", Markup.print_int ready),
   203         ("tasks_pending", Markup.print_int pending),
   204         ("tasks_running", Markup.print_int running),
   205         ("tasks_passive", Markup.print_int passive),
   206         ("workers_total", Markup.print_int total),
   207         ("workers_active", Markup.print_int active),
   208         ("workers_waiting", Markup.print_int waiting)] @
   209         ML_Statistics.get ();
   210     in
   211       Output.protocol_message (Markup.ML_statistics :: stats) ""
   212         handle Fail msg => warning msg
   213     end
   214   else ();
   215 
   216 
   217 (* cancellation primitives *)
   218 
   219 fun cancel_now group = (*requires SYNCHRONIZED*)
   220   let
   221     val running = Task_Queue.cancel (! queue) group;
   222     val _ = running |> List.app (fn thread =>
   223       if Simple_Thread.is_self thread then ()
   224       else Simple_Thread.interrupt_unsynchronized thread);
   225   in running end;
   226 
   227 fun cancel_all () = (*requires SYNCHRONIZED*)
   228   let
   229     val (groups, threads) = Task_Queue.cancel_all (! queue);
   230     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   231   in groups end;
   232 
   233 fun cancel_later group = (*requires SYNCHRONIZED*)
   234  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   235   broadcast scheduler_event);
   236 
   237 fun interruptible_task f x =
   238   (if Multithreading.available then
   239     Multithreading.with_attributes
   240       (if is_some (worker_task ())
   241        then Multithreading.private_interrupts
   242        else Multithreading.public_interrupts)
   243       (fn _ => f x)
   244    else interruptible f x)
   245   before Multithreading.interrupted ();
   246 
   247 
   248 (* worker threads *)
   249 
   250 fun worker_exec (task, jobs) =
   251   let
   252     val group = Task_Queue.group_of_task task;
   253     val valid = not (Task_Queue.is_canceled group);
   254     val ok =
   255       Task_Queue.running task (fn () =>
   256         setmp_worker_task task (fn () =>
   257           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   258     val _ =
   259       if ! Multithreading.trace >= 2 then
   260         Output.protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
   261           handle Fail msg => warning msg
   262       else ();
   263     val _ = SYNCHRONIZED "finish" (fn () =>
   264       let
   265         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   266         val test = Exn.capture Multithreading.interrupted ();
   267         val _ =
   268           if ok andalso not (Exn.is_interrupt_exn test) then ()
   269           else if null (cancel_now group) then ()
   270           else cancel_later group;
   271         val _ = broadcast work_finished;
   272         val _ = if maximal then () else signal work_available;
   273       in () end);
   274   in () end;
   275 
   276 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   277   let
   278     val state =
   279       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   280         SOME state => state
   281       | NONE => raise Fail "Unregistered worker thread");
   282     val _ = state := (if active then Waiting else Sleeping);
   283     val _ = wait cond;
   284     val _ = state := Working;
   285   in () end;
   286 
   287 fun worker_next () = (*requires SYNCHRONIZED*)
   288   if length (! workers) > ! max_workers then
   289     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   290      signal work_available;
   291      NONE)
   292   else if count_workers Working > ! max_active then
   293     (worker_wait false work_available; worker_next ())
   294   else
   295     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   296       NONE => (worker_wait false work_available; worker_next ())
   297     | some => (signal work_available; some));
   298 
   299 fun worker_loop name =
   300   (case SYNCHRONIZED name (fn () => worker_next ()) of
   301     NONE => ()
   302   | SOME work => (worker_exec work; worker_loop name));
   303 
   304 fun worker_start name = (*requires SYNCHRONIZED*)
   305   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   306     Unsynchronized.ref Working));
   307 
   308 
   309 (* scheduler *)
   310 
   311 fun scheduler_next () = (*requires SYNCHRONIZED*)
   312   let
   313     val now = Time.now ();
   314     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   315     val _ = if tick then last_round := now else ();
   316 
   317 
   318     (* runtime status *)
   319 
   320     val _ =
   321       if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
   322     val _ =
   323       if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
   324       then report_status () else ();
   325 
   326     val _ =
   327       if forall (Thread.isActive o #1) (! workers) then ()
   328       else
   329         let
   330           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   331           val _ = workers := alive;
   332         in
   333           Multithreading.tracing 0 (fn () =>
   334             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   335         end;
   336 
   337 
   338     (* worker pool adjustments *)
   339 
   340     val max_active0 = ! max_active;
   341     val max_workers0 = ! max_workers;
   342 
   343     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   344     val _ = max_active := m;
   345 
   346     val mm =
   347       if ! do_shutdown then 0
   348       else if m = 9999 then 1
   349       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   350     val _ =
   351       if tick andalso mm > ! max_workers then
   352         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   353       else if tick andalso mm < ! max_workers then
   354         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   355       else ();
   356     val _ =
   357       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   358         max_workers := mm
   359       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   360         max_workers := Int.min (mm, 2 * m)
   361       else ();
   362 
   363     val missing = ! max_workers - length (! workers);
   364     val _ =
   365       if missing > 0 then
   366         funpow missing (fn () =>
   367           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   368       else ();
   369 
   370     val _ =
   371       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   372       else signal work_available;
   373 
   374 
   375     (* canceled groups *)
   376 
   377     val _ =
   378       if null (! canceled) then ()
   379       else
   380        (Multithreading.tracing 1 (fn () =>
   381           string_of_int (length (! canceled)) ^ " canceled groups");
   382         Unsynchronized.change canceled (filter_out (null o cancel_now));
   383         broadcast_work ());
   384 
   385 
   386     (* delay loop *)
   387 
   388     val _ = Exn.release (wait_timeout next_round scheduler_event);
   389 
   390 
   391     (* shutdown *)
   392 
   393     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   394     val continue = not (! do_shutdown andalso null (! workers));
   395     val _ = if continue then () else (report_status (); scheduler := NONE);
   396 
   397     val _ = broadcast scheduler_event;
   398   in continue end
   399   handle exn =>
   400     if Exn.is_interrupt exn then
   401      (Multithreading.tracing 1 (fn () => "Interrupt");
   402       List.app cancel_later (cancel_all ());
   403       broadcast_work (); true)
   404     else reraise exn;
   405 
   406 fun scheduler_loop () =
   407  (while
   408     Multithreading.with_attributes
   409       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   410       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   411   do (); last_round := Time.zeroTime);
   412 
   413 fun scheduler_active () = (*requires SYNCHRONIZED*)
   414   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   415 
   416 fun scheduler_check () = (*requires SYNCHRONIZED*)
   417  (do_shutdown := false;
   418   if scheduler_active () then ()
   419   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   420 
   421 
   422 
   423 (** futures **)
   424 
   425 (* cancel *)
   426 
   427 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   428   let
   429     val _ = if null (cancel_now group) then () else cancel_later group;
   430     val _ = signal work_available;
   431     val _ = scheduler_check ();
   432   in () end;
   433 
   434 fun cancel_group group =
   435   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   436 
   437 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   438 
   439 
   440 (* results *)
   441 
   442 fun error_msg pos ((serial, msg), exec_id) =
   443   Position.setmp_thread_data pos (fn () =>
   444     let val id = Position.get_id pos in
   445       if is_none id orelse is_none exec_id orelse id = exec_id
   446       then Output.error_msg' (serial, msg) else ()
   447     end) ();
   448 
   449 fun identify_result pos res =
   450   (case res of
   451     Exn.Exn exn =>
   452       let val exec_id =
   453         (case Position.get_id pos of
   454           NONE => []
   455         | SOME id => [(Markup.exec_idN, id)])
   456       in Exn.Exn (Par_Exn.identify exec_id exn) end
   457   | _ => res);
   458 
   459 fun assign_result group result res =
   460   let
   461     val _ = Single_Assignment.assign result res
   462       handle exn as Fail _ =>
   463         (case Single_Assignment.peek result of
   464           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   465         | _ => reraise exn);
   466     val ok =
   467       (case the (Single_Assignment.peek result) of
   468         Exn.Exn exn =>
   469           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   470       | Exn.Res _ => true);
   471   in ok end;
   472 
   473 
   474 (* future jobs *)
   475 
   476 fun future_job group interrupts (e: unit -> 'a) =
   477   let
   478     val result = Single_Assignment.var "future" : 'a result;
   479     val pos = Position.thread_data ();
   480     fun job ok =
   481       let
   482         val res =
   483           if ok then
   484             Exn.capture (fn () =>
   485               Multithreading.with_attributes
   486                 (if interrupts
   487                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   488                 (fn _ => Position.setmp_thread_data pos e ())) ()
   489           else Exn.interrupt_exn;
   490       in assign_result group result (identify_result pos res) end;
   491   in (result, job) end;
   492 
   493 
   494 (* fork *)
   495 
   496 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   497 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   498 
   499 fun forks ({name, group, deps, pri, interrupts}: params) es =
   500   if null es then []
   501   else
   502     let
   503       val grp =
   504         (case group of
   505           NONE => worker_subgroup ()
   506         | SOME grp => grp);
   507       fun enqueue e queue =
   508         let
   509           val (result, job) = future_job grp interrupts e;
   510           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   511           val future = Future {promised = false, task = task, result = result};
   512         in (future, queue') end;
   513     in
   514       SYNCHRONIZED "enqueue" (fn () =>
   515         let
   516           val (futures, queue') = fold_map enqueue es (! queue);
   517           val _ = queue := queue';
   518           val minimal = forall (not o Task_Queue.known_task queue') deps;
   519           val _ = if minimal then signal work_available else ();
   520           val _ = scheduler_check ();
   521         in futures end)
   522     end;
   523 
   524 fun fork e =
   525   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
   526 
   527 
   528 (* join *)
   529 
   530 fun get_result x =
   531   (case peek x of
   532     NONE => Exn.Exn (Fail "Unfinished future")
   533   | SOME res =>
   534       if Exn.is_interrupt_exn res then
   535         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   536           NONE => res
   537         | SOME exn => Exn.Exn exn)
   538       else res);
   539 
   540 local
   541 
   542 fun join_next deps = (*requires SYNCHRONIZED*)
   543   if null deps then NONE
   544   else
   545     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   546       (NONE, []) => NONE
   547     | (NONE, deps') =>
   548         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   549     | (SOME work, deps') => SOME (work, deps'));
   550 
   551 fun execute_work NONE = ()
   552   | execute_work (SOME (work, deps')) =
   553       (worker_joining (fn () => worker_exec work); join_work deps')
   554 and join_work deps =
   555   Multithreading.with_attributes Multithreading.no_interrupts
   556     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   557 
   558 in
   559 
   560 fun join_results xs =
   561   let
   562     val _ =
   563       if forall is_finished xs then ()
   564       else if Multithreading.self_critical () then
   565         error "Cannot join future values within critical section"
   566       else if is_some (worker_task ()) then join_work (map task_of xs)
   567       else List.app (ignore o Single_Assignment.await o result_of) xs;
   568   in map get_result xs end;
   569 
   570 end;
   571 
   572 fun join_result x = singleton join_results x;
   573 fun joins xs = Par_Exn.release_all (join_results xs);
   574 fun join x = Exn.release (join_result x);
   575 
   576 
   577 (* fast-path versions -- bypassing task queue *)
   578 
   579 fun value_result (res: 'a Exn.result) =
   580   let
   581     val task = Task_Queue.dummy_task;
   582     val group = Task_Queue.group_of_task task;
   583     val result = Single_Assignment.var "value" : 'a result;
   584     val _ = assign_result group result (identify_result (Position.thread_data ()) res);
   585   in Future {promised = false, task = task, result = result} end;
   586 
   587 fun value x = value_result (Exn.Res x);
   588 
   589 fun cond_forks args es =
   590   if Multithreading.enabled () then forks args es
   591   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   592 
   593 fun map_future f x =
   594   if is_finished x then value (f (join x))
   595   else
   596     let
   597       val task = task_of x;
   598       val group = Task_Queue.group_of_task task;
   599       val (result, job) = future_job group true (fn () => f (join x));
   600 
   601       val extended = SYNCHRONIZED "extend" (fn () =>
   602         (case Task_Queue.extend task job (! queue) of
   603           SOME queue' => (queue := queue'; true)
   604         | NONE => false));
   605     in
   606       if extended then Future {promised = false, task = task, result = result}
   607       else
   608         (singleton o cond_forks)
   609           {name = "map_future", group = SOME group, deps = [task],
   610             pri = Task_Queue.pri_of_task task, interrupts = true}
   611           (fn () => f (join x))
   612     end;
   613 
   614 
   615 (* promised futures -- fulfilled by external means *)
   616 
   617 fun promise_group group abort : 'a future =
   618   let
   619     val result = Single_Assignment.var "promise" : 'a result;
   620     fun assign () = assign_result group result Exn.interrupt_exn
   621       handle Fail _ => true
   622         | exn =>
   623             if Exn.is_interrupt exn
   624             then raise Fail "Concurrent attempt to fulfill promise"
   625             else reraise exn;
   626     fun job () =
   627       Multithreading.with_attributes Multithreading.no_interrupts
   628         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   629     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   630       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   631   in Future {promised = true, task = task, result = result} end;
   632 
   633 fun promise abort = promise_group (worker_subgroup ()) abort;
   634 
   635 fun fulfill_result (Future {promised, task, result}) res =
   636   if not promised then raise Fail "Not a promised future"
   637   else
   638     let
   639       val group = Task_Queue.group_of_task task;
   640       val pos = Position.thread_data ();
   641       fun job ok =
   642         assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
   643       val _ =
   644         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   645           let
   646             val passive_job =
   647               SYNCHRONIZED "fulfill_result" (fn () =>
   648                 Unsynchronized.change_result queue
   649                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   650           in
   651             (case passive_job of
   652               SOME true => worker_exec (task, [job])
   653             | SOME false => ()
   654             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   655           end);
   656       val _ =
   657         if is_some (Single_Assignment.peek result) then ()
   658         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   659     in () end;
   660 
   661 fun fulfill x res = fulfill_result x (Exn.Res res);
   662 
   663 
   664 (* terminate *)
   665 
   666 fun terminate group =
   667   let
   668     val tasks =
   669       SYNCHRONIZED "terminate" (fn () =>
   670         let val _ = cancel_group_unsynchronized group;
   671         in Task_Queue.group_tasks (! queue) group end);
   672   in
   673     if null tasks then ()
   674     else
   675       (singleton o forks)
   676         {name = "terminate", group = SOME (new_group NONE),
   677           deps = tasks, pri = 0, interrupts = false} I
   678       |> join
   679   end;
   680 
   681 
   682 (* shutdown *)
   683 
   684 fun shutdown () =
   685   if Multithreading.available then
   686     SYNCHRONIZED "shutdown" (fn () =>
   687      while scheduler_active () do
   688       (wait scheduler_event; broadcast_work ()))
   689   else ();
   690 
   691 
   692 (*final declarations of this structure!*)
   693 val map = map_future;
   694 
   695 end;
   696 
   697 type 'a future = 'a Future.future;
   698