1.1 --- a/src/Pure/Concurrent/future.ML Tue Nov 03 10:36:20 2009 +0100
1.2 +++ b/src/Pure/Concurrent/future.ML Tue Nov 03 19:52:09 2009 +0100
1.3 @@ -103,9 +103,10 @@
1.4
1.5 val queue = Unsynchronized.ref Task_Queue.empty;
1.6 val next = Unsynchronized.ref 0;
1.7 -val workers = Unsynchronized.ref ([]: (Thread.thread * bool) list);
1.8 +val max_workers = Unsynchronized.ref 0;
1.9 +val max_active = Unsynchronized.ref 0;
1.10 +val workers = Unsynchronized.ref ([]: (Thread.thread * bool Unsynchronized.ref) list);
1.11 val scheduler = Unsynchronized.ref (NONE: Thread.thread option);
1.12 -val excessive = Unsynchronized.ref 0;
1.13 val canceled = Unsynchronized.ref ([]: Task_Queue.group list);
1.14 val do_shutdown = Unsynchronized.ref false;
1.15
1.16 @@ -186,26 +187,30 @@
1.17 (* worker activity *)
1.18
1.19 fun count_active () = (*requires SYNCHRONIZED*)
1.20 - fold (fn (_, active) => fn i => if active then i + 1 else i) (! workers) 0;
1.21 + fold (fn (_, active) => fn i => if ! active then i + 1 else i) (! workers) 0;
1.22
1.23 -fun change_active active = (*requires SYNCHRONIZED*)
1.24 - Unsynchronized.change workers
1.25 - (AList.update Thread.equal (Thread.self (), active));
1.26 +fun find_active () = (*requires SYNCHRONIZED*)
1.27 + (case AList.lookup Thread.equal (! workers) (Thread.self ()) of
1.28 + SOME active => active
1.29 + | NONE => raise Fail "Unregistered worker thread");
1.30
1.31
1.32 (* worker threads *)
1.33
1.34 fun worker_wait cond = (*requires SYNCHRONIZED*)
1.35 - (change_active false; wait cond; change_active true);
1.36 + let
1.37 + val active = find_active ();
1.38 + val _ = active := false;
1.39 + val _ = wait cond;
1.40 + val _ = active := true;
1.41 + in () end;
1.42
1.43 fun worker_next () = (*requires SYNCHRONIZED*)
1.44 - if ! excessive > 0 then
1.45 - (Unsynchronized.dec excessive;
1.46 - Unsynchronized.change workers
1.47 - (filter_out (fn (thread, _) => Thread.equal (thread, Thread.self ())));
1.48 + if length (! workers) > ! max_workers then
1.49 + (Unsynchronized.change workers (AList.delete Thread.equal (Thread.self ()));
1.50 broadcast scheduler_event;
1.51 NONE)
1.52 - else if count_active () > Multithreading.max_threads_value () then
1.53 + else if count_active () > ! max_active then
1.54 (worker_wait scheduler_event; worker_next ())
1.55 else
1.56 (case Unsynchronized.change_result queue (Task_Queue.dequeue (Thread.self ())) of
1.57 @@ -217,9 +222,12 @@
1.58 NONE => ()
1.59 | SOME work => (execute name work; worker_loop name));
1.60
1.61 -fun worker_start name = (*requires SYNCHRONIZED*)
1.62 - Unsynchronized.change workers (cons (SimpleThread.fork false (fn () =>
1.63 - (broadcast scheduler_event; worker_loop name)), true));
1.64 +fun worker_start name =
1.65 + SimpleThread.fork false (fn () =>
1.66 + (SYNCHRONIZED name (fn () =>
1.67 + Unsynchronized.change workers (cons (Thread.self (), Unsynchronized.ref true)));
1.68 + broadcast scheduler_event;
1.69 + worker_loop name));
1.70
1.71
1.72 (* scheduler *)
1.73 @@ -261,13 +269,16 @@
1.74 "SCHEDULE: disposed " ^ string_of_int (length dead) ^ " dead worker threads")));
1.75
1.76 val m = if ! do_shutdown then 0 else Multithreading.max_threads_value ();
1.77 + val _ = max_active := m;
1.78 +
1.79 val mm = if m = 9999 then 1 else m * 2;
1.80 + val _ = max_workers := mm;
1.81 +
1.82 val l = length (! workers);
1.83 - val _ = excessive := l - mm;
1.84 val _ =
1.85 if mm > l then
1.86 funpow (mm - l) (fn () =>
1.87 - worker_start ("worker " ^ string_of_int (Unsynchronized.inc next))) ()
1.88 + ignore (worker_start ("worker " ^ string_of_int (Unsynchronized.inc next)))) ()
1.89 else ();
1.90
1.91 (*canceled groups*)