wenzelm@23961
|
1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML
|
wenzelm@23961
|
2 |
ID: $Id$
|
wenzelm@23961
|
3 |
Author: Makarius
|
wenzelm@23961
|
4 |
|
wenzelm@25704
|
5 |
Multithreading in Poly/ML 5.1 (cf. polyml/basis/Thread.sml).
|
wenzelm@23961
|
6 |
*)
|
wenzelm@23961
|
7 |
|
wenzelm@23961
|
8 |
open Thread;
|
wenzelm@23961
|
9 |
|
wenzelm@25704
|
10 |
signature MULTITHREADING_POLYML =
|
wenzelm@25704
|
11 |
sig
|
wenzelm@25704
|
12 |
val ignore_interrupt: ('a -> 'b) -> 'a -> 'b
|
wenzelm@25704
|
13 |
val raise_interrupt: ('a -> 'b) -> 'a -> 'b
|
wenzelm@25704
|
14 |
structure TimeLimit: TIME_LIMIT
|
wenzelm@25704
|
15 |
end;
|
wenzelm@25704
|
16 |
|
wenzelm@25704
|
17 |
signature BASIC_MULTITHREADING =
|
wenzelm@25704
|
18 |
sig
|
wenzelm@25704
|
19 |
include BASIC_MULTITHREADING
|
wenzelm@25704
|
20 |
include MULTITHREADING_POLYML
|
wenzelm@25704
|
21 |
end;
|
wenzelm@25704
|
22 |
|
wenzelm@24208
|
23 |
signature MULTITHREADING =
|
wenzelm@24208
|
24 |
sig
|
wenzelm@24208
|
25 |
include MULTITHREADING
|
wenzelm@25704
|
26 |
include MULTITHREADING_POLYML
|
wenzelm@24208
|
27 |
end;
|
wenzelm@24208
|
28 |
|
wenzelm@23961
|
29 |
structure Multithreading: MULTITHREADING =
|
wenzelm@23961
|
30 |
struct
|
wenzelm@23961
|
31 |
|
wenzelm@24072
|
32 |
(* options *)
|
wenzelm@24069
|
33 |
|
wenzelm@24119
|
34 |
val trace = ref 0;
|
wenzelm@24119
|
35 |
fun tracing level msg =
|
wenzelm@24119
|
36 |
if level <= ! trace
|
wenzelm@23981
|
37 |
then (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
|
wenzelm@23981
|
38 |
else ();
|
wenzelm@23961
|
39 |
|
wenzelm@23981
|
40 |
val available = true;
|
wenzelm@23973
|
41 |
val max_threads = ref 1;
|
wenzelm@23973
|
42 |
|
wenzelm@23973
|
43 |
|
wenzelm@24069
|
44 |
(* misc utils *)
|
wenzelm@24069
|
45 |
|
wenzelm@24208
|
46 |
fun cons x xs = x :: xs;
|
wenzelm@24069
|
47 |
|
wenzelm@24208
|
48 |
fun change r f = r := f (! r);
|
wenzelm@24069
|
49 |
|
wenzelm@24069
|
50 |
fun inc i = (i := ! i + 1; ! i);
|
wenzelm@24069
|
51 |
fun dec i = (i := ! i - 1; ! i);
|
wenzelm@24069
|
52 |
|
wenzelm@24208
|
53 |
fun show "" = "" | show name = " " ^ name;
|
wenzelm@24208
|
54 |
fun show' "" = "" | show' name = " [" ^ name ^ "]";
|
wenzelm@24208
|
55 |
|
wenzelm@24208
|
56 |
|
wenzelm@24208
|
57 |
(* thread attributes *)
|
wenzelm@24208
|
58 |
|
wenzelm@24208
|
59 |
fun with_attributes new_atts f x =
|
wenzelm@24208
|
60 |
let
|
wenzelm@24208
|
61 |
val orig_atts = Thread.getAttributes ();
|
wenzelm@24208
|
62 |
fun restore () = Thread.setAttributes orig_atts;
|
wenzelm@24208
|
63 |
in
|
wenzelm@24208
|
64 |
Exn.release
|
wenzelm@24214
|
65 |
(*RACE for fully asynchronous interrupts!*)
|
wenzelm@24214
|
66 |
(let
|
wenzelm@24208
|
67 |
val _ = Thread.setAttributes new_atts;
|
wenzelm@24214
|
68 |
val result = Exn.capture (f orig_atts) x;
|
wenzelm@24208
|
69 |
val _ = restore ();
|
wenzelm@24208
|
70 |
in result end
|
wenzelm@24208
|
71 |
handle Interrupt => (restore (); Exn.Exn Interrupt))
|
wenzelm@24208
|
72 |
end;
|
wenzelm@24208
|
73 |
|
wenzelm@24668
|
74 |
|
wenzelm@24668
|
75 |
(* interrupt handling *)
|
wenzelm@24668
|
76 |
|
wenzelm@24297
|
77 |
fun uninterruptible f x = with_attributes
|
wenzelm@24297
|
78 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer] f x;
|
wenzelm@24208
|
79 |
|
wenzelm@24297
|
80 |
fun interruptible f x = with_attributes
|
wenzelm@24297
|
81 |
[Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce] f x;
|
wenzelm@24208
|
82 |
|
wenzelm@24668
|
83 |
fun ignore_interrupt f = uninterruptible (fn _ => f);
|
wenzelm@24668
|
84 |
fun raise_interrupt f = interruptible (fn _ => f);
|
wenzelm@24668
|
85 |
|
wenzelm@24688
|
86 |
|
wenzelm@24688
|
87 |
(* execution with time limit *)
|
wenzelm@24688
|
88 |
|
wenzelm@24688
|
89 |
structure TimeLimit =
|
wenzelm@24688
|
90 |
struct
|
wenzelm@24688
|
91 |
|
wenzelm@24688
|
92 |
exception TimeOut;
|
wenzelm@24688
|
93 |
|
wenzelm@24688
|
94 |
fun timeLimit time f x =
|
wenzelm@24668
|
95 |
uninterruptible (fn atts => fn () =>
|
wenzelm@24668
|
96 |
let
|
wenzelm@24668
|
97 |
val worker = Thread.self ();
|
wenzelm@24688
|
98 |
val timeout = ref false;
|
wenzelm@24668
|
99 |
val watchdog = Thread.fork (interruptible (fn _ => fn () =>
|
wenzelm@24688
|
100 |
(OS.Process.sleep time; timeout := true; Thread.interrupt worker)), []);
|
wenzelm@24688
|
101 |
|
wenzelm@24688
|
102 |
(*RACE! timeout signal vs. external Interrupt*)
|
wenzelm@24668
|
103 |
val result = Exn.capture (with_attributes atts (fn _ => f)) x;
|
wenzelm@24688
|
104 |
val was_timeout = (case result of Exn.Exn Interrupt => ! timeout | _ => false);
|
wenzelm@24688
|
105 |
|
wenzelm@24668
|
106 |
val _ = Thread.interrupt watchdog handle Thread _ => ();
|
wenzelm@24688
|
107 |
in if was_timeout then raise TimeOut else Exn.release result end) ();
|
wenzelm@24688
|
108 |
|
wenzelm@24688
|
109 |
end;
|
wenzelm@24668
|
110 |
|
wenzelm@24069
|
111 |
|
wenzelm@23961
|
112 |
(* critical section -- may be nested within the same thread *)
|
wenzelm@23961
|
113 |
|
wenzelm@23961
|
114 |
local
|
wenzelm@23961
|
115 |
|
wenzelm@24063
|
116 |
val critical_lock = Mutex.mutex ();
|
wenzelm@24063
|
117 |
val critical_thread = ref (NONE: Thread.thread option);
|
wenzelm@24063
|
118 |
val critical_name = ref "";
|
wenzelm@24063
|
119 |
|
wenzelm@23961
|
120 |
in
|
wenzelm@23961
|
121 |
|
wenzelm@23961
|
122 |
fun self_critical () =
|
wenzelm@23961
|
123 |
(case ! critical_thread of
|
wenzelm@23961
|
124 |
NONE => false
|
wenzelm@23961
|
125 |
| SOME id => Thread.equal (id, Thread.self ()));
|
wenzelm@23961
|
126 |
|
wenzelm@23991
|
127 |
fun NAMED_CRITICAL name e =
|
wenzelm@23961
|
128 |
if self_critical () then e ()
|
wenzelm@23961
|
129 |
else
|
wenzelm@24214
|
130 |
uninterruptible (fn atts => fn () =>
|
wenzelm@24208
|
131 |
let
|
wenzelm@24208
|
132 |
val name' = ! critical_name;
|
wenzelm@24208
|
133 |
val _ =
|
wenzelm@24208
|
134 |
if Mutex.trylock critical_lock then ()
|
wenzelm@24208
|
135 |
else
|
wenzelm@24208
|
136 |
let
|
wenzelm@24208
|
137 |
val timer = Timer.startRealTimer ();
|
wenzelm@24208
|
138 |
val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
|
wenzelm@24208
|
139 |
val _ = Mutex.lock critical_lock;
|
wenzelm@24208
|
140 |
val time = Timer.checkRealTimer timer;
|
wenzelm@24208
|
141 |
val _ = tracing (if Time.> (time, Time.fromMilliseconds 10) then 3 else 4) (fn () =>
|
wenzelm@24208
|
142 |
"CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
|
wenzelm@24208
|
143 |
in () end;
|
wenzelm@24208
|
144 |
val _ = critical_thread := SOME (Thread.self ());
|
wenzelm@24208
|
145 |
val _ = critical_name := name;
|
wenzelm@24214
|
146 |
val result = Exn.capture (with_attributes atts (fn _ => e)) ();
|
wenzelm@24208
|
147 |
val _ = critical_name := "";
|
wenzelm@24208
|
148 |
val _ = critical_thread := NONE;
|
wenzelm@24208
|
149 |
val _ = Mutex.unlock critical_lock;
|
wenzelm@24208
|
150 |
in Exn.release result end) ();
|
wenzelm@23961
|
151 |
|
wenzelm@23991
|
152 |
fun CRITICAL e = NAMED_CRITICAL "" e;
|
wenzelm@23981
|
153 |
|
wenzelm@23961
|
154 |
end;
|
wenzelm@23961
|
155 |
|
wenzelm@23973
|
156 |
|
wenzelm@24208
|
157 |
(* scheduling -- multiple threads working on a queue of tasks *)
|
wenzelm@24208
|
158 |
|
wenzelm@24208
|
159 |
datatype 'a task =
|
wenzelm@24208
|
160 |
Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
|
wenzelm@23973
|
161 |
|
wenzelm@24214
|
162 |
fun schedule n next_task = uninterruptible (fn _ => fn tasks =>
|
wenzelm@23973
|
163 |
let
|
wenzelm@23973
|
164 |
(*protected execution*)
|
wenzelm@23973
|
165 |
val lock = Mutex.mutex ();
|
wenzelm@24672
|
166 |
val protected_name = ref "";
|
wenzelm@24063
|
167 |
fun PROTECTED name e =
|
wenzelm@23973
|
168 |
let
|
wenzelm@24144
|
169 |
val name' = ! protected_name;
|
wenzelm@23981
|
170 |
val _ =
|
wenzelm@23981
|
171 |
if Mutex.trylock lock then ()
|
wenzelm@23981
|
172 |
else
|
wenzelm@24144
|
173 |
let
|
wenzelm@24144
|
174 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": waiting");
|
wenzelm@24144
|
175 |
val _ = Mutex.lock lock;
|
wenzelm@24144
|
176 |
val _ = tracing 2 (fn () => "PROTECTED" ^ show name ^ show' name' ^ ": passed");
|
wenzelm@24144
|
177 |
in () end;
|
wenzelm@24069
|
178 |
val _ = protected_name := name;
|
wenzelm@23973
|
179 |
val res = Exn.capture e ();
|
wenzelm@24069
|
180 |
val _ = protected_name := "";
|
wenzelm@23973
|
181 |
val _ = Mutex.unlock lock;
|
wenzelm@23973
|
182 |
in Exn.release res end;
|
wenzelm@23973
|
183 |
|
wenzelm@24072
|
184 |
(*wakeup condition*)
|
wenzelm@24072
|
185 |
val wakeup = ConditionVar.conditionVar ();
|
wenzelm@24072
|
186 |
fun wakeup_all () = ConditionVar.broadcast wakeup;
|
wenzelm@24072
|
187 |
fun wait () = ConditionVar.wait (wakeup, lock);
|
wenzelm@24291
|
188 |
fun wait_timeout () = ConditionVar.waitUntil (wakeup, lock, Time.now () + Time.fromSeconds 1);
|
wenzelm@24072
|
189 |
|
wenzelm@24214
|
190 |
(*queue of tasks*)
|
wenzelm@23973
|
191 |
val queue = ref tasks;
|
wenzelm@24072
|
192 |
val active = ref 0;
|
wenzelm@24119
|
193 |
fun trace_active () = tracing 1 (fn () => "SCHEDULE: " ^ Int.toString (! active) ^ " active");
|
wenzelm@24072
|
194 |
fun dequeue () =
|
wenzelm@23973
|
195 |
let
|
wenzelm@23981
|
196 |
val (next, tasks') = next_task (! queue);
|
wenzelm@23973
|
197 |
val _ = queue := tasks';
|
wenzelm@24072
|
198 |
in
|
wenzelm@24208
|
199 |
(case next of Wait =>
|
wenzelm@24208
|
200 |
(dec active; trace_active ();
|
wenzelm@24208
|
201 |
wait ();
|
wenzelm@24208
|
202 |
inc active; trace_active ();
|
wenzelm@24208
|
203 |
dequeue ())
|
wenzelm@24208
|
204 |
| _ => next)
|
wenzelm@24072
|
205 |
end;
|
wenzelm@23973
|
206 |
|
wenzelm@24208
|
207 |
(*pool of running threads*)
|
wenzelm@23973
|
208 |
val status = ref ([]: exn list);
|
wenzelm@24208
|
209 |
val running = ref ([]: Thread.thread list);
|
wenzelm@24208
|
210 |
fun start f =
|
wenzelm@24208
|
211 |
(inc active;
|
wenzelm@24208
|
212 |
change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer]))));
|
wenzelm@24208
|
213 |
fun stop () =
|
wenzelm@24208
|
214 |
(dec active;
|
wenzelm@24208
|
215 |
change running (List.filter (fn t => not (Thread.equal (t, Thread.self ())))));
|
wenzelm@24208
|
216 |
|
wenzelm@24208
|
217 |
(*worker thread*)
|
wenzelm@24208
|
218 |
fun worker () =
|
wenzelm@24072
|
219 |
(case PROTECTED "dequeue" dequeue of
|
wenzelm@24208
|
220 |
Task {body, cont, fail} =>
|
wenzelm@24214
|
221 |
(case Exn.capture (interruptible (fn _ => body)) () of
|
wenzelm@24208
|
222 |
Exn.Result () =>
|
wenzelm@24208
|
223 |
(PROTECTED "cont" (fn () => (change queue cont; wakeup_all ())); worker ())
|
wenzelm@23981
|
224 |
| Exn.Exn exn =>
|
wenzelm@24208
|
225 |
PROTECTED "fail" (fn () =>
|
wenzelm@24208
|
226 |
(change status (cons exn); change queue fail; stop (); wakeup_all ())))
|
wenzelm@24208
|
227 |
| Terminate => PROTECTED "terminate" (fn () => (stop (); wakeup_all ())));
|
wenzelm@23973
|
228 |
|
wenzelm@23973
|
229 |
(*main control: fork and wait*)
|
wenzelm@23973
|
230 |
fun fork 0 = ()
|
wenzelm@24208
|
231 |
| fork k = (start worker; fork (k - 1));
|
wenzelm@24063
|
232 |
val _ = PROTECTED "main" (fn () =>
|
wenzelm@24063
|
233 |
(fork (Int.max (n, 1));
|
wenzelm@24208
|
234 |
while not (List.null (! running)) do
|
wenzelm@24208
|
235 |
(trace_active ();
|
wenzelm@24208
|
236 |
if not (List.null (! status)) then (List.app Thread.interrupt (! running)) else ();
|
wenzelm@24291
|
237 |
wait_timeout ())));
|
wenzelm@23973
|
238 |
|
wenzelm@24208
|
239 |
in ! status end);
|
wenzelm@23973
|
240 |
|
wenzelm@25704
|
241 |
|
wenzelm@25704
|
242 |
(* serial numbers *)
|
wenzelm@25704
|
243 |
|
wenzelm@25704
|
244 |
local
|
wenzelm@25704
|
245 |
|
wenzelm@25704
|
246 |
val serial_lock = Mutex.mutex ();
|
wenzelm@25704
|
247 |
val serial_count = ref 0;
|
wenzelm@25704
|
248 |
|
wenzelm@25704
|
249 |
in
|
wenzelm@25704
|
250 |
|
wenzelm@25704
|
251 |
val serial = uninterruptible (fn _ => fn () =>
|
wenzelm@25704
|
252 |
let
|
wenzelm@25704
|
253 |
val _ = Mutex.lock serial_lock;
|
wenzelm@25704
|
254 |
val res = inc serial_count;
|
wenzelm@25704
|
255 |
val _ = Mutex.unlock serial_lock;
|
wenzelm@25704
|
256 |
in res end);
|
wenzelm@25704
|
257 |
|
wenzelm@23961
|
258 |
end;
|
wenzelm@23961
|
259 |
|
wenzelm@25704
|
260 |
end;
|
wenzelm@24688
|
261 |
|
wenzelm@25704
|
262 |
structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;
|
wenzelm@25704
|
263 |
open BasicMultithreading;
|