1 (* Title: Pure/ML-Systems/multithreading_polyml.ML
5 Multithreading in Poly/ML (version 5.1).
10 structure Multithreading: MULTITHREADING =
13 val trace = ref false;
16 then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
20 val max_threads = ref 1;
23 (* critical section -- may be nested within the same thread *)
28 | add_name name = " " ^ name;
31 | add_name' name = " [" ^ name ^ "]";
33 val critical_lock = Mutex.mutex ();
34 val critical_thread = ref (NONE: Thread.thread option);
35 val critical_name = ref "";
39 fun self_critical () =
40 (case ! critical_thread of
42 | SOME id => Thread.equal (id, Thread.self ()));
44 fun NAMED_CRITICAL name e =
45 if self_critical () then e ()
49 if Mutex.trylock critical_lock then ()
52 val timer = Timer.startRealTimer ();
53 val _ = tracing (fn () =>
54 "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": waiting for lock");
55 val _ = Mutex.lock critical_lock;
56 val _ = tracing (fn () =>
57 "CRITICAL" ^ add_name name ^ add_name' (! critical_name) ^ ": obtained lock after " ^
58 Time.toString (Timer.checkRealTimer timer));
60 val _ = critical_thread := SOME (Thread.self ());
61 val _ = critical_name := name;
62 val result = Exn.capture e ();
63 val _ = critical_name := "";
64 val _ = critical_thread := NONE;
65 val _ = Mutex.unlock critical_lock;
66 in Exn.release result end;
68 fun CRITICAL e = NAMED_CRITICAL "" e;
73 (* scheduling -- non-interruptible threads working on a queue of tasks *)
75 fun inc i = (i := ! i + 1; ! i);
76 fun dec i = (i := ! i - 1; ! i);
78 fun schedule n next_task tasks =
80 (*protected execution*)
81 val lock = Mutex.mutex ();
82 fun PROTECTED name e =
85 if Mutex.trylock lock then ()
87 (tracing (fn () => "PROTECTED " ^ name ^ ": waiting for lock");
89 tracing (fn () => "PROTECTED " ^ name ^ ": obtained lock"));
90 val res = Exn.capture e ();
91 val _ = Mutex.unlock lock;
92 in Exn.release res end;
94 (*the queue of tasks*)
95 val queue = ref tasks;
96 fun dequeue () = PROTECTED "dequeue" (fn () =>
98 val (next, tasks') = next_task (! queue);
99 val _ = queue := tasks';
105 val status = ref ([]: exn list);
106 val wakeup = ConditionVar.conditionVar ();
107 fun wait () = ConditionVar.wait (wakeup, lock);
108 fun continue cont k =
109 (PROTECTED "cont" (fn () => queue := cont (! queue));
110 ConditionVar.broadcast wakeup; work k ())
113 (Task.Task f, cont) =>
114 (case Exn.capture f () of
115 Exn.Result () => continue cont k
117 (PROTECTED "status" (fn () => status := exn :: ! status); continue cont k))
118 | (Task.Running, _) =>
119 (PROTECTED "wait" (fn () => (dec active; wait (); inc active)); work k ())
120 | (Task.Finished, _) =>
121 (PROTECTED "running" (fn () => (dec active; dec running));
122 ConditionVar.broadcast wakeup));
124 (*main control: fork and wait*)
127 (inc running; inc active;
128 Thread.fork (work k, [Thread.InterruptState Thread.InterruptDefer]);
130 val _ = PROTECTED "main" (fn () =>
131 (fork (Int.max (n, 1));
132 while ! running <> 0 do
133 (tracing (fn () => "MAIN: " ^ Int.toString (! active) ^ " active");
140 val NAMED_CRITICAL = Multithreading.NAMED_CRITICAL;
141 val CRITICAL = Multithreading.CRITICAL;