src/Pure/Concurrent/future.ML
author wenzelm
Wed, 16 Jan 2013 16:26:36 +0100
changeset 51926 ee7fe4230642
parent 51924 b2fb1ab1475d
child 51929 fe4714886d92
permissions -rw-r--r--
more explicit treatment of (optional) exception properties, notably for "serial" -- avoid conflict with startPosition = offset;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes peer and subgroup members to be
    30     interrupted eventually.  Interrupted tasks that lack regular
    31     result information, will pick up parallel exceptions from the
    32     cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val worker_subgroup: unit -> group
    50   type 'a future
    51   val task_of: 'a future -> task
    52   val peek: 'a future -> 'a Exn.result option
    53   val is_finished: 'a future -> bool
    54   val ML_statistics: bool Unsynchronized.ref
    55   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    56   val cancel_group: group -> unit
    57   val cancel: 'a future -> unit
    58   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    59   val default_params: params
    60   val forks: params -> (unit -> 'a) list -> 'a future list
    61   val fork_pri: int -> (unit -> 'a) -> 'a future
    62   val fork: (unit -> 'a) -> 'a future
    63   val join_results: 'a future list -> 'a Exn.result list
    64   val join_result: 'a future -> 'a Exn.result
    65   val joins: 'a future list -> 'a list
    66   val join: 'a future -> 'a
    67   val value_result: 'a Exn.result -> 'a future
    68   val value: 'a -> 'a future
    69   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    70   val map: ('a -> 'b) -> 'a future -> 'b future
    71   val promise_group: group -> (unit -> unit) -> 'a future
    72   val promise: (unit -> unit) -> 'a future
    73   val fulfill_result: 'a future -> 'a Exn.result -> unit
    74   val fulfill: 'a future -> 'a -> unit
    75   val terminate: group -> unit
    76   val shutdown: unit -> unit
    77 end;
    78 
    79 structure Future: FUTURE =
    80 struct
    81 
    82 (** future values **)
    83 
    84 type task = Task_Queue.task;
    85 type group = Task_Queue.group;
    86 val new_group = Task_Queue.new_group;
    87 
    88 
    89 (* identifiers *)
    90 
    91 local
    92   val tag = Universal.tag () : task option Universal.tag;
    93 in
    94   fun worker_task () = the_default NONE (Thread.getLocal tag);
    95   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    96 end;
    97 
    98 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    99 fun worker_subgroup () = new_group (worker_group ());
   100 
   101 fun worker_joining e =
   102   (case worker_task () of
   103     NONE => e ()
   104   | SOME task => Task_Queue.joining task e);
   105 
   106 fun worker_waiting deps e =
   107   (case worker_task () of
   108     NONE => e ()
   109   | SOME task => Task_Queue.waiting task deps e);
   110 
   111 
   112 (* datatype future *)
   113 
   114 type 'a result = 'a Exn.result Single_Assignment.var;
   115 
   116 datatype 'a future = Future of
   117  {promised: bool,
   118   task: task,
   119   result: 'a result};
   120 
   121 fun task_of (Future {task, ...}) = task;
   122 fun result_of (Future {result, ...}) = result;
   123 
   124 fun peek x = Single_Assignment.peek (result_of x);
   125 fun is_finished x = is_some (peek x);
   126 
   127 
   128 
   129 (** scheduling **)
   130 
   131 (* synchronization *)
   132 
   133 val scheduler_event = ConditionVar.conditionVar ();
   134 val work_available = ConditionVar.conditionVar ();
   135 val work_finished = ConditionVar.conditionVar ();
   136 
   137 local
   138   val lock = Mutex.mutex ();
   139 in
   140 
   141 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   142 
   143 fun wait cond = (*requires SYNCHRONIZED*)
   144   Multithreading.sync_wait NONE NONE cond lock;
   145 
   146 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   147   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   148 
   149 fun signal cond = (*requires SYNCHRONIZED*)
   150   ConditionVar.signal cond;
   151 
   152 fun broadcast cond = (*requires SYNCHRONIZED*)
   153   ConditionVar.broadcast cond;
   154 
   155 fun broadcast_work () = (*requires SYNCHRONIZED*)
   156  (ConditionVar.broadcast work_available;
   157   ConditionVar.broadcast work_finished);
   158 
   159 end;
   160 
   161 
   162 (* global state *)
   163 
   164 val queue = Unsynchronized.ref Task_Queue.empty;
   165 val next = Unsynchronized.ref 0;
   166 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   167 val canceled = Unsynchronized.ref ([]: group list);
   168 val do_shutdown = Unsynchronized.ref false;
   169 val max_workers = Unsynchronized.ref 0;
   170 val max_active = Unsynchronized.ref 0;
   171 val worker_trend = Unsynchronized.ref 0;
   172 
   173 val status_ticks = Unsynchronized.ref 0;
   174 val last_round = Unsynchronized.ref Time.zeroTime;
   175 val next_round = seconds 0.05;
   176 
   177 datatype worker_state = Working | Waiting | Sleeping;
   178 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   179 
   180 fun count_workers state = (*requires SYNCHRONIZED*)
   181   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   182 
   183 
   184 
   185 (* status *)
   186 
   187 val ML_statistics = Unsynchronized.ref false;
   188 
   189 fun report_status () = (*requires SYNCHRONIZED*)
   190   if ! ML_statistics then
   191     let
   192       val {ready, pending, running, passive} = Task_Queue.status (! queue);
   193       val total = length (! workers);
   194       val active = count_workers Working;
   195       val waiting = count_workers Waiting;
   196       val stats =
   197        [("now", signed_string_of_real (Time.toReal (Time.now ()))),
   198         ("tasks_ready", Markup.print_int ready),
   199         ("tasks_pending", Markup.print_int pending),
   200         ("tasks_running", Markup.print_int running),
   201         ("tasks_passive", Markup.print_int passive),
   202         ("workers_total", Markup.print_int total),
   203         ("workers_active", Markup.print_int active),
   204         ("workers_waiting", Markup.print_int waiting)] @
   205         ML_Statistics.get ();
   206     in
   207       Output.protocol_message (Markup.ML_statistics :: stats) ""
   208         handle Fail msg => warning msg
   209     end
   210   else ();
   211 
   212 
   213 (* cancellation primitives *)
   214 
   215 fun cancel_now group = (*requires SYNCHRONIZED*)
   216   let
   217     val running = Task_Queue.cancel (! queue) group;
   218     val _ = running |> List.app (fn thread =>
   219       if Simple_Thread.is_self thread then ()
   220       else Simple_Thread.interrupt_unsynchronized thread);
   221   in running end;
   222 
   223 fun cancel_all () = (*requires SYNCHRONIZED*)
   224   let
   225     val (groups, threads) = Task_Queue.cancel_all (! queue);
   226     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   227   in groups end;
   228 
   229 fun cancel_later group = (*requires SYNCHRONIZED*)
   230  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   231   broadcast scheduler_event);
   232 
   233 fun interruptible_task f x =
   234   (if Multithreading.available then
   235     Multithreading.with_attributes
   236       (if is_some (worker_task ())
   237        then Multithreading.private_interrupts
   238        else Multithreading.public_interrupts)
   239       (fn _ => f x)
   240    else interruptible f x)
   241   before Multithreading.interrupted ();
   242 
   243 
   244 (* worker threads *)
   245 
   246 fun worker_exec (task, jobs) =
   247   let
   248     val group = Task_Queue.group_of_task task;
   249     val valid = not (Task_Queue.is_canceled group);
   250     val ok =
   251       Task_Queue.running task (fn () =>
   252         setmp_worker_task task (fn () =>
   253           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   254     val _ = Multithreading.tracing 2 (fn () =>
   255       let
   256         val s = Task_Queue.str_of_task_groups task;
   257         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   258         val (run, wait, deps) = Task_Queue.timing_of_task task;
   259       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   260     val _ = SYNCHRONIZED "finish" (fn () =>
   261       let
   262         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   263         val test = Exn.capture Multithreading.interrupted ();
   264         val _ =
   265           if ok andalso not (Exn.is_interrupt_exn test) then ()
   266           else if null (cancel_now group) then ()
   267           else cancel_later group;
   268         val _ = broadcast work_finished;
   269         val _ = if maximal then () else signal work_available;
   270       in () end);
   271   in () end;
   272 
   273 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   274   let
   275     val state =
   276       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   277         SOME state => state
   278       | NONE => raise Fail "Unregistered worker thread");
   279     val _ = state := (if active then Waiting else Sleeping);
   280     val _ = wait cond;
   281     val _ = state := Working;
   282   in () end;
   283 
   284 fun worker_next () = (*requires SYNCHRONIZED*)
   285   if length (! workers) > ! max_workers then
   286     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   287      signal work_available;
   288      NONE)
   289   else if count_workers Working > ! max_active then
   290     (worker_wait false work_available; worker_next ())
   291   else
   292     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   293       NONE => (worker_wait false work_available; worker_next ())
   294     | some => (signal work_available; some));
   295 
   296 fun worker_loop name =
   297   (case SYNCHRONIZED name (fn () => worker_next ()) of
   298     NONE => ()
   299   | SOME work => (worker_exec work; worker_loop name));
   300 
   301 fun worker_start name = (*requires SYNCHRONIZED*)
   302   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   303     Unsynchronized.ref Working));
   304 
   305 
   306 (* scheduler *)
   307 
   308 fun scheduler_next () = (*requires SYNCHRONIZED*)
   309   let
   310     val now = Time.now ();
   311     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   312     val _ = if tick then last_round := now else ();
   313 
   314 
   315     (* runtime status *)
   316 
   317     val _ =
   318       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   319     val _ =
   320       if tick andalso ! status_ticks = 0 then report_status () else ();
   321 
   322     val _ =
   323       if forall (Thread.isActive o #1) (! workers) then ()
   324       else
   325         let
   326           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   327           val _ = workers := alive;
   328         in
   329           Multithreading.tracing 0 (fn () =>
   330             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   331         end;
   332 
   333 
   334     (* worker pool adjustments *)
   335 
   336     val max_active0 = ! max_active;
   337     val max_workers0 = ! max_workers;
   338 
   339     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   340     val _ = max_active := m;
   341 
   342     val mm =
   343       if ! do_shutdown then 0
   344       else if m = 9999 then 1
   345       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   346     val _ =
   347       if tick andalso mm > ! max_workers then
   348         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   349       else if tick andalso mm < ! max_workers then
   350         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   351       else ();
   352     val _ =
   353       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   354         max_workers := mm
   355       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   356         max_workers := Int.min (mm, 2 * m)
   357       else ();
   358 
   359     val missing = ! max_workers - length (! workers);
   360     val _ =
   361       if missing > 0 then
   362         funpow missing (fn () =>
   363           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   364       else ();
   365 
   366     val _ =
   367       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   368       else signal work_available;
   369 
   370 
   371     (* canceled groups *)
   372 
   373     val _ =
   374       if null (! canceled) then ()
   375       else
   376        (Multithreading.tracing 1 (fn () =>
   377           string_of_int (length (! canceled)) ^ " canceled groups");
   378         Unsynchronized.change canceled (filter_out (null o cancel_now));
   379         broadcast_work ());
   380 
   381 
   382     (* delay loop *)
   383 
   384     val _ = Exn.release (wait_timeout next_round scheduler_event);
   385 
   386 
   387     (* shutdown *)
   388 
   389     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   390     val continue = not (! do_shutdown andalso null (! workers));
   391     val _ = if continue then () else (report_status (); scheduler := NONE);
   392 
   393     val _ = broadcast scheduler_event;
   394   in continue end
   395   handle exn =>
   396     if Exn.is_interrupt exn then
   397      (Multithreading.tracing 1 (fn () => "Interrupt");
   398       List.app cancel_later (cancel_all ());
   399       broadcast_work (); true)
   400     else reraise exn;
   401 
   402 fun scheduler_loop () =
   403  (while
   404     Multithreading.with_attributes
   405       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   406       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   407   do (); last_round := Time.zeroTime);
   408 
   409 fun scheduler_active () = (*requires SYNCHRONIZED*)
   410   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   411 
   412 fun scheduler_check () = (*requires SYNCHRONIZED*)
   413  (do_shutdown := false;
   414   if scheduler_active () then ()
   415   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   416 
   417 
   418 
   419 (** futures **)
   420 
   421 (* cancel *)
   422 
   423 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   424   let
   425     val _ = if null (cancel_now group) then () else cancel_later group;
   426     val _ = signal work_available;
   427     val _ = scheduler_check ();
   428   in () end;
   429 
   430 fun cancel_group group =
   431   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   432 
   433 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   434 
   435 
   436 (* future jobs *)
   437 
   438 fun assign_result group result raw_res =
   439   let
   440     val res =
   441       (case raw_res of
   442         Exn.Exn exn => Exn.Exn (Par_Exn.identify [] exn)
   443       | _ => raw_res);
   444     val _ = Single_Assignment.assign result res
   445       handle exn as Fail _ =>
   446         (case Single_Assignment.peek result of
   447           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   448         | _ => reraise exn);
   449     val ok =
   450       (case the (Single_Assignment.peek result) of
   451         Exn.Exn exn =>
   452           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   453       | Exn.Res _ => true);
   454   in ok end;
   455 
   456 fun future_job group interrupts (e: unit -> 'a) =
   457   let
   458     val result = Single_Assignment.var "future" : 'a result;
   459     val pos = Position.thread_data ();
   460     fun job ok =
   461       let
   462         val res =
   463           if ok then
   464             Exn.capture (fn () =>
   465               Multithreading.with_attributes
   466                 (if interrupts
   467                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   468                 (fn _ => Position.setmp_thread_data pos e ())) ()
   469           else Exn.interrupt_exn;
   470       in assign_result group result res end;
   471   in (result, job) end;
   472 
   473 
   474 (* fork *)
   475 
   476 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   477 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   478 
   479 fun forks ({name, group, deps, pri, interrupts}: params) es =
   480   if null es then []
   481   else
   482     let
   483       val grp =
   484         (case group of
   485           NONE => worker_subgroup ()
   486         | SOME grp => grp);
   487       fun enqueue e queue =
   488         let
   489           val (result, job) = future_job grp interrupts e;
   490           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   491           val future = Future {promised = false, task = task, result = result};
   492         in (future, queue') end;
   493     in
   494       SYNCHRONIZED "enqueue" (fn () =>
   495         let
   496           val (futures, queue') = fold_map enqueue es (! queue);
   497           val _ = queue := queue';
   498           val minimal = forall (not o Task_Queue.known_task queue') deps;
   499           val _ = if minimal then signal work_available else ();
   500           val _ = scheduler_check ();
   501         in futures end)
   502     end;
   503 
   504 fun fork_pri pri e =
   505   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
   506 
   507 fun fork e = fork_pri 0 e;
   508 
   509 
   510 (* join *)
   511 
   512 fun get_result x =
   513   (case peek x of
   514     NONE => Exn.Exn (Fail "Unfinished future")
   515   | SOME res =>
   516       if Exn.is_interrupt_exn res then
   517         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   518           NONE => res
   519         | SOME exn => Exn.Exn exn)
   520       else res);
   521 
   522 local
   523 
   524 fun join_next deps = (*requires SYNCHRONIZED*)
   525   if null deps then NONE
   526   else
   527     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   528       (NONE, []) => NONE
   529     | (NONE, deps') =>
   530         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   531     | (SOME work, deps') => SOME (work, deps'));
   532 
   533 fun execute_work NONE = ()
   534   | execute_work (SOME (work, deps')) =
   535       (worker_joining (fn () => worker_exec work); join_work deps')
   536 and join_work deps =
   537   Multithreading.with_attributes Multithreading.no_interrupts
   538     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   539 
   540 in
   541 
   542 fun join_results xs =
   543   let
   544     val _ =
   545       if forall is_finished xs then ()
   546       else if Multithreading.self_critical () then
   547         error "Cannot join future values within critical section"
   548       else if is_some (worker_task ()) then join_work (map task_of xs)
   549       else List.app (ignore o Single_Assignment.await o result_of) xs;
   550   in map get_result xs end;
   551 
   552 end;
   553 
   554 fun join_result x = singleton join_results x;
   555 fun joins xs = Par_Exn.release_all (join_results xs);
   556 fun join x = Exn.release (join_result x);
   557 
   558 
   559 (* fast-path versions -- bypassing task queue *)
   560 
   561 fun value_result (res: 'a Exn.result) =
   562   let
   563     val task = Task_Queue.dummy_task;
   564     val group = Task_Queue.group_of_task task;
   565     val result = Single_Assignment.var "value" : 'a result;
   566     val _ = assign_result group result res;
   567   in Future {promised = false, task = task, result = result} end;
   568 
   569 fun value x = value_result (Exn.Res x);
   570 
   571 fun cond_forks args es =
   572   if Multithreading.enabled () then forks args es
   573   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   574 
   575 fun map_future f x =
   576   if is_finished x then value (f (join x))
   577   else
   578     let
   579       val task = task_of x;
   580       val group = Task_Queue.group_of_task task;
   581       val (result, job) = future_job group true (fn () => f (join x));
   582 
   583       val extended = SYNCHRONIZED "extend" (fn () =>
   584         (case Task_Queue.extend task job (! queue) of
   585           SOME queue' => (queue := queue'; true)
   586         | NONE => false));
   587     in
   588       if extended then Future {promised = false, task = task, result = result}
   589       else
   590         (singleton o cond_forks)
   591           {name = "map_future", group = SOME group, deps = [task],
   592             pri = Task_Queue.pri_of_task task, interrupts = true}
   593           (fn () => f (join x))
   594     end;
   595 
   596 
   597 (* promised futures -- fulfilled by external means *)
   598 
   599 fun promise_group group abort : 'a future =
   600   let
   601     val result = Single_Assignment.var "promise" : 'a result;
   602     fun assign () = assign_result group result Exn.interrupt_exn
   603       handle Fail _ => true
   604         | exn =>
   605             if Exn.is_interrupt exn
   606             then raise Fail "Concurrent attempt to fulfill promise"
   607             else reraise exn;
   608     fun job () =
   609       Multithreading.with_attributes Multithreading.no_interrupts
   610         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   611     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   612       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   613   in Future {promised = true, task = task, result = result} end;
   614 
   615 fun promise abort = promise_group (worker_subgroup ()) abort;
   616 
   617 fun fulfill_result (Future {promised, task, result}) res =
   618   if not promised then raise Fail "Not a promised future"
   619   else
   620     let
   621       val group = Task_Queue.group_of_task task;
   622       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   623       val _ =
   624         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   625           let
   626             val passive_job =
   627               SYNCHRONIZED "fulfill_result" (fn () =>
   628                 Unsynchronized.change_result queue
   629                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   630           in
   631             (case passive_job of
   632               SOME true => worker_exec (task, [job])
   633             | SOME false => ()
   634             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   635           end);
   636       val _ =
   637         if is_some (Single_Assignment.peek result) then ()
   638         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   639     in () end;
   640 
   641 fun fulfill x res = fulfill_result x (Exn.Res res);
   642 
   643 
   644 (* terminate *)
   645 
   646 fun terminate group =
   647   let
   648     val tasks =
   649       SYNCHRONIZED "terminate" (fn () =>
   650         let val _ = cancel_group_unsynchronized group;
   651         in Task_Queue.group_tasks (! queue) group end);
   652   in
   653     if null tasks then ()
   654     else
   655       (singleton o forks)
   656         {name = "terminate", group = SOME (new_group NONE),
   657           deps = tasks, pri = 0, interrupts = false} I
   658       |> join
   659   end;
   660 
   661 
   662 (* shutdown *)
   663 
   664 fun shutdown () =
   665   if Multithreading.available then
   666     SYNCHRONIZED "shutdown" (fn () =>
   667      while scheduler_active () do
   668       (wait scheduler_event; broadcast_work ()))
   669   else ();
   670 
   671 
   672 (*final declarations of this structure!*)
   673 val map = map_future;
   674 
   675 end;
   676 
   677 type 'a future = 'a Future.future;
   678