src/Pure/ML-Systems/multithreading_polyml.ML
changeset 24063 736c03ae92f5
parent 24060 b643ee118928
child 24066 fb455cb475df
     1.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Sun Jul 29 19:46:03 2007 +0200
     1.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Sun Jul 29 19:46:04 2007 +0200
     1.3 @@ -24,16 +24,16 @@
     1.4  
     1.5  local
     1.6  
     1.7 -val critical_lock = Mutex.mutex ();
     1.8 -val critical_thread = ref (NONE: Thread.thread option);
     1.9 -val critical_name = ref "";
    1.10 -
    1.11  fun add_name "" = ""
    1.12    | add_name name = " " ^ name;
    1.13  
    1.14  fun add_name' "" = ""
    1.15    | add_name' name = " [" ^ name ^ "]";
    1.16  
    1.17 +val critical_lock = Mutex.mutex ();
    1.18 +val critical_thread = ref (NONE: Thread.thread option);
    1.19 +val critical_name = ref "";
    1.20 +
    1.21  in
    1.22  
    1.23  fun self_critical () =
    1.24 @@ -72,25 +72,28 @@
    1.25  
    1.26  (* scheduling -- non-interruptible threads working on a queue of tasks *)
    1.27  
    1.28 +fun inc i = (i := ! i + 1; ! i);
    1.29 +fun dec i = (i := ! i - 1; ! i);
    1.30 +
    1.31  fun schedule n next_task tasks =
    1.32    let
    1.33      (*protected execution*)
    1.34      val lock = Mutex.mutex ();
    1.35 -    fun PROTECTED k e =
    1.36 +    fun PROTECTED name e =
    1.37        let
    1.38          val _ =
    1.39            if Mutex.trylock lock then ()
    1.40            else
    1.41 -           (tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": waiting for lock");
    1.42 +           (tracing (fn () => "PROTECTED " ^ name ^ ": waiting for lock");
    1.43              Mutex.lock lock;
    1.44 -            tracing (fn () => "PROTECTED " ^ Int.toString k ^ ": obtained lock"));
    1.45 +            tracing (fn () => "PROTECTED " ^ name ^ ": obtained lock"));
    1.46          val res = Exn.capture e ();
    1.47          val _ = Mutex.unlock lock;
    1.48        in Exn.release res end;
    1.49  
    1.50      (*the queue of tasks*)
    1.51      val queue = ref tasks;
    1.52 -    fun dequeue k = PROTECTED k (fn () =>
    1.53 +    fun dequeue () = PROTECTED "dequeue" (fn () =>
    1.54        let
    1.55          val (next, tasks') = next_task (! queue);
    1.56          val _ = queue := tasks';
    1.57 @@ -98,35 +101,37 @@
    1.58  
    1.59      (*worker threads*)
    1.60      val running = ref 0;
    1.61 +    val active = ref 0;
    1.62      val status = ref ([]: exn list);
    1.63      val wakeup = ConditionVar.conditionVar ();
    1.64      fun wait () = ConditionVar.wait (wakeup, lock);
    1.65      fun continue cont k =
    1.66 -      (PROTECTED k (fn () => queue := cont (! queue)); ConditionVar.broadcast wakeup; work k ())
    1.67 +      (PROTECTED "cont" (fn () => queue := cont (! queue));
    1.68 +       ConditionVar.broadcast wakeup; work k ())
    1.69      and work k () =
    1.70 -      (case dequeue k of
    1.71 +      (case dequeue () of
    1.72          (Task.Task f, cont) =>
    1.73 -          (tracing (fn () => "TASK " ^ Int.toString k);
    1.74 -           case Exn.capture f () of
    1.75 +          (case Exn.capture f () of
    1.76              Exn.Result () => continue cont k
    1.77            | Exn.Exn exn =>
    1.78 -              (PROTECTED k (fn () => status := exn :: ! status); continue cont k))
    1.79 +              (PROTECTED "status" (fn () => status := exn :: ! status); continue cont k))
    1.80        | (Task.Running, _) =>
    1.81 -          (tracing (fn () => "WAITING " ^ Int.toString k);
    1.82 -           PROTECTED k wait; work k ())
    1.83 +          (PROTECTED "wait" (fn () => (dec active; wait (); inc active)); work k ())
    1.84        | (Task.Finished, _) =>
    1.85 -         (tracing (fn () => "TERMINATING " ^ Int.toString k);
    1.86 -          PROTECTED k (fn () => running := ! running - 1);
    1.87 +         (PROTECTED "running" (fn () => (dec active; dec running));
    1.88            ConditionVar.broadcast wakeup));
    1.89  
    1.90      (*main control: fork and wait*)
    1.91      fun fork 0 = ()
    1.92        | fork k =
    1.93 -         (running := ! running + 1;
    1.94 +         (inc running; inc active;
    1.95            Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
    1.96            fork (k - 1));
    1.97 -    val _ = PROTECTED 0 (fn () =>
    1.98 -     (fork (Int.max (n, 1)); while ! running <> 0 do (tracing (fn () => "MAIN WAIT"); wait ())));
    1.99 +    val _ = PROTECTED "main" (fn () =>
   1.100 +     (fork (Int.max (n, 1));
   1.101 +      while ! running <> 0 do
   1.102 +        (tracing (fn () => "MAIN: " ^ Int.toString (! active) ^ " active");
   1.103 +         wait ())));
   1.104  
   1.105    in ! status end;
   1.106