Scheduling -- multiple threads working on a queue of tasks.
authorwenzelm
Thu, 04 Sep 2008 16:03:43 +0200
changeset 281212303b4c53d3a
parent 28120 dd4297f5b495
child 28122 3d099ce624e7
Scheduling -- multiple threads working on a queue of tasks.
formerly in ML-Systems/multithreading_polyml.ML;
simplified -- less tracing;
use regular Isabelle/ML functions instead of NJ stuff;
src/Pure/Concurrent/schedule.ML
     1.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     1.2 +++ b/src/Pure/Concurrent/schedule.ML	Thu Sep 04 16:03:43 2008 +0200
     1.3 @@ -0,0 +1,92 @@
     1.4 +(*  Title:      Pure/Concurrent/schedule.ML
     1.5 +    ID:         $Id$
     1.6 +
     1.7 +Scheduling -- multiple threads working on a queue of tasks.
     1.8 +*)
     1.9 +
    1.10 +signature SCHEDULE =
    1.11 +sig
    1.12 +  datatype 'a task =
    1.13 +    Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
    1.14 +  val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
    1.15 +end;
    1.16 +
    1.17 +structure Schedule: SCHEDULE =
    1.18 +struct
    1.19 +
    1.20 +datatype 'a task =
    1.21 +  Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
    1.22 +
    1.23 +fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
    1.24 +  let
    1.25 +    (*protected execution*)
    1.26 +    val lock = Mutex.mutex ();
    1.27 +    fun PROTECTED e =
    1.28 +      let
    1.29 +        val _ = Mutex.lock lock;
    1.30 +        val res = Exn.capture e ();
    1.31 +        val _ = Mutex.unlock lock;
    1.32 +      in Exn.release res end;
    1.33 +
    1.34 +    (*wakeup condition*)
    1.35 +    val wakeup = ConditionVar.conditionVar ();
    1.36 +    fun wakeup_all () = ConditionVar.broadcast wakeup;
    1.37 +    fun wait () = ConditionVar.wait (wakeup, lock);
    1.38 +    fun wait_timeout () =
    1.39 +      ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));
    1.40 +
    1.41 +    (*queue of tasks*)
    1.42 +    val queue = ref tasks;
    1.43 +    val active = ref 0;
    1.44 +    fun trace_active () = Multithreading.tracing 1 (fn () =>
    1.45 +      "SCHEDULE: " ^ string_of_int (! active) ^ " active");
    1.46 +    fun dequeue () =
    1.47 +      let
    1.48 +        val (next, tasks') = next_task (! queue);
    1.49 +        val _ = queue := tasks';
    1.50 +      in
    1.51 +        (case next of Wait =>
    1.52 +          (dec active; trace_active ();
    1.53 +            wait ();
    1.54 +            inc active; trace_active ();
    1.55 +            dequeue ())
    1.56 +        | _ => next)
    1.57 +      end;
    1.58 +
    1.59 +    (*pool of running threads*)
    1.60 +    val status = ref ([]: exn list);
    1.61 +    val running = ref ([]: Thread.thread list);
    1.62 +    fun start f =
    1.63 +      (inc active;
    1.64 +       change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer]))));
    1.65 +    fun stop () =
    1.66 +      (dec active;
    1.67 +       change running (filter (fn t => not (Thread.equal (t, Thread.self ())))));
    1.68 +
    1.69 +    (*worker thread*)
    1.70 +    fun worker () =
    1.71 +      (case PROTECTED dequeue of
    1.72 +        Task {body, cont, fail} =>
    1.73 +          (case Exn.capture (restore_attributes body) () of
    1.74 +            Exn.Result () =>
    1.75 +              (PROTECTED (fn () => (change queue cont; wakeup_all ())); worker ())
    1.76 +          | Exn.Exn exn =>
    1.77 +              PROTECTED (fn () =>
    1.78 +                (change status (cons exn); change queue fail; stop (); wakeup_all ())))
    1.79 +      | Terminate => PROTECTED (fn () => (stop (); wakeup_all ())));
    1.80 +
    1.81 +    (*main control: fork and wait*)
    1.82 +    fun fork 0 = ()
    1.83 +      | fork k = (start worker; fork (k - 1));
    1.84 +    val _ = PROTECTED (fn () =>
    1.85 +     (fork (Int.max (n, 1));
    1.86 +      while not (null (! running)) do
    1.87 +      (trace_active ();
    1.88 +       if not (null (! status))
    1.89 +       then (List.app (fn t => Thread.interrupt t handle Thread _ => ()) (! running))
    1.90 +       else ();
    1.91 +       wait_timeout ())));
    1.92 +
    1.93 +  in ! status end);
    1.94 +
    1.95 +end;