src/Pure/Concurrent/future.ML
author wenzelm
Tue, 05 Mar 2013 11:37:01 +0100
changeset 52491 45579fbe5a24
parent 52462 bcd6b1aa4db5
child 52774 ff2d241dcde1
permissions -rw-r--r--
removed unused Future.flat, while leaving its influence of Future.map (see bcd6b1aa4db5);
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes peer and subgroup members to be
    30     interrupted eventually.  Interrupted tasks that lack regular
    31     result information, will pick up parallel exceptions from the
    32     cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val worker_subgroup: unit -> group
    50   type 'a future
    51   val task_of: 'a future -> task
    52   val peek: 'a future -> 'a Exn.result option
    53   val is_finished: 'a future -> bool
    54   val ML_statistics: bool Unsynchronized.ref
    55   val forked_proofs: int Unsynchronized.ref
    56   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    57   val cancel_group: group -> unit
    58   val cancel: 'a future -> unit
    59   val error_msg: Position.T -> (serial * string) * string option -> unit
    60   val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
    61   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    62   val default_params: params
    63   val forks: params -> (unit -> 'a) list -> 'a future list
    64   val fork: (unit -> 'a) -> 'a future
    65   val join_results: 'a future list -> 'a Exn.result list
    66   val join_result: 'a future -> 'a Exn.result
    67   val joins: 'a future list -> 'a list
    68   val join: 'a future -> 'a
    69   val value_result: 'a Exn.result -> 'a future
    70   val value: 'a -> 'a future
    71   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    72   val map: ('a -> 'b) -> 'a future -> 'b future
    73   val promise_group: group -> (unit -> unit) -> 'a future
    74   val promise: (unit -> unit) -> 'a future
    75   val fulfill_result: 'a future -> 'a Exn.result -> unit
    76   val fulfill: 'a future -> 'a -> unit
    77   val terminate: group -> unit
    78   val shutdown: unit -> unit
    79 end;
    80 
    81 structure Future: FUTURE =
    82 struct
    83 
    84 (** future values **)
    85 
    86 type task = Task_Queue.task;
    87 type group = Task_Queue.group;
    88 val new_group = Task_Queue.new_group;
    89 
    90 
    91 (* identifiers *)
    92 
    93 local
    94   val tag = Universal.tag () : task option Universal.tag;
    95 in
    96   fun worker_task () = the_default NONE (Thread.getLocal tag);
    97   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    98 end;
    99 
   100 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
   101 fun worker_subgroup () = new_group (worker_group ());
   102 
   103 fun worker_joining e =
   104   (case worker_task () of
   105     NONE => e ()
   106   | SOME task => Task_Queue.joining task e);
   107 
   108 fun worker_waiting deps e =
   109   (case worker_task () of
   110     NONE => e ()
   111   | SOME task => Task_Queue.waiting task deps e);
   112 
   113 
   114 (* datatype future *)
   115 
   116 type 'a result = 'a Exn.result Single_Assignment.var;
   117 
   118 datatype 'a future = Future of
   119  {promised: bool,
   120   task: task,
   121   result: 'a result};
   122 
   123 fun task_of (Future {task, ...}) = task;
   124 fun result_of (Future {result, ...}) = result;
   125 
   126 fun peek x = Single_Assignment.peek (result_of x);
   127 fun is_finished x = is_some (peek x);
   128 
   129 
   130 
   131 (** scheduling **)
   132 
   133 (* synchronization *)
   134 
   135 val scheduler_event = ConditionVar.conditionVar ();
   136 val work_available = ConditionVar.conditionVar ();
   137 val work_finished = ConditionVar.conditionVar ();
   138 
   139 local
   140   val lock = Mutex.mutex ();
   141 in
   142 
   143 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   144 
   145 fun wait cond = (*requires SYNCHRONIZED*)
   146   Multithreading.sync_wait NONE NONE cond lock;
   147 
   148 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   149   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   150 
   151 fun signal cond = (*requires SYNCHRONIZED*)
   152   ConditionVar.signal cond;
   153 
   154 fun broadcast cond = (*requires SYNCHRONIZED*)
   155   ConditionVar.broadcast cond;
   156 
   157 end;
   158 
   159 
   160 (* global state *)
   161 
   162 val queue = Unsynchronized.ref Task_Queue.empty;
   163 val next = Unsynchronized.ref 0;
   164 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   165 val canceled = Unsynchronized.ref ([]: group list);
   166 val do_shutdown = Unsynchronized.ref false;
   167 val max_workers = Unsynchronized.ref 0;
   168 val max_active = Unsynchronized.ref 0;
   169 val worker_trend = Unsynchronized.ref 0;
   170 
   171 val status_ticks = Unsynchronized.ref 0;
   172 val last_round = Unsynchronized.ref Time.zeroTime;
   173 val next_round = seconds 0.05;
   174 
   175 datatype worker_state = Working | Waiting | Sleeping;
   176 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   177 
   178 fun count_workers state = (*requires SYNCHRONIZED*)
   179   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   180 
   181 
   182 
   183 (* status *)
   184 
   185 val ML_statistics = Unsynchronized.ref false;
   186 val forked_proofs = Unsynchronized.ref 0;
   187 
   188 fun report_status () = (*requires SYNCHRONIZED*)
   189   if ! ML_statistics then
   190     let
   191       val {ready, pending, running, passive} = Task_Queue.status (! queue);
   192       val total = length (! workers);
   193       val active = count_workers Working;
   194       val waiting = count_workers Waiting;
   195       val stats =
   196        [("now", signed_string_of_real (Time.toReal (Time.now ()))),
   197         ("tasks_proof", Markup.print_int (! forked_proofs)),
   198         ("tasks_ready", Markup.print_int ready),
   199         ("tasks_pending", Markup.print_int pending),
   200         ("tasks_running", Markup.print_int running),
   201         ("tasks_passive", Markup.print_int passive),
   202         ("workers_total", Markup.print_int total),
   203         ("workers_active", Markup.print_int active),
   204         ("workers_waiting", Markup.print_int waiting)] @
   205         ML_Statistics.get ();
   206     in
   207       Output.protocol_message (Markup.ML_statistics :: stats) ""
   208         handle Fail msg => warning msg
   209     end
   210   else ();
   211 
   212 
   213 (* cancellation primitives *)
   214 
   215 fun cancel_now group = (*requires SYNCHRONIZED*)
   216   let
   217     val running = Task_Queue.cancel (! queue) group;
   218     val _ = running |> List.app (fn thread =>
   219       if Simple_Thread.is_self thread then ()
   220       else Simple_Thread.interrupt_unsynchronized thread);
   221   in running end;
   222 
   223 fun cancel_all () = (*requires SYNCHRONIZED*)
   224   let
   225     val (groups, threads) = Task_Queue.cancel_all (! queue);
   226     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   227   in groups end;
   228 
   229 fun cancel_later group = (*requires SYNCHRONIZED*)
   230  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   231   broadcast scheduler_event);
   232 
   233 fun interruptible_task f x =
   234   (if Multithreading.available then
   235     Multithreading.with_attributes
   236       (if is_some (worker_task ())
   237        then Multithreading.private_interrupts
   238        else Multithreading.public_interrupts)
   239       (fn _ => f x)
   240    else interruptible f x)
   241   before Multithreading.interrupted ();
   242 
   243 
   244 (* worker threads *)
   245 
   246 fun worker_exec (task, jobs) =
   247   let
   248     val group = Task_Queue.group_of_task task;
   249     val valid = not (Task_Queue.is_canceled group);
   250     val ok =
   251       Task_Queue.running task (fn () =>
   252         setmp_worker_task task (fn () =>
   253           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   254     val _ =
   255       if ! Multithreading.trace >= 2 then
   256         Output.protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
   257           handle Fail msg => warning msg
   258       else ();
   259     val _ = SYNCHRONIZED "finish" (fn () =>
   260       let
   261         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   262         val test = Exn.capture Multithreading.interrupted ();
   263         val _ =
   264           if ok andalso not (Exn.is_interrupt_exn test) then ()
   265           else if null (cancel_now group) then ()
   266           else cancel_later group;
   267         val _ = broadcast work_finished;
   268         val _ = if maximal then () else signal work_available;
   269       in () end);
   270   in () end;
   271 
   272 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   273   let
   274     val state =
   275       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   276         SOME state => state
   277       | NONE => raise Fail "Unregistered worker thread");
   278     val _ = state := (if active then Waiting else Sleeping);
   279     val _ = wait cond;
   280     val _ = state := Working;
   281   in () end;
   282 
   283 fun worker_next () = (*requires SYNCHRONIZED*)
   284   if length (! workers) > ! max_workers then
   285     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   286      signal work_available;
   287      NONE)
   288   else if count_workers Working > ! max_active then
   289     (worker_wait false work_available; worker_next ())
   290   else
   291     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   292       NONE => (worker_wait false work_available; worker_next ())
   293     | some => (signal work_available; some));
   294 
   295 fun worker_loop name =
   296   (case SYNCHRONIZED name (fn () => worker_next ()) of
   297     NONE => ()
   298   | SOME work => (worker_exec work; worker_loop name));
   299 
   300 fun worker_start name = (*requires SYNCHRONIZED*)
   301   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   302     Unsynchronized.ref Working));
   303 
   304 
   305 (* scheduler *)
   306 
   307 fun scheduler_next () = (*requires SYNCHRONIZED*)
   308   let
   309     val now = Time.now ();
   310     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   311     val _ = if tick then last_round := now else ();
   312 
   313 
   314     (* runtime status *)
   315 
   316     val _ =
   317       if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
   318     val _ =
   319       if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
   320       then report_status () else ();
   321 
   322     val _ =
   323       if forall (Thread.isActive o #1) (! workers) then ()
   324       else
   325         let
   326           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   327           val _ = workers := alive;
   328         in
   329           Multithreading.tracing 0 (fn () =>
   330             "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   331         end;
   332 
   333 
   334     (* worker pool adjustments *)
   335 
   336     val max_active0 = ! max_active;
   337     val max_workers0 = ! max_workers;
   338 
   339     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   340     val _ = max_active := m;
   341 
   342     val mm =
   343       if ! do_shutdown then 0
   344       else if m = 9999 then 1
   345       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   346     val _ =
   347       if tick andalso mm > ! max_workers then
   348         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   349       else if tick andalso mm < ! max_workers then
   350         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   351       else ();
   352     val _ =
   353       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   354         max_workers := mm
   355       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   356         max_workers := Int.min (mm, 2 * m)
   357       else ();
   358 
   359     val missing = ! max_workers - length (! workers);
   360     val _ =
   361       if missing > 0 then
   362         funpow missing (fn () =>
   363           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   364       else ();
   365 
   366     val _ =
   367       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   368       else signal work_available;
   369 
   370 
   371     (* canceled groups *)
   372 
   373     val _ =
   374       if null (! canceled) then ()
   375       else
   376        (Multithreading.tracing 1 (fn () =>
   377           string_of_int (length (! canceled)) ^ " canceled groups");
   378         Unsynchronized.change canceled (filter_out (null o cancel_now));
   379         signal work_available);
   380 
   381 
   382     (* delay loop *)
   383 
   384     val _ = Exn.release (wait_timeout next_round scheduler_event);
   385 
   386 
   387     (* shutdown *)
   388 
   389     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   390     val continue = not (! do_shutdown andalso null (! workers));
   391     val _ = if continue then () else (report_status (); scheduler := NONE);
   392 
   393     val _ = broadcast scheduler_event;
   394   in continue end
   395   handle exn =>
   396     if Exn.is_interrupt exn then
   397      (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
   398       List.app cancel_later (cancel_all ());
   399       signal work_available; true)
   400     else reraise exn;
   401 
   402 fun scheduler_loop () =
   403  (while
   404     Multithreading.with_attributes
   405       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   406       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   407   do (); last_round := Time.zeroTime);
   408 
   409 fun scheduler_active () = (*requires SYNCHRONIZED*)
   410   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   411 
   412 fun scheduler_check () = (*requires SYNCHRONIZED*)
   413  (do_shutdown := false;
   414   if scheduler_active () then ()
   415   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   416 
   417 
   418 
   419 (** futures **)
   420 
   421 (* cancel *)
   422 
   423 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   424   let
   425     val _ = if null (cancel_now group) then () else cancel_later group;
   426     val _ = signal work_available;
   427     val _ = scheduler_check ();
   428   in () end;
   429 
   430 fun cancel_group group =
   431   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   432 
   433 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   434 
   435 
   436 (* results *)
   437 
   438 fun error_msg pos ((serial, msg), exec_id) =
   439   Position.setmp_thread_data pos (fn () =>
   440     let val id = Position.get_id pos in
   441       if is_none id orelse is_none exec_id orelse id = exec_id
   442       then Output.error_msg' (serial, msg) else ()
   443     end) ();
   444 
   445 fun identify_result pos res =
   446   (case res of
   447     Exn.Exn exn =>
   448       let val exec_id =
   449         (case Position.get_id pos of
   450           NONE => []
   451         | SOME id => [(Markup.exec_idN, id)])
   452       in Exn.Exn (Par_Exn.identify exec_id exn) end
   453   | _ => res);
   454 
   455 fun assign_result group result res =
   456   let
   457     val _ = Single_Assignment.assign result res
   458       handle exn as Fail _ =>
   459         (case Single_Assignment.peek result of
   460           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   461         | _ => reraise exn);
   462     val ok =
   463       (case the (Single_Assignment.peek result) of
   464         Exn.Exn exn =>
   465           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   466       | Exn.Res _ => true);
   467   in ok end;
   468 
   469 
   470 (* future jobs *)
   471 
   472 fun future_job group interrupts (e: unit -> 'a) =
   473   let
   474     val result = Single_Assignment.var "future" : 'a result;
   475     val pos = Position.thread_data ();
   476     fun job ok =
   477       let
   478         val res =
   479           if ok then
   480             Exn.capture (fn () =>
   481               Multithreading.with_attributes
   482                 (if interrupts
   483                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   484                 (fn _ => Position.setmp_thread_data pos e ())) ()
   485           else Exn.interrupt_exn;
   486       in assign_result group result (identify_result pos res) end;
   487   in (result, job) end;
   488 
   489 
   490 (* fork *)
   491 
   492 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   493 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   494 
   495 fun forks ({name, group, deps, pri, interrupts}: params) es =
   496   if null es then []
   497   else
   498     let
   499       val grp =
   500         (case group of
   501           NONE => worker_subgroup ()
   502         | SOME grp => grp);
   503       fun enqueue e queue =
   504         let
   505           val (result, job) = future_job grp interrupts e;
   506           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   507           val future = Future {promised = false, task = task, result = result};
   508         in (future, queue') end;
   509     in
   510       SYNCHRONIZED "enqueue" (fn () =>
   511         let
   512           val (futures, queue') = fold_map enqueue es (! queue);
   513           val _ = queue := queue';
   514           val minimal = forall (not o Task_Queue.known_task queue') deps;
   515           val _ = if minimal then signal work_available else ();
   516           val _ = scheduler_check ();
   517         in futures end)
   518     end;
   519 
   520 fun fork e =
   521   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
   522 
   523 
   524 (* join *)
   525 
   526 fun get_result x =
   527   (case peek x of
   528     NONE => Exn.Exn (Fail "Unfinished future")
   529   | SOME res =>
   530       if Exn.is_interrupt_exn res then
   531         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   532           NONE => res
   533         | SOME exn => Exn.Exn exn)
   534       else res);
   535 
   536 local
   537 
   538 fun join_next deps = (*requires SYNCHRONIZED*)
   539   if null deps then NONE
   540   else
   541     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   542       (NONE, []) => NONE
   543     | (NONE, deps') =>
   544         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   545     | (SOME work, deps') => SOME (work, deps'));
   546 
   547 fun execute_work NONE = ()
   548   | execute_work (SOME (work, deps')) =
   549       (worker_joining (fn () => worker_exec work); join_work deps')
   550 and join_work deps =
   551   Multithreading.with_attributes Multithreading.no_interrupts
   552     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   553 
   554 in
   555 
   556 fun join_results xs =
   557   let
   558     val _ =
   559       if forall is_finished xs then ()
   560       else if Multithreading.self_critical () then
   561         error "Cannot join future values within critical section"
   562       else if is_some (worker_task ()) then join_work (map task_of xs)
   563       else List.app (ignore o Single_Assignment.await o result_of) xs;
   564   in map get_result xs end;
   565 
   566 end;
   567 
   568 fun join_result x = singleton join_results x;
   569 fun joins xs = Par_Exn.release_all (join_results xs);
   570 fun join x = Exn.release (join_result x);
   571 
   572 
   573 (* fast-path operations -- bypass task queue if possible *)
   574 
   575 fun value_result (res: 'a Exn.result) =
   576   let
   577     val task = Task_Queue.dummy_task;
   578     val group = Task_Queue.group_of_task task;
   579     val result = Single_Assignment.var "value" : 'a result;
   580     val _ = assign_result group result (identify_result (Position.thread_data ()) res);
   581   in Future {promised = false, task = task, result = result} end;
   582 
   583 fun value x = value_result (Exn.Res x);
   584 
   585 fun cond_forks args es =
   586   if Multithreading.enabled () then forks args es
   587   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   588 
   589 fun map_future f x =
   590   if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
   591   else
   592     let
   593       val task = task_of x;
   594       val group = Task_Queue.group_of_task task;
   595       val (result, job) = future_job group true (fn () => f (join x));
   596 
   597       val extended = SYNCHRONIZED "extend" (fn () =>
   598         (case Task_Queue.extend task job (! queue) of
   599           SOME queue' => (queue := queue'; true)
   600         | NONE => false));
   601     in
   602       if extended then Future {promised = false, task = task, result = result}
   603       else
   604         (singleton o cond_forks)
   605           {name = "map_future", group = SOME group, deps = [task],
   606             pri = Task_Queue.pri_of_task task, interrupts = true}
   607           (fn () => f (join x))
   608     end;
   609 
   610 
   611 (* promised futures -- fulfilled by external means *)
   612 
   613 fun promise_group group abort : 'a future =
   614   let
   615     val result = Single_Assignment.var "promise" : 'a result;
   616     fun assign () = assign_result group result Exn.interrupt_exn
   617       handle Fail _ => true
   618         | exn =>
   619             if Exn.is_interrupt exn
   620             then raise Fail "Concurrent attempt to fulfill promise"
   621             else reraise exn;
   622     fun job () =
   623       Multithreading.with_attributes Multithreading.no_interrupts
   624         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   625     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   626       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   627   in Future {promised = true, task = task, result = result} end;
   628 
   629 fun promise abort = promise_group (worker_subgroup ()) abort;
   630 
   631 fun fulfill_result (Future {promised, task, result}) res =
   632   if not promised then raise Fail "Not a promised future"
   633   else
   634     let
   635       val group = Task_Queue.group_of_task task;
   636       val pos = Position.thread_data ();
   637       fun job ok =
   638         assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
   639       val _ =
   640         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   641           let
   642             val passive_job =
   643               SYNCHRONIZED "fulfill_result" (fn () =>
   644                 Unsynchronized.change_result queue
   645                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   646           in
   647             (case passive_job of
   648               SOME true => worker_exec (task, [job])
   649             | SOME false => ()
   650             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   651           end);
   652       val _ =
   653         if is_some (Single_Assignment.peek result) then ()
   654         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   655     in () end;
   656 
   657 fun fulfill x res = fulfill_result x (Exn.Res res);
   658 
   659 
   660 (* terminate *)
   661 
   662 fun terminate group =
   663   let
   664     val tasks =
   665       SYNCHRONIZED "terminate" (fn () =>
   666         let val _ = cancel_group_unsynchronized group;
   667         in Task_Queue.group_tasks (! queue) group end);
   668   in
   669     if null tasks then ()
   670     else
   671       (singleton o forks)
   672         {name = "terminate", group = SOME (new_group NONE),
   673           deps = tasks, pri = 0, interrupts = false} I
   674       |> join
   675   end;
   676 
   677 
   678 (* shutdown *)
   679 
   680 fun shutdown () =
   681   if not Multithreading.available then ()
   682   else if is_some (worker_task ()) then
   683     raise Fail "Cannot shutdown while running as worker thread"
   684   else
   685     SYNCHRONIZED "shutdown" (fn () =>
   686       while scheduler_active () do
   687        (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
   688         wait scheduler_event));
   689 
   690 
   691 (*final declarations of this structure!*)
   692 val map = map_future;
   693 
   694 end;
   695 
   696 type 'a future = 'a Future.future;
   697