src/Pure/Concurrent/future.ML
author wenzelm
Mon, 31 Jan 2011 22:57:01 +0100
changeset 42544 1c191a39549f
parent 42543 2f70b1ddd09f
child 42545 7da257539a8d
permissions -rw-r--r--
support named tasks, for improved tracing;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join (and variants).  The idea is to model
    12     parallel value-oriented computations, but *not* communicating
    13     processes.
    14 
    15   * Futures are grouped; failure of one group member causes the whole
    16     group to be interrupted eventually.  Groups are block-structured.
    17 
    18   * Forked futures are evaluated spontaneously by a farm of worker
    19     threads in the background; join resynchronizes the computation and
    20     delivers results (values or exceptions).
    21 
    22   * The pool of worker threads is limited, usually in correlation with
    23     the number of physical cores on the machine.  Note that allocation
    24     of runtime resources is distorted either if workers yield CPU time
    25     (e.g. via system sleep or wait operations), or if non-worker
    26     threads contend for significant runtime resources independently.
    27 
    28   * Promised futures are fulfilled by external means.  There is no
    29     associated evaluation task, but other futures can depend on them
    30     as usual.
    31 *)
    32 
    33 signature FUTURE =
    34 sig
    35   type task = Task_Queue.task
    36   type group = Task_Queue.group
    37   val is_worker: unit -> bool
    38   val worker_task: unit -> Task_Queue.task option
    39   val worker_group: unit -> Task_Queue.group option
    40   val worker_subgroup: unit -> Task_Queue.group
    41   val worker_waiting: (unit -> 'a) -> 'a
    42   type 'a future
    43   val task_of: 'a future -> task
    44   val group_of: 'a future -> group
    45   val peek: 'a future -> 'a Exn.result option
    46   val is_finished: 'a future -> bool
    47   val bulk: {name: string, group: group option, deps: task list, pri: int} ->
    48     (unit -> 'a) list -> 'a future list
    49   val fork_pri: int -> (unit -> 'a) -> 'a future
    50   val fork: (unit -> 'a) -> 'a future
    51   val join_results: 'a future list -> 'a Exn.result list
    52   val join_result: 'a future -> 'a Exn.result
    53   val join: 'a future -> 'a
    54   val value: 'a -> 'a future
    55   val map: ('a -> 'b) -> 'a future -> 'b future
    56   val promise_group: group -> 'a future
    57   val promise: unit -> 'a future
    58   val fulfill_result: 'a future -> 'a Exn.result -> unit
    59   val fulfill: 'a future -> 'a -> unit
    60   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    61   val cancel_group: group -> unit
    62   val cancel: 'a future -> unit
    63   val shutdown: unit -> unit
    64   val status: (unit -> 'a) -> 'a
    65 end;
    66 
    67 structure Future: FUTURE =
    68 struct
    69 
    70 (** future values **)
    71 
    72 (* identifiers *)
    73 
    74 type task = Task_Queue.task;
    75 type group = Task_Queue.group;
    76 
    77 local
    78   val tag = Universal.tag () : (task * group) option Universal.tag;
    79 in
    80   fun thread_data () = the_default NONE (Thread.getLocal tag);
    81   fun setmp_thread_data data f x =
    82     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    83 end;
    84 
    85 val is_worker = is_some o thread_data;
    86 val worker_task = Option.map #1 o thread_data;
    87 val worker_group = Option.map #2 o thread_data;
    88 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    89 
    90 fun worker_waiting e =
    91   (case worker_task () of
    92     NONE => e ()
    93   | SOME task => Task_Queue.waiting task e);
    94 
    95 
    96 (* datatype future *)
    97 
    98 type 'a result = 'a Exn.result Single_Assignment.var;
    99 
   100 datatype 'a future = Future of
   101  {promised: bool,
   102   task: task,
   103   group: group,
   104   result: 'a result};
   105 
   106 fun task_of (Future {task, ...}) = task;
   107 fun group_of (Future {group, ...}) = group;
   108 fun result_of (Future {result, ...}) = result;
   109 
   110 fun peek x = Single_Assignment.peek (result_of x);
   111 fun is_finished x = is_some (peek x);
   112 
   113 fun assign_result group result res =
   114   let
   115     val _ = Single_Assignment.assign result res
   116       handle exn as Fail _ =>
   117         (case Single_Assignment.peek result of
   118           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   119         | _ => reraise exn);
   120     val ok =
   121       (case the (Single_Assignment.peek result) of
   122         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   123       | Exn.Result _ => true);
   124   in ok end;
   125 
   126 
   127 
   128 (** scheduling **)
   129 
   130 (* synchronization *)
   131 
   132 val scheduler_event = ConditionVar.conditionVar ();
   133 val work_available = ConditionVar.conditionVar ();
   134 val work_finished = ConditionVar.conditionVar ();
   135 
   136 local
   137   val lock = Mutex.mutex ();
   138 in
   139 
   140 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   141 
   142 fun wait cond = (*requires SYNCHRONIZED*)
   143   Multithreading.sync_wait NONE NONE cond lock;
   144 
   145 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   146   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   147 
   148 fun signal cond = (*requires SYNCHRONIZED*)
   149   ConditionVar.signal cond;
   150 
   151 fun broadcast cond = (*requires SYNCHRONIZED*)
   152   ConditionVar.broadcast cond;
   153 
   154 fun broadcast_work () = (*requires SYNCHRONIZED*)
   155  (ConditionVar.broadcast work_available;
   156   ConditionVar.broadcast work_finished);
   157 
   158 end;
   159 
   160 
   161 (* global state *)
   162 
   163 val queue = Unsynchronized.ref Task_Queue.empty;
   164 val next = Unsynchronized.ref 0;
   165 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   166 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   167 val do_shutdown = Unsynchronized.ref false;
   168 val max_workers = Unsynchronized.ref 0;
   169 val max_active = Unsynchronized.ref 0;
   170 val worker_trend = Unsynchronized.ref 0;
   171 
   172 datatype worker_state = Working | Waiting | Sleeping;
   173 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   174 
   175 fun count_workers state = (*requires SYNCHRONIZED*)
   176   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   177 
   178 
   179 (* execute future jobs *)
   180 
   181 fun future_job group (e: unit -> 'a) =
   182   let
   183     val result = Single_Assignment.var "future" : 'a result;
   184     val pos = Position.thread_data ();
   185     fun job ok =
   186       let
   187         val res =
   188           if ok then
   189             Exn.capture (fn () =>
   190               Multithreading.with_attributes Multithreading.private_interrupts
   191                 (fn _ => Position.setmp_thread_data pos e ())) ()
   192           else Exn.interrupt_exn;
   193       in assign_result group result res end;
   194   in (result, job) end;
   195 
   196 fun cancel_now group = (*requires SYNCHRONIZED*)
   197   Task_Queue.cancel (! queue) group;
   198 
   199 fun cancel_later group = (*requires SYNCHRONIZED*)
   200  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   201   broadcast scheduler_event);
   202 
   203 fun execute (task, group, jobs) =
   204   let
   205     val valid = not (Task_Queue.is_canceled group);
   206     val ok =
   207       Task_Queue.running task (fn () =>
   208         setmp_thread_data (task, group) (fn () =>
   209           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   210     val _ = Multithreading.tracing 1 (fn () =>
   211       let
   212         val s = Task_Queue.str_of_task task;
   213         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   214         val (run, wait) = pairself micros (Task_Queue.timing_of_task task);
   215       in "TASK " ^ s ^ " " ^ run ^ " " ^ wait end);
   216     val _ = SYNCHRONIZED "finish" (fn () =>
   217       let
   218         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   219         val _ =
   220           if ok then ()
   221           else if cancel_now group then ()
   222           else cancel_later group;
   223         val _ = broadcast work_finished;
   224         val _ = if maximal then () else signal work_available;
   225       in () end);
   226   in () end;
   227 
   228 
   229 (* worker threads *)
   230 
   231 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   232   let
   233     val state =
   234       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   235         SOME state => state
   236       | NONE => raise Fail "Unregistered worker thread");
   237     val _ = state := (if active then Waiting else Sleeping);
   238     val _ = wait cond;
   239     val _ = state := Working;
   240   in () end;
   241 
   242 fun worker_next () = (*requires SYNCHRONIZED*)
   243   if length (! workers) > ! max_workers then
   244     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   245      signal work_available;
   246      NONE)
   247   else if count_workers Working > ! max_active then
   248     (worker_wait false work_available; worker_next ())
   249   else
   250     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   251       NONE => (worker_wait false work_available; worker_next ())
   252     | some => (signal work_available; some));
   253 
   254 fun worker_loop name =
   255   (case SYNCHRONIZED name (fn () => worker_next ()) of
   256     NONE => ()
   257   | SOME work => (execute work; worker_loop name));
   258 
   259 fun worker_start name = (*requires SYNCHRONIZED*)
   260   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   261     Unsynchronized.ref Working));
   262 
   263 
   264 (* scheduler *)
   265 
   266 val status_ticks = Unsynchronized.ref 0;
   267 
   268 val last_round = Unsynchronized.ref Time.zeroTime;
   269 val next_round = seconds 0.05;
   270 
   271 fun scheduler_next () = (*requires SYNCHRONIZED*)
   272   let
   273     val now = Time.now ();
   274     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   275     val _ = if tick then last_round := now else ();
   276 
   277 
   278     (* queue and worker status *)
   279 
   280     val _ =
   281       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   282     val _ =
   283       if tick andalso ! status_ticks = 0 then
   284         Multithreading.tracing 1 (fn () =>
   285           let
   286             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   287             val total = length (! workers);
   288             val active = count_workers Working;
   289             val waiting = count_workers Waiting;
   290           in
   291             "SCHEDULE " ^ Time.toString now ^ ": " ^
   292               string_of_int ready ^ " ready, " ^
   293               string_of_int pending ^ " pending, " ^
   294               string_of_int running ^ " running, " ^
   295               string_of_int passive ^ " passive; " ^
   296               string_of_int total ^ " workers, " ^
   297               string_of_int active ^ " active, " ^
   298               string_of_int waiting ^ " waiting "
   299           end)
   300       else ();
   301 
   302     val _ =
   303       if forall (Thread.isActive o #1) (! workers) then ()
   304       else
   305         let
   306           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   307           val _ = workers := alive;
   308         in
   309           Multithreading.tracing 0 (fn () =>
   310             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   311         end;
   312 
   313 
   314     (* worker pool adjustments *)
   315 
   316     val max_active0 = ! max_active;
   317     val max_workers0 = ! max_workers;
   318 
   319     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   320     val _ = max_active := m;
   321 
   322     val mm =
   323       if ! do_shutdown then 0
   324       else if m = 9999 then 1
   325       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   326     val _ =
   327       if tick andalso mm > ! max_workers then
   328         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   329       else if tick andalso mm < ! max_workers then
   330         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   331       else ();
   332     val _ =
   333       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   334         max_workers := mm
   335       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   336         max_workers := Int.min (mm, 2 * m)
   337       else ();
   338 
   339     val missing = ! max_workers - length (! workers);
   340     val _ =
   341       if missing > 0 then
   342         funpow missing (fn () =>
   343           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   344       else ();
   345 
   346     val _ =
   347       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   348       else signal work_available;
   349 
   350 
   351     (* canceled groups *)
   352 
   353     val _ =
   354       if null (! canceled) then ()
   355       else
   356        (Multithreading.tracing 1 (fn () =>
   357           string_of_int (length (! canceled)) ^ " canceled groups");
   358         Unsynchronized.change canceled (filter_out cancel_now);
   359         broadcast_work ());
   360 
   361 
   362     (* delay loop *)
   363 
   364     val _ = Exn.release (wait_timeout next_round scheduler_event);
   365 
   366 
   367     (* shutdown *)
   368 
   369     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   370     val continue = not (! do_shutdown andalso null (! workers));
   371     val _ = if continue then () else scheduler := NONE;
   372 
   373     val _ = broadcast scheduler_event;
   374   in continue end
   375   handle exn =>
   376     if Exn.is_interrupt exn then
   377      (Multithreading.tracing 1 (fn () => "Interrupt");
   378       List.app cancel_later (Task_Queue.cancel_all (! queue));
   379       broadcast_work (); true)
   380     else reraise exn;
   381 
   382 fun scheduler_loop () =
   383   while
   384     Multithreading.with_attributes
   385       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   386       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   387   do ();
   388 
   389 fun scheduler_active () = (*requires SYNCHRONIZED*)
   390   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   391 
   392 fun scheduler_check () = (*requires SYNCHRONIZED*)
   393  (do_shutdown := false;
   394   if scheduler_active () then ()
   395   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   396 
   397 
   398 
   399 (** futures **)
   400 
   401 (* fork *)
   402 
   403 fun bulk {name, group, deps, pri} es =
   404   let
   405     val grp =
   406       (case group of
   407         NONE => worker_subgroup ()
   408       | SOME grp => grp);
   409     fun enqueue e (minimal, queue) =
   410       let
   411         val (result, job) = future_job grp e;
   412         val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
   413         val future = Future {promised = false, task = task, group = grp, result = result};
   414       in (future, (minimal orelse minimal', queue')) end;
   415   in
   416     SYNCHRONIZED "enqueue" (fn () =>
   417       let
   418         val (futures, minimal) =
   419           Unsynchronized.change_result queue (fn q =>
   420             let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
   421             in ((futures, minimal), q') end);
   422         val _ = if minimal then signal work_available else ();
   423         val _ = scheduler_check ();
   424       in futures end)
   425   end;
   426 
   427 fun fork_pri pri e = singleton (bulk {name = "", group = NONE, deps = [], pri = pri}) e;
   428 fun fork e = fork_pri 0 e;
   429 
   430 
   431 (* join *)
   432 
   433 local
   434 
   435 fun get_result x =
   436   (case peek x of
   437     NONE => Exn.Exn (Fail "Unfinished future")
   438   | SOME res =>
   439       if Exn.is_interrupt_exn res then
   440         (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   441           [] => res
   442         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   443       else res);
   444 
   445 fun join_next deps = (*requires SYNCHRONIZED*)
   446   if null deps then NONE
   447   else
   448     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   449       (NONE, []) => NONE
   450     | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   451     | (SOME work, deps') => SOME (work, deps'));
   452 
   453 fun execute_work NONE = ()
   454   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   455 and join_work deps =
   456   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   457 
   458 fun join_depend task deps =
   459   execute_work (SYNCHRONIZED "join" (fn () =>
   460     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   461 
   462 in
   463 
   464 fun join_results xs =
   465   if forall is_finished xs then map get_result xs
   466   else if Multithreading.self_critical () then
   467     error "Cannot join future values within critical section"
   468   else
   469     worker_waiting (fn () =>
   470       (case worker_task () of
   471         SOME task => join_depend task (map task_of xs)
   472       | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
   473       map get_result xs));
   474 
   475 end;
   476 
   477 fun join_result x = singleton join_results x;
   478 fun join x = Exn.release (join_result x);
   479 
   480 
   481 (* fast-path versions -- bypassing full task management *)
   482 
   483 fun value (x: 'a) =
   484   let
   485     val group = Task_Queue.new_group NONE;
   486     val result = Single_Assignment.var "value" : 'a result;
   487     val _ = assign_result group result (Exn.Result x);
   488   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   489 
   490 fun map_future f x =
   491   let
   492     val task = task_of x;
   493     val group = Task_Queue.new_group (SOME (group_of x));
   494     val (result, job) = future_job group (fn () => f (join x));
   495 
   496     val extended = SYNCHRONIZED "extend" (fn () =>
   497       (case Task_Queue.extend task job (! queue) of
   498         SOME queue' => (queue := queue'; true)
   499       | NONE => false));
   500   in
   501     if extended then Future {promised = false, task = task, group = group, result = result}
   502     else
   503       singleton
   504         (bulk {name = "Future.map", group = SOME group,
   505           deps = [task], pri = Task_Queue.pri_of_task task})
   506         (fn () => f (join x))
   507   end;
   508 
   509 
   510 (* promised futures -- fulfilled by external means *)
   511 
   512 fun promise_group group : 'a future =
   513   let
   514     val result = Single_Assignment.var "promise" : 'a result;
   515     fun abort () = assign_result group result Exn.interrupt_exn
   516       handle Fail _ => true
   517         | exn =>
   518             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   519             else reraise exn;
   520     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   521       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   522   in Future {promised = true, task = task, group = group, result = result} end;
   523 
   524 fun promise () = promise_group (worker_subgroup ());
   525 
   526 fun fulfill_result (Future {promised, task, group, result}) res =
   527   if not promised then raise Fail "Not a promised future"
   528   else
   529     let
   530       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   531       val _ =
   532         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   533           let
   534             val still_passive =
   535               SYNCHRONIZED "fulfill_result" (fn () =>
   536                 Unsynchronized.change_result queue
   537                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   538           in if still_passive then execute (task, group, [job]) else () end);
   539       val _ = worker_waiting (fn () => Single_Assignment.await result);
   540     in () end;
   541 
   542 fun fulfill x res = fulfill_result x (Exn.Result res);
   543 
   544 
   545 (* cancellation *)
   546 
   547 fun interruptible_task f x =
   548   if Multithreading.available then
   549     Multithreading.with_attributes
   550       (if is_worker ()
   551        then Multithreading.private_interrupts
   552        else Multithreading.public_interrupts)
   553       (fn _ => f x)
   554   else interruptible f x;
   555 
   556 (*cancel: present and future group members will be interrupted eventually*)
   557 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   558  (if cancel_now group then () else cancel_later group;
   559   signal work_available; scheduler_check ()));
   560 
   561 fun cancel x = cancel_group (group_of x);
   562 
   563 
   564 (* shutdown *)
   565 
   566 fun shutdown () =
   567   if Multithreading.available then
   568     SYNCHRONIZED "shutdown" (fn () =>
   569      while scheduler_active () do
   570       (wait scheduler_event; broadcast_work ()))
   571   else ();
   572 
   573 
   574 (* status markup *)
   575 
   576 fun status e =
   577   let
   578     val task_props =
   579       (case worker_task () of
   580         NONE => I
   581       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   582     val _ = Output.status (Markup.markup (task_props Markup.forked) "");
   583     val x = e ();  (*sic -- report "joined" only for success*)
   584     val _ = Output.status (Markup.markup (task_props Markup.joined) "");
   585   in x end;
   586 
   587 
   588 (*final declarations of this structure!*)
   589 val map = map_future;
   590 
   591 end;
   592 
   593 type 'a future = 'a Future.future;
   594