worker activity: distinguish between waiting (formerly active) and sleeping;
authorwenzelm
Wed, 04 Nov 2009 11:58:29 +0100
changeset 33431e351f4c1f18c
parent 33430 0a1c0c1209ec
child 33432 a07558eb5029
worker activity: distinguish between waiting (formerly active) and sleeping;
tuned;
src/Pure/Concurrent/future.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Wed Nov 04 11:37:06 2009 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Wed Nov 04 11:58:29 2009 +0100
     1.3 @@ -99,18 +99,6 @@
     1.4  
     1.5  (** scheduling **)
     1.6  
     1.7 -(* global state *)
     1.8 -
     1.9 -val queue = Unsynchronized.ref Task_Queue.empty;
    1.10 -val next = Unsynchronized.ref 0;
    1.11 -val max_workers = Unsynchronized.ref 0;
    1.12 -val max_active = Unsynchronized.ref 0;
    1.13 -val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
    1.14 -val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
    1.15 -val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
    1.16 -val do_shutdown = Unsynchronized.ref false;
    1.17 -
    1.18 -
    1.19  (* synchronization *)
    1.20  
    1.21  val scheduler_event = ConditionVar.conditionVar ();
    1.22 @@ -142,6 +130,23 @@
    1.23  end;
    1.24  
    1.25  
    1.26 +(* global state *)
    1.27 +
    1.28 +val queue = Unsynchronized.ref Task_Queue.empty;
    1.29 +val next = Unsynchronized.ref 0;
    1.30 +val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
    1.31 +val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
    1.32 +val do_shutdown = Unsynchronized.ref false;
    1.33 +val max_workers = Unsynchronized.ref 0;
    1.34 +val max_active = Unsynchronized.ref 0;
    1.35 +
    1.36 +datatype worker_state = Working | Waiting | Sleeping;
    1.37 +val workers = Unsynchronized.ref ([]: (Thread.thread * worker_state Unsynchronized.ref) list);
    1.38 +
    1.39 +fun count_workers state = (*requires SYNCHRONIZED*)
    1.40 +  fold (fn (_, state_ref) => fn i => if ! state_ref = state then i + 1 else i) (! workers) 0;
    1.41 +
    1.42 +
    1.43  (* execute future jobs *)
    1.44  
    1.45  fun future_job group (e: unit -> 'a) =
    1.46 @@ -184,25 +189,17 @@
    1.47    in () end;
    1.48  
    1.49  
    1.50 -(* worker activity *)
    1.51 -
    1.52 -fun count_active () = (*requires SYNCHRONIZED*)
    1.53 -  fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
    1.54 -
    1.55 -fun find_active () = (*requires SYNCHRONIZED*)
    1.56 -  (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
    1.57 -    SOME active => active
    1.58 -  | NONE => raise Fail "Unregistered worker thread");
    1.59 -
    1.60 -
    1.61  (* worker threads *)
    1.62  
    1.63 -fun worker_wait cond = (*requires SYNCHRONIZED*)
    1.64 +fun worker_wait active cond = (*requires SYNCHRONIZED*)
    1.65    let
    1.66 -    val active = find_active ();
    1.67 -    val _ = active := false;
    1.68 +    val state =
    1.69 +      (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
    1.70 +        SOME state => state
    1.71 +      | NONE => raise Fail "Unregistered worker thread");
    1.72 +    val _ = state := (if active then Waiting else Sleeping);
    1.73      val _ = wait cond;
    1.74 -    val _ = active := true;
    1.75 +    val _ = state := Working;
    1.76    in () end;
    1.77  
    1.78  fun worker_next have_work = (*requires SYNCHRONIZED*)
    1.79 @@ -211,13 +208,13 @@
    1.80       if have_work then signal work_available else ();
    1.81       broadcast scheduler_event;
    1.82       NONE)
    1.83 -  else if count_active () > ! max_active then
    1.84 +  else if count_workers Working > ! max_active then
    1.85      (if have_work then signal work_available else ();
    1.86 -     worker_wait scheduler_event;
    1.87 +     worker_wait false scheduler_event;
    1.88       worker_next false)
    1.89    else
    1.90      (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
    1.91 -      NONE => (worker_wait work_available; worker_next true)
    1.92 +      NONE => (worker_wait true work_available; worker_next true)
    1.93      | some => some);
    1.94  
    1.95  fun worker_loop name =
    1.96 @@ -227,7 +224,7 @@
    1.97  
    1.98  fun worker_start name = (*requires SYNCHRONIZED*)
    1.99    Unsynchronized.change workers (cons (SimpleThread.fork false (fn () => worker_loop name),
   1.100 -    Unsynchronized.ref true));
   1.101 +    Unsynchronized.ref Working));
   1.102  
   1.103  
   1.104  (* scheduler *)
   1.105 @@ -252,14 +249,16 @@
   1.106            let
   1.107              val {ready, pending, running} = Task_Queue.status (! queue);
   1.108              val total = length (! workers);
   1.109 -            val active = count_active ();
   1.110 +            val active = count_workers Working;
   1.111 +            val waiting = count_workers Waiting;
   1.112            in
   1.113              "SCHEDULE " ^ Time.toString now ^ ": " ^
   1.114                string_of_int ready ^ " ready, " ^
   1.115                string_of_int pending ^ " pending, " ^
   1.116                string_of_int running ^ " running; " ^
   1.117                string_of_int total ^ " workers, " ^
   1.118 -              string_of_int active ^ " active "
   1.119 +              string_of_int active ^ " active, " ^
   1.120 +              string_of_int waiting ^ " waiting "
   1.121            end)
   1.122        else ();
   1.123  
   1.124 @@ -373,7 +372,7 @@
   1.125    else
   1.126      (case Unsynchronized.change_result queue (Task_Queue.dequeue_towards (Thread.self ()) deps) of
   1.127        (NONE, []) => NONE
   1.128 -    | (NONE, deps') => (worker_wait work_finished; join_next deps')
   1.129 +    | (NONE, deps') => (worker_wait true work_finished; join_next deps')
   1.130      | (SOME work, deps') => SOME (work, deps'));
   1.131  
   1.132  fun execute_work NONE = ()