src/Pure/Concurrent/future.ML
author wenzelm
Sun, 04 Jul 2010 21:01:22 +0200
changeset 37706 b16231572c61
parent 37698 ad5489a6be48
child 38120 a902f158b4fc
permissions -rw-r--r--
general Future.report -- also for Toplevel.async_state;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 
    27   * Promised futures are fulfilled by external means.  There is no
    28     associated evaluation task, but other futures can depend on them
    29     as usual.
    30 *)
    31 
    32 signature FUTURE =
    33 sig
    34   type task = Task_Queue.task
    35   type group = Task_Queue.group
    36   val is_worker: unit -> bool
    37   val worker_task: unit -> Task_Queue.task option
    38   val worker_group: unit -> Task_Queue.group option
    39   type 'a future
    40   val task_of: 'a future -> task
    41   val group_of: 'a future -> group
    42   val peek: 'a future -> 'a Exn.result option
    43   val is_finished: 'a future -> bool
    44   val fork_group: group -> (unit -> 'a) -> 'a future
    45   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    46   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    47   val fork_pri: int -> (unit -> 'a) -> 'a future
    48   val fork: (unit -> 'a) -> 'a future
    49   val join_results: 'a future list -> 'a Exn.result list
    50   val join_result: 'a future -> 'a Exn.result
    51   val join: 'a future -> 'a
    52   val value: 'a -> 'a future
    53   val map: ('a -> 'b) -> 'a future -> 'b future
    54   val promise_group: group -> 'a future
    55   val promise: unit -> 'a future
    56   val fulfill_result: 'a future -> 'a Exn.result -> unit
    57   val fulfill: 'a future -> 'a -> unit
    58   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    59   val cancel_group: group -> unit
    60   val cancel: 'a future -> unit
    61   val shutdown: unit -> unit
    62   val report: (unit -> 'a) -> 'a
    63 end;
    64 
    65 structure Future: FUTURE =
    66 struct
    67 
    68 (** future values **)
    69 
    70 (* identifiers *)
    71 
    72 type task = Task_Queue.task;
    73 type group = Task_Queue.group;
    74 
    75 local
    76   val tag = Universal.tag () : (task * group) option Universal.tag;
    77 in
    78   fun thread_data () = the_default NONE (Thread.getLocal tag);
    79   fun setmp_thread_data data f x =
    80     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    81 end;
    82 
    83 val is_worker = is_some o thread_data;
    84 val worker_task = Option.map #1 o thread_data;
    85 val worker_group = Option.map #2 o thread_data;
    86 
    87 fun new_group () = Task_Queue.new_group (worker_group ());
    88 
    89 
    90 (* datatype future *)
    91 
    92 type 'a result = 'a Exn.result Single_Assignment.var;
    93 
    94 datatype 'a future = Future of
    95  {promised: bool,
    96   task: task,
    97   group: group,
    98   result: 'a result};
    99 
   100 fun task_of (Future {task, ...}) = task;
   101 fun group_of (Future {group, ...}) = group;
   102 fun result_of (Future {result, ...}) = result;
   103 
   104 fun peek x = Single_Assignment.peek (result_of x);
   105 fun is_finished x = is_some (peek x);
   106 
   107 fun assign_result group result res =
   108   let
   109     val _ = Single_Assignment.assign result res;
   110     val ok =
   111       (case res of
   112         Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   113       | Exn.Result _ => true);
   114   in ok end;
   115 
   116 
   117 
   118 (** scheduling **)
   119 
   120 (* synchronization *)
   121 
   122 val scheduler_event = ConditionVar.conditionVar ();
   123 val work_available = ConditionVar.conditionVar ();
   124 val work_finished = ConditionVar.conditionVar ();
   125 
   126 local
   127   val lock = Mutex.mutex ();
   128 in
   129 
   130 fun SYNCHRONIZED name = Simple_Thread.synchronized name lock;
   131 
   132 fun wait cond = (*requires SYNCHRONIZED*)
   133   Multithreading.sync_wait NONE NONE cond lock;
   134 
   135 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   136   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   137 
   138 fun signal cond = (*requires SYNCHRONIZED*)
   139   ConditionVar.signal cond;
   140 
   141 fun broadcast cond = (*requires SYNCHRONIZED*)
   142   ConditionVar.broadcast cond;
   143 
   144 fun broadcast_work () = (*requires SYNCHRONIZED*)
   145  (ConditionVar.broadcast work_available;
   146   ConditionVar.broadcast work_finished);
   147 
   148 end;
   149 
   150 
   151 (* global state *)
   152 
   153 val queue = Unsynchronized.ref Task_Queue.empty;
   154 val next = Unsynchronized.ref 0;
   155 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   156 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   157 val do_shutdown = Unsynchronized.ref false;
   158 val max_workers = Unsynchronized.ref 0;
   159 val max_active = Unsynchronized.ref 0;
   160 val worker_trend = Unsynchronized.ref 0;
   161 
   162 datatype worker_state = Working | Waiting | Sleeping;
   163 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   164 
   165 fun count_workers state = (*requires SYNCHRONIZED*)
   166   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   167 
   168 
   169 (* execute future jobs *)
   170 
   171 fun future_job group (e: unit -> 'a) =
   172   let
   173     val result = Single_Assignment.var "future" : 'a result;
   174     val pos = Position.thread_data ();
   175     fun job ok =
   176       let
   177         val res =
   178           if ok then
   179             Exn.capture (fn () =>
   180               Multithreading.with_attributes Multithreading.private_interrupts
   181                 (fn _ => Position.setmp_thread_data pos e ())) ()
   182           else Exn.Exn Exn.Interrupt;
   183       in assign_result group result res end;
   184   in (result, job) end;
   185 
   186 fun cancel_now group = (*requires SYNCHRONIZED*)
   187   Task_Queue.cancel (! queue) group;
   188 
   189 fun cancel_later group = (*requires SYNCHRONIZED*)
   190  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   191   broadcast scheduler_event);
   192 
   193 fun execute (task, group, jobs) =
   194   let
   195     val valid = not (Task_Queue.is_canceled group);
   196     val ok = setmp_thread_data (task, group) (fn () =>
   197       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   198     val _ = SYNCHRONIZED "finish" (fn () =>
   199       let
   200         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   201         val _ =
   202           if ok then ()
   203           else if cancel_now group then ()
   204           else cancel_later group;
   205         val _ = broadcast work_finished;
   206         val _ = if maximal then () else signal work_available;
   207       in () end);
   208   in () end;
   209 
   210 
   211 (* worker threads *)
   212 
   213 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   214   let
   215     val state =
   216       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   217         SOME state => state
   218       | NONE => raise Fail "Unregistered worker thread");
   219     val _ = state := (if active then Waiting else Sleeping);
   220     val _ = wait cond;
   221     val _ = state := Working;
   222   in () end;
   223 
   224 fun worker_next () = (*requires SYNCHRONIZED*)
   225   if length (! workers) > ! max_workers then
   226     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   227      signal work_available;
   228      NONE)
   229   else if count_workers Working > ! max_active then
   230     (worker_wait false work_available; worker_next ())
   231   else
   232     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   233       NONE => (worker_wait false work_available; worker_next ())
   234     | some => (signal work_available; some));
   235 
   236 fun worker_loop name =
   237   (case SYNCHRONIZED name (fn () => worker_next ()) of
   238     NONE => ()
   239   | SOME work => (execute work; worker_loop name));
   240 
   241 fun worker_start name = (*requires SYNCHRONIZED*)
   242   Unsynchronized.change workers (cons (Simple_Thread.fork false (fn () => worker_loop name),
   243     Unsynchronized.ref Working));
   244 
   245 
   246 (* scheduler *)
   247 
   248 val status_ticks = Unsynchronized.ref 0;
   249 
   250 val last_round = Unsynchronized.ref Time.zeroTime;
   251 val next_round = Time.fromMilliseconds 50;
   252 
   253 fun scheduler_next () = (*requires SYNCHRONIZED*)
   254   let
   255     val now = Time.now ();
   256     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   257     val _ = if tick then last_round := now else ();
   258 
   259 
   260     (* queue and worker status *)
   261 
   262     val _ =
   263       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   264     val _ =
   265       if tick andalso ! status_ticks = 0 then
   266         Multithreading.tracing 1 (fn () =>
   267           let
   268             val {ready, pending, running, passive} = Task_Queue.status (! queue);
   269             val total = length (! workers);
   270             val active = count_workers Working;
   271             val waiting = count_workers Waiting;
   272           in
   273             "SCHEDULE " ^ Time.toString now ^ ": " ^
   274               string_of_int ready ^ " ready, " ^
   275               string_of_int pending ^ " pending, " ^
   276               string_of_int running ^ " running, " ^
   277               string_of_int passive ^ " passive; " ^
   278               string_of_int total ^ " workers, " ^
   279               string_of_int active ^ " active, " ^
   280               string_of_int waiting ^ " waiting "
   281           end)
   282       else ();
   283 
   284     val _ =
   285       if forall (Thread.isActive o #1) (! workers) then ()
   286       else
   287         let
   288           val (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   289           val _ = workers := alive;
   290         in
   291           Multithreading.tracing 0 (fn () =>
   292             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   293         end;
   294 
   295 
   296     (* worker pool adjustments *)
   297 
   298     val max_active0 = ! max_active;
   299     val max_workers0 = ! max_workers;
   300 
   301     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   302     val _ = max_active := m;
   303 
   304     val mm =
   305       if ! do_shutdown then 0
   306       else if m = 9999 then 1
   307       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   308     val _ =
   309       if tick andalso mm > ! max_workers then
   310         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   311       else if tick andalso mm < ! max_workers then
   312         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   313       else ();
   314     val _ =
   315       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   316         max_workers := mm
   317       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   318         max_workers := Int.min (mm, 2 * m)
   319       else ();
   320 
   321     val missing = ! max_workers - length (! workers);
   322     val _ =
   323       if missing > 0 then
   324         funpow missing (fn () =>
   325           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   326       else ();
   327 
   328     val _ =
   329       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   330       else signal work_available;
   331 
   332 
   333     (* canceled groups *)
   334 
   335     val _ =
   336       if null (! canceled) then ()
   337       else
   338        (Multithreading.tracing 1 (fn () =>
   339           string_of_int (length (! canceled)) ^ " canceled groups");
   340         Unsynchronized.change canceled (filter_out cancel_now);
   341         broadcast_work ());
   342 
   343 
   344     (* delay loop *)
   345 
   346     val _ = Exn.release (wait_timeout next_round scheduler_event);
   347 
   348 
   349     (* shutdown *)
   350 
   351     val _ = if Task_Queue.all_passive (! queue) then do_shutdown := true else ();
   352     val continue = not (! do_shutdown andalso null (! workers));
   353     val _ = if continue then () else scheduler := NONE;
   354 
   355     val _ = broadcast scheduler_event;
   356   in continue end
   357   handle Exn.Interrupt =>
   358    (Multithreading.tracing 1 (fn () => "Interrupt");
   359     List.app cancel_later (Task_Queue.cancel_all (! queue));
   360     broadcast_work (); true);
   361 
   362 fun scheduler_loop () =
   363   while
   364     Multithreading.with_attributes
   365       (Multithreading.sync_interrupts Multithreading.public_interrupts)
   366       (fn _ => SYNCHRONIZED "scheduler" (fn () => scheduler_next ()))
   367   do ();
   368 
   369 fun scheduler_active () = (*requires SYNCHRONIZED*)
   370   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   371 
   372 fun scheduler_check () = (*requires SYNCHRONIZED*)
   373  (do_shutdown := false;
   374   if scheduler_active () then ()
   375   else scheduler := SOME (Simple_Thread.fork false scheduler_loop));
   376 
   377 
   378 
   379 (** futures **)
   380 
   381 (* fork *)
   382 
   383 fun fork_future opt_group deps pri e =
   384   let
   385     val group =
   386       (case opt_group of
   387         NONE => new_group ()
   388       | SOME group => group);
   389     val (result, job) = future_job group e;
   390     val task = SYNCHRONIZED "enqueue" (fn () =>
   391       let
   392         val (task, minimal) =
   393           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   394         val _ = if minimal then signal work_available else ();
   395         val _ = scheduler_check ();
   396       in task end);
   397   in Future {promised = false, task = task, group = group, result = result} end;
   398 
   399 fun fork_group group e = fork_future (SOME group) [] 0 e;
   400 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   401 fun fork_deps deps e = fork_deps_pri deps 0 e;
   402 fun fork_pri pri e = fork_deps_pri [] pri e;
   403 fun fork e = fork_deps [] e;
   404 
   405 
   406 (* join *)
   407 
   408 local
   409 
   410 fun get_result x =
   411   (case peek x of
   412     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   413   | SOME (exn as Exn.Exn Exn.Interrupt) =>
   414       (case Exn.flatten_list (Task_Queue.group_status (group_of x)) of
   415         [] => exn
   416       | exns => Exn.Exn (Exn.EXCEPTIONS exns))
   417   | SOME res => res);
   418 
   419 fun join_next deps = (*requires SYNCHRONIZED*)
   420   if null deps then NONE
   421   else
   422     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   423       (NONE, []) => NONE
   424     | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   425     | (SOME work, deps') => SOME (work, deps'));
   426 
   427 fun execute_work NONE = ()
   428   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   429 and join_work deps =
   430   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   431 
   432 fun join_depend task deps =
   433   execute_work (SYNCHRONIZED "join" (fn () =>
   434     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   435 
   436 in
   437 
   438 fun join_results xs =
   439   if forall is_finished xs then map get_result xs
   440   else if Multithreading.self_critical () then
   441     error "Cannot join future values within critical section"
   442   else
   443     (case worker_task () of
   444       SOME task => join_depend task (map task_of xs)
   445     | NONE => List.app (ignore o Single_Assignment.await o result_of) xs;
   446     map get_result xs);
   447 
   448 end;
   449 
   450 fun join_result x = singleton join_results x;
   451 fun join x = Exn.release (join_result x);
   452 
   453 
   454 (* fast-path versions -- bypassing full task management *)
   455 
   456 fun value (x: 'a) =
   457   let
   458     val group = Task_Queue.new_group NONE;
   459     val result = Single_Assignment.var "value" : 'a result;
   460     val _ = assign_result group result (Exn.Result x);
   461   in Future {promised = false, task = Task_Queue.dummy_task, group = group, result = result} end;
   462 
   463 fun map_future f x =
   464   let
   465     val task = task_of x;
   466     val group = Task_Queue.new_group (SOME (group_of x));
   467     val (result, job) = future_job group (fn () => f (join x));
   468 
   469     val extended = SYNCHRONIZED "extend" (fn () =>
   470       (case Task_Queue.extend task job (! queue) of
   471         SOME queue' => (queue := queue'; true)
   472       | NONE => false));
   473   in
   474     if extended then Future {promised = false, task = task, group = group, result = result}
   475     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   476   end;
   477 
   478 
   479 (* promised futures -- fulfilled by external means *)
   480 
   481 fun promise_group group : 'a future =
   482   let
   483     val result = Single_Assignment.var "promise" : 'a result;
   484     val task = SYNCHRONIZED "enqueue" (fn () =>
   485       Unsynchronized.change_result queue (Task_Queue.enqueue_passive group));
   486   in Future {promised = true, task = task, group = group, result = result} end;
   487 
   488 fun promise () = promise_group (new_group ());
   489 
   490 fun fulfill_result (Future {promised, task, group, result}) res =
   491   let
   492     val _ = promised orelse raise Fail "Not a promised future";
   493     fun job ok = assign_result group result (if ok then res else Exn.Exn Exn.Interrupt);
   494     val _ = execute (task, group, [job]);
   495   in () end;
   496 
   497 fun fulfill x res = fulfill_result x (Exn.Result res);
   498 
   499 
   500 (* cancellation *)
   501 
   502 fun interruptible_task f x =
   503   if Multithreading.available then
   504     Multithreading.with_attributes
   505       (if is_worker ()
   506        then Multithreading.private_interrupts
   507        else Multithreading.public_interrupts)
   508       (fn _ => f x)
   509   else interruptible f x;
   510 
   511 (*cancel: present and future group members will be interrupted eventually*)
   512 fun cancel_group group =
   513   SYNCHRONIZED "cancel" (fn () => if cancel_now group then () else cancel_later group);
   514 fun cancel x = cancel_group (group_of x);
   515 
   516 
   517 (* shutdown *)
   518 
   519 fun shutdown () =
   520   if Multithreading.available then
   521     SYNCHRONIZED "shutdown" (fn () =>
   522      while scheduler_active () do
   523       (wait scheduler_event; broadcast_work ()))
   524   else ();
   525 
   526 
   527 (* report markup *)
   528 
   529 fun report e =
   530   let
   531     val _ = Output.status (Markup.markup Markup.forked "");
   532     val x = e ();  (*sic -- report "joined" only for success*)
   533     val _ = Output.status (Markup.markup Markup.joined "");
   534   in x end;
   535 
   536 
   537 (*final declarations of this structure!*)
   538 val map = map_future;
   539 
   540 end;
   541 
   542 type 'a future = 'a Future.future;
   543