1.1 --- a/src/Pure/Concurrent/future.ML Thu Jul 30 23:50:11 2009 +0200
1.2 +++ b/src/Pure/Concurrent/future.ML Sat Aug 01 00:09:45 2009 +0200
1.3 @@ -120,11 +120,10 @@
1.4 fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
1.5
1.6 fun wait cond = (*requires SYNCHRONIZED*)
1.7 - Multithreading.sync_wait NONE cond lock;
1.8 + Multithreading.sync_wait NONE NONE cond lock;
1.9
1.10 -fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*)
1.11 - interruptible (fn () =>
1.12 - ignore (Multithreading.sync_wait (SOME (Time.+ (Time.now (), timeout))) cond lock)) ();
1.13 +fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
1.14 + Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
1.15
1.16 fun signal cond = (*requires SYNCHRONIZED*)
1.17 ConditionVar.signal cond;
1.18 @@ -149,11 +148,11 @@
1.19 val res =
1.20 if ok then
1.21 Exn.capture (fn () =>
1.22 - (Thread.testInterrupt ();
1.23 - Multithreading.with_attributes Multithreading.restricted_interrupts
1.24 - (fn _ => fn () => e ())) ()) ()
1.25 + Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
1.26 else Exn.Exn Exn.Interrupt;
1.27 - val _ = Synchronized.change result (K (SOME res));
1.28 + val _ = Synchronized.change result
1.29 + (fn NONE => SOME res
1.30 + | SOME _ => raise Fail "Duplicate assignment of future value");
1.31 in
1.32 (case res of
1.33 Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
1.34 @@ -276,20 +275,24 @@
1.35 broadcast_work ());
1.36
1.37 (*delay loop*)
1.38 - val _ = wait_interruptible next_round scheduler_event
1.39 - handle Exn.Interrupt =>
1.40 - (Multithreading.tracing 1 (fn () => "Interrupt");
1.41 - List.app do_cancel (Task_Queue.cancel_all (! queue)));
1.42 + val _ = Exn.release (wait_timeout next_round scheduler_event);
1.43
1.44 (*shutdown*)
1.45 val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
1.46 val continue = not (! do_shutdown andalso null (! workers));
1.47 val _ = if continue then () else scheduler := NONE;
1.48 val _ = broadcast scheduler_event;
1.49 - in continue end;
1.50 + in continue end
1.51 + handle Exn.Interrupt =>
1.52 + (Multithreading.tracing 1 (fn () => "Interrupt");
1.53 + List.app do_cancel (Task_Queue.cancel_all (! queue));
1.54 + scheduler_next ());
1.55 +
1.56
1.57 fun scheduler_loop () =
1.58 - while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
1.59 + Multithreading.with_attributes
1.60 + (Multithreading.sync_interrupts Multithreading.public_interrupts)
1.61 + (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
1.62
1.63 fun scheduler_active () = (*requires SYNCHRONIZED*)
1.64 (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
1.65 @@ -393,12 +396,11 @@
1.66
1.67 fun interruptible_task f x =
1.68 if Multithreading.available then
1.69 - (Thread.testInterrupt ();
1.70 Multithreading.with_attributes
1.71 (if is_worker ()
1.72 - then Multithreading.restricted_interrupts
1.73 - else Multithreading.regular_interrupts)
1.74 - (fn _ => fn x => f x) x)
1.75 + then Multithreading.private_interrupts
1.76 + else Multithreading.public_interrupts)
1.77 + (fn _ => f x)
1.78 else interruptible f x;
1.79
1.80 (*cancel: present and future group members will be interrupted eventually*)
2.1 --- a/src/Pure/Concurrent/simple_thread.ML Thu Jul 30 23:50:11 2009 +0200
2.2 +++ b/src/Pure/Concurrent/simple_thread.ML Sat Aug 01 00:09:45 2009 +0200
2.3 @@ -16,7 +16,7 @@
2.4
2.5 fun fork interrupts body =
2.6 Thread.fork (fn () => exception_trace (fn () => body ()),
2.7 - if interrupts then Multithreading.regular_interrupts else Multithreading.no_interrupts);
2.8 + if interrupts then Multithreading.public_interrupts else Multithreading.no_interrupts);
2.9
2.10 fun interrupt thread = Thread.interrupt thread handle Thread _ => ();
2.11
3.1 --- a/src/Pure/Concurrent/synchronized.ML Thu Jul 30 23:50:11 2009 +0200
3.2 +++ b/src/Pure/Concurrent/synchronized.ML Sat Aug 01 00:09:45 2009 +0200
3.3 @@ -48,9 +48,10 @@
3.4 (case f x of
3.5 SOME (y, x') => (var := x'; SOME y)
3.6 | NONE =>
3.7 - if Multithreading.sync_wait (time_limit x) cond lock
3.8 - then try_change ()
3.9 - else NONE)
3.10 + (case Multithreading.sync_wait NONE (time_limit x) cond lock of
3.11 + Exn.Result true => try_change ()
3.12 + | Exn.Result false => NONE
3.13 + | Exn.Exn exn => reraise exn))
3.14 end;
3.15 val res = try_change ();
3.16 val _ = ConditionVar.broadcast cond;
4.1 --- a/src/Pure/ML-Systems/multithreading.ML Thu Jul 30 23:50:11 2009 +0200
4.2 +++ b/src/Pure/ML-Systems/multithreading.ML Sat Aug 01 00:09:45 2009 +0200
4.3 @@ -13,20 +13,21 @@
4.4 signature MULTITHREADING =
4.5 sig
4.6 include BASIC_MULTITHREADING
4.7 - val trace: int ref
4.8 - val tracing: int -> (unit -> string) -> unit
4.9 - val tracing_time: bool -> Time.time -> (unit -> string) -> unit
4.10 - val real_time: ('a -> unit) -> 'a -> Time.time
4.11 val available: bool
4.12 val max_threads: int ref
4.13 val max_threads_value: unit -> int
4.14 val enabled: unit -> bool
4.15 val no_interrupts: Thread.threadAttribute list
4.16 - val regular_interrupts: Thread.threadAttribute list
4.17 - val restricted_interrupts: Thread.threadAttribute list
4.18 - val with_attributes: Thread.threadAttribute list ->
4.19 - (Thread.threadAttribute list -> 'a -> 'b) -> 'a -> 'b
4.20 - val sync_wait: Time.time option -> ConditionVar.conditionVar -> Mutex.mutex -> bool
4.21 + val public_interrupts: Thread.threadAttribute list
4.22 + val private_interrupts: Thread.threadAttribute list
4.23 + val sync_interrupts: Thread.threadAttribute list -> Thread.threadAttribute list
4.24 + val with_attributes: Thread.threadAttribute list -> (Thread.threadAttribute list -> 'a) -> 'a
4.25 + val sync_wait: Thread.threadAttribute list option -> Time.time option ->
4.26 + ConditionVar.conditionVar -> Mutex.mutex -> bool Exn.result
4.27 + val trace: int ref
4.28 + val tracing: int -> (unit -> string) -> unit
4.29 + val tracing_time: bool -> Time.time -> (unit -> string) -> unit
4.30 + val real_time: ('a -> unit) -> 'a -> Time.time
4.31 val self_critical: unit -> bool
4.32 val serial: unit -> int
4.33 end;
4.34 @@ -34,14 +35,6 @@
4.35 structure Multithreading: MULTITHREADING =
4.36 struct
4.37
4.38 -(* tracing *)
4.39 -
4.40 -val trace = ref (0: int);
4.41 -fun tracing _ _ = ();
4.42 -fun tracing_time _ _ _ = ();
4.43 -fun real_time f x = (f x; Time.zeroTime);
4.44 -
4.45 -
4.46 (* options *)
4.47
4.48 val available = false;
4.49 @@ -52,18 +45,22 @@
4.50
4.51 (* attributes *)
4.52
4.53 -val no_interrupts =
4.54 - [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
4.55 +val no_interrupts = [];
4.56 +val public_interrupts = [];
4.57 +val private_interrupts = [];
4.58 +fun sync_interrupts _ = [];
4.59
4.60 -val regular_interrupts =
4.61 - [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
4.62 +fun with_attributes _ e = e [];
4.63
4.64 -val restricted_interrupts =
4.65 - [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
4.66 +fun sync_wait _ _ _ _ = Exn.Result true;
4.67
4.68 -fun with_attributes _ f x = f [] x;
4.69
4.70 -fun sync_wait _ _ _ = false;
4.71 +(* tracing *)
4.72 +
4.73 +val trace = ref (0: int);
4.74 +fun tracing _ _ = ();
4.75 +fun tracing_time _ _ _ = ();
4.76 +fun real_time f x = (f x; Time.zeroTime);
4.77
4.78
4.79 (* critical section *)
5.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML Thu Jul 30 23:50:11 2009 +0200
5.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML Sat Aug 01 00:09:45 2009 +0200
5.3 @@ -27,31 +27,6 @@
5.4 structure Multithreading: MULTITHREADING =
5.5 struct
5.6
5.7 -(* tracing *)
5.8 -
5.9 -val trace = ref 0;
5.10 -
5.11 -fun tracing level msg =
5.12 - if level > ! trace then ()
5.13 - else (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
5.14 - handle _ (*sic*) => ();
5.15 -
5.16 -fun tracing_time detailed time =
5.17 - tracing
5.18 - (if not detailed then 5
5.19 - else if Time.>= (time, Time.fromMilliseconds 1000) then 1
5.20 - else if Time.>= (time, Time.fromMilliseconds 100) then 2
5.21 - else if Time.>= (time, Time.fromMilliseconds 10) then 3
5.22 - else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5);
5.23 -
5.24 -fun real_time f x =
5.25 - let
5.26 - val timer = Timer.startRealTimer ();
5.27 - val () = f x;
5.28 - val time = Timer.checkRealTimer timer;
5.29 - in time end;
5.30 -
5.31 -
5.32 (* options *)
5.33
5.34 val available = true;
5.35 @@ -91,57 +66,76 @@
5.36 val no_interrupts =
5.37 [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
5.38
5.39 -val regular_interrupts =
5.40 +val public_interrupts =
5.41 [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
5.42
5.43 -val restricted_interrupts =
5.44 +val private_interrupts =
5.45 [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
5.46
5.47 +val sync_interrupts = map
5.48 + (fn x as Thread.InterruptState Thread.InterruptDefer => x
5.49 + | Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch
5.50 + | x => x);
5.51 +
5.52 val safe_interrupts = map
5.53 (fn Thread.InterruptState Thread.InterruptAsynch =>
5.54 Thread.InterruptState Thread.InterruptAsynchOnce
5.55 | x => x);
5.56
5.57 -fun with_attributes new_atts f x =
5.58 +fun with_attributes new_atts e =
5.59 let
5.60 val orig_atts = safe_interrupts (Thread.getAttributes ());
5.61 val result = Exn.capture (fn () =>
5.62 - (Thread.setAttributes (safe_interrupts new_atts); f orig_atts x)) ();
5.63 + (Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) ();
5.64 val _ = Thread.setAttributes orig_atts;
5.65 in Exn.release result end;
5.66
5.67
5.68 -(* regular interruptibility *)
5.69 +(* portable wrappers *)
5.70
5.71 -fun interruptible f x =
5.72 - (Thread.testInterrupt (); with_attributes regular_interrupts (fn _ => fn x => f x) x);
5.73 +fun interruptible f x = with_attributes public_interrupts (fn _ => f x);
5.74
5.75 -fun uninterruptible f =
5.76 - with_attributes no_interrupts (fn atts => fn x =>
5.77 - f (fn g => with_attributes atts (fn _ => fn y => g y)) x);
5.78 +fun uninterruptible f x =
5.79 + with_attributes no_interrupts (fn atts =>
5.80 + f (fn g => fn y => with_attributes atts (fn _ => g y)) x);
5.81
5.82
5.83 (* synchronous wait *)
5.84
5.85 -fun sync_attributes e =
5.86 +fun sync_wait opt_atts time cond lock =
5.87 + with_attributes
5.88 + (sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ()))
5.89 + (fn _ =>
5.90 + (case time of
5.91 + SOME t => Exn.Result (ConditionVar.waitUntil (cond, lock, t))
5.92 + | NONE => (ConditionVar.wait (cond, lock); Exn.Result true))
5.93 + handle exn => Exn.Exn exn);
5.94 +
5.95 +
5.96 +(* tracing *)
5.97 +
5.98 +val trace = ref 0;
5.99 +
5.100 +fun tracing level msg =
5.101 + if level > ! trace then ()
5.102 + else uninterruptible (fn _ => fn () =>
5.103 + (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
5.104 + handle _ (*sic*) => ()) ();
5.105 +
5.106 +fun tracing_time detailed time =
5.107 + tracing
5.108 + (if not detailed then 5
5.109 + else if Time.>= (time, Time.fromMilliseconds 1000) then 1
5.110 + else if Time.>= (time, Time.fromMilliseconds 100) then 2
5.111 + else if Time.>= (time, Time.fromMilliseconds 10) then 3
5.112 + else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5);
5.113 +
5.114 +fun real_time f x =
5.115 let
5.116 - val orig_atts = Thread.getAttributes ();
5.117 - val broadcast =
5.118 - (case List.find (fn Thread.EnableBroadcastInterrupt _ => true | _ => false) orig_atts of
5.119 - NONE => Thread.EnableBroadcastInterrupt false
5.120 - | SOME att => att);
5.121 - val interrupt_state =
5.122 - (case List.find (fn Thread.InterruptState _ => true | _ => false) orig_atts of
5.123 - NONE => Thread.InterruptState Thread.InterruptDefer
5.124 - | SOME (state as Thread.InterruptState Thread.InterruptDefer) => state
5.125 - | _ => Thread.InterruptState Thread.InterruptSynch);
5.126 - in with_attributes [broadcast, interrupt_state] (fn _ => fn () => e ()) () end;
5.127 -
5.128 -fun sync_wait time cond lock =
5.129 - sync_attributes (fn () =>
5.130 - (case time of
5.131 - SOME t => ConditionVar.waitUntil (cond, lock, t)
5.132 - | NONE => (ConditionVar.wait (cond, lock); true)));
5.133 + val timer = Timer.startRealTimer ();
5.134 + val () = f x;
5.135 + val time = Timer.checkRealTimer timer;
5.136 + in time end;
5.137
5.138
5.139 (* execution with time limit *)
5.140 @@ -169,7 +163,7 @@
5.141
5.142 (* system shell processes, with propagation of interrupts *)
5.143
5.144 -fun system_out script = uninterruptible (fn restore_attributes => fn () =>
5.145 +fun system_out script = with_attributes no_interrupts (fn orig_atts =>
5.146 let
5.147 val script_name = OS.FileSys.tmpName ();
5.148 val _ = write_file script_name script;
5.149 @@ -180,13 +174,12 @@
5.150 (*result state*)
5.151 datatype result = Wait | Signal | Result of int;
5.152 val result = ref Wait;
5.153 - val result_mutex = Mutex.mutex ();
5.154 - val result_cond = ConditionVar.conditionVar ();
5.155 + val lock = Mutex.mutex ();
5.156 + val cond = ConditionVar.conditionVar ();
5.157 fun set_result res =
5.158 - (Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex;
5.159 - ConditionVar.signal result_cond);
5.160 + (Mutex.lock lock; result := res; ConditionVar.signal cond; Mutex.unlock lock);
5.161
5.162 - val _ = Mutex.lock result_mutex;
5.163 + val _ = Mutex.lock lock;
5.164
5.165 (*system thread*)
5.166 val system_thread = Thread.fork (fn () =>
5.167 @@ -216,11 +209,12 @@
5.168 handle OS.SysErr _ => () | IO.Io _ =>
5.169 (OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ());
5.170
5.171 - val _ = while ! result = Wait do
5.172 - restore_attributes (fn () =>
5.173 - (ignore (sync_wait (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100)))
5.174 - result_cond result_mutex)
5.175 - handle Exn.Interrupt => kill 10)) ();
5.176 + val _ =
5.177 + while ! result = Wait do
5.178 + let val res =
5.179 + sync_wait (SOME orig_atts)
5.180 + (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100))) cond lock
5.181 + in case res of Exn.Exn Exn.Interrupt => kill 10 | _ => () end;
5.182
5.183 (*cleanup*)
5.184 val output = read_file output_name handle IO.Io _ => "";
5.185 @@ -229,7 +223,7 @@
5.186 val _ = OS.FileSys.remove output_name handle OS.SysErr _ => ();
5.187 val _ = Thread.interrupt system_thread handle Thread _ => ();
5.188 val rc = (case ! result of Signal => raise Exn.Interrupt | Result rc => rc);
5.189 - in (output, rc) end) ();
5.190 + in (output, rc) end);
5.191
5.192
5.193 (* critical section -- may be nested within the same thread *)