src/Pure/Concurrent/future.ML
author wenzelm
Thu, 05 Dec 2013 17:58:03 +0100
changeset 56013 d64a4ef26edb
parent 55298 99b9249b3e05
parent 55760 890e983cb07b
child 57675 38f1422ef473
permissions -rw-r--r--
merged, resolving obvious conflicts in NEWS and src/Pure/System/isabelle_process.ML;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes peer and subgroup members to be
    30     interrupted eventually.  Interrupted tasks that lack regular
    31     result information, will pick up parallel exceptions from the
    32     cumulative group context (as Par_Exn).
    33 
    34   * Future task groups may be canceled: present and future group
    35     members will be interrupted eventually.
    36 
    37   * Promised "passive" futures are fulfilled by external means.  There
    38     is no associated evaluation task, but other futures can depend on
    39     them via regular join operations.
    40 *)
    41 
    42 signature FUTURE =
    43 sig
    44   type task = Task_Queue.task
    45   type group = Task_Queue.group
    46   val new_group: group option -> group
    47   val worker_task: unit -> task option
    48   val worker_group: unit -> group option
    49   val the_worker_group: unit -> group
    50   val worker_subgroup: unit -> group
    51   type 'a future
    52   val task_of: 'a future -> task
    53   val peek: 'a future -> 'a Exn.result option
    54   val is_finished: 'a future -> bool
    55   val ML_statistics: bool Unsynchronized.ref
    56   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    57   val cancel_group: group -> unit
    58   val cancel: 'a future -> unit
    59   val error_msg: Position.T -> (serial * string) * string option -> unit
    60   val identify_result: Position.T -> 'a Exn.result -> 'a Exn.result
    61   type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool}
    62   val default_params: params
    63   val forks: params -> (unit -> 'a) list -> 'a future list
    64   val fork: (unit -> 'a) -> 'a future
    65   val join_results: 'a future list -> 'a Exn.result list
    66   val join_result: 'a future -> 'a Exn.result
    67   val joins: 'a future list -> 'a list
    68   val join: 'a future -> 'a
    69   val join_tasks: task list -> unit
    70   val task_context: string -> group -> ('a -> 'b) -> 'a -> 'b
    71   val value_result: 'a Exn.result -> 'a future
    72   val value: 'a -> 'a future
    73   val cond_forks: params -> (unit -> 'a) list -> 'a future list
    74   val map: ('a -> 'b) -> 'a future -> 'b future
    75   val promise_group: group -> (unit -> unit) -> 'a future
    76   val promise: (unit -> unit) -> 'a future
    77   val fulfill_result: 'a future -> 'a Exn.result -> unit
    78   val fulfill: 'a future -> 'a -> unit
    79   val group_snapshot: group -> task list
    80   val terminate: group -> unit
    81   val shutdown: unit -> unit
    82 end;
    83 
    84 structure Future: FUTURE =
    85 struct
    86 
    87 (** future values **)
    88 
    89 type task = Task_Queue.task;
    90 type group = Task_Queue.group;
    91 val new_group = Task_Queue.new_group;
    92 
    93 
    94 (* identifiers *)
    95 
    96 local
    97   val tag = Universal.tag () : task option Universal.tag;
    98 in
    99   fun worker_task () = the_default NONE (Thread.getLocal tag);
   100   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
   101 end;
   102 
   103 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
   104 
   105 fun the_worker_group () =
   106   (case worker_group () of
   107     SOME group => group
   108   | NONE => raise Fail "Missing worker thread context");
   109 
   110 fun worker_subgroup () = new_group (worker_group ());
   111 
   112 fun worker_joining e =
   113   (case worker_task () of
   114     NONE => e ()
   115   | SOME task => Task_Queue.joining task e);
   116 
   117 fun worker_waiting deps e =
   118   (case worker_task () of
   119     NONE => e ()
   120   | SOME task => Task_Queue.waiting task deps e);
   121 
   122 
   123 (* datatype future *)
   124 
   125 type 'a result = 'a Exn.result Single_Assignment.var;
   126 
   127 datatype 'a future = Future of
   128  {promised: bool,
   129   task: task,
   130   result: 'a result};
   131 
   132 fun task_of (Future {task, ...}) = task;
   133 fun result_of (Future {result, ...}) = result;
   134 
   135 fun peek x = Single_Assignment.peek (result_of x);
   136 fun is_finished x = is_some (peek x);
   137 
   138 
   139 
   140 (** scheduling **)
   141 
   142 (* synchronization *)
   143 
   144 val scheduler_event = ConditionVar.conditionVar ();
   145 val work_available = ConditionVar.conditionVar ();
   146 val work_finished = ConditionVar.conditionVar ();
   147 
   148 local
   149   val lock = Mutex.mutex ();
   150 in
   151 
   152 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   153 
   154 fun wait cond = (*requires SYNCHRONIZED*)
   155   Multithreading.sync_wait NONE NONE cond lock;
   156 
   157 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   158   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   159 
   160 fun signal cond = (*requires SYNCHRONIZED*)
   161   ConditionVar.signal cond;
   162 
   163 fun broadcast cond = (*requires SYNCHRONIZED*)
   164   ConditionVar.broadcast cond;
   165 
   166 end;
   167 
   168 
   169 (* global state *)
   170 
   171 val queue = Unsynchronized.ref Task_Queue.empty;
   172 val next = Unsynchronized.ref 0;
   173 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   174 val canceled = Unsynchronized.ref ([]: group list);
   175 val do_shutdown = Unsynchronized.ref false;
   176 val max_workers = Unsynchronized.ref 0;
   177 val max_active = Unsynchronized.ref 0;
   178 val worker_trend = Unsynchronized.ref 0;
   179 
   180 val status_ticks = Unsynchronized.ref 0;
   181 val last_round = Unsynchronized.ref Time.zeroTime;
   182 val next_round = seconds 0.05;
   183 
   184 datatype worker_state = Working | Waiting | Sleeping;
   185 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   186 
   187 fun count_workers state = (*requires SYNCHRONIZED*)
   188   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   189 
   190 
   191 
   192 (* status *)
   193 
   194 val ML_statistics = Unsynchronized.ref false;
   195 
   196 fun report_status () = (*requires SYNCHRONIZED*)
   197   if ! ML_statistics then
   198     let
   199       val {ready, pending, running, passive} = Task_Queue.status (! queue);
   200       val total = length (! workers);
   201       val active = count_workers Working;
   202       val waiting = count_workers Waiting;
   203       val stats =
   204        [("now", Markup.print_real (Time.toReal (Time.now ()))),
   205         ("tasks_ready", Markup.print_int ready),
   206         ("tasks_pending", Markup.print_int pending),
   207         ("tasks_running", Markup.print_int running),
   208         ("tasks_passive", Markup.print_int passive),
   209         ("workers_total", Markup.print_int total),
   210         ("workers_active", Markup.print_int active),
   211         ("workers_waiting", Markup.print_int waiting)] @
   212         ML_Statistics.get ();
   213     in Output.try_protocol_message (Markup.ML_statistics :: stats) "" end
   214   else ();
   215 
   216 
   217 (* cancellation primitives *)
   218 
   219 fun cancel_now group = (*requires SYNCHRONIZED*)
   220   let
   221     val running = Task_Queue.cancel (! queue) group;
   222     val _ = running |> List.app (fn thread =>
   223       if Simple_Thread.is_self thread then ()
   224       else Simple_Thread.interrupt_unsynchronized thread);
   225   in running end;
   226 
   227 fun cancel_all () = (*requires SYNCHRONIZED*)
   228   let
   229     val (groups, threads) = Task_Queue.cancel_all (! queue);
   230     val _ = List.app Simple_Thread.interrupt_unsynchronized threads;
   231   in groups end;
   232 
   233 fun cancel_later group = (*requires SYNCHRONIZED*)
   234  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   235   broadcast scheduler_event);
   236 
   237 fun interruptible_task f x =
   238   (if Multithreading.available then
   239     Multithreading.with_attributes
   240       (if is_some (worker_task ())
   241        then Multithreading.private_interrupts
   242        else Multithreading.public_interrupts)
   243       (fn _ => f x)
   244    else interruptible f x)
   245   before Multithreading.interrupted ();
   246 
   247 
   248 (* worker threads *)
   249 
   250 fun worker_exec (task, jobs) =
   251   let
   252     val group = Task_Queue.group_of_task task;
   253     val valid = not (Task_Queue.is_canceled group);
   254     val ok =
   255       Task_Queue.running task (fn () =>
   256         setmp_worker_task task (fn () =>
   257           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   258     val _ =
   259       if ! Multithreading.trace >= 2 then
   260         Output.try_protocol_message (Markup.task_statistics :: Task_Queue.task_statistics task) ""
   261       else ();
   262     val _ = SYNCHRONIZED "finish" (fn () =>
   263       let
   264         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   265         val test = Exn.capture Multithreading.interrupted ();
   266         val _ =
   267           if ok andalso not (Exn.is_interrupt_exn test) then ()
   268           else if null (cancel_now group) then ()
   269           else cancel_later group;
   270         val _ = broadcast work_finished;
   271         val _ = if maximal then () else signal work_available;
   272       in () end);
   273   in () end;
   274 
   275 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   276   (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   277     SOME state =>
   278      (state := (if active then Waiting else Sleeping);
   279       wait cond;
   280       state := Working)
   281   | NONE => ignore (wait cond));
   282 
   283 fun worker_next () = (*requires SYNCHRONIZED*)
   284   if length (! workers) > ! max_workers then
   285     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   286      signal work_available;
   287      NONE)
   288   else if count_workers Working > ! max_active then
   289     (worker_wait false work_available; worker_next ())
   290   else
   291     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   292       NONE => (worker_wait false work_available; worker_next ())
   293     | some => (signal work_available; some));
   294 
   295 fun worker_loop name =
   296   (case SYNCHRONIZED name (fn () => worker_next ()) of
   297     NONE => ()
   298   | SOME work => (worker_exec work; worker_loop name));
   299 
   300 fun worker_start name = (*requires SYNCHRONIZED*)
   301   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   302     Unsynchronized.ref Working));
   303 
   304 
   305 (* scheduler *)
   306 
   307 fun scheduler_next () = (*requires SYNCHRONIZED*)
   308   let
   309     val now = Time.now ();
   310     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   311     val _ = if tick then last_round := now else ();
   312 
   313 
   314     (* runtime status *)
   315 
   316     val _ =
   317       if tick then Unsynchronized.change status_ticks (fn i => i + 1) else ();
   318     val _ =
   319       if tick andalso ! status_ticks mod (if ! Multithreading.trace >= 1 then 2 else 10) = 0
   320       then report_status () else ();
   321 
   322     val _ =
   323       if forall (Thread.isActive o #1) (! workers) then ()
   324       else
   325         let
   326           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   327           val _ = workers := alive;
   328         in
   329           Multithreading.tracing 0 (fn () =>
   330             "SCHEDULER: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   331         end;
   332 
   333 
   334     (* worker pool adjustments *)
   335 
   336     val max_active0 = ! max_active;
   337     val max_workers0 = ! max_workers;
   338 
   339     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   340     val _ = max_active := m;
   341 
   342     val mm =
   343       if ! do_shutdown then 0
   344       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   345     val _ =
   346       if tick andalso mm > ! max_workers then
   347         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   348       else if tick andalso mm < ! max_workers then
   349         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   350       else ();
   351     val _ =
   352       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   353         max_workers := mm
   354       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   355         max_workers := Int.min (mm, 2 * m)
   356       else ();
   357 
   358     val missing = ! max_workers - length (! workers);
   359     val _ =
   360       if missing > 0 then
   361         funpow missing (fn () =>
   362           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   363       else ();
   364 
   365     val _ =
   366       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   367       else signal work_available;
   368 
   369 
   370     (* canceled groups *)
   371 
   372     val _ =
   373       if null (! canceled) then ()
   374       else
   375        (Multithreading.tracing 1 (fn () =>
   376           string_of_int (length (! canceled)) ^ " canceled groups");
   377         Unsynchronized.change canceled (filter_out (null o cancel_now));
   378         signal work_available);
   379 
   380 
   381     (* delay loop *)
   382 
   383     val _ = Exn.release (wait_timeout next_round scheduler_event);
   384 
   385 
   386     (* shutdown *)
   387 
   388     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   389     val continue = not (! do_shutdown andalso null (! workers));
   390     val _ = if continue then () else (report_status (); scheduler := NONE);
   391 
   392     val _ = broadcast scheduler_event;
   393   in continue end
   394   handle exn =>
   395     if Exn.is_interrupt exn then
   396      (Multithreading.tracing 1 (fn () => "SCHEDULER: Interrupt");
   397       List.app cancel_later (cancel_all ());
   398       signal work_available; true)
   399     else reraise exn;
   400 
   401 fun scheduler_loop () =
   402  (while
   403     Multithreading.with_attributes
   404       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   405       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   406   do (); last_round := Time.zeroTime);
   407 
   408 fun scheduler_active () = (*requires SYNCHRONIZED*)
   409   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   410 
   411 fun scheduler_check () = (*requires SYNCHRONIZED*)
   412  (do_shutdown := false;
   413   if scheduler_active () then ()
   414   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   415 
   416 
   417 
   418 (** futures **)
   419 
   420 (* cancel *)
   421 
   422 fun cancel_group_unsynchronized group = (*requires SYNCHRONIZED*)
   423   let
   424     val _ = if null (cancel_now group) then () else cancel_later group;
   425     val _ = signal work_available;
   426     val _ = scheduler_check ();
   427   in () end;
   428 
   429 fun cancel_group group =
   430   SYNCHRONIZED "cancel_group" (fn () => cancel_group_unsynchronized group);
   431 
   432 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   433 
   434 
   435 (* results *)
   436 
   437 fun error_msg pos ((serial, msg), exec_id) =
   438   Position.setmp_thread_data pos (fn () =>
   439     let val id = Position.get_id pos in
   440       if is_none id orelse is_none exec_id orelse id = exec_id
   441       then Output.error_message' (serial, msg) else ()
   442     end) ();
   443 
   444 fun identify_result pos res =
   445   (case res of
   446     Exn.Exn exn =>
   447       let val exec_id =
   448         (case Position.get_id pos of
   449           NONE => []
   450         | SOME id => [(Markup.exec_idN, id)])
   451       in Exn.Exn (Par_Exn.identify exec_id exn) end
   452   | _ => res);
   453 
   454 fun assign_result group result res =
   455   let
   456     val _ = Single_Assignment.assign result res
   457       handle exn as Fail _ =>
   458         (case Single_Assignment.peek result of
   459           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   460         | _ => reraise exn);
   461     val ok =
   462       (case the (Single_Assignment.peek result) of
   463         Exn.Exn exn =>
   464           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   465       | Exn.Res _ => true);
   466   in ok end;
   467 
   468 
   469 (* future jobs *)
   470 
   471 fun future_job group atts (e: unit -> 'a) =
   472   let
   473     val result = Single_Assignment.var "future" : 'a result;
   474     val pos = Position.thread_data ();
   475     fun job ok =
   476       let
   477         val res =
   478           if ok then
   479             Exn.capture (fn () =>
   480               Multithreading.with_attributes atts (fn _ => Position.setmp_thread_data pos e ())) ()
   481           else Exn.interrupt_exn;
   482       in assign_result group result (identify_result pos res) end;
   483   in (result, job) end;
   484 
   485 
   486 (* fork *)
   487 
   488 type params = {name: string, group: group option, deps: task list, pri: int, interrupts: bool};
   489 val default_params: params = {name = "", group = NONE, deps = [], pri = 0, interrupts = true};
   490 
   491 fun forks ({name, group, deps, pri, interrupts}: params) es =
   492   if null es then []
   493   else
   494     let
   495       val grp =
   496         (case group of
   497           NONE => worker_subgroup ()
   498         | SOME grp => grp);
   499       fun enqueue e queue =
   500         let
   501           val atts =
   502             if interrupts
   503             then Multithreading.private_interrupts
   504             else Multithreading.no_interrupts;
   505           val (result, job) = future_job grp atts e;
   506           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   507           val future = Future {promised = false, task = task, result = result};
   508         in (future, queue') end;
   509     in
   510       SYNCHRONIZED "enqueue" (fn () =>
   511         let
   512           val (futures, queue') = fold_map enqueue es (! queue);
   513           val _ = queue := queue';
   514           val minimal = forall (not o Task_Queue.known_task queue') deps;
   515           val _ = if minimal then signal work_available else ();
   516           val _ = scheduler_check ();
   517         in futures end)
   518     end;
   519 
   520 fun fork e =
   521   (singleton o forks) {name = "fork", group = NONE, deps = [], pri = 0, interrupts = true} e;
   522 
   523 
   524 (* join *)
   525 
   526 fun get_result x =
   527   (case peek x of
   528     NONE => Exn.Exn (Fail "Unfinished future")
   529   | SOME res =>
   530       if Exn.is_interrupt_exn res then
   531         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   532           [] => res
   533         | exns => Exn.Exn (Par_Exn.make exns))
   534       else res);
   535 
   536 local
   537 
   538 fun join_next deps = (*requires SYNCHRONIZED*)
   539   if null deps then NONE
   540   else
   541     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   542       (NONE, []) => NONE
   543     | (NONE, deps') =>
   544         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   545     | (SOME work, deps') => SOME (work, deps'));
   546 
   547 fun execute_work NONE = ()
   548   | execute_work (SOME (work, deps')) =
   549       (worker_joining (fn () => worker_exec work); join_work deps')
   550 and join_work deps =
   551   Multithreading.with_attributes Multithreading.no_interrupts
   552     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   553 
   554 in
   555 
   556 fun join_results xs =
   557   let
   558     val _ =
   559       if forall is_finished xs then ()
   560       else if Multithreading.self_critical () then
   561         raise Fail "Cannot join future values within critical section"
   562       else if is_some (worker_task ()) then join_work (map task_of xs)
   563       else List.app (ignore o Single_Assignment.await o result_of) xs;
   564   in map get_result xs end;
   565 
   566 end;
   567 
   568 fun join_result x = singleton join_results x;
   569 fun joins xs = Par_Exn.release_all (join_results xs);
   570 fun join x = Exn.release (join_result x);
   571 
   572 fun join_tasks tasks =
   573   if null tasks then ()
   574   else
   575     (singleton o forks)
   576       {name = "join_tasks", group = SOME (new_group NONE),
   577         deps = tasks, pri = 0, interrupts = false} I
   578     |> join;
   579 
   580 
   581 (* task context for running thread *)
   582 
   583 fun task_context name group f x =
   584   Multithreading.with_attributes Multithreading.no_interrupts (fn orig_atts =>
   585     let
   586       val (result, job) = future_job group orig_atts (fn () => f x);
   587       val task =
   588         SYNCHRONIZED "enroll" (fn () =>
   589           Unsynchronized.change_result queue (Task_Queue.enroll (Thread.self ()) name group));
   590       val _ = worker_exec (task, [job]);
   591     in
   592       (case Single_Assignment.peek result of
   593         NONE => raise Fail "Missing task context result"
   594       | SOME res => Exn.release res)
   595     end);
   596 
   597 
   598 (* fast-path operations -- bypass task queue if possible *)
   599 
   600 fun value_result (res: 'a Exn.result) =
   601   let
   602     val task = Task_Queue.dummy_task;
   603     val group = Task_Queue.group_of_task task;
   604     val result = Single_Assignment.var "value" : 'a result;
   605     val _ = assign_result group result (identify_result (Position.thread_data ()) res);
   606   in Future {promised = false, task = task, result = result} end;
   607 
   608 fun value x = value_result (Exn.Res x);
   609 
   610 fun cond_forks args es =
   611   if Multithreading.enabled () then forks args es
   612   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   613 
   614 fun map_future f x =
   615   if is_finished x then value_result (Exn.interruptible_capture (f o join) x)
   616   else
   617     let
   618       val task = task_of x;
   619       val group = Task_Queue.group_of_task task;
   620       val (result, job) =
   621         future_job group Multithreading.private_interrupts (fn () => f (join x));
   622 
   623       val extended = SYNCHRONIZED "extend" (fn () =>
   624         (case Task_Queue.extend task job (! queue) of
   625           SOME queue' => (queue := queue'; true)
   626         | NONE => false));
   627     in
   628       if extended then Future {promised = false, task = task, result = result}
   629       else
   630         (singleton o cond_forks)
   631           {name = "map_future", group = SOME group, deps = [task],
   632             pri = Task_Queue.pri_of_task task, interrupts = true}
   633           (fn () => f (join x))
   634     end;
   635 
   636 
   637 (* promised futures -- fulfilled by external means *)
   638 
   639 fun promise_group group abort : 'a future =
   640   let
   641     val result = Single_Assignment.var "promise" : 'a result;
   642     fun assign () = assign_result group result Exn.interrupt_exn
   643       handle Fail _ => true
   644         | exn =>
   645             if Exn.is_interrupt exn
   646             then raise Fail "Concurrent attempt to fulfill promise"
   647             else reraise exn;
   648     fun job () =
   649       Multithreading.with_attributes Multithreading.no_interrupts
   650         (fn _ => Exn.release (Exn.capture assign () before abort ()));
   651     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   652       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
   653   in Future {promised = true, task = task, result = result} end;
   654 
   655 fun promise abort = promise_group (worker_subgroup ()) abort;
   656 
   657 fun fulfill_result (Future {promised, task, result}) res =
   658   if not promised then raise Fail "Not a promised future"
   659   else
   660     let
   661       val group = Task_Queue.group_of_task task;
   662       val pos = Position.thread_data ();
   663       fun job ok =
   664         assign_result group result (if ok then identify_result pos res else Exn.interrupt_exn);
   665       val _ =
   666         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   667           let
   668             val passive_job =
   669               SYNCHRONIZED "fulfill_result" (fn () =>
   670                 Unsynchronized.change_result queue
   671                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   672           in
   673             (case passive_job of
   674               SOME true => worker_exec (task, [job])
   675             | SOME false => ()
   676             | NONE => ignore (job (not (Task_Queue.is_canceled group))))
   677           end);
   678       val _ =
   679         if is_some (Single_Assignment.peek result) then ()
   680         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   681     in () end;
   682 
   683 fun fulfill x res = fulfill_result x (Exn.Res res);
   684 
   685 
   686 (* group snapshot *)
   687 
   688 fun group_snapshot group =
   689   SYNCHRONIZED "group_snapshot" (fn () =>
   690     Task_Queue.group_tasks (! queue) group);
   691 
   692 
   693 (* terminate *)
   694 
   695 fun terminate group =
   696   SYNCHRONIZED "terminate" (fn () =>
   697     let val _ = cancel_group_unsynchronized group;
   698     in Task_Queue.group_tasks (! queue) group end)
   699   |> join_tasks;
   700 
   701 
   702 (* shutdown *)
   703 
   704 fun shutdown () =
   705   if not Multithreading.available then ()
   706   else if is_some (worker_task ()) then
   707     raise Fail "Cannot shutdown while running as worker thread"
   708   else
   709     SYNCHRONIZED "shutdown" (fn () =>
   710       while scheduler_active () do
   711        (Multithreading.tracing 1 (fn () => "SHUTDOWN: wait");
   712         wait scheduler_event));
   713 
   714 
   715 (*final declarations of this structure!*)
   716 val map = map_future;
   717 
   718 end;
   719 
   720 type 'a future = 'a Future.future;
   721