src/Pure/Concurrent/future.ML
author wenzelm
Fri, 19 Aug 2011 12:51:14 +0200
changeset 45170 e43f0ea90c9a
parent 45169 a0ddd5760444
child 45173 b8f8488704e2
permissions -rw-r--r--
more focused use of Multithreading.interrupted: retain interrupts within task group boundary, without loss of information;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes the immediate peers to be
    30     interrupted eventually (i.e. none by default).  Interrupted tasks
    31     that lack regular result information, will pick up parallel
    32     exceptions from the cumulative group context (as Par_Exn).
    33 
    34   * Promised "passive" futures are fulfilled by external means.  There
    35     is no associated evaluation task, but other futures can depend on
    36     them via regular join operations.
    37 *)
    38 
    39 signature FUTURE =
    40 sig
    41   val worker_task: unit -> Task_Queue.task option
    42   val worker_group: unit -> Task_Queue.group option
    43   val worker_subgroup: unit -> Task_Queue.group
    44   type 'a future
    45   val task_of: 'a future -> Task_Queue.task
    46   val peek: 'a future -> 'a Exn.result option
    47   val is_finished: 'a future -> bool
    48   val get_finished: 'a future -> 'a
    49   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    50   val cancel_group: Task_Queue.group -> unit
    51   val cancel: 'a future -> unit
    52   type fork_params =
    53    {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
    54     pri: int, interrupts: bool}
    55   val forks: fork_params -> (unit -> 'a) list -> 'a future list
    56   val fork_pri: int -> (unit -> 'a) -> 'a future
    57   val fork: (unit -> 'a) -> 'a future
    58   val join_results: 'a future list -> 'a Exn.result list
    59   val join_result: 'a future -> 'a Exn.result
    60   val join: 'a future -> 'a
    61   val value_result: 'a Exn.result -> 'a future
    62   val value: 'a -> 'a future
    63   val map: ('a -> 'b) -> 'a future -> 'b future
    64   val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
    65   val promise_group: Task_Queue.group -> 'a future
    66   val promise: unit -> 'a future
    67   val fulfill_result: 'a future -> 'a Exn.result -> unit
    68   val fulfill: 'a future -> 'a -> unit
    69   val shutdown: unit -> unit
    70   val status: (unit -> 'a) -> 'a
    71 end;
    72 
    73 structure Future: FUTURE =
    74 struct
    75 
    76 (** future values **)
    77 
    78 (* identifiers *)
    79 
    80 local
    81   val tag = Universal.tag () : Task_Queue.task option Universal.tag;
    82 in
    83   fun worker_task () = the_default NONE (Thread.getLocal tag);
    84   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    85 end;
    86 
    87 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    89 
    90 fun worker_joining e =
    91   (case worker_task () of
    92     NONE => e ()
    93   | SOME task => Task_Queue.joining task e);
    94 
    95 fun worker_waiting deps e =
    96   (case worker_task () of
    97     NONE => e ()
    98   | SOME task => Task_Queue.waiting task deps e);
    99 
   100 
   101 (* datatype future *)
   102 
   103 type 'a result = 'a Exn.result Single_Assignment.var;
   104 
   105 datatype 'a future = Future of
   106  {promised: bool,
   107   task: Task_Queue.task,
   108   result: 'a result};
   109 
   110 fun task_of (Future {task, ...}) = task;
   111 fun result_of (Future {result, ...}) = result;
   112 
   113 fun peek x = Single_Assignment.peek (result_of x);
   114 fun is_finished x = is_some (peek x);
   115 
   116 fun get_finished x =
   117   (case peek x of
   118     SOME res => Exn.release res
   119   | NONE => raise Fail "Unfinished future evaluation");
   120 
   121 
   122 
   123 (** scheduling **)
   124 
   125 (* synchronization *)
   126 
   127 val scheduler_event = ConditionVar.conditionVar ();
   128 val work_available = ConditionVar.conditionVar ();
   129 val work_finished = ConditionVar.conditionVar ();
   130 
   131 local
   132   val lock = Mutex.mutex ();
   133 in
   134 
   135 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   136 
   137 fun wait cond = (*requires SYNCHRONIZED*)
   138   Multithreading.sync_wait NONE NONE cond lock;
   139 
   140 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   141   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   142 
   143 fun signal cond = (*requires SYNCHRONIZED*)
   144   ConditionVar.signal cond;
   145 
   146 fun broadcast cond = (*requires SYNCHRONIZED*)
   147   ConditionVar.broadcast cond;
   148 
   149 fun broadcast_work () = (*requires SYNCHRONIZED*)
   150  (ConditionVar.broadcast work_available;
   151   ConditionVar.broadcast work_finished);
   152 
   153 end;
   154 
   155 
   156 (* global state *)
   157 
   158 val queue = Unsynchronized.ref Task_Queue.empty;
   159 val next = Unsynchronized.ref 0;
   160 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   161 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   162 val do_shutdown = Unsynchronized.ref false;
   163 val max_workers = Unsynchronized.ref 0;
   164 val max_active = Unsynchronized.ref 0;
   165 val worker_trend = Unsynchronized.ref 0;
   166 
   167 datatype worker_state = Working | Waiting | Sleeping;
   168 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   169 
   170 fun count_workers state = (*requires SYNCHRONIZED*)
   171   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   172 
   173 
   174 (* cancellation primitives *)
   175 
   176 fun interruptible_task f x =
   177   (if Multithreading.available then
   178     Multithreading.with_attributes
   179       (if is_some (worker_task ())
   180        then Multithreading.private_interrupts
   181        else Multithreading.public_interrupts)
   182       (fn _ => f x)
   183    else interruptible f x)
   184   before Multithreading.interrupted ();
   185 
   186 fun cancel_now group = (*requires SYNCHRONIZED*)
   187   Task_Queue.cancel (! queue) group;
   188 
   189 fun cancel_later group = (*requires SYNCHRONIZED*)
   190  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   191   broadcast scheduler_event);
   192 
   193 
   194 (* worker threads *)
   195 
   196 fun worker_exec (task, jobs) =
   197   let
   198     val group = Task_Queue.group_of_task task;
   199     val valid = not (Task_Queue.is_canceled group);
   200     val ok =
   201       Task_Queue.running task (fn () =>
   202         setmp_worker_task task (fn () =>
   203           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   204     val _ = Multithreading.tracing 2 (fn () =>
   205       let
   206         val s = Task_Queue.str_of_task_groups task;
   207         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   208         val (run, wait, deps) = Task_Queue.timing_of_task task;
   209       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   210     val _ = SYNCHRONIZED "finish" (fn () =>
   211       let
   212         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   213         val test = Exn.capture Multithreading.interrupted ();
   214         val _ =
   215           if ok andalso not (Exn.is_interrupt_exn test) then ()
   216           else if cancel_now group then ()
   217           else cancel_later group;
   218         val _ = broadcast work_finished;
   219         val _ = if maximal then () else signal work_available;
   220       in () end);
   221   in () end;
   222 
   223 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   224   let
   225     val state =
   226       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   227         SOME state => state
   228       | NONE => raise Fail "Unregistered worker thread");
   229     val _ = state := (if active then Waiting else Sleeping);
   230     val _ = wait cond;
   231     val _ = state := Working;
   232   in () end;
   233 
   234 fun worker_next () = (*requires SYNCHRONIZED*)
   235   if length (! workers) > ! max_workers then
   236     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   237      signal work_available;
   238      NONE)
   239   else if count_workers Working > ! max_active then
   240     (worker_wait false work_available; worker_next ())
   241   else
   242     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   243       NONE => (worker_wait false work_available; worker_next ())
   244     | some => (signal work_available; some));
   245 
   246 fun worker_loop name =
   247   (case SYNCHRONIZED name (fn () => worker_next ()) of
   248     NONE => ()
   249   | SOME work => (worker_exec work; worker_loop name));
   250 
   251 fun worker_start name = (*requires SYNCHRONIZED*)
   252   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   253     Unsynchronized.ref Working));
   254 
   255 
   256 (* scheduler *)
   257 
   258 val status_ticks = Unsynchronized.ref 0;
   259 
   260 val last_round = Unsynchronized.ref Time.zeroTime;
   261 val next_round = seconds 0.05;
   262 
   263 fun scheduler_next () = (*requires SYNCHRONIZED*)
   264   let
   265     val now = Time.now ();
   266     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   267     val _ = if tick then last_round := now else ();
   268 
   269 
   270     (* queue and worker status *)
   271 
   272     val _ =
   273       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   274     val _ =
   275       if tick andalso ! status_ticks = 0 then
   276         Multithreading.tracing 1 (fn () =>
   277           let
   278             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   279             val total = length (! workers);
   280             val active = count_workers Working;
   281             val waiting = count_workers Waiting;
   282           in
   283             "SCHEDULE " ^ Time.toString now ^ ": " ^
   284               string_of_int ready ^ " ready, " ^
   285               string_of_int pending ^ " pending, " ^
   286               string_of_int running ^ " running, " ^
   287               string_of_int passive ^ " passive; " ^
   288               string_of_int total ^ " workers, " ^
   289               string_of_int active ^ " active, " ^
   290               string_of_int waiting ^ " waiting "
   291           end)
   292       else ();
   293 
   294     val _ =
   295       if forall (Thread.isActive o #1) (! workers) then ()
   296       else
   297         let
   298           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   299           val _ = workers := alive;
   300         in
   301           Multithreading.tracing 0 (fn () =>
   302             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   303         end;
   304 
   305 
   306     (* worker pool adjustments *)
   307 
   308     val max_active0 = ! max_active;
   309     val max_workers0 = ! max_workers;
   310 
   311     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   312     val _ = max_active := m;
   313 
   314     val mm =
   315       if ! do_shutdown then 0
   316       else if m = 9999 then 1
   317       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   318     val _ =
   319       if tick andalso mm > ! max_workers then
   320         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   321       else if tick andalso mm < ! max_workers then
   322         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   323       else ();
   324     val _ =
   325       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   326         max_workers := mm
   327       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   328         max_workers := Int.min (mm, 2 * m)
   329       else ();
   330 
   331     val missing = ! max_workers - length (! workers);
   332     val _ =
   333       if missing > 0 then
   334         funpow missing (fn () =>
   335           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   336       else ();
   337 
   338     val _ =
   339       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   340       else signal work_available;
   341 
   342 
   343     (* canceled groups *)
   344 
   345     val _ =
   346       if null (! canceled) then ()
   347       else
   348        (Multithreading.tracing 1 (fn () =>
   349           string_of_int (length (! canceled)) ^ " canceled groups");
   350         Unsynchronized.change canceled (filter_out cancel_now);
   351         broadcast_work ());
   352 
   353 
   354     (* delay loop *)
   355 
   356     val _ = Exn.release (wait_timeout next_round scheduler_event);
   357 
   358 
   359     (* shutdown *)
   360 
   361     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   362     val continue = not (! do_shutdown andalso null (! workers));
   363     val _ = if continue then () else scheduler := NONE;
   364 
   365     val _ = broadcast scheduler_event;
   366   in continue end
   367   handle exn =>
   368     if Exn.is_interrupt exn then
   369      (Multithreading.tracing 1 (fn () => "Interrupt");
   370       List.app cancel_later (Task_Queue.cancel_all (! queue));
   371       broadcast_work (); true)
   372     else reraise exn;
   373 
   374 fun scheduler_loop () =
   375  (while
   376     Multithreading.with_attributes
   377       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   378       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   379   do (); last_round := Time.zeroTime);
   380 
   381 fun scheduler_active () = (*requires SYNCHRONIZED*)
   382   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   383 
   384 fun scheduler_check () = (*requires SYNCHRONIZED*)
   385  (do_shutdown := false;
   386   if scheduler_active () then ()
   387   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   388 
   389 
   390 
   391 (** futures **)
   392 
   393 (* cancellation *)
   394 
   395 (*cancel: present and future group members will be interrupted eventually*)
   396 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   397  (if cancel_now group then () else cancel_later group;
   398   signal work_available; scheduler_check ()));
   399 
   400 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   401 
   402 
   403 (* future jobs *)
   404 
   405 fun assign_result group result raw_res =
   406   let
   407     val res =
   408       (case raw_res of
   409         Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
   410       | _ => raw_res);
   411     val _ = Single_Assignment.assign result res
   412       handle exn as Fail _ =>
   413         (case Single_Assignment.peek result of
   414           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   415         | _ => reraise exn);
   416     val ok =
   417       (case the (Single_Assignment.peek result) of
   418         Exn.Exn exn =>
   419           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   420       | Exn.Res _ => true);
   421   in ok end;
   422 
   423 fun future_job group interrupts (e: unit -> 'a) =
   424   let
   425     val result = Single_Assignment.var "future" : 'a result;
   426     val pos = Position.thread_data ();
   427     fun job ok =
   428       let
   429         val res =
   430           if ok then
   431             Exn.capture (fn () =>
   432               Multithreading.with_attributes
   433                 (if interrupts
   434                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   435                 (fn _ => Position.setmp_thread_data pos e ())) ()
   436           else Exn.interrupt_exn;
   437       in assign_result group result res end;
   438   in (result, job) end;
   439 
   440 
   441 (* fork *)
   442 
   443 type fork_params =
   444  {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
   445   pri: int, interrupts: bool};
   446 
   447 fun forks ({name, group, deps, pri, interrupts}: fork_params) es =
   448   if null es then []
   449   else
   450     let
   451       val grp =
   452         (case group of
   453           NONE => worker_subgroup ()
   454         | SOME grp => grp);
   455       fun enqueue e queue =
   456         let
   457           val (result, job) = future_job grp interrupts e;
   458           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   459           val future = Future {promised = false, task = task, result = result};
   460         in (future, queue') end;
   461     in
   462       SYNCHRONIZED "enqueue" (fn () =>
   463         let
   464           val (futures, queue') = fold_map enqueue es (! queue);
   465           val _ = queue := queue';
   466           val minimal = forall (not o Task_Queue.known_task queue') deps;
   467           val _ = if minimal then signal work_available else ();
   468           val _ = scheduler_check ();
   469         in futures end)
   470     end;
   471 
   472 fun fork_pri pri e =
   473   singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e;
   474 
   475 fun fork e = fork_pri 0 e;
   476 
   477 
   478 (* join *)
   479 
   480 local
   481 
   482 fun get_result x =
   483   (case peek x of
   484     NONE => Exn.Exn (Fail "Unfinished future")
   485   | SOME res =>
   486       if Exn.is_interrupt_exn res then
   487         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   488           NONE => res
   489         | SOME exn => Exn.Exn exn)
   490       else res);
   491 
   492 fun join_next deps = (*requires SYNCHRONIZED*)
   493   if null deps then NONE
   494   else
   495     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   496       (NONE, []) => NONE
   497     | (NONE, deps') =>
   498         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   499     | (SOME work, deps') => SOME (work, deps'));
   500 
   501 fun execute_work NONE = ()
   502   | execute_work (SOME (work, deps')) =
   503       (worker_joining (fn () => worker_exec work); join_work deps')
   504 and join_work deps =
   505   Multithreading.with_attributes Multithreading.no_interrupts
   506     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   507 
   508 in
   509 
   510 fun join_results xs =
   511   let
   512     val _ =
   513       if forall is_finished xs then ()
   514       else if Multithreading.self_critical () then
   515         error "Cannot join future values within critical section"
   516       else if is_some (worker_task ()) then join_work (map task_of xs)
   517       else List.app (ignore o Single_Assignment.await o result_of) xs;
   518   in map get_result xs end;
   519 
   520 end;
   521 
   522 fun join_result x = singleton join_results x;
   523 fun join x = Exn.release (join_result x);
   524 
   525 
   526 (* fast-path versions -- bypassing task queue *)
   527 
   528 fun value_result (res: 'a Exn.result) =
   529   let
   530     val task = Task_Queue.dummy_task ();
   531     val group = Task_Queue.group_of_task task;
   532     val result = Single_Assignment.var "value" : 'a result;
   533     val _ = assign_result group result res;
   534   in Future {promised = false, task = task, result = result} end;
   535 
   536 fun value x = value_result (Exn.Res x);
   537 
   538 fun map_future f x =
   539   let
   540     val task = task_of x;
   541     val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
   542     val (result, job) = future_job group true (fn () => f (join x));
   543 
   544     val extended = SYNCHRONIZED "extend" (fn () =>
   545       (case Task_Queue.extend task job (! queue) of
   546         SOME queue' => (queue := queue'; true)
   547       | NONE => false));
   548   in
   549     if extended then Future {promised = false, task = task, result = result}
   550     else
   551       singleton
   552         (forks {name = "Future.map", group = SOME group, deps = [task],
   553           pri = Task_Queue.pri_of_task task, interrupts = true})
   554         (fn () => f (join x))
   555   end;
   556 
   557 fun cond_forks args es =
   558   if Multithreading.enabled () then forks args es
   559   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   560 
   561 
   562 (* promised futures -- fulfilled by external means *)
   563 
   564 fun promise_group group : 'a future =
   565   let
   566     val result = Single_Assignment.var "promise" : 'a result;
   567     fun abort () = assign_result group result Exn.interrupt_exn
   568       handle Fail _ => true
   569         | exn =>
   570             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   571             else reraise exn;
   572     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   573       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   574   in Future {promised = true, task = task, result = result} end;
   575 
   576 fun promise () = promise_group (worker_subgroup ());
   577 
   578 fun fulfill_result (Future {promised, task, result}) res =
   579   if not promised then raise Fail "Not a promised future"
   580   else
   581     let
   582       val group = Task_Queue.group_of_task task;
   583       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   584       val _ =
   585         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   586           let
   587             val still_passive =
   588               SYNCHRONIZED "fulfill_result" (fn () =>
   589                 Unsynchronized.change_result queue
   590                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   591           in if still_passive then worker_exec (task, [job]) else () end);
   592       val _ =
   593         if is_some (Single_Assignment.peek result) then ()
   594         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   595     in () end;
   596 
   597 fun fulfill x res = fulfill_result x (Exn.Res res);
   598 
   599 
   600 (* shutdown *)
   601 
   602 fun shutdown () =
   603   if Multithreading.available then
   604     SYNCHRONIZED "shutdown" (fn () =>
   605      while scheduler_active () do
   606       (wait scheduler_event; broadcast_work ()))
   607   else ();
   608 
   609 
   610 (* status markup *)
   611 
   612 fun status e =
   613   let
   614     val task_props =
   615       (case worker_task () of
   616         NONE => I
   617       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   618     val _ = Output.status (Markup.markup_only (task_props Markup.forked));
   619     val x = e ();  (*sic -- report "joined" only for success*)
   620     val _ = Output.status (Markup.markup_only (task_props Markup.joined));
   621   in x end;
   622 
   623 
   624 (*final declarations of this structure!*)
   625 val map = map_future;
   626 
   627 end;
   628 
   629 type 'a future = 'a Future.future;
   630