wenzelm@23961
|
1 |
(* Title: Pure/ML-Systems/multithreading_polyml.ML
|
wenzelm@23961
|
2 |
Author: Makarius
|
wenzelm@23961
|
3 |
|
wenzelm@31311
|
4 |
Multithreading in Poly/ML 5.2.1 or later (cf. polyml/basis/Thread.sml).
|
wenzelm@23961
|
5 |
*)
|
wenzelm@23961
|
6 |
|
wenzelm@25704
|
7 |
signature MULTITHREADING_POLYML =
|
wenzelm@25704
|
8 |
sig
|
wenzelm@26083
|
9 |
val interruptible: ('a -> 'b) -> 'a -> 'b
|
wenzelm@26083
|
10 |
val uninterruptible: ((('c -> 'd) -> 'c -> 'd) -> 'a -> 'b) -> 'a -> 'b
|
wenzelm@26098
|
11 |
val system_out: string -> string * int
|
wenzelm@25704
|
12 |
structure TimeLimit: TIME_LIMIT
|
wenzelm@25704
|
13 |
end;
|
wenzelm@25704
|
14 |
|
wenzelm@25704
|
15 |
signature BASIC_MULTITHREADING =
|
wenzelm@25704
|
16 |
sig
|
wenzelm@25704
|
17 |
include BASIC_MULTITHREADING
|
wenzelm@25704
|
18 |
include MULTITHREADING_POLYML
|
wenzelm@25704
|
19 |
end;
|
wenzelm@25704
|
20 |
|
wenzelm@24208
|
21 |
signature MULTITHREADING =
|
wenzelm@24208
|
22 |
sig
|
wenzelm@24208
|
23 |
include MULTITHREADING
|
wenzelm@25704
|
24 |
include MULTITHREADING_POLYML
|
wenzelm@24208
|
25 |
end;
|
wenzelm@24208
|
26 |
|
wenzelm@23961
|
27 |
structure Multithreading: MULTITHREADING =
|
wenzelm@23961
|
28 |
struct
|
wenzelm@23961
|
29 |
|
wenzelm@24072
|
30 |
(* options *)
|
wenzelm@24069
|
31 |
|
wenzelm@24119
|
32 |
val trace = ref 0;
|
wenzelm@28465
|
33 |
|
wenzelm@24119
|
34 |
fun tracing level msg =
|
wenzelm@28465
|
35 |
if level > ! trace then ()
|
wenzelm@28465
|
36 |
else (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
|
wenzelm@28465
|
37 |
handle _ (*sic*) => ();
|
wenzelm@23961
|
38 |
|
wenzelm@23981
|
39 |
val available = true;
|
wenzelm@25775
|
40 |
|
wenzelm@23973
|
41 |
val max_threads = ref 1;
|
wenzelm@23973
|
42 |
|
wenzelm@31636
|
43 |
val tested_platform =
|
wenzelm@31636
|
44 |
let val ml_platform = getenv "ML_PLATFORM"
|
wenzelm@31636
|
45 |
in String.isSuffix "linux" ml_platform orelse String.isSuffix "darwin" ml_platform end;
|
wenzelm@31636
|
46 |
|
wenzelm@25775
|
47 |
fun max_threads_value () =
|
wenzelm@31636
|
48 |
let val m = ! max_threads in
|
wenzelm@31636
|
49 |
if m > 0 then m
|
wenzelm@31636
|
50 |
else if not tested_platform then 1
|
wenzelm@31636
|
51 |
else Int.min (Int.max (Thread.numProcessors (), 1), 8)
|
wenzelm@31636
|
52 |
end;
|
wenzelm@25775
|
53 |
|
wenzelm@28555
|
54 |
fun enabled () = max_threads_value () > 1;
|
wenzelm@28555
|
55 |
|
wenzelm@23973
|
56 |
|
wenzelm@24069
|
57 |
(* misc utils *)
|
wenzelm@24069
|
58 |
|
wenzelm@24208
|
59 |
fun show "" = "" | show name = " " ^ name;
|
wenzelm@24208
|
60 |
fun show' "" = "" | show' name = " [" ^ name ^ "]";
|
wenzelm@24208
|
61 |
|
wenzelm@26098
|
62 |
fun read_file name =
|
wenzelm@26098
|
63 |
let val is = TextIO.openIn name
|
wenzelm@26504
|
64 |
in Exn.release (Exn.capture TextIO.inputAll is before TextIO.closeIn is) end;
|
wenzelm@26098
|
65 |
|
wenzelm@26098
|
66 |
fun write_file name txt =
|
wenzelm@26098
|
67 |
let val os = TextIO.openOut name
|
wenzelm@26504
|
68 |
in Exn.release (Exn.capture TextIO.output (os, txt) before TextIO.closeOut os) end;
|
wenzelm@26098
|
69 |
|
wenzelm@24208
|
70 |
|
wenzelm@24208
|
71 |
(* thread attributes *)
|
wenzelm@24208
|
72 |
|
wenzelm@28161
|
73 |
val no_interrupts =
|
wenzelm@28161
|
74 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
|
wenzelm@28161
|
75 |
|
wenzelm@28161
|
76 |
val regular_interrupts =
|
wenzelm@28161
|
77 |
[Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
|
wenzelm@28161
|
78 |
|
wenzelm@30612
|
79 |
val restricted_interrupts =
|
wenzelm@30612
|
80 |
[Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
|
wenzelm@30612
|
81 |
|
wenzelm@28466
|
82 |
val safe_interrupts = map
|
wenzelm@28466
|
83 |
(fn Thread.InterruptState Thread.InterruptAsynch =>
|
wenzelm@28466
|
84 |
Thread.InterruptState Thread.InterruptAsynchOnce
|
wenzelm@28466
|
85 |
| x => x);
|
wenzelm@28466
|
86 |
|
wenzelm@24208
|
87 |
fun with_attributes new_atts f x =
|
wenzelm@24208
|
88 |
let
|
wenzelm@29550
|
89 |
val orig_atts = safe_interrupts (Thread.getAttributes ());
|
wenzelm@30604
|
90 |
val result = Exn.capture (fn () =>
|
wenzelm@30604
|
91 |
(Thread.setAttributes (safe_interrupts new_atts); f orig_atts x)) ();
|
wenzelm@30604
|
92 |
val _ = Thread.setAttributes orig_atts;
|
wenzelm@28466
|
93 |
in Exn.release result end;
|
wenzelm@24208
|
94 |
|
wenzelm@28161
|
95 |
fun interruptible f = with_attributes regular_interrupts (fn _ => f);
|
wenzelm@28150
|
96 |
|
wenzelm@26083
|
97 |
fun uninterruptible f =
|
wenzelm@28150
|
98 |
with_attributes no_interrupts (fn atts => f (fn g => with_attributes atts (fn _ => g)));
|
wenzelm@24668
|
99 |
|
wenzelm@24688
|
100 |
|
wenzelm@24688
|
101 |
(* execution with time limit *)
|
wenzelm@24688
|
102 |
|
wenzelm@24688
|
103 |
structure TimeLimit =
|
wenzelm@24688
|
104 |
struct
|
wenzelm@24688
|
105 |
|
wenzelm@24688
|
106 |
exception TimeOut;
|
wenzelm@24688
|
107 |
|
wenzelm@26083
|
108 |
fun timeLimit time f x = uninterruptible (fn restore_attributes => fn () =>
|
wenzelm@26083
|
109 |
let
|
wenzelm@26083
|
110 |
val worker = Thread.self ();
|
wenzelm@26083
|
111 |
val timeout = ref false;
|
wenzelm@26083
|
112 |
val watchdog = Thread.fork (fn () =>
|
wenzelm@26083
|
113 |
(OS.Process.sleep time; timeout := true; Thread.interrupt worker), []);
|
wenzelm@24688
|
114 |
|
wenzelm@26083
|
115 |
val result = Exn.capture (restore_attributes f) x;
|
wenzelm@28443
|
116 |
val was_timeout = (case result of Exn.Exn Exn.Interrupt => ! timeout | _ => false);
|
wenzelm@24688
|
117 |
|
wenzelm@26083
|
118 |
val _ = Thread.interrupt watchdog handle Thread _ => ();
|
wenzelm@26083
|
119 |
in if was_timeout then raise TimeOut else Exn.release result end) ();
|
wenzelm@24688
|
120 |
|
wenzelm@24688
|
121 |
end;
|
wenzelm@24668
|
122 |
|
wenzelm@24069
|
123 |
|
wenzelm@26098
|
124 |
(* system shell processes, with propagation of interrupts *)
|
wenzelm@26098
|
125 |
|
wenzelm@28254
|
126 |
fun system_out script = uninterruptible (fn restore_attributes => fn () =>
|
wenzelm@26098
|
127 |
let
|
wenzelm@26098
|
128 |
val script_name = OS.FileSys.tmpName ();
|
wenzelm@26098
|
129 |
val _ = write_file script_name script;
|
wenzelm@26098
|
130 |
|
wenzelm@26098
|
131 |
val pid_name = OS.FileSys.tmpName ();
|
wenzelm@26098
|
132 |
val output_name = OS.FileSys.tmpName ();
|
wenzelm@26098
|
133 |
|
wenzelm@26098
|
134 |
(*result state*)
|
wenzelm@26098
|
135 |
datatype result = Wait | Signal | Result of int;
|
wenzelm@26098
|
136 |
val result = ref Wait;
|
wenzelm@26098
|
137 |
val result_mutex = Mutex.mutex ();
|
wenzelm@26098
|
138 |
val result_cond = ConditionVar.conditionVar ();
|
wenzelm@26098
|
139 |
fun set_result res =
|
wenzelm@26098
|
140 |
(Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex;
|
wenzelm@26098
|
141 |
ConditionVar.signal result_cond);
|
wenzelm@26098
|
142 |
|
wenzelm@26098
|
143 |
val _ = Mutex.lock result_mutex;
|
wenzelm@26098
|
144 |
|
wenzelm@26098
|
145 |
(*system thread*)
|
wenzelm@26098
|
146 |
val system_thread = Thread.fork (fn () =>
|
wenzelm@26098
|
147 |
let
|
wenzelm@26098
|
148 |
val status =
|
wenzelm@26098
|
149 |
OS.Process.system ("perl -w \"$ISABELLE_HOME/lib/scripts/system.pl\" group " ^
|
wenzelm@26098
|
150 |
script_name ^ " " ^ pid_name ^ " " ^ output_name);
|
wenzelm@26098
|
151 |
val res =
|
wenzelm@26098
|
152 |
(case Posix.Process.fromStatus status of
|
wenzelm@26098
|
153 |
Posix.Process.W_EXITED => Result 0
|
wenzelm@26098
|
154 |
| Posix.Process.W_EXITSTATUS 0wx82 => Signal
|
wenzelm@26098
|
155 |
| Posix.Process.W_EXITSTATUS w => Result (Word8.toInt w)
|
wenzelm@26098
|
156 |
| Posix.Process.W_SIGNALED s =>
|
wenzelm@26098
|
157 |
if s = Posix.Signal.int then Signal
|
wenzelm@26098
|
158 |
else Result (256 + LargeWord.toInt (Posix.Signal.toWord s))
|
wenzelm@26098
|
159 |
| Posix.Process.W_STOPPED s => Result (512 + LargeWord.toInt (Posix.Signal.toWord s)));
|
wenzelm@28398
|
160 |
in set_result res end handle _ (*sic*) => set_result (Result 2), []);
|
wenzelm@26098
|
161 |
|
wenzelm@26098
|
162 |
(*main thread -- proxy for interrupts*)
|
wenzelm@26098
|
163 |
fun kill n =
|
wenzelm@26098
|
164 |
(case Int.fromString (read_file pid_name) of
|
wenzelm@26098
|
165 |
SOME pid =>
|
wenzelm@26098
|
166 |
Posix.Process.kill
|
wenzelm@26098
|
167 |
(Posix.Process.K_GROUP (Posix.Process.wordToPid (LargeWord.fromInt pid)),
|
wenzelm@26098
|
168 |
Posix.Signal.int)
|
wenzelm@26098
|
169 |
| NONE => ())
|
wenzelm@26098
|
170 |
handle OS.SysErr _ => () | IO.Io _ =>
|
wenzelm@26098
|
171 |
(OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ());
|
wenzelm@26098
|
172 |
|
wenzelm@26098
|
173 |
val _ = while ! result = Wait do
|
wenzelm@26098
|
174 |
restore_attributes (fn () =>
|
wenzelm@26098
|
175 |
(ConditionVar.waitUntil (result_cond, result_mutex, Time.now () + Time.fromMilliseconds 100); ())
|
wenzelm@28443
|
176 |
handle Exn.Interrupt => kill 10) ();
|
wenzelm@26098
|
177 |
|
wenzelm@26098
|
178 |
(*cleanup*)
|
wenzelm@26098
|
179 |
val output = read_file output_name handle IO.Io _ => "";
|
wenzelm@26098
|
180 |
val _ = OS.FileSys.remove script_name handle OS.SysErr _ => ();
|
wenzelm@26098
|
181 |
val _ = OS.FileSys.remove pid_name handle OS.SysErr _ => ();
|
wenzelm@26098
|
182 |
val _ = OS.FileSys.remove output_name handle OS.SysErr _ => ();
|
wenzelm@26098
|
183 |
val _ = Thread.interrupt system_thread handle Thread _ => ();
|
wenzelm@28443
|
184 |
val rc = (case ! result of Signal => raise Exn.Interrupt | Result rc => rc);
|
wenzelm@26098
|
185 |
in (output, rc) end) ();
|
wenzelm@26098
|
186 |
|
wenzelm@26098
|
187 |
|
wenzelm@23961
|
188 |
(* critical section -- may be nested within the same thread *)
|
wenzelm@23961
|
189 |
|
wenzelm@23961
|
190 |
local
|
wenzelm@23961
|
191 |
|
wenzelm@24063
|
192 |
val critical_lock = Mutex.mutex ();
|
wenzelm@24063
|
193 |
val critical_thread = ref (NONE: Thread.thread option);
|
wenzelm@24063
|
194 |
val critical_name = ref "";
|
wenzelm@24063
|
195 |
|
wenzelm@23961
|
196 |
in
|
wenzelm@23961
|
197 |
|
wenzelm@23961
|
198 |
fun self_critical () =
|
wenzelm@23961
|
199 |
(case ! critical_thread of
|
wenzelm@23961
|
200 |
NONE => false
|
wenzelm@28150
|
201 |
| SOME t => Thread.equal (t, Thread.self ()));
|
wenzelm@23961
|
202 |
|
wenzelm@23991
|
203 |
fun NAMED_CRITICAL name e =
|
wenzelm@23961
|
204 |
if self_critical () then e ()
|
wenzelm@23961
|
205 |
else
|
wenzelm@26083
|
206 |
uninterruptible (fn restore_attributes => fn () =>
|
wenzelm@24208
|
207 |
let
|
wenzelm@24208
|
208 |
val name' = ! critical_name;
|
wenzelm@24208
|
209 |
val _ =
|
wenzelm@24208
|
210 |
if Mutex.trylock critical_lock then ()
|
wenzelm@24208
|
211 |
else
|
wenzelm@24208
|
212 |
let
|
wenzelm@24208
|
213 |
val timer = Timer.startRealTimer ();
|
wenzelm@24208
|
214 |
val _ = tracing 4 (fn () => "CRITICAL" ^ show name ^ show' name' ^ ": waiting");
|
wenzelm@24208
|
215 |
val _ = Mutex.lock critical_lock;
|
wenzelm@24208
|
216 |
val time = Timer.checkRealTimer timer;
|
wenzelm@26493
|
217 |
val trace_time =
|
wenzelm@26493
|
218 |
if Time.>= (time, Time.fromMilliseconds 1000) then 1
|
wenzelm@26493
|
219 |
else if Time.>= (time, Time.fromMilliseconds 100) then 2
|
wenzelm@26493
|
220 |
else if Time.>= (time, Time.fromMilliseconds 10) then 3 else 4;
|
wenzelm@26493
|
221 |
val _ = tracing trace_time (fn () =>
|
wenzelm@24208
|
222 |
"CRITICAL" ^ show name ^ show' name' ^ ": passed after " ^ Time.toString time);
|
wenzelm@24208
|
223 |
in () end;
|
wenzelm@24208
|
224 |
val _ = critical_thread := SOME (Thread.self ());
|
wenzelm@24208
|
225 |
val _ = critical_name := name;
|
wenzelm@26083
|
226 |
val result = Exn.capture (restore_attributes e) ();
|
wenzelm@24208
|
227 |
val _ = critical_name := "";
|
wenzelm@24208
|
228 |
val _ = critical_thread := NONE;
|
wenzelm@24208
|
229 |
val _ = Mutex.unlock critical_lock;
|
wenzelm@24208
|
230 |
in Exn.release result end) ();
|
wenzelm@23961
|
231 |
|
wenzelm@23991
|
232 |
fun CRITICAL e = NAMED_CRITICAL "" e;
|
wenzelm@23981
|
233 |
|
wenzelm@23961
|
234 |
end;
|
wenzelm@23961
|
235 |
|
wenzelm@23973
|
236 |
|
wenzelm@25704
|
237 |
(* serial numbers *)
|
wenzelm@25704
|
238 |
|
wenzelm@25704
|
239 |
local
|
wenzelm@25704
|
240 |
|
wenzelm@25704
|
241 |
val serial_lock = Mutex.mutex ();
|
wenzelm@25704
|
242 |
val serial_count = ref 0;
|
wenzelm@25704
|
243 |
|
wenzelm@25704
|
244 |
in
|
wenzelm@25704
|
245 |
|
wenzelm@25704
|
246 |
val serial = uninterruptible (fn _ => fn () =>
|
wenzelm@25704
|
247 |
let
|
wenzelm@25704
|
248 |
val _ = Mutex.lock serial_lock;
|
wenzelm@28124
|
249 |
val _ = serial_count := ! serial_count + 1;
|
wenzelm@28124
|
250 |
val res = ! serial_count;
|
wenzelm@25704
|
251 |
val _ = Mutex.unlock serial_lock;
|
wenzelm@25704
|
252 |
in res end);
|
wenzelm@25704
|
253 |
|
wenzelm@23961
|
254 |
end;
|
wenzelm@23961
|
255 |
|
wenzelm@25704
|
256 |
end;
|
wenzelm@24688
|
257 |
|
wenzelm@25704
|
258 |
structure BasicMultithreading: BASIC_MULTITHREADING = Multithreading;
|
wenzelm@25704
|
259 |
open BasicMultithreading;
|