src/Pure/Concurrent/future.ML
changeset 32219 9a2566d1fdbd
parent 32186 8026b73cd357
child 32220 01ff6781dd18
     1.1 --- a/src/Pure/Concurrent/future.ML	Mon Jul 27 12:00:02 2009 +0200
     1.2 +++ b/src/Pure/Concurrent/future.ML	Mon Jul 27 12:11:18 2009 +0200
     1.3 @@ -114,20 +114,26 @@
     1.4  
     1.5  (* synchronization *)
     1.6  
     1.7 +val scheduler_event = ConditionVar.conditionVar ();
     1.8 +val work_available = ConditionVar.conditionVar ();
     1.9 +val work_finished = ConditionVar.conditionVar ();
    1.10 +
    1.11  local
    1.12    val lock = Mutex.mutex ();
    1.13 -  val cond = ConditionVar.conditionVar ();
    1.14  in
    1.15  
    1.16  fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
    1.17  
    1.18 -fun wait () = (*requires SYNCHRONIZED*)
    1.19 +fun wait cond = (*requires SYNCHRONIZED*)
    1.20    ConditionVar.wait (cond, lock);
    1.21  
    1.22 -fun wait_timeout timeout = (*requires SYNCHRONIZED*)
    1.23 +fun wait_timeout cond timeout = (*requires SYNCHRONIZED*)
    1.24    ignore (ConditionVar.waitUntil (cond, lock, Time.+ (Time.now (), timeout)));
    1.25  
    1.26 -fun notify_all () = (*requires SYNCHRONIZED*)
    1.27 +fun signal cond = (*requires SYNCHRONIZED*)
    1.28 +  ConditionVar.signal cond;
    1.29 +
    1.30 +fun broadcast cond = (*requires SYNCHRONIZED*)
    1.31    ConditionVar.broadcast cond;
    1.32  
    1.33  end;
    1.34 @@ -183,29 +189,35 @@
    1.35      val ok = setmp_thread_data (name, task, group) (fn () =>
    1.36        fold (fn job => fn ok => job valid andalso ok) jobs true) ();
    1.37      val _ = SYNCHRONIZED "execute" (fn () =>
    1.38 -     (change queue (Task_Queue.finish task);
    1.39 -      if ok then ()
    1.40 -      else if Task_Queue.cancel (! queue) group then ()
    1.41 -      else do_cancel group;
    1.42 -      notify_all ()));
    1.43 +      let
    1.44 +        val maximal = change_result queue (Task_Queue.finish task);
    1.45 +        val _ =
    1.46 +          if ok then ()
    1.47 +          else if Task_Queue.cancel (! queue) group then ()
    1.48 +          else do_cancel group;
    1.49 +        val _ = broadcast work_finished;
    1.50 +        val _ = if maximal then () else broadcast work_available;
    1.51 +      in () end);
    1.52    in () end;
    1.53  
    1.54  
    1.55  (* worker threads *)
    1.56  
    1.57 -fun worker_wait () = (*requires SYNCHRONIZED*)
    1.58 -  (change_active false; wait (); change_active true);
    1.59 +fun worker_wait cond = (*requires SYNCHRONIZED*)
    1.60 + (change_active false; broadcast scheduler_event;
    1.61 +  wait cond;
    1.62 +  change_active true; broadcast scheduler_event);
    1.63  
    1.64  fun worker_next () = (*requires SYNCHRONIZED*)
    1.65    if ! excessive > 0 then
    1.66      (dec excessive;
    1.67       change workers (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
    1.68 -     notify_all ();
    1.69 +     broadcast scheduler_event;
    1.70       NONE)
    1.71 -  else if overloaded () then (worker_wait (); worker_next ())
    1.72 +  else if overloaded () then (worker_wait scheduler_event; worker_next ())
    1.73    else
    1.74      (case change_result queue Task_Queue.dequeue of
    1.75 -      NONE => (worker_wait (); worker_next ())
    1.76 +      NONE => (worker_wait work_available; worker_next ())
    1.77      | some => some);
    1.78  
    1.79  fun worker_loop name =
    1.80 @@ -231,11 +243,10 @@
    1.81        end);
    1.82  
    1.83      (*worker threads*)
    1.84 -    val ws = ! workers;
    1.85      val _ =
    1.86 -      if forall (Thread.isActive o #1) ws then ()
    1.87 +      if forall (Thread.isActive o #1) (! workers) then ()
    1.88        else
    1.89 -        (case List.partition (Thread.isActive o #1) ws of
    1.90 +        (case List.partition (Thread.isActive o #1) (! workers) of
    1.91            (_, []) => ()
    1.92          | (active, inactive) =>
    1.93              (workers := active; Multithreading.tracing 0 (fn () =>
    1.94 @@ -244,24 +255,26 @@
    1.95  
    1.96      val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
    1.97      val mm = (m * 3) div 2;
    1.98 -    val l = length ws;
    1.99 +    val l = length (! workers);
   1.100      val _ = excessive := l - mm;
   1.101      val _ =
   1.102        if mm > l then
   1.103 -        funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ()
   1.104 +       (funpow (mm - l) (fn () => worker_start ("worker " ^ string_of_int (inc next))) ();
   1.105 +        broadcast scheduler_event)
   1.106        else ();
   1.107  
   1.108      (*canceled groups*)
   1.109      val _ = change canceled (filter_out (Task_Queue.cancel (! queue)));
   1.110  
   1.111 +    val timeout =
   1.112 +      Time.fromMilliseconds (if not (! do_shutdown) andalso null (! canceled) then 500 else 50);
   1.113 +    val _ = interruptible (fn () => wait_timeout scheduler_event timeout) ()
   1.114 +      handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   1.115 +
   1.116      (*shutdown*)
   1.117 -    val continue = not (! do_shutdown andalso null ws);
   1.118 +    val continue = not (! do_shutdown andalso null (! workers));
   1.119      val _ = if continue then () else scheduler := NONE;
   1.120 -
   1.121 -    val _ = notify_all ();
   1.122 -    val _ = interruptible (fn () =>
   1.123 -        wait_timeout (Time.fromMilliseconds (if null (! canceled) then 1000 else 50))) ()
   1.124 -      handle Exn.Interrupt => List.app do_cancel (Task_Queue.cancel_all (! queue));
   1.125 +    val _ = broadcast scheduler_event;
   1.126    in continue end;
   1.127  
   1.128  fun scheduler_loop () =
   1.129 @@ -272,7 +285,8 @@
   1.130  
   1.131  fun scheduler_check name = SYNCHRONIZED name (fn () =>
   1.132    if not (scheduler_active ()) then
   1.133 -    (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop))
   1.134 +   (do_shutdown := false; scheduler := SOME (SimpleThread.fork false scheduler_loop);
   1.135 +    broadcast scheduler_event)
   1.136    else if ! do_shutdown then error "Scheduler shutdown in progress"
   1.137    else ());
   1.138  
   1.139 @@ -292,7 +306,10 @@
   1.140        | NONE => Task_Queue.new_group (worker_group ()));
   1.141      val (result, job) = future_job group e;
   1.142      val task = SYNCHRONIZED "future" (fn () =>
   1.143 -      change_result queue (Task_Queue.enqueue group deps pri job) before notify_all ());
   1.144 +      let
   1.145 +        val (task, minimal) = change_result queue (Task_Queue.enqueue group deps pri job);
   1.146 +        val _ = if minimal then signal work_available else ();
   1.147 +      in task end);
   1.148    in Future {task = task, group = group, result = result} end;
   1.149  
   1.150  fun fork e = fork_future NONE [] 0 e;
   1.151 @@ -313,7 +330,7 @@
   1.152    | SOME res => res);
   1.153  
   1.154  fun join_next deps = (*requires SYNCHRONIZED*)
   1.155 -  if overloaded () then (worker_wait (); join_next deps)
   1.156 +  if overloaded () then (worker_wait scheduler_event; join_next deps)
   1.157    else change_result queue (Task_Queue.dequeue_towards deps);
   1.158  
   1.159  fun join_deps deps =
   1.160 @@ -336,7 +353,7 @@
   1.161  
   1.162        fun join_wait x =
   1.163          if SYNCHRONIZED "join_wait" (fn () =>
   1.164 -          is_finished x orelse (if worker then worker_wait () else wait (); false))
   1.165 +          is_finished x orelse ((if worker then worker_wait else wait) work_finished; false))
   1.166          then () else join_wait x;
   1.167  
   1.168        val _ = xs |> List.app (fn x =>
   1.169 @@ -390,7 +407,7 @@
   1.170  (*cancel: present and future group members will be interrupted eventually*)
   1.171  fun cancel_group group =
   1.172   (scheduler_check "cancel check";
   1.173 -  SYNCHRONIZED "cancel" (fn () => (do_cancel group; notify_all ())));
   1.174 +  SYNCHRONIZED "cancel" (fn () => (do_cancel group; broadcast scheduler_event)));
   1.175  
   1.176  fun cancel x = cancel_group (group_of x);
   1.177  
   1.178 @@ -401,13 +418,13 @@
   1.179    if Multithreading.available then
   1.180     (scheduler_check "shutdown check";
   1.181      SYNCHRONIZED "shutdown" (fn () =>
   1.182 -     (while not (scheduler_active ()) do wait ();
   1.183 -      while not (Task_Queue.is_empty (! queue)) do wait ();
   1.184 +     (while not (scheduler_active ()) do wait scheduler_event;
   1.185 +      while not (Task_Queue.is_empty (! queue)) do wait scheduler_event;
   1.186        do_shutdown := true;
   1.187 -      notify_all ();
   1.188 -      while not (null (! workers)) do wait ();
   1.189 -      while scheduler_active () do wait ();
   1.190 -      OS.Process.sleep (Time.fromMilliseconds 300))))
   1.191 +      while scheduler_active () do
   1.192 +       (broadcast work_available;
   1.193 +        broadcast scheduler_event;
   1.194 +        wait scheduler_event))))
   1.195    else ();
   1.196  
   1.197