src/Pure/Concurrent/future.ML
author wenzelm
Thu, 27 Aug 2009 17:00:03 +0200
changeset 32420 6473d1952023
parent 32299 e6a8ed8aed3a
child 32592 e29c0b7dcf66
permissions -rw-r--r--
tuned tracing;
     1 (*  Title:      Pure/Concurrent/future.ML
     2     Author:     Makarius
     3 
     4 Future values, see also
     5 http://www4.in.tum.de/~wenzelm/papers/parallel-isabelle.pdf
     6 
     7 Notes:
     8 
     9   * Futures are similar to delayed evaluation, i.e. delay/force is
    10     generalized to fork/join (and variants).  The idea is to model
    11     parallel value-oriented computations, but *not* communicating
    12     processes.
    13 
    14   * Futures are grouped; failure of one group member causes the whole
    15     group to be interrupted eventually.  Groups are block-structured.
    16 
    17   * Forked futures are evaluated spontaneously by a farm of worker
    18     threads in the background; join resynchronizes the computation and
    19     delivers results (values or exceptions).
    20 
    21   * The pool of worker threads is limited, usually in correlation with
    22     the number of physical cores on the machine.  Note that allocation
    23     of runtime resources is distorted either if workers yield CPU time
    24     (e.g. via system sleep or wait operations), or if non-worker
    25     threads contend for significant runtime resources independently.
    26 *)
    27 
    28 signature FUTURE =
    29 sig
    30   type task = Task_Queue.task
    31   type group = Task_Queue.group
    32   val is_worker: unit -> bool
    33   val worker_group: unit -> Task_Queue.group option
    34   type 'a future
    35   val task_of: 'a future -> task
    36   val group_of: 'a future -> group
    37   val peek: 'a future -> 'a Exn.result option
    38   val is_finished: 'a future -> bool
    39   val value: 'a -> 'a future
    40   val fork: (unit -> 'a) -> 'a future
    41   val fork_group: group -> (unit -> 'a) -> 'a future
    42   val fork_deps: 'b future list -> (unit -> 'a) -> 'a future
    43   val fork_pri: int -> (unit -> 'a) -> 'a future
    44   val join_results: 'a future list -> 'a Exn.result list
    45   val join_result: 'a future -> 'a Exn.result
    46   val join: 'a future -> 'a
    47   val map: ('a -> 'b) -> 'a future -> 'b future
    48   val interruptible_task: ('a -> 'b) -> 'a -> 'b
    49   val cancel_group: group -> unit
    50   val cancel: 'a future -> unit
    51   val shutdown: unit -> unit
    52 end;
    53 
    54 structure Future: FUTURE =
    55 struct
    56 
    57 (** future values **)
    58 
    59 (* identifiers *)
    60 
    61 type task = Task_Queue.task;
    62 type group = Task_Queue.group;
    63 
    64 local
    65   val tag = Universal.tag () : (string * task * group) option Universal.tag;
    66 in
    67   fun thread_data () = the_default NONE (Thread.getLocal tag);
    68   fun setmp_thread_data data f x =
    69     Library.setmp_thread_data tag (thread_data ()) (SOME data) f x;
    70 end;
    71 
    72 val is_worker = is_some o thread_data;
    73 val worker_group = Option.map #3 o thread_data;
    74 
    75 
    76 (* datatype future *)
    77 
    78 datatype 'a future = Future of
    79  {task: task,
    80   group: group,
    81   result: 'a Exn.result option Synchronized.var};
    82 
    83 fun task_of (Future {task, ...}) = task;
    84 fun group_of (Future {group, ...}) = group;
    85 fun result_of (Future {result, ...}) = result;
    86 
    87 fun peek x = Synchronized.peek (result_of x);
    88 fun is_finished x = is_some (peek x);
    89 
    90 fun value x = Future
    91  {task = Task_Queue.new_task 0,
    92   group = Task_Queue.new_group NONE,
    93   result = Synchronized.var "future" (SOME (Exn.Result x))};
    94 
    95 
    96 
    97 (** scheduling **)
    98 
    99 (* global state *)
   100 
   101 val queue = ref Task_Queue.empty;
   102 val next = ref 0;
   103 val workers = ref ([]: (Thread.thread * bool) list);
   104 val scheduler = ref (NONE: Thread.thread option);
   105 val excessive = ref 0;
   106 val canceled = ref ([]: Task_Queue.group list);
   107 val do_shutdown = ref false;
   108 
   109 
   110 (* synchronization *)
   111 
   112 val scheduler_event = ConditionVar.conditionVar ();
   113 val work_available = ConditionVar.conditionVar ();
   114 val work_finished = ConditionVar.conditionVar ();
   115 
   116 local
   117   val lock = Mutex.mutex ();
   118 in
   119 
   120 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
   121 
   122 fun wait cond = (*requires SYNCHRONIZED*)
   123   Multithreading.sync_wait NONE NONE cond lock;
   124 
   125 fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
   126   Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
   127 
   128 fun signal cond = (*requires SYNCHRONIZED*)
   129   ConditionVar.signal cond;
   130 
   131 fun broadcast cond = (*requires SYNCHRONIZED*)
   132   ConditionVar.broadcast cond;
   133 
   134 fun broadcast_work () = (*requires SYNCHRONIZED*)
   135  (ConditionVar.broadcast work_available;
   136   ConditionVar.broadcast work_finished);
   137 
   138 end;
   139 
   140 
   141 (* execute future jobs *)
   142 
   143 fun future_job group (e: unit -> 'a) =
   144   let
   145     val result = Synchronized.var "future" (NONE: 'a Exn.result option);
   146     fun job ok =
   147       let
   148         val res =
   149           if ok then
   150             Exn.capture (fn () =>
   151               Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
   152           else Exn.Exn Exn.Interrupt;
   153         val _ = Synchronized.change result
   154           (fn NONE => SOME res
   155             | SOME _ => raise Fail "Duplicate assignment of future value");
   156       in
   157         (case res of
   158           Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
   159         | Exn.Result _ => true)
   160       end;
   161   in (result, job) end;
   162 
   163 fun do_cancel group = (*requires SYNCHRONIZED*)
   164  (change canceled (insert Task_Queue.eq_group group); broadcast scheduler_event);
   165 
   166 fun execute name (task, group, jobs) =
   167   let
   168     val valid = not (Task_Queue.is_canceled group);
   169     val ok = setmp_thread_data (name, task, group) (fn () =>
   170       fold (fn job => fn ok => job valid andalso ok) jobs true) ();
   171     val _ = SYNCHRONIZED "finish" (fn () =>
   172       let
   173         val maximal = change_result queue (Task_Queue.finish task);
   174         val _ =
   175           if ok then ()
   176           else if Task_Queue.cancel (! queue) group then ()
   177           else do_cancel group;
   178         val _ = broadcast work_finished;
   179         val _ = if maximal then () else broadcast work_available;
   180       in () end);
   181   in () end;
   182 
   183 
   184 (* worker activity *)
   185 
   186 fun count_active () = (*requires SYNCHRONIZED*)
   187   fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
   188 
   189 fun change_active active = (*requires SYNCHRONIZED*)
   190   change workers (AList.update Thread.equal (Thread.self (), active));
   191 
   192 
   193 (* worker threads *)
   194 
   195 fun worker_wait cond = (*requires SYNCHRONIZED*)
   196   (change_active false; wait cond; change_active true);
   197 
   198 fun worker_next () = (*requires SYNCHRONIZED*)
   199   if ! excessive > 0 then
   200     (dec excessive;
   201      change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
   202      broadcast scheduler_event;
   203      NONE)
   204   else if count_active () > Multithreading.max_threads_value () then
   205     (worker_wait scheduler_event; worker_next ())
   206   else
   207     (case change_result queue (Task_Queue.dequeue (Thread.self ())) of
   208       NONE => (worker_wait work_available; worker_next ())
   209     | some => some);
   210 
   211 fun worker_loop name =
   212   (case SYNCHRONIZED name (fn () => worker_next ()) of
   213     NONE => ()
   214   | SOME work => (execute name work; worker_loop name));
   215 
   216 fun worker_start name = (*requires SYNCHRONIZED*)
   217   change workers (cons (SimpleThread.fork false (fn () =>
   218      (broadcast scheduler_event; worker_loop name)), true));
   219 
   220 
   221 (* scheduler *)
   222 
   223 val last_status = ref Time.zeroTime;
   224 val next_status = Time.fromMilliseconds 500;
   225 val next_round = Time.fromMilliseconds 50;
   226 
   227 fun scheduler_next () = (*requires SYNCHRONIZED*)
   228   let
   229     (*queue and worker status*)
   230     val _ =
   231       let val now = Time.now () in
   232         if Time.> (Time.+ (! last_status, next_status), now) then ()
   233         else
   234          (last_status := now; Multithreading.tracing 1 (fn () =>
   235             let
   236               val {ready, pending, running} = Task_Queue.status (! queue);
   237               val total = length (! workers);
   238               val active = count_active ();
   239             in
   240               "SCHEDULE " ^ string_of_int (Time.toMilliseconds now) ^ ": " ^
   241                 string_of_int ready ^ " ready, " ^
   242                 string_of_int pending ^ " pending, " ^
   243                 string_of_int running ^ " running; " ^
   244                 string_of_int total ^ " workers, " ^
   245                 string_of_int active ^ " active"
   246             end))
   247       end;
   248 
   249     (*worker threads*)
   250     val _ =
   251       if forall (Thread.isActive o #1) (! workers) then ()
   252       else
   253         (case List.partition (Thread.isActive o #1) (! workers) of
   254           (_, []) => ()
   255         | (alive, dead) =>
   256             (workers := alive; Multithreading.tracing 0 (fn () =>
   257               "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
   258 
   259     val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
   260     val mm = (m * 3) div 2;
   261     val l = length (! workers);
   262     val _ = excessive := l - mm;
   263     val _ =
   264       if mm > l then
   265         funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
   266       else ();
   267 
   268     (*canceled groups*)
   269     val _ =
   270       if null (! canceled) then ()
   271       else
   272        (Multithreading.tracing 1 (fn () =>
   273           string_of_int (length (! canceled)) ^ " canceled groups");
   274         change canceled (filter_out (Task_Queue.cancel (! queue)));
   275         broadcast_work ());
   276 
   277     (*delay loop*)
   278     val _ = Exn.release (wait_timeout next_round scheduler_event);
   279 
   280     (*shutdown*)
   281     val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
   282     val continue = not (! do_shutdown andalso null (! workers));
   283     val _ = if continue then () else scheduler := NONE;
   284     val _ = broadcast scheduler_event;
   285   in continue end
   286   handle Exn.Interrupt =>
   287    (Multithreading.tracing 1 (fn () => "Interrupt");
   288     uninterruptible (fn _ => fn () => List.app do_cancel (Task_Queue.cancel_all (! queue))) ();
   289     scheduler_next ());
   290 
   291 fun scheduler_loop () =
   292   Multithreading.with_attributes
   293     (Multithreading.sync_interrupts Multithreading.public_interrupts)
   294     (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
   295 
   296 fun scheduler_active () = (*requires SYNCHRONIZED*)
   297   (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
   298 
   299 fun scheduler_check () = (*requires SYNCHRONIZED*)
   300  (do_shutdown := false;
   301   if scheduler_active () then ()
   302   else scheduler := SOME (SimpleThread.fork false scheduler_loop));
   303 
   304 
   305 
   306 (** futures **)
   307 
   308 (* fork *)
   309 
   310 fun fork_future opt_group deps pri e =
   311   let
   312     val group =
   313       (case opt_group of
   314         SOME group => group
   315       | NONE => Task_Queue.new_group (worker_group ()));
   316     val (result, job) = future_job group e;
   317     val task = SYNCHRONIZED "enqueue" (fn () =>
   318       let
   319         val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   320         val _ = if minimal then signal work_available else ();
   321         val _ = scheduler_check ();
   322       in task end);
   323   in Future {task = task, group = group, result = result} end;
   324 
   325 fun fork e = fork_future NONE [] 0 e;
   326 fun fork_group group e = fork_future (SOME group) [] 0 e;
   327 fun fork_deps deps e = fork_future NONE (map task_of deps) 0 e;
   328 fun fork_pri pri e = fork_future NONE [] pri e;
   329 
   330 
   331 (* join *)
   332 
   333 local
   334 
   335 fun get_result x =
   336   (case peek x of
   337     NONE => Exn.Exn (SYS_ERROR "unfinished future")
   338   | SOME (Exn.Exn Exn.Interrupt) =>
   339       Exn.Exn (Exn.EXCEPTIONS (Exn.flatten_list (Task_Queue.group_status (group_of x))))
   340   | SOME res => res);
   341 
   342 fun join_wait x =
   343   Synchronized.guarded_access (result_of x) (fn NONE => NONE | some => SOME ((), some));
   344 
   345 fun join_next deps = (*requires SYNCHRONIZED*)
   346   if null deps then NONE
   347   else
   348     (case change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   349       (NONE, []) => NONE
   350     | (NONE, deps') => (worker_wait work_finished; join_next deps')
   351     | (SOME work, deps') => SOME (work, deps'));
   352 
   353 fun join_work deps =
   354   (case SYNCHRONIZED "join" (fn () => join_next deps) of
   355     NONE => ()
   356   | SOME (work, deps') => (execute "join" work; join_work deps'));
   357 
   358 in
   359 
   360 fun join_results xs =
   361   if forall is_finished xs then map get_result xs
   362   else if Multithreading.self_critical () then
   363     error "Cannot join future values within critical section"
   364   else uninterruptible (fn _ => fn () =>
   365      (if is_worker ()
   366       then join_work (map task_of xs)
   367       else List.app join_wait xs;
   368       map get_result xs)) ();
   369 
   370 end;
   371 
   372 fun join_result x = singleton join_results x;
   373 fun join x = Exn.release (join_result x);
   374 
   375 
   376 (* map *)
   377 
   378 fun map_future f x =
   379   let
   380     val task = task_of x;
   381     val group = Task_Queue.new_group (SOME (group_of x));
   382     val (result, job) = future_job group (fn () => f (join x));
   383 
   384     val extended = SYNCHRONIZED "extend" (fn () =>
   385       (case Task_Queue.extend task job (! queue) of
   386         SOME queue' => (queue := queue'; true)
   387       | NONE => false));
   388   in
   389     if extended then Future {task = task, group = group, result = result}
   390     else fork_future (SOME group) [task] (Task_Queue.pri_of_task task) (fn () => f (join x))
   391   end;
   392 
   393 
   394 (* cancellation *)
   395 
   396 fun interruptible_task f x =
   397   if Multithreading.available then
   398     Multithreading.with_attributes
   399       (if is_worker ()
   400        then Multithreading.private_interrupts
   401        else Multithreading.public_interrupts)
   402       (fn _ => f x)
   403   else interruptible f x;
   404 
   405 (*cancel: present and future group members will be interrupted eventually*)
   406 fun cancel_group group = SYNCHRONIZED "cancel" (fn () => do_cancel group);
   407 fun cancel x = cancel_group (group_of x);
   408 
   409 
   410 (* shutdown *)
   411 
   412 fun shutdown () =
   413   if Multithreading.available then
   414     SYNCHRONIZED "shutdown" (fn () =>
   415      while scheduler_active () do
   416       (wait scheduler_event; broadcast_work ()))
   417   else ();
   418 
   419 
   420 (*final declarations of this structure!*)
   421 val map = map_future;
   422 
   423 end;
   424 
   425 type 'a future = 'a Future.future;
   426