src/Pure/Concurrent/future.ML
author wenzelm
Wed, 02 Feb 2011 15:04:09 +0100
changeset 42563 73dde8006820
parent 42561 b5d7b15166bf
child 42566 afdbec23b92b
permissions -rw-r--r--
maintain Task_Queue.group within Task_Queue.task;
Task_Queue.dummy_task: id = 0 in accordance to Document.no_id etc.;
tuned signature;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 http://www4.in.tum.de/~wenzelm/papers/parallel-ml.pdf
     7 
     8 Notes:
     9 
    10   * Futures are similar to delayed evaluation, i.e. delay/force is
    11     generalized to fork/join (and variants).  The idea is to model
    12     parallel value-oriented computations, but *not* communicating
    13     processes.
    14 
    15   * Futures are grouped; failure of one group member causes the whole
    16     group to be interrupted eventually.  Groups are block-structured.
    17 
    18   * Forked futures are evaluated spontaneously by a farm of worker
    19     threads in the background; join resynchronizes the computation and
    20     delivers results (values or exceptions).
    21 
    22   * The pool of worker threads is limited, usually in correlation with
    23     the number of physical cores on the machine.  Note that allocation
    24     of runtime resources is distorted either if workers yield CPU time
    25     (e.g. via system sleep or wait operations), or if non-worker
    26     threads contend for significant runtime resources independently.
    27 
    28   * Promised futures are fulfilled by external means.  There is no
    29     associated evaluation task, but other futures can depend on them
    30     as usual.
    31 *)
    32 
    33 signature FUTURE =
    34 sig
    35   val worker_task: unit -> Task_Queue.task option
    36   val worker_group: unit -> Task_Queue.group option
    37   val worker_subgroup: unit -> Task_Queue.group
    38   type 'a future
    39   val task_of: 'a future -> Task_Queue.task
    40   val peek: 'a future -> 'a Exn.result option
    41   val is_finished: 'a future -> bool
    42   val forks:
    43     {name: string, group: Task_Queue.group option, deps: Task_Queue.task list, pri: int} ->
    44       (unit -> 'a) list -> 'a future list
    45   val fork_pri: int -> (unit -> 'a) -> 'a future
    46   val fork: (unit -> 'a) -> 'a future
    47   val join_results: 'a future list -> 'a Exn.result list
    48   val join_result: 'a future -> 'a Exn.result
    49   val join: 'a future -> 'a
    50   val value: 'a -> 'a future
    51   val map: ('a -> 'b) -> 'a future -> 'b future
    52   val promise_group: Task_Queue.group -> 'a future
    53   val promise: unit -> 'a future
    54   val fulfill_result: 'a future -> 'a Exn.result -> unit
    55   val fulfill: 'a future -> 'a -> unit
    56   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    57   val cancel_group: Task_Queue.group -> unit
    58   val cancel: 'a future -> unit
    59   val shutdown: unit -> unit
    60   val status: (unit -> 'a) -> 'a
    61 end;
    62 
    63 structure Future: FUTURE =
    64 struct
    65 
    66 (** future values **)
    67 
    68 (* identifiers *)
    69 
    70 local
    71   val tag = Universal.tag () : Task_Queue.task option Universal.tag;
    72 in
    73   fun worker_task () = the_default NONE (Thread.getLocal tag);
    74   fun setmp_worker_task data f x =
    75     Library.setmp_thread_data tag (worker_task ()) (SOME data) f x;
    76 end;
    77 
    78 val worker_group = Option.map Task_Queue.group_of_task o worker_task;
    79 fun worker_subgroup () = Task_Queue.new_group (worker_group ());
    80 
    81 fun worker_joining e =
    82   (case worker_task () of
    83     NONE => e ()
    84   | SOME task => Task_Queue.joining task e);
    85 
    86 fun worker_waiting deps e =
    87   (case worker_task () of
    88     NONE => e ()
    89   | SOME task => Task_Queue.waiting task deps e);
    90 
    91 
    92 (* datatype future *)
    93 
    94 type 'a result = 'a Exn.result Single_Assignment.var;
    95 
    96 datatype 'a future = Future of
    97  {promised: bool,
    98   task: Task_Queue.task,
    99   result: 'a result};
   100 
   101 fun task_of (Future {task, ...}) = task;
   102 fun result_of (Future {result, ...}) = result;
   103 
   104 fun peek x = Single_Assignment.peek (result_of x);
   105 fun is_finished x = is_some (peek x);
   106 
   107 fun assign_result group result res =
   108   let
   109     val _ = Single_Assignment.assign result res
   110       handle exn as Fail _ =>
   111         (case Single_Assignment.peek result of
   112           SOME (Exn.Exn e) => reraise (if Exn.is_interrupt e then e else exn)
   113         | _ => reraise exn);
   114     val ok =
   115       (case the (Single_Assignment.peek result) of
   116         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   117       | Exn.Result _ => true);
   118   in ok end;
   119 
   120 
   121 
   122 (** scheduling **)
   123 
   124 (* synchronization *)
   125 
   126 val scheduler_event = ConditionVar.conditionVar ();
   127 val work_available = ConditionVar.conditionVar ();
   128 val work_finished = ConditionVar.conditionVar ();
   129 
   130 local
   131   val lock = Mutex.mutex ();
   132 in
   133 
   134 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   135 
   136 fun wait cond = (*requires SYNCHRONIZED*)
   137   Multithreading.sync_wait NONE NONE cond lock;
   138 
   139 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   140   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   141 
   142 fun signal cond = (*requires SYNCHRONIZED*)
   143   ConditionVar.signal cond;
   144 
   145 fun broadcast cond = (*requires SYNCHRONIZED*)
   146   ConditionVar.broadcast cond;
   147 
   148 fun broadcast_work () = (*requires SYNCHRONIZED*)
   149  (ConditionVar.broadcast work_available;
   150   ConditionVar.broadcast work_finished);
   151 
   152 end;
   153 
   154 
   155 (* global state *)
   156 
   157 val queue = Unsynchronized.ref Task_Queue.empty;
   158 val next = Unsynchronized.ref 0;
   159 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   160 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   161 val do_shutdown = Unsynchronized.ref false;
   162 val max_workers = Unsynchronized.ref 0;
   163 val max_active = Unsynchronized.ref 0;
   164 val worker_trend = Unsynchronized.ref 0;
   165 
   166 datatype worker_state = Working | Waiting | Sleeping;
   167 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   168 
   169 fun count_workers state = (*requires SYNCHRONIZED*)
   170   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   171 
   172 
   173 (* execute future jobs *)
   174 
   175 fun future_job group (e: unit -> 'a) =
   176   let
   177     val result = Single_Assignment.var "future" : 'a result;
   178     val pos = Position.thread_data ();
   179     fun job ok =
   180       let
   181         val res =
   182           if ok then
   183             Exn.capture (fn () =>
   184               Multithreading.with_attributes Multithreading.private_interrupts
   185                 (fn _ => Position.setmp_thread_data pos e ())) ()
   186           else Exn.interrupt_exn;
   187       in assign_result group result res end;
   188   in (result, job) end;
   189 
   190 fun cancel_now group = (*requires SYNCHRONIZED*)
   191   Task_Queue.cancel (! queue) group;
   192 
   193 fun cancel_later group = (*requires SYNCHRONIZED*)
   194  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   195   broadcast scheduler_event);
   196 
   197 fun execute (task, jobs) =
   198   let
   199     val group = Task_Queue.group_of_task task;
   200     val valid = not (Task_Queue.is_canceled group);
   201     val ok =
   202       Task_Queue.running task (fn () =>
   203         setmp_worker_task task (fn () =>
   204           fold (fn job => fn ok => job valid andalso ok) jobs true) ());
   205     val _ = Multithreading.tracing 1 (fn () =>
   206       let
   207         val s = Task_Queue.str_of_task task;
   208         fun micros time = string_of_int (Time.toNanoseconds time div 1000);
   209         val (run, wait, deps) = Task_Queue.timing_of_task task;
   210       in "TASK " ^ s ^ " " ^ micros run ^ " " ^ micros wait ^ " (" ^ commas deps ^ ")" end);
   211     val _ = SYNCHRONIZED "finish" (fn () =>
   212       let
   213         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   214         val _ =
   215           if ok then ()
   216           else if cancel_now group then ()
   217           else cancel_later group;
   218         val _ = broadcast work_finished;
   219         val _ = if maximal then () else signal work_available;
   220       in () end);
   221   in () end;
   222 
   223 
   224 (* worker threads *)
   225 
   226 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   227   let
   228     val state =
   229       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   230         SOME state => state
   231       | NONE => raise Fail "Unregistered worker thread");
   232     val _ = state := (if active then Waiting else Sleeping);
   233     val _ = wait cond;
   234     val _ = state := Working;
   235   in () end;
   236 
   237 fun worker_next () = (*requires SYNCHRONIZED*)
   238   if length (! workers) > ! max_workers then
   239     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   240      signal work_available;
   241      NONE)
   242   else if count_workers Working > ! max_active then
   243     (worker_wait false work_available; worker_next ())
   244   else
   245     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   246       NONE => (worker_wait false work_available; worker_next ())
   247     | some => (signal work_available; some));
   248 
   249 fun worker_loop name =
   250   (case SYNCHRONIZED name (fn () => worker_next ()) of
   251     NONE => ()
   252   | SOME work => (execute work; worker_loop name));
   253 
   254 fun worker_start name = (*requires SYNCHRONIZED*)
   255   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   256     Unsynchronized.ref Working));
   257 
   258 
   259 (* scheduler *)
   260 
   261 val status_ticks = Unsynchronized.ref 0;
   262 
   263 val last_round = Unsynchronized.ref Time.zeroTime;
   264 val next_round = seconds 0.05;
   265 
   266 fun scheduler_next () = (*requires SYNCHRONIZED*)
   267   let
   268     val now = Time.now ();
   269     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   270     val _ = if tick then last_round := now else ();
   271 
   272 
   273     (* queue and worker status *)
   274 
   275     val _ =
   276       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   277     val _ =
   278       if tick andalso ! status_ticks = 0 then
   279         Multithreading.tracing 1 (fn () =>
   280           let
   281             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   282             val total = length (! workers);
   283             val active = count_workers Working;
   284             val waiting = count_workers Waiting;
   285           in
   286             "SCHEDULE " ^ Time.toString now ^ ": " ^
   287               string_of_int ready ^ " ready, " ^
   288               string_of_int pending ^ " pending, " ^
   289               string_of_int running ^ " running, " ^
   290               string_of_int passive ^ " passive; " ^
   291               string_of_int total ^ " workers, " ^
   292               string_of_int active ^ " active, " ^
   293               string_of_int waiting ^ " waiting "
   294           end)
   295       else ();
   296 
   297     val _ =
   298       if forall (Thread.isActive o #1) (! workers) then ()
   299       else
   300         let
   301           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   302           val _ = workers := alive;
   303         in
   304           Multithreading.tracing 0 (fn () =>
   305             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   306         end;
   307 
   308 
   309     (* worker pool adjustments *)
   310 
   311     val max_active0 = ! max_active;
   312     val max_workers0 = ! max_workers;
   313 
   314     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   315     val _ = max_active := m;
   316 
   317     val mm =
   318       if ! do_shutdown then 0
   319       else if m = 9999 then 1
   320       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   321     val _ =
   322       if tick andalso mm > ! max_workers then
   323         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   324       else if tick andalso mm < ! max_workers then
   325         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   326       else ();
   327     val _ =
   328       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   329         max_workers := mm
   330       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   331         max_workers := Int.min (mm, 2 * m)
   332       else ();
   333 
   334     val missing = ! max_workers - length (! workers);
   335     val _ =
   336       if missing > 0 then
   337         funpow missing (fn () =>
   338           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   339       else ();
   340 
   341     val _ =
   342       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   343       else signal work_available;
   344 
   345 
   346     (* canceled groups *)
   347 
   348     val _ =
   349       if null (! canceled) then ()
   350       else
   351        (Multithreading.tracing 1 (fn () =>
   352           string_of_int (length (! canceled)) ^ " canceled groups");
   353         Unsynchronized.change canceled (filter_out cancel_now);
   354         broadcast_work ());
   355 
   356 
   357     (* delay loop *)
   358 
   359     val _ = Exn.release (wait_timeout next_round scheduler_event);
   360 
   361 
   362     (* shutdown *)
   363 
   364     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   365     val continue = not (! do_shutdown andalso null (! workers));
   366     val _ = if continue then () else scheduler := NONE;
   367 
   368     val _ = broadcast scheduler_event;
   369   in continue end
   370   handle exn =>
   371     if Exn.is_interrupt exn then
   372      (Multithreading.tracing 1 (fn () => "Interrupt");
   373       List.app cancel_later (Task_Queue.cancel_all (! queue));
   374       broadcast_work (); true)
   375     else reraise exn;
   376 
   377 fun scheduler_loop () =
   378   while
   379     Multithreading.with_attributes
   380       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   381       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   382   do ();
   383 
   384 fun scheduler_active () = (*requires SYNCHRONIZED*)
   385   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   386 
   387 fun scheduler_check () = (*requires SYNCHRONIZED*)
   388  (do_shutdown := false;
   389   if scheduler_active () then ()
   390   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   391 
   392 
   393 
   394 (** futures **)
   395 
   396 (* fork *)
   397 
   398 fun forks {name, group, deps, pri} es =
   399   if null es then []
   400   else
   401     let
   402       val grp =
   403         (case group of
   404           NONE => worker_subgroup ()
   405         | SOME grp => grp);
   406       fun enqueue e (minimal, queue) =
   407         let
   408           val (result, job) = future_job grp e;
   409           val ((task, minimal'), queue') = Task_Queue.enqueue name grp deps pri job queue;
   410           val future = Future {promised = false, task = task, result = result};
   411         in (future, (minimal orelse minimal', queue')) end;
   412     in
   413       SYNCHRONIZED "enqueue" (fn () =>
   414         let
   415           val (futures, minimal) =
   416             Unsynchronized.change_result queue (fn q =>
   417               let val (futures, (minimal, q')) = fold_map enqueue es (false, q)
   418               in ((futures, minimal), q') end);
   419           val _ = if minimal then signal work_available else ();
   420           val _ = scheduler_check ();
   421         in futures end)
   422     end;
   423 
   424 fun fork_pri pri e = singleton (forks {name = "", group = NONE, deps = [], pri = pri}) e;
   425 fun fork e = fork_pri 0 e;
   426 
   427 
   428 (* join *)
   429 
   430 local
   431 
   432 fun get_result x =
   433   (case peek x of
   434     NONE => Exn.Exn (Fail "Unfinished future")
   435   | SOME res =>
   436       if Exn.is_interrupt_exn res then
   437         (case Exn.flatten_list (Task_Queue.group_status (Task_Queue.group_of_task (task_of x))) of
   438           [] => res
   439         | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   440       else res);
   441 
   442 fun join_next deps = (*requires SYNCHRONIZED*)
   443   if Task_Queue.finished_deps deps then NONE
   444   else
   445     (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
   446       (NONE, deps') =>
   447         if Task_Queue.finished_deps deps' then NONE
   448         else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
   449     | (SOME work, deps') => SOME (work, deps'));
   450 
   451 fun execute_work NONE = ()
   452   | execute_work (SOME (work, deps')) = (worker_joining (fn () => execute work); join_work deps')
   453 and join_work deps =
   454   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   455 
   456 in
   457 
   458 fun join_results xs =
   459   let
   460     val _ =
   461       if forall is_finished xs then ()
   462       else if Multithreading.self_critical () then
   463         error "Cannot join future values within critical section"
   464       else if is_some (worker_task ()) then
   465         join_work (Task_Queue.init_deps (map task_of xs))
   466       else List.app (ignore o Single_Assignment.await o result_of) xs;
   467   in map get_result xs end;
   468 
   469 end;
   470 
   471 fun join_result x = singleton join_results x;
   472 fun join x = Exn.release (join_result x);
   473 
   474 
   475 (* fast-path versions -- bypassing full task management *)
   476 
   477 fun value (x: 'a) =
   478   let
   479     val task = Task_Queue.dummy_task ();
   480     val group = Task_Queue.group_of_task task;
   481     val result = Single_Assignment.var "value" : 'a result;
   482     val _ = assign_result group result (Exn.Result x);
   483   in Future {promised = false, task = task, result = result} end;
   484 
   485 fun map_future f x =
   486   let
   487     val task = task_of x;
   488     val group = Task_Queue.new_group (SOME (Task_Queue.group_of_task task));
   489     val (result, job) = future_job group (fn () => f (join x));
   490 
   491     val extended = SYNCHRONIZED "extend" (fn () =>
   492       (case Task_Queue.extend task job (! queue) of
   493         SOME queue' => (queue := queue'; true)
   494       | NONE => false));
   495   in
   496     if extended then Future {promised = false, task = task, result = result}
   497     else
   498       singleton
   499         (forks {name = "Future.map", group = SOME group,
   500           deps = [task], pri = Task_Queue.pri_of_task task})
   501         (fn () => f (join x))
   502   end;
   503 
   504 
   505 (* promised futures -- fulfilled by external means *)
   506 
   507 fun promise_group group : 'a future =
   508   let
   509     val result = Single_Assignment.var "promise" : 'a result;
   510     fun abort () = assign_result group result Exn.interrupt_exn
   511       handle Fail _ => true
   512         | exn =>
   513             if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
   514             else reraise exn;
   515     val task = SYNCHRONIZED "enqueue_passive" (fn () =>
   516       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
   517   in Future {promised = true, task = task, result = result} end;
   518 
   519 fun promise () = promise_group (worker_subgroup ());
   520 
   521 fun fulfill_result (Future {promised, task, result}) res =
   522   if not promised then raise Fail "Not a promised future"
   523   else
   524     let
   525       val group = Task_Queue.group_of_task task;
   526       fun job ok = assign_result group result (if ok then res else Exn.interrupt_exn);
   527       val _ =
   528         Multithreading.with_attributes Multithreading.no_interrupts (fn _ =>
   529           let
   530             val still_passive =
   531               SYNCHRONIZED "fulfill_result" (fn () =>
   532                 Unsynchronized.change_result queue
   533                   (Task_Queue.dequeue_passive (Thread.self ()) task));
   534           in if still_passive then execute (task, [job]) else () end);
   535       val _ =
   536         worker_waiting (Task_Queue.init_deps [task])
   537           (fn () => Single_Assignment.await result);
   538     in () end;
   539 
   540 fun fulfill x res = fulfill_result x (Exn.Result res);
   541 
   542 
   543 (* cancellation *)
   544 
   545 fun interruptible_task f x =
   546   if Multithreading.available then
   547     Multithreading.with_attributes
   548       (if is_some (worker_task ())
   549        then Multithreading.private_interrupts
   550        else Multithreading.public_interrupts)
   551       (fn _ => f x)
   552   else interruptible f x;
   553 
   554 (*cancel: present and future group members will be interrupted eventually*)
   555 fun cancel_group group = SYNCHRONIZED "cancel" (fn () =>
   556  (if cancel_now group then () else cancel_later group;
   557   signal work_available; scheduler_check ()));
   558 
   559 fun cancel x = cancel_group (Task_Queue.group_of_task (task_of x));
   560 
   561 
   562 (* shutdown *)
   563 
   564 fun shutdown () =
   565   if Multithreading.available then
   566     SYNCHRONIZED "shutdown" (fn () =>
   567      while scheduler_active () do
   568       (wait scheduler_event; broadcast_work ()))
   569   else ();
   570 
   571 
   572 (* status markup *)
   573 
   574 fun status e =
   575   let
   576     val task_props =
   577       (case worker_task () of
   578         NONE => I
   579       | SOME task => Markup.properties [(Markup.taskN, Task_Queue.str_of_task task)]);
   580     val _ = Output.status (Markup.markup (task_props Markup.forked) "");
   581     val x = e ();  (*sic -- report "joined" only for success*)
   582     val _ = Output.status (Markup.markup (task_props Markup.joined) "");
   583   in x end;
   584 
   585 
   586 (*final declarations of this structure!*)
   587 val map = map_future;
   588 
   589 end;
   590 
   591 type 'a future = 'a Future.future;
   592