src/Pure/Concurrent/future.ML
author wenzelm
Thu, 18 Oct 2012 12:00:27 +0200
changeset 50921 06a3570b0f0a
parent 50909 69bfd86cc711
child 50923 8a23e8a6bc02
permissions -rw-r--r--
more official Future.terminate;
tuned signature;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes the immediate peers to be
    30     interrupted eventually (i.e. none by default).  Interrupted tasks
    31     that lack regular result information, will pick up parallel
    32     exceptions from the cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val worker_subgroup: unit -> group
    50   type 'a future
    51   val task_of: 'a future -> task
    52   val peek: 'a future -> 'a Exn.result option
    53   val is_finished: 'a future -> bool
    54   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    55   val cancel_group: group -> unit
    56   val cancel: 'a future -> unit
    57   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    58   val default_params: params
    59   val forks: params -> (unit -> 'a) list -> 'a future list
    60   val fork_pri: int -> (unit -> 'a) -> 'a future
    61   val fork: (unit -> 'a) -> 'a future
    62   val join_results: 'a future list -> 'a Exn.result list
    63   val join_result: 'a future -> 'a Exn.result
    64   val joins: 'a future list -> 'a list
    65   val join: 'a future -> 'a
    66   val value_result: 'a Exn.result -> 'a future
    67   val value: 'a -> 'a future
    68   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    69   val map: ('a -> 'b) -> 'a future -> 'b future
    70   val promise_group: group -> (unit -> unit) -> 'a future
    71   val promise: (unit -> unit) -> 'a future
    72   val fulfill_result: 'a future -> 'a Exn.result -> unit
    73   val fulfill: 'a future -> 'a -> unit
    74   val terminate: group -> unit
    75   val shutdown: unit -> unit
    76 end;
    77 
    78 structure Future: FUTURE =
    79 struct
    80 
    81 (** future values **)
    82 
    83 type task = Task_Queue.task;
    84 type group = Task_Queue.group;
    85 val new_group = Task_Queue.new_group;
    86 
    87 
    88 (* identifiers *)
    89 
    90 local
    91   val tag = Universal.tag () : task option Universal.tag;
    92 in
    93   fun worker_task () = the_default NONE (Thread.getLocal tag);
    94   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    95 end;
    96 
    97 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    98 fun worker_subgroup () = new_group (worker_group ());
    99 
   100 fun worker_joining e =
   101   (case worker_task () of
   102     NONE => e ()
   103   | SOME task => Task_Queue.joining task e);
   104 
   105 fun worker_waiting deps e =
   106   (case worker_task () of
   107     NONE => e ()
   108   | SOME task => Task_Queue.waiting task deps e);
   109 
   110 
   111 (* datatype future *)
   112 
   113 type 'a result = 'a Exn.result Single_Assignment.var;
   114 
   115 datatype 'a future = Future of
   116  {promised: bool,
   117   task: task,
   118   result: 'a result};
   119 
   120 fun task_of (Future {task, ...}) = task;
   121 fun result_of (Future {result, ...}) = result;
   122 
   123 fun peek x = Single_Assignment.peek (result_of x);
   124 fun is_finished x = is_some (peek x);
   125 
   126 
   127 
   128 (** scheduling **)
   129 
   130 (* synchronization *)
   131 
   132 val scheduler_event = ConditionVar.conditionVar ();
   133 val work_available = ConditionVar.conditionVar ();
   134 val work_finished = ConditionVar.conditionVar ();
   135 
   136 local
   137   val lock = Mutex.mutex ();
   138 in
   139 
   140 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   141 
   142 fun wait cond = (*requires SYNCHRONIZED*)
   143   Multithreading.sync_wait NONE NONE cond lock;
   144 
   145 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   146   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   147 
   148 fun signal cond = (*requires SYNCHRONIZED*)
   149   ConditionVar.signal cond;
   150 
   151 fun broadcast cond = (*requires SYNCHRONIZED*)
   152   ConditionVar.broadcast cond;
   153 
   154 fun broadcast_work () = (*requires SYNCHRONIZED*)
   155  (ConditionVar.broadcast work_available;
   156   ConditionVar.broadcast work_finished);
   157 
   158 end;
   159 
   160 
   161 (* global state *)
   162 
   163 val queue = Unsynchronized.ref Task_Queue.empty;
   164 val next = Unsynchronized.ref 0;
   165 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   166 val canceled = Unsynchronized.ref ([]: group list);
   167 val do_shutdown = Unsynchronized.ref false;
   168 val max_workers = Unsynchronized.ref 0;
   169 val max_active = Unsynchronized.ref 0;
   170 val worker_trend = Unsynchronized.ref 0;
   171 
   172 datatype worker_state = Working | Waiting | Sleeping;
   173 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   174 
   175 fun count_workers state = (*requires SYNCHRONIZED*)
   176   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   177 
   178 
   179 (* cancellation primitives *)
   180 
   181 fun cancel_now group = (*requires SYNCHRONIZED*)
   182   let
   183     val running = Task_Queue.cancel (! queue) group;
   184     val _ = running |> List.app (fn thread =>
   185       if Simple_Thread.is_self thread then ()
   186       else Simple_Thread.interrupt_unsynchronized thread);
   187   in running end;
   188 
   189 fun cancel_all () = (*requires SYNCHRONIZED*)
   190   let
   191     val (groups, threads) = Task_Queue.cancel_all (! queue);
   192     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   193   in groups end;
   194 
   195 fun cancel_later group = (*requires SYNCHRONIZED*)
   196  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   197   broadcast scheduler_event);
   198 
   199 fun interruptible_task f x =
   200   (if Multithreading.available then
   201     Multithreading.with_attributes
   202       (if is_some (worker_task ())
   203        then Multithreading.private_interrupts
   204        else Multithreading.public_interrupts)
   205       (fn _ => f x)
   206    else interruptible f x)
   207   before Multithreading.interrupted ();
   208 
   209 
   210 (* worker threads *)
   211 
   212 fun worker_exec (task, jobs) =
   213   let
   214     val group = Task_Queue.group_of_task task;
   215     val valid = not (Task_Queue.is_canceled group);
   216     val ok =
   217       Task_Queue.running task (fn () =>
   218         setmp_worker_task task (fn () =>
   219           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   220     val _ = Multithreading.tracing 2 (fn () =>
   221       let
   222         val s = Task_Queue.str_of_task_groups task;
   223         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   224         val (run, wait, deps) = Task_Queue.timing_of_task task;
   225       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   226     val _ = SYNCHRONIZED "finish" (fn () =>
   227       let
   228         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   229         val test = Exn.capture Multithreading.interrupted ();
   230         val _ =
   231           if ok andalso not (Exn.is_interrupt_exn test) then ()
   232           else if null (cancel_now group) then ()
   233           else cancel_later group;
   234         val _ = broadcast work_finished;
   235         val _ = if maximal then () else signal work_available;
   236       in () end);
   237   in () end;
   238 
   239 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   240   let
   241     val state =
   242       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   243         SOME state => state
   244       | NONE => raise Fail "Unregistered worker thread");
   245     val _ = state := (if active then Waiting else Sleeping);
   246     val _ = wait cond;
   247     val _ = state := Working;
   248   in () end;
   249 
   250 fun worker_next () = (*requires SYNCHRONIZED*)
   251   if length (! workers) > ! max_workers then
   252     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   253      signal work_available;
   254      NONE)
   255   else if count_workers Working > ! max_active then
   256     (worker_wait false work_available; worker_next ())
   257   else
   258     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   259       NONE => (worker_wait false work_available; worker_next ())
   260     | some => (signal work_available; some));
   261 
   262 fun worker_loop name =
   263   (case SYNCHRONIZED name (fn () => worker_next ()) of
   264     NONE => ()
   265   | SOME work => (worker_exec work; worker_loop name));
   266 
   267 fun worker_start name = (*requires SYNCHRONIZED*)
   268   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   269     Unsynchronized.ref Working));
   270 
   271 
   272 (* scheduler *)
   273 
   274 val status_ticks = Unsynchronized.ref 0;
   275 
   276 val last_round = Unsynchronized.ref Time.zeroTime;
   277 val next_round = seconds 0.05;
   278 
   279 fun scheduler_next () = (*requires SYNCHRONIZED*)
   280   let
   281     val now = Time.now ();
   282     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   283     val _ = if tick then last_round := now else ();
   284 
   285 
   286     (* queue and worker status *)
   287 
   288     val _ =
   289       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   290     val _ =
   291       if tick andalso ! status_ticks = 0 then
   292         Multithreading.tracing 1 (fn () =>
   293           let
   294             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   295             val total = length (! workers);
   296             val active = count_workers Working;
   297             val waiting = count_workers Waiting;
   298           in
   299             "SCHEDULE " ^ Time.toString now ^ ": " ^
   300               string_of_int ready ^ " ready, " ^
   301               string_of_int pending ^ " pending, " ^
   302               string_of_int running ^ " running, " ^
   303               string_of_int passive ^ " passive; " ^
   304               string_of_int total ^ " workers, " ^
   305               string_of_int active ^ " active, " ^
   306               string_of_int waiting ^ " waiting "
   307           end)
   308       else ();
   309 
   310     val _ =
   311       if forall (Thread.isActive o #1) (! workers) then ()
   312       else
   313         let
   314           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   315           val _ = workers := alive;
   316         in
   317           Multithreading.tracing 0 (fn () =>
   318             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   319         end;
   320 
   321 
   322     (* worker pool adjustments *)
   323 
   324     val max_active0 = ! max_active;
   325     val max_workers0 = ! max_workers;
   326 
   327     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   328     val _ = max_active := m;
   329 
   330     val mm =
   331       if ! do_shutdown then 0
   332       else if m = 9999 then 1
   333       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   334     val _ =
   335       if tick andalso mm > ! max_workers then
   336         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   337       else if tick andalso mm < ! max_workers then
   338         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   339       else ();
   340     val _ =
   341       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   342         max_workers := mm
   343       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   344         max_workers := Int.min (mm, 2 * m)
   345       else ();
   346 
   347     val missing = ! max_workers - length (! workers);
   348     val _ =
   349       if missing > 0 then
   350         funpow missing (fn () =>
   351           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   352       else ();
   353 
   354     val _ =
   355       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   356       else signal work_available;
   357 
   358 
   359     (* canceled groups *)
   360 
   361     val _ =
   362       if null (! canceled) then ()
   363       else
   364        (Multithreading.tracing 1 (fn () =>
   365           string_of_int (length (! canceled)) ^ " canceled groups");
   366         Unsynchronized.change canceled (filter_out (null o cancel_now));
   367         broadcast_work ());
   368 
   369 
   370     (* delay loop *)
   371 
   372     val _ = Exn.release (wait_timeout next_round scheduler_event);
   373 
   374 
   375     (* shutdown *)
   376 
   377     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   378     val continue = not (! do_shutdown andalso null (! workers));
   379     val _ = if continue then () else scheduler := NONE;
   380 
   381     val _ = broadcast scheduler_event;
   382   in continue end
   383   handle exn =>
   384     if Exn.is_interrupt exn then
   385      (Multithreading.tracing 1 (fn () => "Interrupt");
   386       List.app cancel_later (cancel_all ());
   387       broadcast_work (); true)
   388     else reraise exn;
   389 
   390 fun scheduler_loop () =
   391  (while
   392     Multithreading.with_attributes
   393       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   394       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   395   do (); last_round := Time.zeroTime);
   396 
   397 fun scheduler_active () = (*requires SYNCHRONIZED*)
   398   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   399 
   400 fun scheduler_check () = (*requires SYNCHRONIZED*)
   401  (do_shutdown := false;
   402   if scheduler_active () then ()
   403   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   404 
   405 
   406 
   407 (** futures **)
   408 
   409 (* cancel *)
   410 
   411 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   412   let
   413     val _ = if null (cancel_now group) then () else cancel_later group;
   414     val _ = signal work_available;
   415     val _ = scheduler_check ();
   416   in () end;
   417 
   418 fun cancel_group group =
   419   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   420 
   421 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   422 
   423 
   424 (* future jobs *)
   425 
   426 fun assign_result group result raw_res =
   427   let
   428     val res =
   429       (case raw_res of
   430         Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
   431       | _ => raw_res);
   432     val _ = Single_Assignment.assign result res
   433       handle exn as Fail _ =>
   434         (case Single_Assignment.peek result of
   435           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   436         | _ => reraise exn);
   437     val ok =
   438       (case the (Single_Assignment.peek result) of
   439         Exn.Exn exn =>
   440           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   441       | Exn.Res _ => true);
   442   in ok end;
   443 
   444 fun future_job group interrupts (e: unit -> 'a) =
   445   let
   446     val result = Single_Assignment.var "future" : 'a result;
   447     val pos = Position.thread_data ();
   448     fun job ok =
   449       let
   450         val res =
   451           if ok then
   452             Exn.capture (fn () =>
   453               Multithreading.with_attributes
   454                 (if interrupts
   455                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   456                 (fn _ => Position.setmp_thread_data pos e ())) ()
   457           else Exn.interrupt_exn;
   458       in assign_result group result res end;
   459   in (result, job) end;
   460 
   461 
   462 (* fork *)
   463 
   464 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   465 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   466 
   467 fun forks ({name, group, deps, pri, interrupts}: params) es =
   468   if null es then []
   469   else
   470     let
   471       val grp =
   472         (case group of
   473           NONE => worker_subgroup ()
   474         | SOME grp => grp);
   475       fun enqueue e queue =
   476         let
   477           val (result, job) = future_job grp interrupts e;
   478           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   479           val future = Future {promised = false, task = task, result = result};
   480         in (future, queue') end;
   481     in
   482       SYNCHRONIZED "enqueue" (fn () =>
   483         let
   484           val (futures, queue') = fold_map enqueue es (! queue);
   485           val _ = queue := queue';
   486           val minimal = forall (not o Task_Queue.known_task queue') deps;
   487           val _ = if minimal then signal work_available else ();
   488           val _ = scheduler_check ();
   489         in futures end)
   490     end;
   491 
   492 fun fork_pri pri e =
   493   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = pri, interrupts = true} e;
   494 
   495 fun fork e = fork_pri 0 e;
   496 
   497 
   498 (* join *)
   499 
   500 local
   501 
   502 fun get_result x =
   503   (case peek x of
   504     NONE => Exn.Exn (Fail "Unfinished future")
   505   | SOME res =>
   506       if Exn.is_interrupt_exn res then
   507         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   508           NONE => res
   509         | SOME exn => Exn.Exn exn)
   510       else res);
   511 
   512 fun join_next deps = (*requires SYNCHRONIZED*)
   513   if null deps then NONE
   514   else
   515     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   516       (NONE, []) => NONE
   517     | (NONE, deps') =>
   518         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   519     | (SOME work, deps') => SOME (work, deps'));
   520 
   521 fun execute_work NONE = ()
   522   | execute_work (SOME (work, deps')) =
   523       (worker_joining (fn () => worker_exec work); join_work deps')
   524 and join_work deps =
   525   Multithreading.with_attributes Multithreading.no_interrupts
   526     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   527 
   528 in
   529 
   530 fun join_results xs =
   531   let
   532     val _ =
   533       if forall is_finished xs then ()
   534       else if Multithreading.self_critical () then
   535         error "Cannot join future values within critical section"
   536       else if is_some (worker_task ()) then join_work (map task_of xs)
   537       else List.app (ignore o Single_Assignment.await o result_of) xs;
   538   in map get_result xs end;
   539 
   540 end;
   541 
   542 fun join_result x = singleton join_results x;
   543 fun joins xs = Par_Exn.release_all (join_results xs);
   544 fun join x = Exn.release (join_result x);
   545 
   546 
   547 (* fast-path versions -- bypassing task queue *)
   548 
   549 fun value_result (res: 'a Exn.result) =
   550   let
   551     val task = Task_Queue.dummy_task;
   552     val group = Task_Queue.group_of_task task;
   553     val result = Single_Assignment.var "value" : 'a result;
   554     val _ = assign_result group result res;
   555   in Future {promised = false, task = task, result = result} end;
   556 
   557 fun value x = value_result (Exn.Res x);
   558 
   559 fun cond_forks args es =
   560   if Multithreading.enabled () then forks args es
   561   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   562 
   563 fun map_future f x =
   564   let
   565     val task = task_of x;
   566     val group = new_group (SOME (Task_Queue.group_of_task task));
   567     val (result, job) = future_job group true (fn () => f (join x));
   568 
   569     val extended = SYNCHRONIZED "extend" (fn () =>
   570       (case Task_Queue.extend task job (! queue) of
   571         SOME queue' => (queue := queue'; true)
   572       | NONE => false));
   573   in
   574     if extended then Future {promised = false, task = task, result = result}
   575     else
   576       (singleton o cond_forks)
   577         {name = "map_future", group = SOME group, deps = [task],
   578           pri = Task_Queue.pri_of_task task, interrupts = true}
   579         (fn () => f (join x))
   580   end;
   581 
   582 
   583 (* promised futures -- fulfilled by external means *)
   584 
   585 fun promise_group group abort : 'a future =
   586   let
   587     val result = Single_Assignment.var "promise" : 'a result;
   588     fun assign () = assign_result group result Exn.interrupt_exn
   589       handle Fail _ => true
   590         | exn =>
   591             if Exn.is_interrupt exn
   592             then raise Fail "Concurrent attempt to fulfill promise"
   593             else reraise exn;
   594     fun job () =
   595       Multithreading.with_attributes Multithreading.no_interrupts
   596         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   597     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   598       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   599   in Future {promised = true, task = task, result = result} end;
   600 
   601 fun promise abort = promise_group (worker_subgroup ()) abort;
   602 
   603 fun fulfill_result (Future {promised, task, result}) res =
   604   if not promised then raise Fail "Not a promised future"
   605   else
   606     let
   607       val group = Task_Queue.group_of_task task;
   608       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   609       val _ =
   610         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   611           let
   612             val passive_job =
   613               SYNCHRONIZED "fulfill_result" (fn () =>
   614                 Unsynchronized.change_result queue
   615                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   616           in
   617             (case passive_job of
   618               SOME true => worker_exec (task, [job])
   619             | SOME false => ()
   620             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   621           end);
   622       val _ =
   623         if is_some (Single_Assignment.peek result) then ()
   624         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   625     in () end;
   626 
   627 fun fulfill x res = fulfill_result x (Exn.Res res);
   628 
   629 
   630 (* terminate *)
   631 
   632 fun terminate group =
   633   let
   634     val tasks =
   635       SYNCHRONIZED "terminate" (fn () =>
   636         let val _ = cancel_group_unsynchronized group;
   637         in Task_Queue.group_tasks (! queue) group end);
   638   in
   639     if null tasks then ()
   640     else
   641       (singleton o forks)
   642         {name = "terminate", group = SOME (new_group NONE),
   643           deps = tasks, pri = 0, interrupts = false} I
   644       |> join
   645   end;
   646 
   647 
   648 (* shutdown *)
   649 
   650 fun shutdown () =
   651   if Multithreading.available then
   652     SYNCHRONIZED "shutdown" (fn () =>
   653      while scheduler_active () do
   654       (wait scheduler_event; broadcast_work ()))
   655   else ();
   656 
   657 
   658 (* queue status *)
   659 
   660 fun queue_status () = Task_Queue.status (! queue);
   661 
   662 
   663 (*final declarations of this structure!*)
   664 val map = map_future;
   665 
   666 end;
   667 
   668 type 'a future = 'a Future.future;
   669