slightly leaner and more direct control of worker activity etc.;
authorwenzelm
Tue, 03 Nov 2009 19:52:09 +0100
changeset 334271ddcb8472bd2
parent 33405 5c1928d5db38
child 33428 1427333220bc
slightly leaner and more direct control of worker activity etc.;
src/Pure/Concurrent/future.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Tue Nov 03 10:36:20 2009 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Tue Nov 03 19:52:09 2009 +0100
     1.3 @@ -103,9 +103,10 @@
     1.4  
     1.5  val queue = Unsynchronized.ref Task_Queue.empty;
     1.6  val next = Unsynchronized.ref 0;
     1.7 -val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
     1.8 +val max_workers = Unsynchronized.ref 0;
     1.9 +val max_active = Unsynchronized.ref 0;
    1.10 +val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
    1.11  val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
    1.12 -val excessive = Unsynchronized.ref 0;
    1.13  val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
    1.14  val do_shutdown = Unsynchronized.ref false;
    1.15  
    1.16 @@ -186,26 +187,30 @@
    1.17  (* worker activity *)
    1.18  
    1.19  fun count_active () = (*requires SYNCHRONIZED*)
    1.20 -  fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
    1.21 +  fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
    1.22  
    1.23 -fun change_active active = (*requires SYNCHRONIZED*)
    1.24 -  Unsynchronized.change workers
    1.25 -    (AList.update Thread.equal (Thread.self (), active));
    1.26 +fun find_active () = (*requires SYNCHRONIZED*)
    1.27 +  (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
    1.28 +    SOME active => active
    1.29 +  | NONE => raise Fail "Unregistered worker thread");
    1.30  
    1.31  
    1.32  (* worker threads *)
    1.33  
    1.34  fun worker_wait cond = (*requires SYNCHRONIZED*)
    1.35 -  (change_active false; wait cond; change_active true);
    1.36 +  let
    1.37 +    val active = find_active ();
    1.38 +    val _ = active := false;
    1.39 +    val _ = wait cond;
    1.40 +    val _ = active := true;
    1.41 +  in () end;
    1.42  
    1.43  fun worker_next () = (*requires SYNCHRONIZED*)
    1.44 -  if ! excessive > 0 then
    1.45 -    (Unsynchronized.dec excessive;
    1.46 -     Unsynchronized.change workers
    1.47 -      (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
    1.48 +  if length (! workers) > ! max_workers then
    1.49 +    (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
    1.50       broadcast scheduler_event;
    1.51       NONE)
    1.52 -  else if count_active () > Multithreading.max_threads_value () then
    1.53 +  else if count_active () > ! max_active then
    1.54      (worker_wait scheduler_event; worker_next ())
    1.55    else
    1.56      (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
    1.57 @@ -217,9 +222,12 @@
    1.58      NONE => ()
    1.59    | SOME work => (execute name work; worker_loop name));
    1.60  
    1.61 -fun worker_start name = (*requires SYNCHRONIZED*)
    1.62 -  Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
    1.63 -     (broadcast scheduler_event; worker_loop name)), true));
    1.64 +fun worker_start name =
    1.65 +  SimpleThread.fork false (fn () =>
    1.66 +   (SYNCHRONIZED name (fn () =>
    1.67 +      Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
    1.68 +    broadcast scheduler_event;
    1.69 +    worker_loop name));
    1.70  
    1.71  
    1.72  (* scheduler *)
    1.73 @@ -261,13 +269,16 @@
    1.74                "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
    1.75  
    1.76      val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
    1.77 +    val _ = max_active := m;
    1.78 +
    1.79      val mm = if m = 9999 then 1 else m * 2;
    1.80 +    val _ = max_workers := mm;
    1.81 +
    1.82      val l = length (! workers);
    1.83 -    val _ = excessive := l - mm;
    1.84      val _ =
    1.85        if mm > l then
    1.86          funpow (mm - l) (fn () =>
    1.87 -          worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
    1.88 +          ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
    1.89        else ();
    1.90  
    1.91      (*canceled groups*)