src/Pure/Concurrent/future.ML
author wenzelm
Tue, 09 Nov 2010 21:13:06 +0100
changeset 40705 f9347a30d1b2
parent 40553 bf39a257b3d3
child 42541 74010c6af0a4
permissions -rw-r--r--
explicitly identify forked/joined tasks;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join (and variants).  The idea is to model
    12     parallel value-oriented computations, but *not* communicating
    13     processes.
    14 
    15   * Futures are grouped; failure of one group member causes the whole
    16     group to be interrupted eventually.  Groups are block-structured.
    17 
    18   * Forked futures are evaluated spontaneously by a farm of worker
    19     threads in the background; join resynchronizes the computation and
    20     delivers results (values or exceptions).
    21 
    22   * The pool of worker threads is limited, usually in correlation with
    23     the number of physical cores on the machine.  Note that allocation
    24     of runtime resources is distorted either if workers yield CPU time
    25     (e.g. via system sleep or wait operations), or if non-worker
    26     threads contend for significant runtime resources independently.
    27 
    28   * Promised futures are fulfilled by external means.  There is no
    29     associated evaluation task, but other futures can depend on them
    30     as usual.
    31 *)
    32 
    33 signature FUTURE =
    34 sig
    35   type task = Task_Queue.task
    36   type group = Task_Queue.group
    37   val is_worker: unit -> bool
    38   val worker_task: unit -> Task_Queue.task option
    39   val worker_group: unit -> Task_Queue.group option
    40   val worker_subgroup: unit -> Task_Queue.group
    41   type 'a future
    42   val task_of: 'a future -> task
    43   val group_of: 'a future -> group
    44   val peek: 'a future -> 'a Exn.result option
    45   val is_finished: 'a future -> bool
    46   val fork_group: group -> (unit -> 'a) -> 'a future
    47   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    48   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    49   val fork_pri: int -> (unit -> 'a) -> 'a future
    50   val fork: (unit -> 'a) -> 'a future
    51   val join_results: 'a future list -> 'a Exn.result list
    52   val join_result: 'a future -> 'a Exn.result
    53   val join: 'a future -> 'a
    54   val value: 'a -> 'a future
    55   val map: ('a -> 'b) -> 'a future -> 'b future
    56   val promise_group: group -> 'a future
    57   val promise: unit -> 'a future
    58   val fulfill_result: 'a future -> 'a Exn.result -> unit
    59   val fulfill: 'a future -> 'a -> unit
    60   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    61   val cancel_group: group -> unit
    62   val cancel: 'a future -> unit
    63   val shutdown: unit -> unit
    64   val status: (unit -> 'a) -> 'a
    65 end;
    66 
    67 structure Future: FUTURE =
    68 struct
    69 
    70 (** future values **)
    71 
    72 (* identifiers *)
    73 
    74 type task = Task_Queue.task;
    75 type group = Task_Queue.group;
    76 
    77 local
    78   val tag = Universal.tag () : (task * group) option Universal.tag;
    79 in
    80   fun thread_data () = the_default NONE (Thread.getLocal tag);
    81   fun setmp_thread_data data f x =
    82     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    83 end;
    84 
    85 val is_worker = is_some o thread_data;
    86 val worker_task = Option.map #1 o thread_data;
    87 val worker_group = Option.map #2 o thread_data;
    88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    89 
    90 
    91 (* datatype future *)
    92 
    93 type 'a result = 'a Exn.result Single_Assignment.var;
    94 
    95 datatype 'a future = Future of
    96  {promised: bool,
    97   task: task,
    98   group: group,
    99   result: 'a result};
   100 
   101 fun task_of (Future {task, ...}) = task;
   102 fun group_of (Future {group, ...}) = group;
   103 fun result_of (Future {result, ...}) = result;
   104 
   105 fun peek x = Single_Assignment.peek (result_of x);
   106 fun is_finished x = is_some (peek x);
   107 
   108 fun assign_result group result res =
   109   let
   110     val _ = Single_Assignment.assign result res
   111       handle exn as Fail _ =>
   112         (case Single_Assignment.peek result of
   113           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   114         | _ => reraise exn);
   115     val ok =
   116       (case the (Single_Assignment.peek result) of
   117         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   118       | Exn.Result _ => true);
   119   in ok end;
   120 
   121 
   122 
   123 (** scheduling **)
   124 
   125 (* synchronization *)
   126 
   127 val scheduler_event = ConditionVar.conditionVar ();
   128 val work_available = ConditionVar.conditionVar ();
   129 val work_finished = ConditionVar.conditionVar ();
   130 
   131 local
   132   val lock = Mutex.mutex ();
   133 in
   134 
   135 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   136 
   137 fun wait cond = (*requires SYNCHRONIZED*)
   138   Multithreading.sync_wait NONE NONE cond lock;
   139 
   140 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   141   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   142 
   143 fun signal cond = (*requires SYNCHRONIZED*)
   144   ConditionVar.signal cond;
   145 
   146 fun broadcast cond = (*requires SYNCHRONIZED*)
   147   ConditionVar.broadcast cond;
   148 
   149 fun broadcast_work () = (*requires SYNCHRONIZED*)
   150  (ConditionVar.broadcast work_available;
   151   ConditionVar.broadcast work_finished);
   152 
   153 end;
   154 
   155 
   156 (* global state *)
   157 
   158 val queue = Unsynchronized.ref Task_Queue.empty;
   159 val next = Unsynchronized.ref 0;
   160 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   161 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   162 val do_shutdown = Unsynchronized.ref false;
   163 val max_workers = Unsynchronized.ref 0;
   164 val max_active = Unsynchronized.ref 0;
   165 val worker_trend = Unsynchronized.ref 0;
   166 
   167 datatype worker_state = Working | Waiting | Sleeping;
   168 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   169 
   170 fun count_workers state = (*requires SYNCHRONIZED*)
   171   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   172 
   173 
   174 (* execute future jobs *)
   175 
   176 fun future_job group (e: unit -> 'a) =
   177   let
   178     val result = Single_Assignment.var "future" : 'a result;
   179     val pos = Position.thread_data ();
   180     fun job ok =
   181       let
   182         val res =
   183           if ok then
   184             Exn.capture (fn () =>
   185               Multithreading.with_attributes Multithreading.private_interrupts
   186                 (fn _ => Position.setmp_thread_data pos e ())) ()
   187           else Exn.interrupt_exn;
   188       in assign_result group result res end;
   189   in (result, job) end;
   190 
   191 fun cancel_now group = (*requires SYNCHRONIZED*)
   192   Task_Queue.cancel (! queue) group;
   193 
   194 fun cancel_later group = (*requires SYNCHRONIZED*)
   195  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   196   broadcast scheduler_event);
   197 
   198 fun execute (task, group, jobs) =
   199   let
   200     val valid = not (Task_Queue.is_canceled group);
   201     val ok = setmp_thread_data (task, group) (fn () =>
   202       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   203     val _ = SYNCHRONIZED "finish" (fn () =>
   204       let
   205         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   206         val _ =
   207           if ok then ()
   208           else if cancel_now group then ()
   209           else cancel_later group;
   210         val _ = broadcast work_finished;
   211         val _ = if maximal then () else signal work_available;
   212       in () end);
   213   in () end;
   214 
   215 
   216 (* worker threads *)
   217 
   218 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   219   let
   220     val state =
   221       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   222         SOME state => state
   223       | NONE => raise Fail "Unregistered worker thread");
   224     val _ = state := (if active then Waiting else Sleeping);
   225     val _ = wait cond;
   226     val _ = state := Working;
   227   in () end;
   228 
   229 fun worker_next () = (*requires SYNCHRONIZED*)
   230   if length (! workers) > ! max_workers then
   231     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   232      signal work_available;
   233      NONE)
   234   else if count_workers Working > ! max_active then
   235     (worker_wait false work_available; worker_next ())
   236   else
   237     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   238       NONE => (worker_wait false work_available; worker_next ())
   239     | some => (signal work_available; some));
   240 
   241 fun worker_loop name =
   242   (case SYNCHRONIZED name (fn () => worker_next ()) of
   243     NONE => ()
   244   | SOME work => (execute work; worker_loop name));
   245 
   246 fun worker_start name = (*requires SYNCHRONIZED*)
   247   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   248     Unsynchronized.ref Working));
   249 
   250 
   251 (* scheduler *)
   252 
   253 val status_ticks = Unsynchronized.ref 0;
   254 
   255 val last_round = Unsynchronized.ref Time.zeroTime;
   256 val next_round = seconds 0.05;
   257 
   258 fun scheduler_next () = (*requires SYNCHRONIZED*)
   259   let
   260     val now = Time.now ();
   261     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   262     val _ = if tick then last_round := now else ();
   263 
   264 
   265     (* queue and worker status *)
   266 
   267     val _ =
   268       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   269     val _ =
   270       if tick andalso ! status_ticks = 0 then
   271         Multithreading.tracing 1 (fn () =>
   272           let
   273             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   274             val total = length (! workers);
   275             val active = count_workers Working;
   276             val waiting = count_workers Waiting;
   277           in
   278             "SCHEDULE " ^ Time.toString now ^ ": " ^
   279               string_of_int ready ^ " ready, " ^
   280               string_of_int pending ^ " pending, " ^
   281               string_of_int running ^ " running, " ^
   282               string_of_int passive ^ " passive; " ^
   283               string_of_int total ^ " workers, " ^
   284               string_of_int active ^ " active, " ^
   285               string_of_int waiting ^ " waiting "
   286           end)
   287       else ();
   288 
   289     val _ =
   290       if forall (Thread.isActive o #1) (! workers) then ()
   291       else
   292         let
   293           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   294           val _ = workers := alive;
   295         in
   296           Multithreading.tracing 0 (fn () =>
   297             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   298         end;
   299 
   300 
   301     (* worker pool adjustments *)
   302 
   303     val max_active0 = ! max_active;
   304     val max_workers0 = ! max_workers;
   305 
   306     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   307     val _ = max_active := m;
   308 
   309     val mm =
   310       if ! do_shutdown then 0
   311       else if m = 9999 then 1
   312       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   313     val _ =
   314       if tick andalso mm > ! max_workers then
   315         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   316       else if tick andalso mm < ! max_workers then
   317         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   318       else ();
   319     val _ =
   320       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   321         max_workers := mm
   322       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   323         max_workers := Int.min (mm, 2 * m)
   324       else ();
   325 
   326     val missing = ! max_workers - length (! workers);
   327     val _ =
   328       if missing > 0 then
   329         funpow missing (fn () =>
   330           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   331       else ();
   332 
   333     val _ =
   334       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   335       else signal work_available;
   336 
   337 
   338     (* canceled groups *)
   339 
   340     val _ =
   341       if null (! canceled) then ()
   342       else
   343        (Multithreading.tracing 1 (fn () =>
   344           string_of_int (length (! canceled)) ^ " canceled groups");
   345         Unsynchronized.change canceled (filter_out cancel_now);
   346         broadcast_work ());
   347 
   348 
   349     (* delay loop *)
   350 
   351     val _ = Exn.release (wait_timeout next_round scheduler_event);
   352 
   353 
   354     (* shutdown *)
   355 
   356     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   357     val continue = not (! do_shutdown andalso null (! workers));
   358     val _ = if continue then () else scheduler := NONE;
   359 
   360     val _ = broadcast scheduler_event;
   361   in continue end
   362   handle exn =>
   363     if Exn.is_interrupt exn then
   364      (Multithreading.tracing 1 (fn () => "Interrupt");
   365       List.app cancel_later (Task_Queue.cancel_all (! queue));
   366       broadcast_work (); true)
   367     else reraise exn;
   368 
   369 fun scheduler_loop () =
   370   while
   371     Multithreading.with_attributes
   372       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   373       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   374   do ();
   375 
   376 fun scheduler_active () = (*requires SYNCHRONIZED*)
   377   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   378 
   379 fun scheduler_check () = (*requires SYNCHRONIZED*)
   380  (do_shutdown := false;
   381   if scheduler_active () then ()
   382   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   383 
   384 
   385 
   386 (** futures **)
   387 
   388 (* fork *)
   389 
   390 fun fork_future opt_group deps pri e =
   391   let
   392     val group =
   393       (case opt_group of
   394         NONE => worker_subgroup ()
   395       | SOME group => group);
   396     val (result, job) = future_job group e;
   397     val task = SYNCHRONIZED "enqueue" (fn () =>
   398       let
   399         val (task, minimal) =
   400           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   401         val _ = if minimal then signal work_available else ();
   402         val _ = scheduler_check ();
   403       in task end);
   404   in Future {promised = false, task = task, group = group, result = result} end;
   405 
   406 fun fork_group group e = fork_future (SOME group) [] 0 e;
   407 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   408 fun fork_deps deps e = fork_deps_pri deps 0 e;
   409 fun fork_pri pri e = fork_deps_pri [] pri e;
   410 fun fork e = fork_deps [] e;
   411 
   412 
   413 (* join *)
   414 
   415 local
   416 
   417 fun get_result x =
   418   (case peek x of
   419     NONE => Exn.Exn (Fail "Unfinished future")
   420   | SOME res =>
   421       if Exn.is_interrupt_exn res then
   422         (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   423           [] => res
   424         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   425       else res);
   426 
   427 fun join_next deps = (*requires SYNCHRONIZED*)
   428   if null deps then NONE
   429   else
   430     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   431       (NONE, []) => NONE
   432     | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   433     | (SOME work, deps') => SOME (work, deps'));
   434 
   435 fun execute_work NONE = ()
   436   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   437 and join_work deps =
   438   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   439 
   440 fun join_depend task deps =
   441   execute_work (SYNCHRONIZED "join" (fn () =>
   442     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   443 
   444 in
   445 
   446 fun join_results xs =
   447   if forall is_finished xs then map get_result xs
   448   else if Multithreading.self_critical () then
   449     error "Cannot join future values within critical section"
   450   else
   451     (case worker_task () of
   452       SOME task => join_depend task (map task_of xs)
   453     | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
   454     map get_result xs);
   455 
   456 end;
   457 
   458 fun join_result x = singleton join_results x;
   459 fun join x = Exn.release (join_result x);
   460 
   461 
   462 (* fast-path versions -- bypassing full task management *)
   463 
   464 fun value (x: 'a) =
   465   let
   466     val group = Task_Queue.new_group NONE;
   467     val result = Single_Assignment.var "value" : 'a result;
   468     val _ = assign_result group result (Exn.Result x);
   469   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   470 
   471 fun map_future f x =
   472   let
   473     val task = task_of x;
   474     val group = Task_Queue.new_group (SOME (group_of x));
   475     val (result, job) = future_job group (fn () => f (join x));
   476 
   477     val extended = SYNCHRONIZED "extend" (fn () =>
   478       (case Task_Queue.extend task job (! queue) of
   479         SOME queue' => (queue := queue'; true)
   480       | NONE => false));
   481   in
   482     if extended then Future {promised = false, task = task, group = group, result = result}
   483     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   484   end;
   485 
   486 
   487 (* promised futures -- fulfilled by external means *)
   488 
   489 fun promise_group group : 'a future =
   490   let
   491     val result = Single_Assignment.var "promise" : 'a result;
   492     fun abort () = assign_result group result Exn.interrupt_exn
   493       handle Fail _ => true
   494         | exn =>
   495             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   496             else reraise exn;
   497     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   498       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   499   in Future {promised = true, task = task, group = group, result = result} end;
   500 
   501 fun promise () = promise_group (worker_subgroup ());
   502 
   503 fun fulfill_result (Future {promised, task, group, result}) res =
   504   if not promised then raise Fail "Not a promised future"
   505   else
   506     let
   507       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   508       val _ =
   509         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   510           let
   511             val still_passive =
   512               SYNCHRONIZED "fulfill_result" (fn () =>
   513                 Unsynchronized.change_result queue
   514                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   515           in if still_passive then execute (task, group, [job]) else () end);
   516       val _ = Single_Assignment.await result;
   517     in () end;
   518 
   519 fun fulfill x res = fulfill_result x (Exn.Result res);
   520 
   521 
   522 (* cancellation *)
   523 
   524 fun interruptible_task f x =
   525   if Multithreading.available then
   526     Multithreading.with_attributes
   527       (if is_worker ()
   528        then Multithreading.private_interrupts
   529        else Multithreading.public_interrupts)
   530       (fn _ => f x)
   531   else interruptible f x;
   532 
   533 (*cancel: present and future group members will be interrupted eventually*)
   534 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   535  (if cancel_now group then () else cancel_later group;
   536   signal work_available; scheduler_check ()));
   537 
   538 fun cancel x = cancel_group (group_of x);
   539 
   540 
   541 (* shutdown *)
   542 
   543 fun shutdown () =
   544   if Multithreading.available then
   545     SYNCHRONIZED "shutdown" (fn () =>
   546      while scheduler_active () do
   547       (wait scheduler_event; broadcast_work ()))
   548   else ();
   549 
   550 
   551 (* status markup *)
   552 
   553 fun status e =
   554   let
   555     val task_props =
   556       (case worker_task () of
   557         NONE => I
   558       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   559     val _ = Output.status (Markup.markup (task_props Markup.forked) "");
   560     val x = e ();  (*sic -- report "joined" only for success*)
   561     val _ = Output.status (Markup.markup (task_props Markup.joined) "");
   562   in x end;
   563 
   564 
   565 (*final declarations of this structure!*)
   566 val map = map_future;
   567 
   568 end;
   569 
   570 type 'a future = 'a Future.future;
   571