1.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML Sun Jul 29 19:46:03 2007 +0200
1.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Sun Jul 29 19:46:04 2007 +0200
1.3 @@ -24,16 +24,16 @@
1.4
1.5 local
1.6
1.7 -val critical_lock = Mutex.mutex ();
1.8 -val critical_thread = ref (NONE: Thread.thread option);
1.9 -val critical_name = ref "";
1.10 -
1.11 fun add_name "" = ""
1.12 | add_name name = " " ^ name;
1.13
1.14 fun add_name' "" = ""
1.15 | add_name' name = " [" ^ name ^ "]";
1.16
1.17 +val critical_lock = Mutex.mutex ();
1.18 +val critical_thread = ref (NONE: Thread.thread option);
1.19 +val critical_name = ref "";
1.20 +
1.21 in
1.22
1.23 fun self_critical () =
1.24 @@ -72,25 +72,28 @@
1.25
1.26 (* scheduling -- non-interruptible threads working on a queue of tasks *)
1.27
1.28 +fun inc i = (i := ! i + 1; ! i);
1.29 +fun dec i = (i := ! i - 1; ! i);
1.30 +
1.31 fun schedule n next_task tasks =
1.32 let
1.33 (*protected execution*)
1.34 val lock = Mutex.mutex ();
1.35 - fun PROTECTED k e =
1.36 + fun PROTECTED name e =
1.37 let
1.38 val _ =
1.39 if Mutex.trylock lock then ()
1.40 else
1.41 - (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
1.42 + (tracing (fn () => "PROTECTED " ^ name ^ ": waiting for lock");
1.43 Mutex.lock lock;
1.44 - tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
1.45 + tracing (fn () => "PROTECTED " ^ name ^ ": obtained lock"));
1.46 val res = Exn.capture e ();
1.47 val _ = Mutex.unlock lock;
1.48 in Exn.release res end;
1.49
1.50 (*the queue of tasks*)
1.51 val queue = ref tasks;
1.52 - fun dequeue k = PROTECTED k (fn () =>
1.53 + fun dequeue () = PROTECTED "dequeue" (fn () =>
1.54 let
1.55 val (next, tasks') = next_task (! queue);
1.56 val _ = queue := tasks';
1.57 @@ -98,35 +101,37 @@
1.58
1.59 (*worker threads*)
1.60 val running = ref 0;
1.61 + val active = ref 0;
1.62 val status = ref ([]: exn list);
1.63 val wakeup = ConditionVar.conditionVar ();
1.64 fun wait () = ConditionVar.wait (wakeup, lock);
1.65 fun continue cont k =
1.66 - (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.broadcast wakeup; work k ())
1.67 + (PROTECTED "cont" (fn () => queue := cont (! queue));
1.68 + ConditionVar.broadcast wakeup; work k ())
1.69 and work k () =
1.70 - (case dequeue k of
1.71 + (case dequeue () of
1.72 (Task.Task f, cont) =>
1.73 - (tracing (fn () => "TASK " ^ Int.toString k);
1.74 - case Exn.capture f () of
1.75 + (case Exn.capture f () of
1.76 Exn.Result () => continue cont k
1.77 | Exn.Exn exn =>
1.78 - (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
1.79 + (PROTECTED "status" (fn () => status := exn :: ! status); continue cont k))
1.80 | (Task.Running, _) =>
1.81 - (tracing (fn () => "WAITING " ^ Int.toString k);
1.82 - PROTECTED k wait; work k ())
1.83 + (PROTECTED "wait" (fn () => (dec active; wait (); inc active)); work k ())
1.84 | (Task.Finished, _) =>
1.85 - (tracing (fn () => "TERMINATING " ^ Int.toString k);
1.86 - PROTECTED k (fn () => running := ! running - 1);
1.87 + (PROTECTED "running" (fn () => (dec active; dec running));
1.88 ConditionVar.broadcast wakeup));
1.89
1.90 (*main control: fork and wait*)
1.91 fun fork 0 = ()
1.92 | fork k =
1.93 - (running := ! running + 1;
1.94 + (inc running; inc active;
1.95 Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
1.96 fork (k - 1));
1.97 - val _ = PROTECTED 0 (fn () =>
1.98 - (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
1.99 + val _ = PROTECTED "main" (fn () =>
1.100 + (fork (Int.max (n, 1));
1.101 + while ! running <> 0 do
1.102 + (tracing (fn () => "MAIN: " ^ Int.toString (! active) ^ " active");
1.103 + wait ())));
1.104
1.105 in ! status end;
1.106