src/Pure/Concurrent/future.ML
author wenzelm
Thu, 05 Nov 2009 13:01:11 +0100
changeset 33436 352fe8e9162d
parent 33434 cb409680dda8
child 33437 13d00799fe49
permissions -rw-r--r--
worker_next: plain signalling via work_available only, not scheduler_event;
scheduler: tuned worker pool adjustments;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_task: unit -> Task_Queue.task option
    34   val worker_group: unit -> Task_Queue.group option
    35   type 'a future
    36   val task_of: 'a future -> task
    37   val group_of: 'a future -> group
    38   val peek: 'a future -> 'a Exn.result option
    39   val is_finished: 'a future -> bool
    40   val value: 'a -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps_pri: 'b future list -> int -> (unit -> 'a) -> 'a future
    43   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    44   val fork_pri: int -> (unit -> 'a) -> 'a future
    45   val fork: (unit -> 'a) -> 'a future
    46   val join_results: 'a future list -> 'a Exn.result list
    47   val join_result: 'a future -> 'a Exn.result
    48   val join: 'a future -> 'a
    49   val map: ('a -> 'b) -> 'a future -> 'b future
    50   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    51   val cancel_group: group -> unit
    52   val cancel: 'a future -> unit
    53   val shutdown: unit -> unit
    54 end;
    55 
    56 structure Future: FUTURE =
    57 struct
    58 
    59 (** future values **)
    60 
    61 (* identifiers *)
    62 
    63 type task = Task_Queue.task;
    64 type group = Task_Queue.group;
    65 
    66 local
    67   val tag = Universal.tag () : (task * group) option Universal.tag;
    68 in
    69   fun thread_data () = the_default NONE (Thread.getLocal tag);
    70   fun setmp_thread_data data f x =
    71     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    72 end;
    73 
    74 val is_worker = is_some o thread_data;
    75 val worker_task = Option.map #1 o thread_data;
    76 val worker_group = Option.map #2 o thread_data;
    77 
    78 
    79 (* datatype future *)
    80 
    81 datatype 'a future = Future of
    82  {task: task,
    83   group: group,
    84   result: 'a Exn.result option Synchronized.var};
    85 
    86 fun task_of (Future {task, ...}) = task;
    87 fun group_of (Future {group, ...}) = group;
    88 fun result_of (Future {result, ...}) = result;
    89 
    90 fun peek x = Synchronized.value (result_of x);
    91 fun is_finished x = is_some (peek x);
    92 
    93 fun value x = Future
    94  {task = Task_Queue.new_task 0,
    95   group = Task_Queue.new_group NONE,
    96   result = Synchronized.var "future" (SOME (Exn.Result x))};
    97 
    98 
    99 
   100 (** scheduling **)
   101 
   102 (* synchronization *)
   103 
   104 val scheduler_event = ConditionVar.conditionVar ();
   105 val work_available = ConditionVar.conditionVar ();
   106 val work_finished = ConditionVar.conditionVar ();
   107 
   108 local
   109   val lock = Mutex.mutex ();
   110 in
   111 
   112 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   113 
   114 fun wait cond = (*requires SYNCHRONIZED*)
   115   Multithreading.sync_wait NONE NONE cond lock;
   116 
   117 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   118   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   119 
   120 fun signal cond = (*requires SYNCHRONIZED*)
   121   ConditionVar.signal cond;
   122 
   123 fun broadcast cond = (*requires SYNCHRONIZED*)
   124   ConditionVar.broadcast cond;
   125 
   126 fun broadcast_work () = (*requires SYNCHRONIZED*)
   127  (ConditionVar.broadcast work_available;
   128   ConditionVar.broadcast work_finished);
   129 
   130 end;
   131 
   132 
   133 (* global state *)
   134 
   135 val queue = Unsynchronized.ref Task_Queue.empty;
   136 val next = Unsynchronized.ref 0;
   137 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
   138 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
   139 val do_shutdown = Unsynchronized.ref false;
   140 val max_workers = Unsynchronized.ref 0;
   141 val max_active = Unsynchronized.ref 0;
   142 val worker_trend = Unsynchronized.ref 0;
   143 
   144 datatype worker_state = Working | Waiting | Sleeping;
   145 val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
   146 
   147 fun count_workers state = (*requires SYNCHRONIZED*)
   148   fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
   149 
   150 
   151 (* execute future jobs *)
   152 
   153 fun future_job group (e: unit -> 'a) =
   154   let
   155     val result = Synchronized.var "future" (NONE: 'a Exn.result option);
   156     fun job ok =
   157       let
   158         val res =
   159           if ok then
   160             Exn.capture (fn () =>
   161               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
   162           else Exn.Exn Exn.Interrupt;
   163         val _ = Synchronized.assign result (K (SOME res));
   164       in
   165         (case res of
   166           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   167         | Exn.Result _ => true)
   168       end;
   169   in (result, job) end;
   170 
   171 fun do_cancel group = (*requires SYNCHRONIZED*)
   172  (Unsynchronized.change canceled (insert Task_Queue.eq_group group);
   173   broadcast scheduler_event);
   174 
   175 fun execute (task, group, jobs) =
   176   let
   177     val valid = not (Task_Queue.is_canceled group);
   178     val ok = setmp_thread_data (task, group) (fn () =>
   179       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   180     val _ = SYNCHRONIZED "finish" (fn () =>
   181       let
   182         val maximal = Unsynchronized.change_result queue (Task_Queue.finish task);
   183         val _ =
   184           if ok then ()
   185           else if Task_Queue.cancel (! queue) group then ()
   186           else do_cancel group;
   187         val _ = broadcast work_finished;
   188         val _ = if maximal then () else signal work_available;
   189       in () end);
   190   in () end;
   191 
   192 
   193 (* worker threads *)
   194 
   195 fun worker_wait active cond = (*requires SYNCHRONIZED*)
   196   let
   197     val state =
   198       (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
   199         SOME state => state
   200       | NONE => raise Fail "Unregistered worker thread");
   201     val _ = state := (if active then Waiting else Sleeping);
   202     val _ = wait cond;
   203     val _ = state := Working;
   204   in () end;
   205 
   206 fun worker_next () = (*requires SYNCHRONIZED*)
   207   if length (! workers) > ! max_workers then
   208     (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
   209      signal work_available;
   210      NONE)
   211   else if count_workers Working > ! max_active then
   212     (worker_wait false work_available; worker_next ())
   213   else
   214     (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
   215       NONE => (worker_wait false work_available; worker_next ())
   216     | some => (signal work_available; some));
   217 
   218 fun worker_loop name =
   219   (case SYNCHRONIZED name (fn () => worker_next ()) of
   220     NONE => ()
   221   | SOME work => (execute work; worker_loop name));
   222 
   223 fun worker_start name = (*requires SYNCHRONIZED*)
   224   Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
   225     Unsynchronized.ref Working));
   226 
   227 
   228 (* scheduler *)
   229 
   230 val status_ticks = Unsynchronized.ref 0;
   231 
   232 val last_round = Unsynchronized.ref Time.zeroTime;
   233 val next_round = Time.fromMilliseconds 50;
   234 
   235 fun scheduler_next () = (*requires SYNCHRONIZED*)
   236   let
   237     val now = Time.now ();
   238     val tick = Time.<= (Time.+ (! last_round, next_round), now);
   239     val _ = if tick then last_round := now else ();
   240 
   241 
   242     (* queue and worker status *)
   243 
   244     val _ =
   245       if tick then Unsynchronized.change status_ticks (fn i => (i + 1) mod 10) else ();
   246     val _ =
   247       if tick andalso ! status_ticks = 0 then
   248         Multithreading.tracing 1 (fn () =>
   249           let
   250             val {ready, pending, running} = Task_Queue.status (! queue);
   251             val total = length (! workers);
   252             val active = count_workers Working;
   253             val waiting = count_workers Waiting;
   254           in
   255             "SCHEDULE " ^ Time.toString now ^ ": " ^
   256               string_of_int ready ^ " ready, " ^
   257               string_of_int pending ^ " pending, " ^
   258               string_of_int running ^ " running; " ^
   259               string_of_int total ^ " workers, " ^
   260               string_of_int active ^ " active, " ^
   261               string_of_int waiting ^ " waiting "
   262           end)
   263       else ();
   264 
   265     val _ =
   266       if forall (Thread.isActive o #1) (! workers) then ()
   267       else
   268         let
   269           val  (alive, dead) = List.partition (Thread.isActive o #1) (! workers);
   270           val _ = workers := alive;
   271         in
   272           Multithreading.tracing 0 (fn () =>
   273             "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")
   274         end;
   275 
   276 
   277     (* worker pool adjustments *)
   278 
   279     val max_active0 = ! max_active;
   280     val max_workers0 = ! max_workers;
   281 
   282     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   283     val _ = max_active := m;
   284 
   285     val mm =
   286       if ! do_shutdown then 0
   287       else if m = 9999 then 1
   288       else Int.min (Int.max (count_workers Working + 2 * count_workers Waiting, m), 4 * m);
   289     val _ =
   290       if tick andalso mm > ! max_workers then
   291         Unsynchronized.change worker_trend (fn w => if w < 0 then 0 else w + 1)
   292       else if tick andalso mm < ! max_workers then
   293         Unsynchronized.change worker_trend (fn w => if w > 0 then 0 else w - 1)
   294       else ();
   295     val _ =
   296       if mm = 0 orelse ! worker_trend > 50 orelse ! worker_trend < ~50 then
   297         max_workers := mm
   298       else if ! worker_trend > 5 andalso ! max_workers < 2 * m then
   299         max_workers := Int.min (mm, 2 * m)
   300       else ();
   301 
   302     val missing = ! max_workers - length (! workers);
   303     val _ =
   304       if missing > 0 then
   305         funpow missing (fn () =>
   306           ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
   307       else ();
   308 
   309     val _ =
   310       if ! max_active = max_active0 andalso ! max_workers = max_workers0 then ()
   311       else signal work_available;
   312 
   313 
   314     (* canceled groups *)
   315 
   316     val _ =
   317       if null (! canceled) then ()
   318       else
   319        (Multithreading.tracing 1 (fn () =>
   320           string_of_int (length (! canceled)) ^ " canceled groups");
   321         Unsynchronized.change canceled (filter_out (Task_Queue.cancel (! queue)));
   322         broadcast_work ());
   323 
   324 
   325     (* delay loop *)
   326 
   327     val _ = Exn.release (wait_timeout next_round scheduler_event);
   328 
   329 
   330     (* shutdown *)
   331 
   332     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   333     val continue = not (! do_shutdown andalso null (! workers));
   334     val _ = if continue then () else scheduler := NONE;
   335 
   336     val _ = broadcast scheduler_event;
   337   in continue end
   338   handle Exn.Interrupt =>
   339    (Multithreading.tracing 1 (fn () => "Interrupt");
   340     uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
   341     scheduler_next ());
   342 
   343 fun scheduler_loop () =
   344   Multithreading.with_attributes
   345     (Multithreading.sync_interrupts Multithreading.public_interrupts)
   346     (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
   347 
   348 fun scheduler_active () = (*requires SYNCHRONIZED*)
   349   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   350 
   351 fun scheduler_check () = (*requires SYNCHRONIZED*)
   352  (do_shutdown := false;
   353   if scheduler_active () then ()
   354   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   355 
   356 
   357 
   358 (** futures **)
   359 
   360 (* fork *)
   361 
   362 fun fork_future opt_group deps pri e =
   363   let
   364     val group =
   365       (case opt_group of
   366         SOME group => group
   367       | NONE => Task_Queue.new_group (worker_group ()));
   368     val (result, job) = future_job group e;
   369     val task = SYNCHRONIZED "enqueue" (fn () =>
   370       let
   371         val (task, minimal) =
   372           Unsynchronized.change_result queue (Task_Queue.enqueue group deps pri job);
   373         val _ = if minimal then signal work_available else ();
   374         val _ = scheduler_check ();
   375       in task end);
   376   in Future {task = task, group = group, result = result} end;
   377 
   378 fun fork_group group e = fork_future (SOME group) [] 0 e;
   379 fun fork_deps_pri deps pri e = fork_future NONE (map task_of deps) pri e;
   380 fun fork_deps deps e = fork_deps_pri deps 0 e;
   381 fun fork_pri pri e = fork_deps_pri [] pri e;
   382 fun fork e = fork_deps [] e;
   383 
   384 
   385 (* join *)
   386 
   387 local
   388 
   389 fun get_result x =
   390   (case peek x of
   391     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   392   | SOME (Exn.Exn Exn.Interrupt) =>
   393       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   394   | SOME res => res);
   395 
   396 fun passive_wait x =
   397   Synchronized.readonly_access (result_of x) (fn NONE => NONE | SOME _ => SOME ());
   398 
   399 fun join_next deps = (*requires SYNCHRONIZED*)
   400   if null deps then NONE
   401   else
   402     (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   403       (NONE, []) => NONE
   404     | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   405     | (SOME work, deps') => SOME (work, deps'));
   406 
   407 fun execute_work NONE = ()
   408   | execute_work (SOME (work, deps')) = (execute work; join_work deps')
   409 and join_work deps =
   410   execute_work (SYNCHRONIZED "join" (fn () => join_next deps));
   411 
   412 fun join_depend task deps =
   413   execute_work (SYNCHRONIZED "join" (fn () =>
   414     (Unsynchronized.change queue (Task_Queue.depend task deps); join_next deps)));
   415 
   416 in
   417 
   418 fun join_results xs =
   419   if forall is_finished xs then map get_result xs
   420   else if Multithreading.self_critical () then
   421     error "Cannot join future values within critical section"
   422   else
   423     (case worker_task () of
   424       SOME task => join_depend task (map task_of xs)
   425     | NONE => List.app passive_wait xs;
   426     map get_result xs);
   427 
   428 end;
   429 
   430 fun join_result x = singleton join_results x;
   431 fun join x = Exn.release (join_result x);
   432 
   433 
   434 (* map *)
   435 
   436 fun map_future f x =
   437   let
   438     val task = task_of x;
   439     val group = Task_Queue.new_group (SOME (group_of x));
   440     val (result, job) = future_job group (fn () => f (join x));
   441 
   442     val extended = SYNCHRONIZED "extend" (fn () =>
   443       (case Task_Queue.extend task job (! queue) of
   444         SOME queue' => (queue := queue'; true)
   445       | NONE => false));
   446   in
   447     if extended then Future {task = task, group = group, result = result}
   448     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   449   end;
   450 
   451 
   452 (* cancellation *)
   453 
   454 fun interruptible_task f x =
   455   if Multithreading.available then
   456     Multithreading.with_attributes
   457       (if is_worker ()
   458        then Multithreading.private_interrupts
   459        else Multithreading.public_interrupts)
   460       (fn _ => f x)
   461   else interruptible f x;
   462 
   463 (*cancel: present and future group members will be interrupted eventually*)
   464 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   465 fun cancel x = cancel_group (group_of x);
   466 
   467 
   468 (* shutdown *)
   469 
   470 fun shutdown () =
   471   if Multithreading.available then
   472     SYNCHRONIZED "shutdown" (fn () =>
   473      while scheduler_active () do
   474       (wait scheduler_event; broadcast_work ()))
   475   else ();
   476 
   477 
   478 (*final declarations of this structure!*)
   479 val map = map_future;
   480 
   481 end;
   482 
   483 type 'a future = 'a Future.future;
   484