src/Pure/Concurrent/future.ML
author wenzelm
Thu, 11 Jul 2013 22:53:56 +0200
changeset 53740 a44e9a1d5d8b
parent 53695 271663ddf289
child 53912 e0169f13bd37
permissions -rw-r--r--
tuned signature;
unified exceptions for this module;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes peer and subgroup members to be
    30     interrupted eventually.  Interrupted tasks that lack regular
    31     result information, will pick up parallel exceptions from the
    32     cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val the_worker_group: unit -> group
    50   val worker_subgroup: unit -> group
    51   val worker_guest: string -> group -> ('a -> 'b) -> 'a -> 'b
    52   type 'a future
    53   val task_of: 'a future -> task
    54   val peek: 'a future -> 'a Exn.result option
    55   val is_finished: 'a future -> bool
    56   val ML_statistics: bool Unsynchronized.ref
    57   val forked_proofs: int Unsynchronized.ref
    58   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    59   val cancel_group: group -> unit
    60   val cancel: 'a future -> unit
    61   val error_msg: Position.T -> (serial * string) * string option -> unit
    62   val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
    63   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    64   val default_params: params
    65   val forks: params -> (unit -> 'a) list -> 'a future list
    66   val fork: (unit -> 'a) -> 'a future
    67   val join_results: 'a future list -> 'a Exn.result list
    68   val join_result: 'a future -> 'a Exn.result
    69   val joins: 'a future list -> 'a list
    70   val join: 'a future -> 'a
    71   val value_result: 'a Exn.result -> 'a future
    72   val value: 'a -> 'a future
    73   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    74   val map: ('a -> 'b) -> 'a future -> 'b future
    75   val promise_group: group -> (unit -> unit) -> 'a future
    76   val promise: (unit -> unit) -> 'a future
    77   val fulfill_result: 'a future -> 'a Exn.result -> unit
    78   val fulfill: 'a future -> 'a -> unit
    79   val terminate: group -> unit
    80   val shutdown: unit -> unit
    81 end;
    82 
    83 structure Future: FUTURE =
    84 struct
    85 
    86 (** future values **)
    87 
    88 type task = Task_Queue.task;
    89 type group = Task_Queue.group;
    90 val new_group = Task_Queue.new_group;
    91 
    92 
    93 (* identifiers *)
    94 
    95 local
    96   val tag = Universal.tag () : task option Universal.tag;
    97 in
    98   fun worker_task () = the_default NONE (Thread.getLocal tag);
    99   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
   100 end;
   101 
   102 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
   103 
   104 fun the_worker_group () =
   105   (case worker_group () of
   106     SOME group => group
   107   | NONE => raise Fail "Missing worker thread context");
   108 
   109 fun worker_subgroup () = new_group (worker_group ());
   110 
   111 fun worker_guest name group f x =
   112   if is_some (worker_task ()) then
   113     raise Fail "Already running as worker thread"
   114   else setmp_worker_task (Task_Queue.new_task group name NONE) f x;
   115 
   116 fun worker_joining e =
   117   (case worker_task () of
   118     NONE => e ()
   119   | SOME task => Task_Queue.joining task e);
   120 
   121 fun worker_waiting deps e =
   122   (case worker_task () of
   123     NONE => e ()
   124   | SOME task => Task_Queue.waiting task deps e);
   125 
   126 
   127 (* datatype future *)
   128 
   129 type 'a result = 'a Exn.result Single_Assignment.var;
   130 
   131 datatype 'a future = Future of
   132  {promised: bool,
   133   task: task,
   134   result: 'a result};
   135 
   136 fun task_of (Future {task, ...}) = task;
   137 fun result_of (Future {result, ...}) = result;
   138 
   139 fun peek x = Single_Assignment.peek (result_of x);
   140 fun is_finished x = is_some (peek x);
   141 
   142 
   143 
   144 (** scheduling **)
   145 
   146 (* synchronization *)
   147 
   148 val scheduler_event = ConditionVar.conditionVar ();
   149 val work_available = ConditionVar.conditionVar ();
   150 val work_finished = ConditionVar.conditionVar ();
   151 
   152 local
   153   val lock = Mutex.mutex ();
   154 in
   155 
   156 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   157 
   158 fun wait cond = (*requires SYNCHRONIZED*)
   159   Multithreading.sync_wait NONE NONE cond lock;
   160 
   161 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   162   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   163 
   164 fun signal cond = (*requires SYNCHRONIZED*)
   165   ConditionVar.signal cond;
   166 
   167 fun broadcast cond = (*requires SYNCHRONIZED*)
   168   ConditionVar.broadcast cond;
   169 
   170 end;
   171 
   172 
   173 (* global state *)
   174 
   175 val queue = Unsynchronized.ref Task_Queue.empty;
   176 val next = Unsynchronized.ref 0;
   177 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   178 val canceled = Unsynchronized.ref ([]: group list);
   179 val do_shutdown = Unsynchronized.ref false;
   180 val max_workers = Unsynchronized.ref 0;
   181 val max_active = Unsynchronized.ref 0;
   182 val worker_trend = Unsynchronized.ref 0;
   183 
   184 val status_ticks = Unsynchronized.ref 0;
   185 val last_round = Unsynchronized.ref Time.zeroTime;
   186 val next_round = seconds 0.05;
   187 
   188 datatype worker_state = Working | Waiting | Sleeping;
   189 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   190 
   191 fun count_workers state = (*requires SYNCHRONIZED*)
   192   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   193 
   194 
   195 
   196 (* status *)
   197 
   198 val ML_statistics = Unsynchronized.ref false;
   199 val forked_proofs = Unsynchronized.ref 0;
   200 
   201 fun report_status () = (*requires SYNCHRONIZED*)
   202   if ! ML_statistics then
   203     let
   204       val {ready, pending, running, passive} = Task_Queue.status (! queue);
   205       val total = length (! workers);
   206       val active = count_workers Working;
   207       val waiting = count_workers Waiting;
   208       val stats =
   209        [("now", Markup.print_real (Time.toReal (Time.now ()))),
   210         ("tasks_proof", Markup.print_int (! forked_proofs)),
   211         ("tasks_ready", Markup.print_int ready),
   212         ("tasks_pending", Markup.print_int pending),
   213         ("tasks_running", Markup.print_int running),
   214         ("tasks_passive", Markup.print_int passive),
   215         ("workers_total", Markup.print_int total),
   216         ("workers_active", Markup.print_int active),
   217         ("workers_waiting", Markup.print_int waiting)] @
   218         ML_Statistics.get ();
   219     in Output.try_protocol_message (Markup.ML_statistics :: stats) "" end
   220   else ();
   221 
   222 
   223 (* cancellation primitives *)
   224 
   225 fun cancel_now group = (*requires SYNCHRONIZED*)
   226   let
   227     val running = Task_Queue.cancel (! queue) group;
   228     val _ = running |> List.app (fn thread =>
   229       if Simple_Thread.is_self thread then ()
   230       else Simple_Thread.interrupt_unsynchronized thread);
   231   in running end;
   232 
   233 fun cancel_all () = (*requires SYNCHRONIZED*)
   234   let
   235     val (groups, threads) = Task_Queue.cancel_all (! queue);
   236     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   237   in groups end;
   238 
   239 fun cancel_later group = (*requires SYNCHRONIZED*)
   240  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   241   broadcast scheduler_event);
   242 
   243 fun interruptible_task f x =
   244   (if Multithreading.available then
   245     Multithreading.with_attributes
   246       (if is_some (worker_task ())
   247        then Multithreading.private_interrupts
   248        else Multithreading.public_interrupts)
   249       (fn _ => f x)
   250    else interruptible f x)
   251   before Multithreading.interrupted ();
   252 
   253 
   254 (* worker threads *)
   255 
   256 fun worker_exec (task, jobs) =
   257   let
   258     val group = Task_Queue.group_of_task task;
   259     val valid = not (Task_Queue.is_canceled group);
   260     val ok =
   261       Task_Queue.running task (fn () =>
   262         setmp_worker_task task (fn () =>
   263           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   264     val _ =
   265       if ! Multithreading.trace >= 2 then
   266         Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
   267       else ();
   268     val _ = SYNCHRONIZED "finish" (fn () =>
   269       let
   270         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   271         val test = Exn.capture Multithreading.interrupted ();
   272         val _ =
   273           if ok andalso not (Exn.is_interrupt_exn test) then ()
   274           else if null (cancel_now group) then ()
   275           else cancel_later group;
   276         val _ = broadcast work_finished;
   277         val _ = if maximal then () else signal work_available;
   278       in () end);
   279   in () end;
   280 
   281 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   282   (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   283     SOME state =>
   284      (state := (if active then Waiting else Sleeping);
   285       wait cond;
   286       state := Working)
   287   | NONE => ignore (wait cond));
   288 
   289 fun worker_next () = (*requires SYNCHRONIZED*)
   290   if length (! workers) > ! max_workers then
   291     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   292      signal work_available;
   293      NONE)
   294   else if count_workers Working > ! max_active then
   295     (worker_wait false work_available; worker_next ())
   296   else
   297     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   298       NONE => (worker_wait false work_available; worker_next ())
   299     | some => (signal work_available; some));
   300 
   301 fun worker_loop name =
   302   (case SYNCHRONIZED name (fn () => worker_next ()) of
   303     NONE => ()
   304   | SOME work => (worker_exec work; worker_loop name));
   305 
   306 fun worker_start name = (*requires SYNCHRONIZED*)
   307   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   308     Unsynchronized.ref Working));
   309 
   310 
   311 (* scheduler *)
   312 
   313 fun scheduler_next () = (*requires SYNCHRONIZED*)
   314   let
   315     val now = Time.now ();
   316     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   317     val _ = if tick then last_round := now else ();
   318 
   319 
   320     (* runtime status *)
   321 
   322     val _ =
   323       if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
   324     val _ =
   325       if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
   326       then report_status () else ();
   327 
   328     val _ =
   329       if forall (Thread.isActive o #1) (! workers) then ()
   330       else
   331         let
   332           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   333           val _ = workers := alive;
   334         in
   335           Multithreading.tracing 0 (fn () =>
   336             "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   337         end;
   338 
   339 
   340     (* worker pool adjustments *)
   341 
   342     val max_active0 = ! max_active;
   343     val max_workers0 = ! max_workers;
   344 
   345     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   346     val _ = max_active := m;
   347 
   348     val mm =
   349       if ! do_shutdown then 0
   350       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   351     val _ =
   352       if tick andalso mm > ! max_workers then
   353         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   354       else if tick andalso mm < ! max_workers then
   355         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   356       else ();
   357     val _ =
   358       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   359         max_workers := mm
   360       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   361         max_workers := Int.min (mm, 2 * m)
   362       else ();
   363 
   364     val missing = ! max_workers - length (! workers);
   365     val _ =
   366       if missing > 0 then
   367         funpow missing (fn () =>
   368           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   369       else ();
   370 
   371     val _ =
   372       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   373       else signal work_available;
   374 
   375 
   376     (* canceled groups *)
   377 
   378     val _ =
   379       if null (! canceled) then ()
   380       else
   381        (Multithreading.tracing 1 (fn () =>
   382           string_of_int (length (! canceled)) ^ " canceled groups");
   383         Unsynchronized.change canceled (filter_out (null o cancel_now));
   384         signal work_available);
   385 
   386 
   387     (* delay loop *)
   388 
   389     val _ = Exn.release (wait_timeout next_round scheduler_event);
   390 
   391 
   392     (* shutdown *)
   393 
   394     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   395     val continue = not (! do_shutdown andalso null (! workers));
   396     val _ = if continue then () else (report_status (); scheduler := NONE);
   397 
   398     val _ = broadcast scheduler_event;
   399   in continue end
   400   handle exn =>
   401     if Exn.is_interrupt exn then
   402      (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
   403       List.app cancel_later (cancel_all ());
   404       signal work_available; true)
   405     else reraise exn;
   406 
   407 fun scheduler_loop () =
   408  (while
   409     Multithreading.with_attributes
   410       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   411       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   412   do (); last_round := Time.zeroTime);
   413 
   414 fun scheduler_active () = (*requires SYNCHRONIZED*)
   415   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   416 
   417 fun scheduler_check () = (*requires SYNCHRONIZED*)
   418  (do_shutdown := false;
   419   if scheduler_active () then ()
   420   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   421 
   422 
   423 
   424 (** futures **)
   425 
   426 (* cancel *)
   427 
   428 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   429   let
   430     val _ = if null (cancel_now group) then () else cancel_later group;
   431     val _ = signal work_available;
   432     val _ = scheduler_check ();
   433   in () end;
   434 
   435 fun cancel_group group =
   436   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   437 
   438 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   439 
   440 
   441 (* results *)
   442 
   443 fun error_msg pos ((serial, msg), exec_id) =
   444   Position.setmp_thread_data pos (fn () =>
   445     let val id = Position.get_id pos in
   446       if is_none id orelse is_none exec_id orelse id = exec_id
   447       then Output.error_msg' (serial, msg) else ()
   448     end) ();
   449 
   450 fun identify_result pos res =
   451   (case res of
   452     Exn.Exn exn =>
   453       let val exec_id =
   454         (case Position.get_id pos of
   455           NONE => []
   456         | SOME id => [(Markup.exec_idN, id)])
   457       in Exn.Exn (Par_Exn.identify exec_id exn) end
   458   | _ => res);
   459 
   460 fun assign_result group result res =
   461   let
   462     val _ = Single_Assignment.assign result res
   463       handle exn as Fail _ =>
   464         (case Single_Assignment.peek result of
   465           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   466         | _ => reraise exn);
   467     val ok =
   468       (case the (Single_Assignment.peek result) of
   469         Exn.Exn exn =>
   470           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   471       | Exn.Res _ => true);
   472   in ok end;
   473 
   474 
   475 (* future jobs *)
   476 
   477 fun future_job group interrupts (e: unit -> 'a) =
   478   let
   479     val result = Single_Assignment.var "future" : 'a result;
   480     val pos = Position.thread_data ();
   481     fun job ok =
   482       let
   483         val res =
   484           if ok then
   485             Exn.capture (fn () =>
   486               Multithreading.with_attributes
   487                 (if interrupts
   488                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   489                 (fn _ => Position.setmp_thread_data pos e ())) ()
   490           else Exn.interrupt_exn;
   491       in assign_result group result (identify_result pos res) end;
   492   in (result, job) end;
   493 
   494 
   495 (* fork *)
   496 
   497 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   498 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   499 
   500 fun forks ({name, group, deps, pri, interrupts}: params) es =
   501   if null es then []
   502   else
   503     let
   504       val grp =
   505         (case group of
   506           NONE => worker_subgroup ()
   507         | SOME grp => grp);
   508       fun enqueue e queue =
   509         let
   510           val (result, job) = future_job grp interrupts e;
   511           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   512           val future = Future {promised = false, task = task, result = result};
   513         in (future, queue') end;
   514     in
   515       SYNCHRONIZED "enqueue" (fn () =>
   516         let
   517           val (futures, queue') = fold_map enqueue es (! queue);
   518           val _ = queue := queue';
   519           val minimal = forall (not o Task_Queue.known_task queue') deps;
   520           val _ = if minimal then signal work_available else ();
   521           val _ = scheduler_check ();
   522         in futures end)
   523     end;
   524 
   525 fun fork e =
   526   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
   527 
   528 
   529 (* join *)
   530 
   531 fun get_result x =
   532   (case peek x of
   533     NONE => Exn.Exn (Fail "Unfinished future")
   534   | SOME res =>
   535       if Exn.is_interrupt_exn res then
   536         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   537           NONE => res
   538         | SOME exn => Exn.Exn exn)
   539       else res);
   540 
   541 local
   542 
   543 fun join_next deps = (*requires SYNCHRONIZED*)
   544   if null deps then NONE
   545   else
   546     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   547       (NONE, []) => NONE
   548     | (NONE, deps') =>
   549         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   550     | (SOME work, deps') => SOME (work, deps'));
   551 
   552 fun execute_work NONE = ()
   553   | execute_work (SOME (work, deps')) =
   554       (worker_joining (fn () => worker_exec work); join_work deps')
   555 and join_work deps =
   556   Multithreading.with_attributes Multithreading.no_interrupts
   557     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   558 
   559 in
   560 
   561 fun join_results xs =
   562   let
   563     val _ =
   564       if forall is_finished xs then ()
   565       else if Multithreading.self_critical () then
   566         raise Fail "Cannot join future values within critical section"
   567       else if is_some (worker_task ()) then join_work (map task_of xs)
   568       else List.app (ignore o Single_Assignment.await o result_of) xs;
   569   in map get_result xs end;
   570 
   571 end;
   572 
   573 fun join_result x = singleton join_results x;
   574 fun joins xs = Par_Exn.release_all (join_results xs);
   575 fun join x = Exn.release (join_result x);
   576 
   577 
   578 (* fast-path operations -- bypass task queue if possible *)
   579 
   580 fun value_result (res: 'a Exn.result) =
   581   let
   582     val task = Task_Queue.dummy_task;
   583     val group = Task_Queue.group_of_task task;
   584     val result = Single_Assignment.var "value" : 'a result;
   585     val _ = assign_result group result (identify_result (Position.thread_data ()) res);
   586   in Future {promised = false, task = task, result = result} end;
   587 
   588 fun value x = value_result (Exn.Res x);
   589 
   590 fun cond_forks args es =
   591   if Multithreading.enabled () then forks args es
   592   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   593 
   594 fun map_future f x =
   595   if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
   596   else
   597     let
   598       val task = task_of x;
   599       val group = Task_Queue.group_of_task task;
   600       val (result, job) = future_job group true (fn () => f (join x));
   601 
   602       val extended = SYNCHRONIZED "extend" (fn () =>
   603         (case Task_Queue.extend task job (! queue) of
   604           SOME queue' => (queue := queue'; true)
   605         | NONE => false));
   606     in
   607       if extended then Future {promised = false, task = task, result = result}
   608       else
   609         (singleton o cond_forks)
   610           {name = "map_future", group = SOME group, deps = [task],
   611             pri = Task_Queue.pri_of_task task, interrupts = true}
   612           (fn () => f (join x))
   613     end;
   614 
   615 
   616 (* promised futures -- fulfilled by external means *)
   617 
   618 fun promise_group group abort : 'a future =
   619   let
   620     val result = Single_Assignment.var "promise" : 'a result;
   621     fun assign () = assign_result group result Exn.interrupt_exn
   622       handle Fail _ => true
   623         | exn =>
   624             if Exn.is_interrupt exn
   625             then raise Fail "Concurrent attempt to fulfill promise"
   626             else reraise exn;
   627     fun job () =
   628       Multithreading.with_attributes Multithreading.no_interrupts
   629         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   630     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   631       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   632   in Future {promised = true, task = task, result = result} end;
   633 
   634 fun promise abort = promise_group (worker_subgroup ()) abort;
   635 
   636 fun fulfill_result (Future {promised, task, result}) res =
   637   if not promised then raise Fail "Not a promised future"
   638   else
   639     let
   640       val group = Task_Queue.group_of_task task;
   641       val pos = Position.thread_data ();
   642       fun job ok =
   643         assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
   644       val _ =
   645         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   646           let
   647             val passive_job =
   648               SYNCHRONIZED "fulfill_result" (fn () =>
   649                 Unsynchronized.change_result queue
   650                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   651           in
   652             (case passive_job of
   653               SOME true => worker_exec (task, [job])
   654             | SOME false => ()
   655             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   656           end);
   657       val _ =
   658         if is_some (Single_Assignment.peek result) then ()
   659         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   660     in () end;
   661 
   662 fun fulfill x res = fulfill_result x (Exn.Res res);
   663 
   664 
   665 (* terminate *)
   666 
   667 fun terminate group =
   668   let
   669     val tasks =
   670       SYNCHRONIZED "terminate" (fn () =>
   671         let val _ = cancel_group_unsynchronized group;
   672         in Task_Queue.group_tasks (! queue) group end);
   673   in
   674     if null tasks then ()
   675     else
   676       (singleton o forks)
   677         {name = "terminate", group = SOME (new_group NONE),
   678           deps = tasks, pri = 0, interrupts = false} I
   679       |> join
   680   end;
   681 
   682 
   683 (* shutdown *)
   684 
   685 fun shutdown () =
   686   if not Multithreading.available then ()
   687   else if is_some (worker_task ()) then
   688     raise Fail "Cannot shutdown while running as worker thread"
   689   else
   690     SYNCHRONIZED "shutdown" (fn () =>
   691       while scheduler_active () do
   692        (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
   693         wait scheduler_event));
   694 
   695 
   696 (*final declarations of this structure!*)
   697 val map = map_future;
   698 
   699 end;
   700 
   701 type 'a future = 'a Future.future;
   702