src/Pure/ML-Systems/multithreading_polyml.ML
author wenzelm
Sun, 29 Jul 2007 19:46:04 +0200
changeset 24063 736c03ae92f5
parent 24060 b643ee118928
child 24066 fb455cb475df
permissions -rw-r--r--
more informative tracing;
     1 (*  Title:      Pure/ML-Systems/multithreading_polyml.ML
     2     ID:         $Id$
     3     Author:     Makarius
     4 
     5 Multithreading in Poly/ML (version 5.1).
     6 *)
     7 
     8 open Thread;
     9 
    10 structure Multithreading: MULTITHREADING =
    11 struct
    12 
    13 val trace = ref false;
    14 fun tracing msg =
    15   if ! trace
    16   then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    17   else ();
    18 
    19 val available = true;
    20 val max_threads = ref 1;
    21 
    22 
    23 (* critical section -- may be nested within the same thread *)
    24 
    25 local
    26 
    27 fun add_name "" = ""
    28   | add_name name = " " ^ name;
    29 
    30 fun add_name' "" = ""
    31   | add_name' name = " [" ^ name ^ "]";
    32 
    33 val critical_lock = Mutex.mutex ();
    34 val critical_thread = ref (NONE: Thread.thread option);
    35 val critical_name = ref "";
    36 
    37 in
    38 
    39 fun self_critical () =
    40   (case ! critical_thread of
    41     NONE => false
    42   | SOME id => Thread.equal (id, Thread.self ()));
    43 
    44 fun NAMED_CRITICAL name e =
    45   if self_critical () then e ()
    46   else
    47     let
    48       val _ =
    49         if Mutex.trylock critical_lock then ()
    50         else
    51           let
    52             val timer = Timer.startRealTimer ();
    53             val _ = tracing (fn () =>
    54               "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": waiting for lock");
    55             val _ = Mutex.lock critical_lock;
    56             val _ = tracing (fn () =>
    57               "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": obtained lock after " ^
    58               Time.toString (Timer.checkRealTimer timer));
    59           in () end;
    60       val _ = critical_thread := SOME (Thread.self ());
    61       val _ = critical_name := name;
    62       val result = Exn.capture e ();
    63       val _ = critical_name := "";
    64       val _ = critical_thread := NONE;
    65       val _ = Mutex.unlock critical_lock;
    66     in Exn.release result end;
    67 
    68 fun CRITICAL e = NAMED_CRITICAL "" e;
    69 
    70 end;
    71 
    72 
    73 (* scheduling -- non-interruptible threads working on a queue of tasks *)
    74 
    75 fun inc i = (i := ! i + 1; ! i);
    76 fun dec i = (i := ! i - 1; ! i);
    77 
    78 fun schedule n next_task tasks =
    79   let
    80     (*protected execution*)
    81     val lock = Mutex.mutex ();
    82     fun PROTECTED name e =
    83       let
    84         val _ =
    85           if Mutex.trylock lock then ()
    86           else
    87            (tracing (fn () => "PROTECTED " ^ name ^ ": waiting for lock");
    88             Mutex.lock lock;
    89             tracing (fn () => "PROTECTED " ^ name ^ ": obtained lock"));
    90         val res = Exn.capture e ();
    91         val _ = Mutex.unlock lock;
    92       in Exn.release res end;
    93 
    94     (*the queue of tasks*)
    95     val queue = ref tasks;
    96     fun dequeue () = PROTECTED "dequeue" (fn () =>
    97       let
    98         val (next, tasks') = next_task (! queue);
    99         val _ = queue := tasks';
   100       in next end);
   101 
   102     (*worker threads*)
   103     val running = ref 0;
   104     val active = ref 0;
   105     val status = ref ([]: exn list);
   106     val wakeup = ConditionVar.conditionVar ();
   107     fun wait () = ConditionVar.wait (wakeup, lock);
   108     fun continue cont k =
   109       (PROTECTED "cont" (fn () => queue := cont (! queue));
   110        ConditionVar.broadcast wakeup; work k ())
   111     and work k () =
   112       (case dequeue () of
   113         (Task.Task f, cont) =>
   114           (case Exn.capture f () of
   115             Exn.Result () => continue cont k
   116           | Exn.Exn exn =>
   117               (PROTECTED "status" (fn () => status := exn :: ! status); continue cont k))
   118       | (Task.Running, _) =>
   119           (PROTECTED "wait" (fn () => (dec active; wait (); inc active)); work k ())
   120       | (Task.Finished, _) =>
   121          (PROTECTED "running" (fn () => (dec active; dec running));
   122           ConditionVar.broadcast wakeup));
   123 
   124     (*main control: fork and wait*)
   125     fun fork 0 = ()
   126       | fork k =
   127          (inc running; inc active;
   128           Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
   129           fork (k - 1));
   130     val _ = PROTECTED "main" (fn () =>
   131      (fork (Int.max (n, 1));
   132       while ! running <> 0 do
   133         (tracing (fn () => "MAIN: " ^ Int.toString (! active) ^ " active");
   134          wait ())));
   135 
   136   in ! status end;
   137 
   138 end;
   139 
   140 val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
   141 val CRITICAL = Multithreading.CRITICAL;