src/Pure/Concurrent/future.ML
author wenzelm
Fri, 19 Aug 2011 12:03:44 +0200
changeset 45169 a0ddd5760444
parent 45151 f6a11c1da821
child 45170 e43f0ea90c9a
permissions -rw-r--r--
clarified Future.cond_forks: more uniform handling of exceptional situations;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Value-oriented parallelism via futures and promises.  See also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join.  The idea is to model parallel
    12     value-oriented computations (not communicating processes).
    13 
    14   * Forked futures are evaluated spontaneously by a farm of worker
    15     threads in the background; join resynchronizes the computation and
    16     delivers results (values or exceptions).
    17 
    18   * The pool of worker threads is limited, usually in correlation with
    19     the number of physical cores on the machine.  Note that allocation
    20     of runtime resources may be distorted either if workers yield CPU
    21     time (e.g. via system sleep or wait operations), or if non-worker
    22     threads contend for significant runtime resources independently.
    23     There is a limited number of replacement worker threads that get
    24     activated in certain explicit wait conditions.
    25 
    26   * Future tasks are organized in groups, which are block-structured.
    27     When forking a new new task, the default is to open an individual
    28     subgroup, unless some common group is specified explicitly.
    29     Failure of one group member causes the immediate peers to be
    30     interrupted eventually (i.e. none by default).  Interrupted tasks
    31     that lack regular result information, will pick up parallel
    32     exceptions from the cumulative group context (as Par_Exn).
    33 
    34   * Promised "passive" futures are fulfilled by external means.  There
    35     is no associated evaluation task, but other futures can depend on
    36     them via regular join operations.
    37 *)
    38 
    39 signature FUTURE =
    40 sig
    41   val worker_task: unit -> Task_Queue.task option
    42   val worker_group: unit -> Task_Queue.group option
    43   val worker_subgroup: unit -> Task_Queue.group
    44   type 'a future
    45   val task_of: 'a future -> Task_Queue.task
    46   val peek: 'a future -> 'a Exn.result option
    47   val is_finished: 'a future -> bool
    48   val get_finished: 'a future -> 'a
    49   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    50   val cancel_group: Task_Queue.group -> unit
    51   val cancel: 'a future -> unit
    52   type fork_params =
    53    {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
    54     pri: int, interrupts: bool}
    55   val forks: fork_params -> (unit -> 'a) list -> 'a future list
    56   val fork_pri: int -> (unit -> 'a) -> 'a future
    57   val fork: (unit -> 'a) -> 'a future
    58   val join_results: 'a future list -> 'a Exn.result list
    59   val join_result: 'a future -> 'a Exn.result
    60   val join: 'a future -> 'a
    61   val value_result: 'a Exn.result -> 'a future
    62   val value: 'a -> 'a future
    63   val map: ('a -> 'b) -> 'a future -> 'b future
    64   val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
    65   val promise_group: Task_Queue.group -> 'a future
    66   val promise: unit -> 'a future
    67   val fulfill_result: 'a future -> 'a Exn.result -> unit
    68   val fulfill: 'a future -> 'a -> unit
    69   val shutdown: unit -> unit
    70   val status: (unit -> 'a) -> 'a
    71 end;
    72 
    73 structure Future: FUTURE =
    74 struct
    75 
    76 (** future values **)
    77 
    78 (* identifiers *)
    79 
    80 local
    81   val tag = Universal.tag () : Task_Queue.task option Universal.tag;
    82 in
    83   fun worker_task () = the_default NONE (Thread.getLocal tag);
    84   fun setmp_worker_task task f x = setmp_thread_data tag (worker_task ()) (SOME task) f x;
    85 end;
    86 
    87 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    89 
    90 fun worker_joining e =
    91   (case worker_task () of
    92     NONE => e ()
    93   | SOME task => Task_Queue.joining task e);
    94 
    95 fun worker_waiting deps e =
    96   (case worker_task () of
    97     NONE => e ()
    98   | SOME task => Task_Queue.waiting task deps e);
    99 
   100 
   101 (* datatype future *)
   102 
   103 type 'a result = 'a Exn.result Single_Assignment.var;
   104 
   105 datatype 'a future = Future of
   106  {promised: bool,
   107   task: Task_Queue.task,
   108   result: 'a result};
   109 
   110 fun task_of (Future {task, ...}) = task;
   111 fun result_of (Future {result, ...}) = result;
   112 
   113 fun peek x = Single_Assignment.peek (result_of x);
   114 fun is_finished x = is_some (peek x);
   115 
   116 fun get_finished x =
   117   (case peek x of
   118     SOME res => Exn.release res
   119   | NONE => raise Fail "Unfinished future evaluation");
   120 
   121 
   122 
   123 (** scheduling **)
   124 
   125 (* synchronization *)
   126 
   127 val scheduler_event = ConditionVar.conditionVar ();
   128 val work_available = ConditionVar.conditionVar ();
   129 val work_finished = ConditionVar.conditionVar ();
   130 
   131 local
   132   val lock = Mutex.mutex ();
   133 in
   134 
   135 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   136 
   137 fun wait cond = (*requires SYNCHRONIZED*)
   138   Multithreading.sync_wait NONE NONE cond lock;
   139 
   140 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   141   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   142 
   143 fun signal cond = (*requires SYNCHRONIZED*)
   144   ConditionVar.signal cond;
   145 
   146 fun broadcast cond = (*requires SYNCHRONIZED*)
   147   ConditionVar.broadcast cond;
   148 
   149 fun broadcast_work () = (*requires SYNCHRONIZED*)
   150  (ConditionVar.broadcast work_available;
   151   ConditionVar.broadcast work_finished);
   152 
   153 end;
   154 
   155 
   156 (* global state *)
   157 
   158 val queue = Unsynchronized.ref Task_Queue.empty;
   159 val next = Unsynchronized.ref 0;
   160 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   161 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   162 val do_shutdown = Unsynchronized.ref false;
   163 val max_workers = Unsynchronized.ref 0;
   164 val max_active = Unsynchronized.ref 0;
   165 val worker_trend = Unsynchronized.ref 0;
   166 
   167 datatype worker_state = Working | Waiting | Sleeping;
   168 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   169 
   170 fun count_workers state = (*requires SYNCHRONIZED*)
   171   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   172 
   173 
   174 (* cancellation primitives *)
   175 
   176 fun interruptible_task f x =
   177   if Multithreading.available then
   178     Multithreading.with_attributes
   179       (if is_some (worker_task ())
   180        then Multithreading.private_interrupts
   181        else Multithreading.public_interrupts)
   182       (fn _ => f x)
   183   else interruptible f x;
   184 
   185 fun cancel_now group = (*requires SYNCHRONIZED*)
   186   Task_Queue.cancel (! queue) group;
   187 
   188 fun cancel_later group = (*requires SYNCHRONIZED*)
   189  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   190   broadcast scheduler_event);
   191 
   192 
   193 (* worker threads *)
   194 
   195 fun worker_exec (task, jobs) =
   196   let
   197     val group = Task_Queue.group_of_task task;
   198     val valid = not (Task_Queue.is_canceled group);
   199     val ok =
   200       Task_Queue.running task (fn () =>
   201         setmp_worker_task task (fn () =>
   202           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   203     val _ = Multithreading.tracing 2 (fn () =>
   204       let
   205         val s = Task_Queue.str_of_task_groups task;
   206         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   207         val (run, wait, deps) = Task_Queue.timing_of_task task;
   208       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   209     val _ = SYNCHRONIZED "finish" (fn () =>
   210       let
   211         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   212         val _ = Exn.capture Multithreading.interrupted ();
   213         val _ =
   214           if ok then ()
   215           else if cancel_now group then ()
   216           else cancel_later group;
   217         val _ = broadcast work_finished;
   218         val _ = if maximal then () else signal work_available;
   219       in () end);
   220   in () end;
   221 
   222 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   223   let
   224     val state =
   225       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   226         SOME state => state
   227       | NONE => raise Fail "Unregistered worker thread");
   228     val _ = state := (if active then Waiting else Sleeping);
   229     val _ = wait cond;
   230     val _ = state := Working;
   231   in () end;
   232 
   233 fun worker_next () = (*requires SYNCHRONIZED*)
   234   if length (! workers) > ! max_workers then
   235     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   236      signal work_available;
   237      NONE)
   238   else if count_workers Working > ! max_active then
   239     (worker_wait false work_available; worker_next ())
   240   else
   241     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   242       NONE => (worker_wait false work_available; worker_next ())
   243     | some => (signal work_available; some));
   244 
   245 fun worker_loop name =
   246   (case SYNCHRONIZED name (fn () => worker_next ()) of
   247     NONE => ()
   248   | SOME work => (Exn.capture Multithreading.interrupted (); worker_exec work; worker_loop name));
   249 
   250 fun worker_start name = (*requires SYNCHRONIZED*)
   251   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   252     Unsynchronized.ref Working));
   253 
   254 
   255 (* scheduler *)
   256 
   257 val status_ticks = Unsynchronized.ref 0;
   258 
   259 val last_round = Unsynchronized.ref Time.zeroTime;
   260 val next_round = seconds 0.05;
   261 
   262 fun scheduler_next () = (*requires SYNCHRONIZED*)
   263   let
   264     val now = Time.now ();
   265     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   266     val _ = if tick then last_round := now else ();
   267 
   268 
   269     (* queue and worker status *)
   270 
   271     val _ =
   272       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   273     val _ =
   274       if tick andalso ! status_ticks = 0 then
   275         Multithreading.tracing 1 (fn () =>
   276           let
   277             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   278             val total = length (! workers);
   279             val active = count_workers Working;
   280             val waiting = count_workers Waiting;
   281           in
   282             "SCHEDULE " ^ Time.toString now ^ ": " ^
   283               string_of_int ready ^ " ready, " ^
   284               string_of_int pending ^ " pending, " ^
   285               string_of_int running ^ " running, " ^
   286               string_of_int passive ^ " passive; " ^
   287               string_of_int total ^ " workers, " ^
   288               string_of_int active ^ " active, " ^
   289               string_of_int waiting ^ " waiting "
   290           end)
   291       else ();
   292 
   293     val _ =
   294       if forall (Thread.isActive o #1) (! workers) then ()
   295       else
   296         let
   297           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   298           val _ = workers := alive;
   299         in
   300           Multithreading.tracing 0 (fn () =>
   301             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   302         end;
   303 
   304 
   305     (* worker pool adjustments *)
   306 
   307     val max_active0 = ! max_active;
   308     val max_workers0 = ! max_workers;
   309 
   310     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   311     val _ = max_active := m;
   312 
   313     val mm =
   314       if ! do_shutdown then 0
   315       else if m = 9999 then 1
   316       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   317     val _ =
   318       if tick andalso mm > ! max_workers then
   319         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   320       else if tick andalso mm < ! max_workers then
   321         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   322       else ();
   323     val _ =
   324       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   325         max_workers := mm
   326       else if ! worker_trend > 5 andalso ! max_workers < 2 * m orelse ! max_workers = 0 then
   327         max_workers := Int.min (mm, 2 * m)
   328       else ();
   329 
   330     val missing = ! max_workers - length (! workers);
   331     val _ =
   332       if missing > 0 then
   333         funpow missing (fn () =>
   334           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   335       else ();
   336 
   337     val _ =
   338       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   339       else signal work_available;
   340 
   341 
   342     (* canceled groups *)
   343 
   344     val _ =
   345       if null (! canceled) then ()
   346       else
   347        (Multithreading.tracing 1 (fn () =>
   348           string_of_int (length (! canceled)) ^ " canceled groups");
   349         Unsynchronized.change canceled (filter_out cancel_now);
   350         broadcast_work ());
   351 
   352 
   353     (* delay loop *)
   354 
   355     val _ = Exn.release (wait_timeout next_round scheduler_event);
   356 
   357 
   358     (* shutdown *)
   359 
   360     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   361     val continue = not (! do_shutdown andalso null (! workers));
   362     val _ = if continue then () else scheduler := NONE;
   363 
   364     val _ = broadcast scheduler_event;
   365   in continue end
   366   handle exn =>
   367     if Exn.is_interrupt exn then
   368      (Multithreading.tracing 1 (fn () => "Interrupt");
   369       List.app cancel_later (Task_Queue.cancel_all (! queue));
   370       broadcast_work (); true)
   371     else reraise exn;
   372 
   373 fun scheduler_loop () =
   374  (while
   375     Multithreading.with_attributes
   376       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   377       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   378   do (); last_round := Time.zeroTime);
   379 
   380 fun scheduler_active () = (*requires SYNCHRONIZED*)
   381   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   382 
   383 fun scheduler_check () = (*requires SYNCHRONIZED*)
   384  (do_shutdown := false;
   385   if scheduler_active () then ()
   386   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   387 
   388 
   389 
   390 (** futures **)
   391 
   392 (* cancellation *)
   393 
   394 (*cancel: present and future group members will be interrupted eventually*)
   395 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   396  (if cancel_now group then () else cancel_later group;
   397   signal work_available; scheduler_check ()));
   398 
   399 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   400 
   401 
   402 (* future jobs *)
   403 
   404 fun assign_result group result raw_res =
   405   let
   406     val res =
   407       (case raw_res of
   408         Exn.Exn exn => Exn.Exn (#2 (Par_Exn.serial exn))
   409       | _ => raw_res);
   410     val _ = Single_Assignment.assign result res
   411       handle exn as Fail _ =>
   412         (case Single_Assignment.peek result of
   413           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   414         | _ => reraise exn);
   415     val ok =
   416       (case the (Single_Assignment.peek result) of
   417         Exn.Exn exn =>
   418           (SYNCHRONIZED "cancel" (fn () => Task_Queue.cancel_group group exn); false)
   419       | Exn.Res _ => true);
   420   in ok end;
   421 
   422 fun future_job group interrupts (e: unit -> 'a) =
   423   let
   424     val result = Single_Assignment.var "future" : 'a result;
   425     val pos = Position.thread_data ();
   426     fun job ok =
   427       let
   428         val res =
   429           if ok then
   430             Exn.capture (fn () =>
   431               Multithreading.with_attributes
   432                 (if interrupts
   433                  then Multithreading.private_interrupts else Multithreading.no_interrupts)
   434                 (fn _ => Position.setmp_thread_data pos e ()) before
   435               Multithreading.interrupted ()) ()
   436           else Exn.interrupt_exn;
   437       in assign_result group result res end;
   438   in (result, job) end;
   439 
   440 
   441 (* fork *)
   442 
   443 type fork_params =
   444  {name: string, group: Task_Queue.group option, deps: Task_Queue.task list,
   445   pri: int, interrupts: bool};
   446 
   447 fun forks ({name, group, deps, pri, interrupts}: fork_params) es =
   448   if null es then []
   449   else
   450     let
   451       val grp =
   452         (case group of
   453           NONE => worker_subgroup ()
   454         | SOME grp => grp);
   455       fun enqueue e queue =
   456         let
   457           val (result, job) = future_job grp interrupts e;
   458           val (task, queue') = Task_Queue.enqueue name grp deps pri job queue;
   459           val future = Future {promised = false, task = task, result = result};
   460         in (future, queue') end;
   461     in
   462       SYNCHRONIZED "enqueue" (fn () =>
   463         let
   464           val (futures, queue') = fold_map enqueue es (! queue);
   465           val _ = queue := queue';
   466           val minimal = forall (not o Task_Queue.known_task queue') deps;
   467           val _ = if minimal then signal work_available else ();
   468           val _ = scheduler_check ();
   469         in futures end)
   470     end;
   471 
   472 fun fork_pri pri e =
   473   singleton (forks {name = "", group = NONE, deps = [], pri = pri, interrupts = true}) e;
   474 
   475 fun fork e = fork_pri 0 e;
   476 
   477 
   478 (* join *)
   479 
   480 local
   481 
   482 fun get_result x =
   483   (case peek x of
   484     NONE => Exn.Exn (Fail "Unfinished future")
   485   | SOME res =>
   486       if Exn.is_interrupt_exn res then
   487         (case Task_Queue.group_status (Task_Queue.group_of_task (task_of x)) of
   488           NONE => res
   489         | SOME exn => Exn.Exn exn)
   490       else res);
   491 
   492 fun join_next deps = (*requires SYNCHRONIZED*)
   493   if null deps then NONE
   494   else
   495     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   496       (NONE, []) => NONE
   497     | (NONE, deps') =>
   498         (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   499     | (SOME work, deps') => SOME (work, deps'));
   500 
   501 fun execute_work NONE = ()
   502   | execute_work (SOME (work, deps')) =
   503       (worker_joining (fn () => worker_exec work); join_work deps')
   504 and join_work deps =
   505   Multithreading.with_attributes Multithreading.no_interrupts
   506     (fn _ => execute_work (SYNCHRONIZED "join" (fn () => join_next deps)));
   507 
   508 in
   509 
   510 fun join_results xs =
   511   let
   512     val _ =
   513       if forall is_finished xs then ()
   514       else if Multithreading.self_critical () then
   515         error "Cannot join future values within critical section"
   516       else if is_some (worker_task ()) then join_work (map task_of xs)
   517       else List.app (ignore o Single_Assignment.await o result_of) xs;
   518   in map get_result xs end;
   519 
   520 end;
   521 
   522 fun join_result x = singleton join_results x;
   523 fun join x = Exn.release (join_result x);
   524 
   525 
   526 (* fast-path versions -- bypassing task queue *)
   527 
   528 fun value_result (res: 'a Exn.result) =
   529   let
   530     val task = Task_Queue.dummy_task ();
   531     val group = Task_Queue.group_of_task task;
   532     val result = Single_Assignment.var "value" : 'a result;
   533     val _ = assign_result group result res;
   534   in Future {promised = false, task = task, result = result} end;
   535 
   536 fun value x = value_result (Exn.Res x);
   537 
   538 fun map_future f x =
   539   let
   540     val task = task_of x;
   541     val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
   542     val (result, job) = future_job group true (fn () => f (join x));
   543 
   544     val extended = SYNCHRONIZED "extend" (fn () =>
   545       (case Task_Queue.extend task job (! queue) of
   546         SOME queue' => (queue := queue'; true)
   547       | NONE => false));
   548   in
   549     if extended then Future {promised = false, task = task, result = result}
   550     else
   551       singleton
   552         (forks {name = "Future.map", group = SOME group, deps = [task],
   553           pri = Task_Queue.pri_of_task task, interrupts = true})
   554         (fn () => f (join x))
   555   end;
   556 
   557 fun cond_forks args es =
   558   if Multithreading.enabled () then forks args es
   559   else map (fn e => value_result (Exn.interruptible_capture e ())) es;
   560 
   561 
   562 (* promised futures -- fulfilled by external means *)
   563 
   564 fun promise_group group : 'a future =
   565   let
   566     val result = Single_Assignment.var "promise" : 'a result;
   567     fun abort () = assign_result group result Exn.interrupt_exn
   568       handle Fail _ => true
   569         | exn =>
   570             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   571             else reraise exn;
   572     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   573       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   574   in Future {promised = true, task = task, result = result} end;
   575 
   576 fun promise () = promise_group (worker_subgroup ());
   577 
   578 fun fulfill_result (Future {promised, task, result}) res =
   579   if not promised then raise Fail "Not a promised future"
   580   else
   581     let
   582       val group = Task_Queue.group_of_task task;
   583       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   584       val _ =
   585         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   586           let
   587             val still_passive =
   588               SYNCHRONIZED "fulfill_result" (fn () =>
   589                 Unsynchronized.change_result queue
   590                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   591           in if still_passive then worker_exec (task, [job]) else () end);
   592       val _ =
   593         if is_some (Single_Assignment.peek result) then ()
   594         else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
   595     in () end;
   596 
   597 fun fulfill x res = fulfill_result x (Exn.Res res);
   598 
   599 
   600 (* shutdown *)
   601 
   602 fun shutdown () =
   603   if Multithreading.available then
   604     SYNCHRONIZED "shutdown" (fn () =>
   605      while scheduler_active () do
   606       (wait scheduler_event; broadcast_work ()))
   607   else ();
   608 
   609 
   610 (* status markup *)
   611 
   612 fun status e =
   613   let
   614     val task_props =
   615       (case worker_task () of
   616         NONE => I
   617       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   618     val _ = Output.status (Markup.markup_only (task_props Markup.forked));
   619     val x = e ();  (*sic -- report "joined" only for success*)
   620     val _ = Output.status (Markup.markup_only (task_props Markup.joined));
   621   in x end;
   622 
   623 
   624 (*final declarations of this structure!*)
   625 val map = map_future;
   626 
   627 end;
   628 
   629 type 'a future = 'a Future.future;
   630