src/Pure/Concurrent/future.ML
author wenzelm
Tue, 20 Jul 2010 17:35:42 +0200
changeset 38122 047c96f41455
parent 38120 a902f158b4fc
child 38133 3a6ec95a9f68
permissions -rw-r--r--
back to more strict dependencies, even for canceled groups (reverting parts of 02936e77a07c);
cancel passive tasks more actively via Exn.Interrupt, by treating them like ragular jobs here;
attempts to re-assign canceled futures/promises raise Exn.Interrupt;
tuned;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 
    27   * Promised futures are fulfilled by external means.  There is no
    28     associated evaluation task, but other futures can depend on them
    29     as usual.
    30 *)
    31 
    32 signature FUTURE =
    33 sig
    34   type task = Task_Queue.task
    35   type group = Task_Queue.group
    36   val is_worker: unit -> bool
    37   val worker_task: unit -> Task_Queue.task option
    38   val worker_group: unit -> Task_Queue.group option
    39   type 'a future
    40   val task_of: 'a future -> task
    41   val group_of: 'a future -> group
    42   val peek: 'a future -> 'a Exn.result option
    43   val is_finished: 'a future -> bool
    44   val fork_group: group -> (unit -> 'a) -> 'a future
    45   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    46   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    47   val fork_pri: int -> (unit -> 'a) -> 'a future
    48   val fork: (unit -> 'a) -> 'a future
    49   val join_results: 'a future list -> 'a Exn.result list
    50   val join_result: 'a future -> 'a Exn.result
    51   val join: 'a future -> 'a
    52   val value: 'a -> 'a future
    53   val map: ('a -> 'b) -> 'a future -> 'b future
    54   val promise_group: group -> 'a future
    55   val promise: unit -> 'a future
    56   val fulfill_result: 'a future -> 'a Exn.result -> unit
    57   val fulfill: 'a future -> 'a -> unit
    58   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    59   val cancel_group: group -> unit
    60   val cancel: 'a future -> unit
    61   val shutdown: unit -> unit
    62   val report: (unit -> 'a) -> 'a
    63 end;
    64 
    65 structure Future: FUTURE =
    66 struct
    67 
    68 (** future values **)
    69 
    70 (* identifiers *)
    71 
    72 type task = Task_Queue.task;
    73 type group = Task_Queue.group;
    74 
    75 local
    76   val tag = Universal.tag () : (task * group) option Universal.tag;
    77 in
    78   fun thread_data () = the_default NONE (Thread.getLocal tag);
    79   fun setmp_thread_data data f x =
    80     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    81 end;
    82 
    83 val is_worker = is_some o thread_data;
    84 val worker_task = Option.map #1 o thread_data;
    85 val worker_group = Option.map #2 o thread_data;
    86 
    87 fun new_group () = Task_Queue.new_group (worker_group ());
    88 
    89 
    90 (* datatype future *)
    91 
    92 type 'a result = 'a Exn.result Single_Assignment.var;
    93 
    94 datatype 'a future = Future of
    95  {promised: bool,
    96   task: task,
    97   group: group,
    98   result: 'a result};
    99 
   100 fun task_of (Future {task, ...}) = task;
   101 fun group_of (Future {group, ...}) = group;
   102 fun result_of (Future {result, ...}) = result;
   103 
   104 fun peek x = Single_Assignment.peek (result_of x);
   105 fun is_finished x = is_some (peek x);
   106 
   107 fun assign_result group result res =
   108   let
   109     val _ = Single_Assignment.assign result res
   110       handle exn as Fail _ =>
   111         (case Single_Assignment.peek result of
   112           SOME (Exn.Exn Exn.Interrupt) => raise Exn.Interrupt
   113         | _ => reraise exn);
   114     val ok =
   115       (case the (Single_Assignment.peek result) of
   116         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   117       | Exn.Result _ => true);
   118   in ok end;
   119 
   120 
   121 
   122 (** scheduling **)
   123 
   124 (* synchronization *)
   125 
   126 val scheduler_event = ConditionVar.conditionVar ();
   127 val work_available = ConditionVar.conditionVar ();
   128 val work_finished = ConditionVar.conditionVar ();
   129 
   130 local
   131   val lock = Mutex.mutex ();
   132 in
   133 
   134 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   135 
   136 fun wait cond = (*requires SYNCHRONIZED*)
   137   Multithreading.sync_wait NONE NONE cond lock;
   138 
   139 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   140   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   141 
   142 fun signal cond = (*requires SYNCHRONIZED*)
   143   ConditionVar.signal cond;
   144 
   145 fun broadcast cond = (*requires SYNCHRONIZED*)
   146   ConditionVar.broadcast cond;
   147 
   148 fun broadcast_work () = (*requires SYNCHRONIZED*)
   149  (ConditionVar.broadcast work_available;
   150   ConditionVar.broadcast work_finished);
   151 
   152 end;
   153 
   154 
   155 (* global state *)
   156 
   157 val queue = Unsynchronized.ref Task_Queue.empty;
   158 val next = Unsynchronized.ref 0;
   159 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   160 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   161 val do_shutdown = Unsynchronized.ref false;
   162 val max_workers = Unsynchronized.ref 0;
   163 val max_active = Unsynchronized.ref 0;
   164 val worker_trend = Unsynchronized.ref 0;
   165 
   166 datatype worker_state = Working | Waiting | Sleeping;
   167 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   168 
   169 fun count_workers state = (*requires SYNCHRONIZED*)
   170   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   171 
   172 
   173 (* execute future jobs *)
   174 
   175 fun future_job group (e: unit -> 'a) =
   176   let
   177     val result = Single_Assignment.var "future" : 'a result;
   178     val pos = Position.thread_data ();
   179     fun job ok =
   180       let
   181         val res =
   182           if ok then
   183             Exn.capture (fn () =>
   184               Multithreading.with_attributes Multithreading.private_interrupts
   185                 (fn _ => Position.setmp_thread_data pos e ())) ()
   186           else Exn.Exn Exn.Interrupt;
   187       in assign_result group result res end;
   188   in (result, job) end;
   189 
   190 fun cancel_now group = (*requires SYNCHRONIZED*)
   191   Task_Queue.cancel (! queue) group;
   192 
   193 fun cancel_later group = (*requires SYNCHRONIZED*)
   194  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   195   broadcast scheduler_event);
   196 
   197 fun execute (task, group, jobs) =
   198   let
   199     val valid = not (Task_Queue.is_canceled group);
   200     val ok = setmp_thread_data (task, group) (fn () =>
   201       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   202     val _ = SYNCHRONIZED "finish" (fn () =>
   203       let
   204         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   205         val _ =
   206           if ok then ()
   207           else if cancel_now group then ()
   208           else cancel_later group;
   209         val _ = broadcast work_finished;
   210         val _ = if maximal then () else signal work_available;
   211       in () end);
   212   in () end;
   213 
   214 
   215 (* worker threads *)
   216 
   217 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   218   let
   219     val state =
   220       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   221         SOME state => state
   222       | NONE => raise Fail "Unregistered worker thread");
   223     val _ = state := (if active then Waiting else Sleeping);
   224     val _ = wait cond;
   225     val _ = state := Working;
   226   in () end;
   227 
   228 fun worker_next () = (*requires SYNCHRONIZED*)
   229   if length (! workers) > ! max_workers then
   230     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   231      signal work_available;
   232      NONE)
   233   else if count_workers Working > ! max_active then
   234     (worker_wait false work_available; worker_next ())
   235   else
   236     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   237       NONE => (worker_wait false work_available; worker_next ())
   238     | some => (signal work_available; some));
   239 
   240 fun worker_loop name =
   241   (case SYNCHRONIZED name (fn () => worker_next ()) of
   242     NONE => ()
   243   | SOME work => (execute work; worker_loop name));
   244 
   245 fun worker_start name = (*requires SYNCHRONIZED*)
   246   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   247     Unsynchronized.ref Working));
   248 
   249 
   250 (* scheduler *)
   251 
   252 val status_ticks = Unsynchronized.ref 0;
   253 
   254 val last_round = Unsynchronized.ref Time.zeroTime;
   255 val next_round = Time.fromMilliseconds 50;
   256 
   257 fun scheduler_next () = (*requires SYNCHRONIZED*)
   258   let
   259     val now = Time.now ();
   260     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   261     val _ = if tick then last_round := now else ();
   262 
   263 
   264     (* queue and worker status *)
   265 
   266     val _ =
   267       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   268     val _ =
   269       if tick andalso ! status_ticks = 0 then
   270         Multithreading.tracing 1 (fn () =>
   271           let
   272             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   273             val total = length (! workers);
   274             val active = count_workers Working;
   275             val waiting = count_workers Waiting;
   276           in
   277             "SCHEDULE " ^ Time.toString now ^ ": " ^
   278               string_of_int ready ^ " ready, " ^
   279               string_of_int pending ^ " pending, " ^
   280               string_of_int running ^ " running, " ^
   281               string_of_int passive ^ " passive; " ^
   282               string_of_int total ^ " workers, " ^
   283               string_of_int active ^ " active, " ^
   284               string_of_int waiting ^ " waiting "
   285           end)
   286       else ();
   287 
   288     val _ =
   289       if forall (Thread.isActive o #1) (! workers) then ()
   290       else
   291         let
   292           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   293           val _ = workers := alive;
   294         in
   295           Multithreading.tracing 0 (fn () =>
   296             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   297         end;
   298 
   299 
   300     (* worker pool adjustments *)
   301 
   302     val max_active0 = ! max_active;
   303     val max_workers0 = ! max_workers;
   304 
   305     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   306     val _ = max_active := m;
   307 
   308     val mm =
   309       if ! do_shutdown then 0
   310       else if m = 9999 then 1
   311       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   312     val _ =
   313       if tick andalso mm > ! max_workers then
   314         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   315       else if tick andalso mm < ! max_workers then
   316         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   317       else ();
   318     val _ =
   319       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   320         max_workers := mm
   321       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   322         max_workers := Int.min (mm, 2 * m)
   323       else ();
   324 
   325     val missing = ! max_workers - length (! workers);
   326     val _ =
   327       if missing > 0 then
   328         funpow missing (fn () =>
   329           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   330       else ();
   331 
   332     val _ =
   333       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   334       else signal work_available;
   335 
   336 
   337     (* canceled groups *)
   338 
   339     val _ =
   340       if null (! canceled) then ()
   341       else
   342        (Multithreading.tracing 1 (fn () =>
   343           string_of_int (length (! canceled)) ^ " canceled groups");
   344         Unsynchronized.change canceled (filter_out cancel_now);
   345         broadcast_work ());
   346 
   347 
   348     (* delay loop *)
   349 
   350     val _ = Exn.release (wait_timeout next_round scheduler_event);
   351 
   352 
   353     (* shutdown *)
   354 
   355     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   356     val continue = not (! do_shutdown andalso null (! workers));
   357     val _ = if continue then () else scheduler := NONE;
   358 
   359     val _ = broadcast scheduler_event;
   360   in continue end
   361   handle Exn.Interrupt =>
   362    (Multithreading.tracing 1 (fn () => "Interrupt");
   363     List.app cancel_later (Task_Queue.cancel_all (! queue));
   364     broadcast_work (); true);
   365 
   366 fun scheduler_loop () =
   367   while
   368     Multithreading.with_attributes
   369       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   370       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   371   do ();
   372 
   373 fun scheduler_active () = (*requires SYNCHRONIZED*)
   374   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   375 
   376 fun scheduler_check () = (*requires SYNCHRONIZED*)
   377  (do_shutdown := false;
   378   if scheduler_active () then ()
   379   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   380 
   381 
   382 
   383 (** futures **)
   384 
   385 (* fork *)
   386 
   387 fun fork_future opt_group deps pri e =
   388   let
   389     val group =
   390       (case opt_group of
   391         NONE => new_group ()
   392       | SOME group => group);
   393     val (result, job) = future_job group e;
   394     val task = SYNCHRONIZED "enqueue" (fn () =>
   395       let
   396         val (task, minimal) =
   397           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   398         val _ = if minimal then signal work_available else ();
   399         val _ = scheduler_check ();
   400       in task end);
   401   in Future {promised = false, task = task, group = group, result = result} end;
   402 
   403 fun fork_group group e = fork_future (SOME group) [] 0 e;
   404 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   405 fun fork_deps deps e = fork_deps_pri deps 0 e;
   406 fun fork_pri pri e = fork_deps_pri [] pri e;
   407 fun fork e = fork_deps [] e;
   408 
   409 
   410 (* join *)
   411 
   412 local
   413 
   414 fun get_result x =
   415   (case peek x of
   416     NONE => Exn.Exn (Fail "Unfinished future")
   417   | SOME (exn as Exn.Exn Exn.Interrupt) =>
   418       (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   419         [] => exn
   420       | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   421   | SOME res => res);
   422 
   423 fun join_next deps = (*requires SYNCHRONIZED*)
   424   if null deps then NONE
   425   else
   426     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   427       (NONE, []) => NONE
   428     | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   429     | (SOME work, deps') => SOME (work, deps'));
   430 
   431 fun execute_work NONE = ()
   432   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   433 and join_work deps =
   434   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   435 
   436 fun join_depend task deps =
   437   execute_work (SYNCHRONIZED "join" (fn () =>
   438     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   439 
   440 in
   441 
   442 fun join_results xs =
   443   if forall is_finished xs then map get_result xs
   444   else if Multithreading.self_critical () then
   445     error "Cannot join future values within critical section"
   446   else
   447     (case worker_task () of
   448       SOME task => join_depend task (map task_of xs)
   449     | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
   450     map get_result xs);
   451 
   452 end;
   453 
   454 fun join_result x = singleton join_results x;
   455 fun join x = Exn.release (join_result x);
   456 
   457 
   458 (* fast-path versions -- bypassing full task management *)
   459 
   460 fun value (x: 'a) =
   461   let
   462     val group = Task_Queue.new_group NONE;
   463     val result = Single_Assignment.var "value" : 'a result;
   464     val _ = assign_result group result (Exn.Result x);
   465   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   466 
   467 fun map_future f x =
   468   let
   469     val task = task_of x;
   470     val group = Task_Queue.new_group (SOME (group_of x));
   471     val (result, job) = future_job group (fn () => f (join x));
   472 
   473     val extended = SYNCHRONIZED "extend" (fn () =>
   474       (case Task_Queue.extend task job (! queue) of
   475         SOME queue' => (queue := queue'; true)
   476       | NONE => false));
   477   in
   478     if extended then Future {promised = false, task = task, group = group, result = result}
   479     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   480   end;
   481 
   482 
   483 (* promised futures -- fulfilled by external means *)
   484 
   485 fun promise_group group : 'a future =
   486   let
   487     val result = Single_Assignment.var "promise" : 'a result;
   488     fun abort () = assign_result group result (Exn.Exn Exn.Interrupt) handle Fail _ => true;
   489     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   490       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   491   in Future {promised = true, task = task, group = group, result = result} end;
   492 
   493 fun promise () = promise_group (new_group ());
   494 
   495 fun fulfill_result (Future {promised, task, group, result}) res =
   496   let
   497     val _ = promised orelse raise Fail "Not a promised future";
   498     fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
   499     val _ = execute (task, group, [job]);
   500   in () end;
   501 
   502 fun fulfill x res = fulfill_result x (Exn.Result res);
   503 
   504 
   505 (* cancellation *)
   506 
   507 fun interruptible_task f x =
   508   if Multithreading.available then
   509     Multithreading.with_attributes
   510       (if is_worker ()
   511        then Multithreading.private_interrupts
   512        else Multithreading.public_interrupts)
   513       (fn _ => f x)
   514   else interruptible f x;
   515 
   516 (*cancel: present and future group members will be interrupted eventually*)
   517 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   518  (if cancel_now group then () else cancel_later group;
   519   signal work_available; scheduler_check ()));
   520 
   521 fun cancel x = cancel_group (group_of x);
   522 
   523 
   524 (* shutdown *)
   525 
   526 fun shutdown () =
   527   if Multithreading.available then
   528     SYNCHRONIZED "shutdown" (fn () =>
   529      while scheduler_active () do
   530       (wait scheduler_event; broadcast_work ()))
   531   else ();
   532 
   533 
   534 (* report markup *)
   535 
   536 fun report e =
   537   let
   538     val _ = Output.status (Markup.markup Markup.forked "");
   539     val x = e ();  (*sic -- report "joined" only for success*)
   540     val _ = Output.status (Markup.markup Markup.joined "");
   541   in x end;
   542 
   543 
   544 (*final declarations of this structure!*)
   545 val map = map_future;
   546 
   547 end;
   548 
   549 type 'a future = 'a Future.future;
   550