src/Pure/Concurrent/future.ML
author wenzelm
Mon, 08 Apr 2013 15:44:09 +0200
changeset 52774 ff2d241dcde1
parent 52491 45579fbe5a24
child 52798 92e58b76dbb1
permissions -rw-r--r--
discontinued odd magic number, which was once used for performance measurements;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes peer and subgroup members to be
    30     interrupted eventually.  Interrupted tasks that lack regular
    31     result information, will pick up parallel exceptions from the
    32     cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val worker_subgroup: unit -> group
    50   type 'a future
    51   val task_of: 'a future -> task
    52   val peek: 'a future -> 'a Exn.result option
    53   val is_finished: 'a future -> bool
    54   val ML_statistics: bool Unsynchronized.ref
    55   val forked_proofs: int Unsynchronized.ref
    56   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    57   val cancel_group: group -> unit
    58   val cancel: 'a future -> unit
    59   val error_msg: Position.T -> (serial * string) * string option -> unit
    60   val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
    61   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    62   val default_params: params
    63   val forks: params -> (unit -> 'a) list -> 'a future list
    64   val fork: (unit -> 'a) -> 'a future
    65   val join_results: 'a future list -> 'a Exn.result list
    66   val join_result: 'a future -> 'a Exn.result
    67   val joins: 'a future list -> 'a list
    68   val join: 'a future -> 'a
    69   val value_result: 'a Exn.result -> 'a future
    70   val value: 'a -> 'a future
    71   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    72   val map: ('a -> 'b) -> 'a future -> 'b future
    73   val promise_group: group -> (unit -> unit) -> 'a future
    74   val promise: (unit -> unit) -> 'a future
    75   val fulfill_result: 'a future -> 'a Exn.result -> unit
    76   val fulfill: 'a future -> 'a -> unit
    77   val terminate: group -> unit
    78   val shutdown: unit -> unit
    79 end;
    80 
    81 structure Future: FUTURE =
    82 struct
    83 
    84 (** future values **)
    85 
    86 type task = Task_Queue.task;
    87 type group = Task_Queue.group;
    88 val new_group = Task_Queue.new_group;
    89 
    90 
    91 (* identifiers *)
    92 
    93 local
    94   val tag = Universal.tag () : task option Universal.tag;
    95 in
    96   fun worker_task () = the_default NONE (Thread.getLocal tag);
    97   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    98 end;
    99 
   100 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
   101 fun worker_subgroup () = new_group (worker_group ());
   102 
   103 fun worker_joining e =
   104   (case worker_task () of
   105     NONE => e ()
   106   | SOME task => Task_Queue.joining task e);
   107 
   108 fun worker_waiting deps e =
   109   (case worker_task () of
   110     NONE => e ()
   111   | SOME task => Task_Queue.waiting task deps e);
   112 
   113 
   114 (* datatype future *)
   115 
   116 type 'a result = 'a Exn.result Single_Assignment.var;
   117 
   118 datatype 'a future = Future of
   119  {promised: bool,
   120   task: task,
   121   result: 'a result};
   122 
   123 fun task_of (Future {task, ...}) = task;
   124 fun result_of (Future {result, ...}) = result;
   125 
   126 fun peek x = Single_Assignment.peek (result_of x);
   127 fun is_finished x = is_some (peek x);
   128 
   129 
   130 
   131 (** scheduling **)
   132 
   133 (* synchronization *)
   134 
   135 val scheduler_event = ConditionVar.conditionVar ();
   136 val work_available = ConditionVar.conditionVar ();
   137 val work_finished = ConditionVar.conditionVar ();
   138 
   139 local
   140   val lock = Mutex.mutex ();
   141 in
   142 
   143 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   144 
   145 fun wait cond = (*requires SYNCHRONIZED*)
   146   Multithreading.sync_wait NONE NONE cond lock;
   147 
   148 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   149   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   150 
   151 fun signal cond = (*requires SYNCHRONIZED*)
   152   ConditionVar.signal cond;
   153 
   154 fun broadcast cond = (*requires SYNCHRONIZED*)
   155   ConditionVar.broadcast cond;
   156 
   157 end;
   158 
   159 
   160 (* global state *)
   161 
   162 val queue = Unsynchronized.ref Task_Queue.empty;
   163 val next = Unsynchronized.ref 0;
   164 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   165 val canceled = Unsynchronized.ref ([]: group list);
   166 val do_shutdown = Unsynchronized.ref false;
   167 val max_workers = Unsynchronized.ref 0;
   168 val max_active = Unsynchronized.ref 0;
   169 val worker_trend = Unsynchronized.ref 0;
   170 
   171 val status_ticks = Unsynchronized.ref 0;
   172 val last_round = Unsynchronized.ref Time.zeroTime;
   173 val next_round = seconds 0.05;
   174 
   175 datatype worker_state = Working | Waiting | Sleeping;
   176 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   177 
   178 fun count_workers state = (*requires SYNCHRONIZED*)
   179   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   180 
   181 
   182 
   183 (* status *)
   184 
   185 val ML_statistics = Unsynchronized.ref false;
   186 val forked_proofs = Unsynchronized.ref 0;
   187 
   188 fun report_status () = (*requires SYNCHRONIZED*)
   189   if ! ML_statistics then
   190     let
   191       val {ready, pending, running, passive} = Task_Queue.status (! queue);
   192       val total = length (! workers);
   193       val active = count_workers Working;
   194       val waiting = count_workers Waiting;
   195       val stats =
   196        [("now", signed_string_of_real (Time.toReal (Time.now ()))),
   197         ("tasks_proof", Markup.print_int (! forked_proofs)),
   198         ("tasks_ready", Markup.print_int ready),
   199         ("tasks_pending", Markup.print_int pending),
   200         ("tasks_running", Markup.print_int running),
   201         ("tasks_passive", Markup.print_int passive),
   202         ("workers_total", Markup.print_int total),
   203         ("workers_active", Markup.print_int active),
   204         ("workers_waiting", Markup.print_int waiting)] @
   205         ML_Statistics.get ();
   206     in
   207       Output.protocol_message (Markup.ML_statistics :: stats) ""
   208         handle Fail msg => warning msg
   209     end
   210   else ();
   211 
   212 
   213 (* cancellation primitives *)
   214 
   215 fun cancel_now group = (*requires SYNCHRONIZED*)
   216   let
   217     val running = Task_Queue.cancel (! queue) group;
   218     val _ = running |> List.app (fn thread =>
   219       if Simple_Thread.is_self thread then ()
   220       else Simple_Thread.interrupt_unsynchronized thread);
   221   in running end;
   222 
   223 fun cancel_all () = (*requires SYNCHRONIZED*)
   224   let
   225     val (groups, threads) = Task_Queue.cancel_all (! queue);
   226     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   227   in groups end;
   228 
   229 fun cancel_later group = (*requires SYNCHRONIZED*)
   230  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   231   broadcast scheduler_event);
   232 
   233 fun interruptible_task f x =
   234   (if Multithreading.available then
   235     Multithreading.with_attributes
   236       (if is_some (worker_task ())
   237        then Multithreading.private_interrupts
   238        else Multithreading.public_interrupts)
   239       (fn _ => f x)
   240    else interruptible f x)
   241   before Multithreading.interrupted ();
   242 
   243 
   244 (* worker threads *)
   245 
   246 fun worker_exec (task, jobs) =
   247   let
   248     val group = Task_Queue.group_of_task task;
   249     val valid = not (Task_Queue.is_canceled group);
   250     val ok =
   251       Task_Queue.running task (fn () =>
   252         setmp_worker_task task (fn () =>
   253           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   254     val _ =
   255       if ! Multithreading.trace >= 2 then
   256         Output.protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
   257           handle Fail msg => warning msg
   258       else ();
   259     val _ = SYNCHRONIZED "finish" (fn () =>
   260       let
   261         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   262         val test = Exn.capture Multithreading.interrupted ();
   263         val _ =
   264           if ok andalso not (Exn.is_interrupt_exn test) then ()
   265           else if null (cancel_now group) then ()
   266           else cancel_later group;
   267         val _ = broadcast work_finished;
   268         val _ = if maximal then () else signal work_available;
   269       in () end);
   270   in () end;
   271 
   272 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   273   let
   274     val state =
   275       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   276         SOME state => state
   277       | NONE => raise Fail "Unregistered worker thread");
   278     val _ = state := (if active then Waiting else Sleeping);
   279     val _ = wait cond;
   280     val _ = state := Working;
   281   in () end;
   282 
   283 fun worker_next () = (*requires SYNCHRONIZED*)
   284   if length (! workers) > ! max_workers then
   285     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   286      signal work_available;
   287      NONE)
   288   else if count_workers Working > ! max_active then
   289     (worker_wait false work_available; worker_next ())
   290   else
   291     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   292       NONE => (worker_wait false work_available; worker_next ())
   293     | some => (signal work_available; some));
   294 
   295 fun worker_loop name =
   296   (case SYNCHRONIZED name (fn () => worker_next ()) of
   297     NONE => ()
   298   | SOME work => (worker_exec work; worker_loop name));
   299 
   300 fun worker_start name = (*requires SYNCHRONIZED*)
   301   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   302     Unsynchronized.ref Working));
   303 
   304 
   305 (* scheduler *)
   306 
   307 fun scheduler_next () = (*requires SYNCHRONIZED*)
   308   let
   309     val now = Time.now ();
   310     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   311     val _ = if tick then last_round := now else ();
   312 
   313 
   314     (* runtime status *)
   315 
   316     val _ =
   317       if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
   318     val _ =
   319       if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
   320       then report_status () else ();
   321 
   322     val _ =
   323       if forall (Thread.isActive o #1) (! workers) then ()
   324       else
   325         let
   326           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   327           val _ = workers := alive;
   328         in
   329           Multithreading.tracing 0 (fn () =>
   330             "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   331         end;
   332 
   333 
   334     (* worker pool adjustments *)
   335 
   336     val max_active0 = ! max_active;
   337     val max_workers0 = ! max_workers;
   338 
   339     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   340     val _ = max_active := m;
   341 
   342     val mm =
   343       if ! do_shutdown then 0
   344       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   345     val _ =
   346       if tick andalso mm > ! max_workers then
   347         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   348       else if tick andalso mm < ! max_workers then
   349         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   350       else ();
   351     val _ =
   352       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   353         max_workers := mm
   354       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   355         max_workers := Int.min (mm, 2 * m)
   356       else ();
   357 
   358     val missing = ! max_workers - length (! workers);
   359     val _ =
   360       if missing > 0 then
   361         funpow missing (fn () =>
   362           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   363       else ();
   364 
   365     val _ =
   366       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   367       else signal work_available;
   368 
   369 
   370     (* canceled groups *)
   371 
   372     val _ =
   373       if null (! canceled) then ()
   374       else
   375        (Multithreading.tracing 1 (fn () =>
   376           string_of_int (length (! canceled)) ^ " canceled groups");
   377         Unsynchronized.change canceled (filter_out (null o cancel_now));
   378         signal work_available);
   379 
   380 
   381     (* delay loop *)
   382 
   383     val _ = Exn.release (wait_timeout next_round scheduler_event);
   384 
   385 
   386     (* shutdown *)
   387 
   388     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   389     val continue = not (! do_shutdown andalso null (! workers));
   390     val _ = if continue then () else (report_status (); scheduler := NONE);
   391 
   392     val _ = broadcast scheduler_event;
   393   in continue end
   394   handle exn =>
   395     if Exn.is_interrupt exn then
   396      (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
   397       List.app cancel_later (cancel_all ());
   398       signal work_available; true)
   399     else reraise exn;
   400 
   401 fun scheduler_loop () =
   402  (while
   403     Multithreading.with_attributes
   404       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   405       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   406   do (); last_round := Time.zeroTime);
   407 
   408 fun scheduler_active () = (*requires SYNCHRONIZED*)
   409   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   410 
   411 fun scheduler_check () = (*requires SYNCHRONIZED*)
   412  (do_shutdown := false;
   413   if scheduler_active () then ()
   414   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   415 
   416 
   417 
   418 (** futures **)
   419 
   420 (* cancel *)
   421 
   422 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   423   let
   424     val _ = if null (cancel_now group) then () else cancel_later group;
   425     val _ = signal work_available;
   426     val _ = scheduler_check ();
   427   in () end;
   428 
   429 fun cancel_group group =
   430   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   431 
   432 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   433 
   434 
   435 (* results *)
   436 
   437 fun error_msg pos ((serial, msg), exec_id) =
   438   Position.setmp_thread_data pos (fn () =>
   439     let val id = Position.get_id pos in
   440       if is_none id orelse is_none exec_id orelse id = exec_id
   441       then Output.error_msg' (serial, msg) else ()
   442     end) ();
   443 
   444 fun identify_result pos res =
   445   (case res of
   446     Exn.Exn exn =>
   447       let val exec_id =
   448         (case Position.get_id pos of
   449           NONE => []
   450         | SOME id => [(Markup.exec_idN, id)])
   451       in Exn.Exn (Par_Exn.identify exec_id exn) end
   452   | _ => res);
   453 
   454 fun assign_result group result res =
   455   let
   456     val _ = Single_Assignment.assign result res
   457       handle exn as Fail _ =>
   458         (case Single_Assignment.peek result of
   459           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   460         | _ => reraise exn);
   461     val ok =
   462       (case the (Single_Assignment.peek result) of
   463         Exn.Exn exn =>
   464           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   465       | Exn.Res _ => true);
   466   in ok end;
   467 
   468 
   469 (* future jobs *)
   470 
   471 fun future_job group interrupts (e: unit -> 'a) =
   472   let
   473     val result = Single_Assignment.var "future" : 'a result;
   474     val pos = Position.thread_data ();
   475     fun job ok =
   476       let
   477         val res =
   478           if ok then
   479             Exn.capture (fn () =>
   480               Multithreading.with_attributes
   481                 (if interrupts
   482                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   483                 (fn _ => Position.setmp_thread_data pos e ())) ()
   484           else Exn.interrupt_exn;
   485       in assign_result group result (identify_result pos res) end;
   486   in (result, job) end;
   487 
   488 
   489 (* fork *)
   490 
   491 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   492 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   493 
   494 fun forks ({name, group, deps, pri, interrupts}: params) es =
   495   if null es then []
   496   else
   497     let
   498       val grp =
   499         (case group of
   500           NONE => worker_subgroup ()
   501         | SOME grp => grp);
   502       fun enqueue e queue =
   503         let
   504           val (result, job) = future_job grp interrupts e;
   505           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   506           val future = Future {promised = false, task = task, result = result};
   507         in (future, queue') end;
   508     in
   509       SYNCHRONIZED "enqueue" (fn () =>
   510         let
   511           val (futures, queue') = fold_map enqueue es (! queue);
   512           val _ = queue := queue';
   513           val minimal = forall (not o Task_Queue.known_task queue') deps;
   514           val _ = if minimal then signal work_available else ();
   515           val _ = scheduler_check ();
   516         in futures end)
   517     end;
   518 
   519 fun fork e =
   520   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
   521 
   522 
   523 (* join *)
   524 
   525 fun get_result x =
   526   (case peek x of
   527     NONE => Exn.Exn (Fail "Unfinished future")
   528   | SOME res =>
   529       if Exn.is_interrupt_exn res then
   530         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   531           NONE => res
   532         | SOME exn => Exn.Exn exn)
   533       else res);
   534 
   535 local
   536 
   537 fun join_next deps = (*requires SYNCHRONIZED*)
   538   if null deps then NONE
   539   else
   540     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   541       (NONE, []) => NONE
   542     | (NONE, deps') =>
   543         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   544     | (SOME work, deps') => SOME (work, deps'));
   545 
   546 fun execute_work NONE = ()
   547   | execute_work (SOME (work, deps')) =
   548       (worker_joining (fn () => worker_exec work); join_work deps')
   549 and join_work deps =
   550   Multithreading.with_attributes Multithreading.no_interrupts
   551     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   552 
   553 in
   554 
   555 fun join_results xs =
   556   let
   557     val _ =
   558       if forall is_finished xs then ()
   559       else if Multithreading.self_critical () then
   560         error "Cannot join future values within critical section"
   561       else if is_some (worker_task ()) then join_work (map task_of xs)
   562       else List.app (ignore o Single_Assignment.await o result_of) xs;
   563   in map get_result xs end;
   564 
   565 end;
   566 
   567 fun join_result x = singleton join_results x;
   568 fun joins xs = Par_Exn.release_all (join_results xs);
   569 fun join x = Exn.release (join_result x);
   570 
   571 
   572 (* fast-path operations -- bypass task queue if possible *)
   573 
   574 fun value_result (res: 'a Exn.result) =
   575   let
   576     val task = Task_Queue.dummy_task;
   577     val group = Task_Queue.group_of_task task;
   578     val result = Single_Assignment.var "value" : 'a result;
   579     val _ = assign_result group result (identify_result (Position.thread_data ()) res);
   580   in Future {promised = false, task = task, result = result} end;
   581 
   582 fun value x = value_result (Exn.Res x);
   583 
   584 fun cond_forks args es =
   585   if Multithreading.enabled () then forks args es
   586   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   587 
   588 fun map_future f x =
   589   if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
   590   else
   591     let
   592       val task = task_of x;
   593       val group = Task_Queue.group_of_task task;
   594       val (result, job) = future_job group true (fn () => f (join x));
   595 
   596       val extended = SYNCHRONIZED "extend" (fn () =>
   597         (case Task_Queue.extend task job (! queue) of
   598           SOME queue' => (queue := queue'; true)
   599         | NONE => false));
   600     in
   601       if extended then Future {promised = false, task = task, result = result}
   602       else
   603         (singleton o cond_forks)
   604           {name = "map_future", group = SOME group, deps = [task],
   605             pri = Task_Queue.pri_of_task task, interrupts = true}
   606           (fn () => f (join x))
   607     end;
   608 
   609 
   610 (* promised futures -- fulfilled by external means *)
   611 
   612 fun promise_group group abort : 'a future =
   613   let
   614     val result = Single_Assignment.var "promise" : 'a result;
   615     fun assign () = assign_result group result Exn.interrupt_exn
   616       handle Fail _ => true
   617         | exn =>
   618             if Exn.is_interrupt exn
   619             then raise Fail "Concurrent attempt to fulfill promise"
   620             else reraise exn;
   621     fun job () =
   622       Multithreading.with_attributes Multithreading.no_interrupts
   623         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   624     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   625       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   626   in Future {promised = true, task = task, result = result} end;
   627 
   628 fun promise abort = promise_group (worker_subgroup ()) abort;
   629 
   630 fun fulfill_result (Future {promised, task, result}) res =
   631   if not promised then raise Fail "Not a promised future"
   632   else
   633     let
   634       val group = Task_Queue.group_of_task task;
   635       val pos = Position.thread_data ();
   636       fun job ok =
   637         assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
   638       val _ =
   639         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   640           let
   641             val passive_job =
   642               SYNCHRONIZED "fulfill_result" (fn () =>
   643                 Unsynchronized.change_result queue
   644                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   645           in
   646             (case passive_job of
   647               SOME true => worker_exec (task, [job])
   648             | SOME false => ()
   649             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   650           end);
   651       val _ =
   652         if is_some (Single_Assignment.peek result) then ()
   653         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   654     in () end;
   655 
   656 fun fulfill x res = fulfill_result x (Exn.Res res);
   657 
   658 
   659 (* terminate *)
   660 
   661 fun terminate group =
   662   let
   663     val tasks =
   664       SYNCHRONIZED "terminate" (fn () =>
   665         let val _ = cancel_group_unsynchronized group;
   666         in Task_Queue.group_tasks (! queue) group end);
   667   in
   668     if null tasks then ()
   669     else
   670       (singleton o forks)
   671         {name = "terminate", group = SOME (new_group NONE),
   672           deps = tasks, pri = 0, interrupts = false} I
   673       |> join
   674   end;
   675 
   676 
   677 (* shutdown *)
   678 
   679 fun shutdown () =
   680   if not Multithreading.available then ()
   681   else if is_some (worker_task ()) then
   682     raise Fail "Cannot shutdown while running as worker thread"
   683   else
   684     SYNCHRONIZED "shutdown" (fn () =>
   685       while scheduler_active () do
   686        (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
   687         wait scheduler_event));
   688 
   689 
   690 (*final declarations of this structure!*)
   691 val map = map_future;
   692 
   693 end;
   694 
   695 type 'a future = 'a Future.future;
   696