renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
authorwenzelm
Sat, 01 Aug 2009 00:09:45 +0200
changeset 32298400cc493d466
parent 32294 d00238af17b6
child 32299 e6a8ed8aed3a
renamed Multithreading.regular_interrupts to Multithreading.public_interrupts;
renamed Multithreading.restricted_interrupts to Multithreading.private_interrupts;
added Multithreading.sync_interrupts;
Multithreading.sync_wait: more careful treatment of attributes;
Multithreading.tracing: uninterruptible;
Multithreading.system_out: signal within critical region, more careful sync_wait;
eliminated redundant Thread.testInterrupt;
Future.wait_timeout: uniform Multithreading.sync_wait;
future scheduler: interruptible body (sync!), to improve reactivity;
future_job: reject duplicate assignments -- system error;
misc tuning;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/simple_thread.ML
src/Pure/Concurrent/synchronized.ML
src/Pure/ML-Systems/multithreading.ML
src/Pure/ML-Systems/multithreading_polyml.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Thu Jul 30 23:50:11 2009 +0200
     1.2 +++ b/src/Pure/Concurrent/future.ML	Sat Aug 01 00:09:45 2009 +0200
     1.3 @@ -120,11 +120,10 @@
     1.4  fun SYNCHRONIZED name = SimpleThread.synchronized name lock;
     1.5  
     1.6  fun wait cond = (*requires SYNCHRONIZED*)
     1.7 -  Multithreading.sync_wait NONE cond lock;
     1.8 +  Multithreading.sync_wait NONE NONE cond lock;
     1.9  
    1.10 -fun wait_interruptible timeout cond = (*requires SYNCHRONIZED*)
    1.11 -  interruptible (fn () =>
    1.12 -    ignore (Multithreading.sync_wait (SOME (Time.+ (Time.now (), timeout))) cond lock)) ();
    1.13 +fun wait_timeout timeout cond = (*requires SYNCHRONIZED*)
    1.14 +  Multithreading.sync_wait NONE (SOME (Time.+ (Time.now (), timeout))) cond lock;
    1.15  
    1.16  fun signal cond = (*requires SYNCHRONIZED*)
    1.17    ConditionVar.signal cond;
    1.18 @@ -149,11 +148,11 @@
    1.19          val res =
    1.20            if ok then
    1.21              Exn.capture (fn () =>
    1.22 -             (Thread.testInterrupt ();
    1.23 -              Multithreading.with_attributes Multithreading.restricted_interrupts
    1.24 -                (fn _ => fn () => e ())) ()) ()
    1.25 +              Multithreading.with_attributes Multithreading.private_interrupts (fn _ => e ())) ()
    1.26            else Exn.Exn Exn.Interrupt;
    1.27 -        val _ = Synchronized.change result (K (SOME res));
    1.28 +        val _ = Synchronized.change result
    1.29 +          (fn NONE => SOME res
    1.30 +            | SOME _ => raise Fail "Duplicate assignment of future value");
    1.31        in
    1.32          (case res of
    1.33            Exn.Exn exn => (Task_Queue.cancel_group group exn; false)
    1.34 @@ -276,20 +275,24 @@
    1.35          broadcast_work ());
    1.36  
    1.37      (*delay loop*)
    1.38 -    val _ = wait_interruptible next_round scheduler_event
    1.39 -      handle Exn.Interrupt =>
    1.40 -        (Multithreading.tracing 1 (fn () => "Interrupt");
    1.41 -          List.app do_cancel (Task_Queue.cancel_all (! queue)));
    1.42 +    val _ = Exn.release (wait_timeout next_round scheduler_event);
    1.43  
    1.44      (*shutdown*)
    1.45      val _ = if Task_Queue.is_empty (! queue) then do_shutdown := true else ();
    1.46      val continue = not (! do_shutdown andalso null (! workers));
    1.47      val _ = if continue then () else scheduler := NONE;
    1.48      val _ = broadcast scheduler_event;
    1.49 -  in continue end;
    1.50 +  in continue end
    1.51 +  handle Exn.Interrupt =>
    1.52 +   (Multithreading.tracing 1 (fn () => "Interrupt");
    1.53 +    List.app do_cancel (Task_Queue.cancel_all (! queue));
    1.54 +    scheduler_next ());
    1.55 +
    1.56  
    1.57  fun scheduler_loop () =
    1.58 -  while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ();
    1.59 +  Multithreading.with_attributes
    1.60 +    (Multithreading.sync_interrupts Multithreading.public_interrupts)
    1.61 +    (fn _ => while SYNCHRONIZED "scheduler" (fn () => scheduler_next ()) do ());
    1.62  
    1.63  fun scheduler_active () = (*requires SYNCHRONIZED*)
    1.64    (case ! scheduler of NONE => false | SOME thread => Thread.isActive thread);
    1.65 @@ -393,12 +396,11 @@
    1.66  
    1.67  fun interruptible_task f x =
    1.68    if Multithreading.available then
    1.69 -   (Thread.testInterrupt ();
    1.70      Multithreading.with_attributes
    1.71        (if is_worker ()
    1.72 -       then Multithreading.restricted_interrupts
    1.73 -       else Multithreading.regular_interrupts)
    1.74 -      (fn _ => fn x => f x) x)
    1.75 +       then Multithreading.private_interrupts
    1.76 +       else Multithreading.public_interrupts)
    1.77 +      (fn _ => f x)
    1.78    else interruptible f x;
    1.79  
    1.80  (*cancel: present and future group members will be interrupted eventually*)
     2.1 --- a/src/Pure/Concurrent/simple_thread.ML	Thu Jul 30 23:50:11 2009 +0200
     2.2 +++ b/src/Pure/Concurrent/simple_thread.ML	Sat Aug 01 00:09:45 2009 +0200
     2.3 @@ -16,7 +16,7 @@
     2.4  
     2.5  fun fork interrupts body =
     2.6    Thread.fork (fn () => exception_trace (fn () => body ()),
     2.7 -    if interrupts then Multithreading.regular_interrupts else Multithreading.no_interrupts);
     2.8 +    if interrupts then Multithreading.public_interrupts else Multithreading.no_interrupts);
     2.9  
    2.10  fun interrupt thread = Thread.interrupt thread handle Thread _ => ();
    2.11  
     3.1 --- a/src/Pure/Concurrent/synchronized.ML	Thu Jul 30 23:50:11 2009 +0200
     3.2 +++ b/src/Pure/Concurrent/synchronized.ML	Sat Aug 01 00:09:45 2009 +0200
     3.3 @@ -48,9 +48,10 @@
     3.4            (case f x of
     3.5              SOME (y, x') => (var := x'; SOME y)
     3.6            | NONE =>
     3.7 -              if Multithreading.sync_wait (time_limit x) cond lock
     3.8 -              then try_change ()
     3.9 -              else NONE)
    3.10 +              (case Multithreading.sync_wait NONE (time_limit x) cond lock of
    3.11 +                Exn.Result true => try_change ()
    3.12 +              | Exn.Result false => NONE
    3.13 +              | Exn.Exn exn => reraise exn))
    3.14          end;
    3.15        val res = try_change ();
    3.16        val _ = ConditionVar.broadcast cond;
     4.1 --- a/src/Pure/ML-Systems/multithreading.ML	Thu Jul 30 23:50:11 2009 +0200
     4.2 +++ b/src/Pure/ML-Systems/multithreading.ML	Sat Aug 01 00:09:45 2009 +0200
     4.3 @@ -13,20 +13,21 @@
     4.4  signature MULTITHREADING =
     4.5  sig
     4.6    include BASIC_MULTITHREADING
     4.7 -  val trace: int ref
     4.8 -  val tracing: int -> (unit -> string) -> unit
     4.9 -  val tracing_time: bool -> Time.time -> (unit -> string) -> unit
    4.10 -  val real_time: ('a -> unit) -> 'a -> Time.time
    4.11    val available: bool
    4.12    val max_threads: int ref
    4.13    val max_threads_value: unit -> int
    4.14    val enabled: unit -> bool
    4.15    val no_interrupts: Thread.threadAttribute list
    4.16 -  val regular_interrupts: Thread.threadAttribute list
    4.17 -  val restricted_interrupts: Thread.threadAttribute list
    4.18 -  val with_attributes: Thread.threadAttribute list ->
    4.19 -    (Thread.threadAttribute list -> 'a -> 'b) -> 'a -> 'b
    4.20 -  val sync_wait: Time.time option -> ConditionVar.conditionVar -> Mutex.mutex -> bool
    4.21 +  val public_interrupts: Thread.threadAttribute list
    4.22 +  val private_interrupts: Thread.threadAttribute list
    4.23 +  val sync_interrupts: Thread.threadAttribute list -> Thread.threadAttribute list
    4.24 +  val with_attributes: Thread.threadAttribute list -> (Thread.threadAttribute list -> 'a) -> 'a
    4.25 +  val sync_wait: Thread.threadAttribute list option -> Time.time option ->
    4.26 +    ConditionVar.conditionVar -> Mutex.mutex -> bool Exn.result
    4.27 +  val trace: int ref
    4.28 +  val tracing: int -> (unit -> string) -> unit
    4.29 +  val tracing_time: bool -> Time.time -> (unit -> string) -> unit
    4.30 +  val real_time: ('a -> unit) -> 'a -> Time.time
    4.31    val self_critical: unit -> bool
    4.32    val serial: unit -> int
    4.33  end;
    4.34 @@ -34,14 +35,6 @@
    4.35  structure Multithreading: MULTITHREADING =
    4.36  struct
    4.37  
    4.38 -(* tracing *)
    4.39 -
    4.40 -val trace = ref (0: int);
    4.41 -fun tracing _ _ = ();
    4.42 -fun tracing_time _ _ _ = ();
    4.43 -fun real_time f x = (f x; Time.zeroTime);
    4.44 -
    4.45 -
    4.46  (* options *)
    4.47  
    4.48  val available = false;
    4.49 @@ -52,18 +45,22 @@
    4.50  
    4.51  (* attributes *)
    4.52  
    4.53 -val no_interrupts =
    4.54 -  [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
    4.55 +val no_interrupts = [];
    4.56 +val public_interrupts = [];
    4.57 +val private_interrupts = [];
    4.58 +fun sync_interrupts _ = [];
    4.59  
    4.60 -val regular_interrupts =
    4.61 -  [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
    4.62 +fun with_attributes _ e = e [];
    4.63  
    4.64 -val restricted_interrupts =
    4.65 -  [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
    4.66 +fun sync_wait _ _ _ _ = Exn.Result true;
    4.67  
    4.68 -fun with_attributes _ f x = f [] x;
    4.69  
    4.70 -fun sync_wait _ _ _ = false;
    4.71 +(* tracing *)
    4.72 +
    4.73 +val trace = ref (0: int);
    4.74 +fun tracing _ _ = ();
    4.75 +fun tracing_time _ _ _ = ();
    4.76 +fun real_time f x = (f x; Time.zeroTime);
    4.77  
    4.78  
    4.79  (* critical section *)
     5.1 --- a/src/Pure/ML-Systems/multithreading_polyml.ML	Thu Jul 30 23:50:11 2009 +0200
     5.2 +++ b/src/Pure/ML-Systems/multithreading_polyml.ML	Sat Aug 01 00:09:45 2009 +0200
     5.3 @@ -27,31 +27,6 @@
     5.4  structure Multithreading: MULTITHREADING =
     5.5  struct
     5.6  
     5.7 -(* tracing *)
     5.8 -
     5.9 -val trace = ref 0;
    5.10 -
    5.11 -fun tracing level msg =
    5.12 -  if level > ! trace then ()
    5.13 -  else (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
    5.14 -    handle _ (*sic*) => ();
    5.15 -
    5.16 -fun tracing_time detailed time =
    5.17 -  tracing
    5.18 -   (if not detailed then 5
    5.19 -    else if Time.>= (time, Time.fromMilliseconds 1000) then 1
    5.20 -    else if Time.>= (time, Time.fromMilliseconds 100) then 2
    5.21 -    else if Time.>= (time, Time.fromMilliseconds 10) then 3
    5.22 -    else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5);
    5.23 -
    5.24 -fun real_time f x =
    5.25 -  let
    5.26 -    val timer = Timer.startRealTimer ();
    5.27 -    val () = f x;
    5.28 -    val time = Timer.checkRealTimer timer;
    5.29 -  in time end;
    5.30 -
    5.31 -
    5.32  (* options *)
    5.33  
    5.34  val available = true;
    5.35 @@ -91,57 +66,76 @@
    5.36  val no_interrupts =
    5.37    [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptDefer];
    5.38  
    5.39 -val regular_interrupts =
    5.40 +val public_interrupts =
    5.41    [Thread.EnableBroadcastInterrupt true, Thread.InterruptState Thread.InterruptAsynchOnce];
    5.42  
    5.43 -val restricted_interrupts =
    5.44 +val private_interrupts =
    5.45    [Thread.EnableBroadcastInterrupt false, Thread.InterruptState Thread.InterruptAsynchOnce];
    5.46  
    5.47 +val sync_interrupts = map
    5.48 +  (fn x as Thread.InterruptState Thread.InterruptDefer => x
    5.49 +    | Thread.InterruptState _ => Thread.InterruptState Thread.InterruptSynch
    5.50 +    | x => x);
    5.51 +
    5.52  val safe_interrupts = map
    5.53    (fn Thread.InterruptState Thread.InterruptAsynch =>
    5.54        Thread.InterruptState Thread.InterruptAsynchOnce
    5.55      | x => x);
    5.56  
    5.57 -fun with_attributes new_atts f x =
    5.58 +fun with_attributes new_atts e =
    5.59    let
    5.60      val orig_atts = safe_interrupts (Thread.getAttributes ());
    5.61      val result = Exn.capture (fn () =>
    5.62 -      (Thread.setAttributes (safe_interrupts new_atts); f orig_atts x)) ();
    5.63 +      (Thread.setAttributes (safe_interrupts new_atts); e orig_atts)) ();
    5.64      val _ = Thread.setAttributes orig_atts;
    5.65    in Exn.release result end;
    5.66  
    5.67  
    5.68 -(* regular interruptibility *)
    5.69 +(* portable wrappers *)
    5.70  
    5.71 -fun interruptible f x =
    5.72 -  (Thread.testInterrupt (); with_attributes regular_interrupts (fn _ => fn x => f x) x);
    5.73 +fun interruptible f x = with_attributes public_interrupts (fn _ => f x);
    5.74  
    5.75 -fun uninterruptible f =
    5.76 -  with_attributes no_interrupts (fn atts => fn x =>
    5.77 -    f (fn g => with_attributes atts (fn _ => fn y => g y)) x);
    5.78 +fun uninterruptible f x =
    5.79 +  with_attributes no_interrupts (fn atts =>
    5.80 +    f (fn g => fn y => with_attributes atts (fn _ => g y)) x);
    5.81  
    5.82  
    5.83  (* synchronous wait *)
    5.84  
    5.85 -fun sync_attributes e =
    5.86 +fun sync_wait opt_atts time cond lock =
    5.87 +  with_attributes
    5.88 +    (sync_interrupts (case opt_atts of SOME atts => atts | NONE => Thread.getAttributes ()))
    5.89 +    (fn _ =>
    5.90 +      (case time of
    5.91 +        SOME t => Exn.Result (ConditionVar.waitUntil (cond, lock, t))
    5.92 +      | NONE => (ConditionVar.wait (cond, lock); Exn.Result true))
    5.93 +      handle exn => Exn.Exn exn);
    5.94 +
    5.95 +
    5.96 +(* tracing *)
    5.97 +
    5.98 +val trace = ref 0;
    5.99 +
   5.100 +fun tracing level msg =
   5.101 +  if level > ! trace then ()
   5.102 +  else uninterruptible (fn _ => fn () =>
   5.103 +    (TextIO.output (TextIO.stdErr, (">>> " ^ msg () ^ "\n")); TextIO.flushOut TextIO.stdErr)
   5.104 +      handle _ (*sic*) => ()) ();
   5.105 +
   5.106 +fun tracing_time detailed time =
   5.107 +  tracing
   5.108 +   (if not detailed then 5
   5.109 +    else if Time.>= (time, Time.fromMilliseconds 1000) then 1
   5.110 +    else if Time.>= (time, Time.fromMilliseconds 100) then 2
   5.111 +    else if Time.>= (time, Time.fromMilliseconds 10) then 3
   5.112 +    else if Time.>= (time, Time.fromMilliseconds 1) then 4 else 5);
   5.113 +
   5.114 +fun real_time f x =
   5.115    let
   5.116 -    val orig_atts = Thread.getAttributes ();
   5.117 -    val broadcast =
   5.118 -      (case List.find (fn Thread.EnableBroadcastInterrupt _ => true | _ => false) orig_atts of
   5.119 -        NONE => Thread.EnableBroadcastInterrupt false
   5.120 -      | SOME att => att);
   5.121 -    val interrupt_state =
   5.122 -      (case List.find (fn Thread.InterruptState _ => true | _ => false) orig_atts of
   5.123 -        NONE => Thread.InterruptState Thread.InterruptDefer
   5.124 -      | SOME (state as Thread.InterruptState Thread.InterruptDefer) => state
   5.125 -      | _ => Thread.InterruptState Thread.InterruptSynch);
   5.126 -  in with_attributes [broadcast, interrupt_state] (fn _ => fn () => e ()) () end;
   5.127 -
   5.128 -fun sync_wait time cond lock =
   5.129 -  sync_attributes (fn () =>
   5.130 -    (case time of
   5.131 -      SOME t => ConditionVar.waitUntil (cond, lock, t)
   5.132 -    | NONE => (ConditionVar.wait (cond, lock); true)));
   5.133 +    val timer = Timer.startRealTimer ();
   5.134 +    val () = f x;
   5.135 +    val time = Timer.checkRealTimer timer;
   5.136 +  in time end;
   5.137  
   5.138  
   5.139  (* execution with time limit *)
   5.140 @@ -169,7 +163,7 @@
   5.141  
   5.142  (* system shell processes, with propagation of interrupts *)
   5.143  
   5.144 -fun system_out script = uninterruptible (fn restore_attributes => fn () =>
   5.145 +fun system_out script = with_attributes no_interrupts (fn orig_atts =>
   5.146    let
   5.147      val script_name = OS.FileSys.tmpName ();
   5.148      val _ = write_file script_name script;
   5.149 @@ -180,13 +174,12 @@
   5.150      (*result state*)
   5.151      datatype result = Wait | Signal | Result of int;
   5.152      val result = ref Wait;
   5.153 -    val result_mutex = Mutex.mutex ();
   5.154 -    val result_cond = ConditionVar.conditionVar ();
   5.155 +    val lock = Mutex.mutex ();
   5.156 +    val cond = ConditionVar.conditionVar ();
   5.157      fun set_result res =
   5.158 -      (Mutex.lock result_mutex; result := res; Mutex.unlock result_mutex;
   5.159 -        ConditionVar.signal result_cond);
   5.160 +      (Mutex.lock lock; result := res; ConditionVar.signal cond; Mutex.unlock lock);
   5.161  
   5.162 -    val _ = Mutex.lock result_mutex;
   5.163 +    val _ = Mutex.lock lock;
   5.164  
   5.165      (*system thread*)
   5.166      val system_thread = Thread.fork (fn () =>
   5.167 @@ -216,11 +209,12 @@
   5.168        handle OS.SysErr _ => () | IO.Io _ =>
   5.169          (OS.Process.sleep (Time.fromMilliseconds 100); if n > 0 then kill (n - 1) else ());
   5.170  
   5.171 -    val _ = while ! result = Wait do
   5.172 -      restore_attributes (fn () =>
   5.173 -        (ignore (sync_wait (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100)))
   5.174 -            result_cond result_mutex)
   5.175 -          handle Exn.Interrupt => kill 10)) ();
   5.176 +    val _ =
   5.177 +      while ! result = Wait do
   5.178 +        let val res =
   5.179 +          sync_wait (SOME orig_atts)
   5.180 +            (SOME (Time.+ (Time.now (), Time.fromMilliseconds 100))) cond lock
   5.181 +        in case res of Exn.Exn Exn.Interrupt => kill 10 | _ => () end;
   5.182  
   5.183      (*cleanup*)
   5.184      val output = read_file output_name handle IO.Io _ => "";
   5.185 @@ -229,7 +223,7 @@
   5.186      val _ = OS.FileSys.remove output_name handle OS.SysErr _ => ();
   5.187      val _ = Thread.interrupt system_thread handle Thread _ => ();
   5.188      val rc = (case ! result of Signal => raise Exn.Interrupt | Result rc => rc);
   5.189 -  in (output, rc) end) ();
   5.190 +  in (output, rc) end);
   5.191  
   5.192  
   5.193  (* critical section -- may be nested within the same thread *)