Scheduling -- multiple threads working on a queue of tasks.
formerly in ML-Systems/multithreading_polyml.ML;
simplified -- less tracing;
use regular Isabelle/ML functions instead of NJ stuff;
1.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
1.2 +++ b/src/Pure/Concurrent/schedule.ML Thu Sep 04 16:03:43 2008 +0200
1.3 @@ -0,0 +1,92 @@
1.4 +(* Title: Pure/Concurrent/schedule.ML
1.5 + ID: $Id$
1.6 +
1.7 +Scheduling -- multiple threads working on a queue of tasks.
1.8 +*)
1.9 +
1.10 +signature SCHEDULE =
1.11 +sig
1.12 + datatype 'a task =
1.13 + Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
1.14 + val schedule: int -> ('a -> 'a task * 'a) -> 'a -> exn list
1.15 +end;
1.16 +
1.17 +structure Schedule: SCHEDULE =
1.18 +struct
1.19 +
1.20 +datatype 'a task =
1.21 + Task of {body: unit -> unit, cont: 'a -> 'a, fail: 'a -> 'a} | Wait | Terminate;
1.22 +
1.23 +fun schedule n next_task = uninterruptible (fn restore_attributes => fn tasks =>
1.24 + let
1.25 + (*protected execution*)
1.26 + val lock = Mutex.mutex ();
1.27 + fun PROTECTED e =
1.28 + let
1.29 + val _ = Mutex.lock lock;
1.30 + val res = Exn.capture e ();
1.31 + val _ = Mutex.unlock lock;
1.32 + in Exn.release res end;
1.33 +
1.34 + (*wakeup condition*)
1.35 + val wakeup = ConditionVar.conditionVar ();
1.36 + fun wakeup_all () = ConditionVar.broadcast wakeup;
1.37 + fun wait () = ConditionVar.wait (wakeup, lock);
1.38 + fun wait_timeout () =
1.39 + ConditionVar.waitUntil (wakeup, lock, Time.+ (Time.now (), Time.fromSeconds 1));
1.40 +
1.41 + (*queue of tasks*)
1.42 + val queue = ref tasks;
1.43 + val active = ref 0;
1.44 + fun trace_active () = Multithreading.tracing 1 (fn () =>
1.45 + "SCHEDULE: " ^ string_of_int (! active) ^ " active");
1.46 + fun dequeue () =
1.47 + let
1.48 + val (next, tasks') = next_task (! queue);
1.49 + val _ = queue := tasks';
1.50 + in
1.51 + (case next of Wait =>
1.52 + (dec active; trace_active ();
1.53 + wait ();
1.54 + inc active; trace_active ();
1.55 + dequeue ())
1.56 + | _ => next)
1.57 + end;
1.58 +
1.59 + (*pool of running threads*)
1.60 + val status = ref ([]: exn list);
1.61 + val running = ref ([]: Thread.thread list);
1.62 + fun start f =
1.63 + (inc active;
1.64 + change running (cons (Thread.fork (f, [Thread.InterruptState Thread.InterruptDefer]))));
1.65 + fun stop () =
1.66 + (dec active;
1.67 + change running (filter (fn t => not (Thread.equal (t, Thread.self ())))));
1.68 +
1.69 + (*worker thread*)
1.70 + fun worker () =
1.71 + (case PROTECTED dequeue of
1.72 + Task {body, cont, fail} =>
1.73 + (case Exn.capture (restore_attributes body) () of
1.74 + Exn.Result () =>
1.75 + (PROTECTED (fn () => (change queue cont; wakeup_all ())); worker ())
1.76 + | Exn.Exn exn =>
1.77 + PROTECTED (fn () =>
1.78 + (change status (cons exn); change queue fail; stop (); wakeup_all ())))
1.79 + | Terminate => PROTECTED (fn () => (stop (); wakeup_all ())));
1.80 +
1.81 + (*main control: fork and wait*)
1.82 + fun fork 0 = ()
1.83 + | fork k = (start worker; fork (k - 1));
1.84 + val _ = PROTECTED (fn () =>
1.85 + (fork (Int.max (n, 1));
1.86 + while not (null (! running)) do
1.87 + (trace_active ();
1.88 + if not (null (! status))
1.89 + then (List.app (fn t => Thread.interrupt t handle Thread _ => ()) (! running))
1.90 + else ();
1.91 + wait_timeout ())));
1.92 +
1.93 + in ! status end);
1.94 +
1.95 +end;