eliminated slightly odd abstract type Task_Queue.deps;
authorwenzelm
Wed, 02 Feb 2011 20:32:50 +0100
changeset 42566afdbec23b92b
parent 42565 a96d43a54650
child 42567 f69bb9077b02
eliminated slightly odd abstract type Task_Queue.deps;
tuned signature;
tuned;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/task_queue.ML
     1.1 --- a/src/Pure/Concurrent/future.ML	Wed Feb 02 18:22:13 2011 +0100
     1.2 +++ b/src/Pure/Concurrent/future.ML	Wed Feb 02 20:32:50 2011 +0100
     1.3 @@ -440,12 +440,12 @@
     1.4        else res);
     1.5  
     1.6  fun join_next deps = (*requires SYNCHRONIZED*)
     1.7 -  if Task_Queue.finished_deps deps then NONE
     1.8 +  if null deps then NONE
     1.9    else
    1.10      (case Unsynchronized.change_result queue (Task_Queue.dequeue_deps (Thread.self ()) deps) of
    1.11 -      (NONE, deps') =>
    1.12 -        if Task_Queue.finished_deps deps' then NONE
    1.13 -        else (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
    1.14 +      (NONE, []) => NONE
    1.15 +    | (NONE, deps') =>
    1.16 +        (worker_waiting deps' (fn () => worker_wait true work_finished); join_next deps')
    1.17      | (SOME work, deps') => SOME (work, deps'));
    1.18  
    1.19  fun execute_work NONE = ()
    1.20 @@ -461,8 +461,7 @@
    1.21        if forall is_finished xs then ()
    1.22        else if Multithreading.self_critical () then
    1.23          error "Cannot join future values within critical section"
    1.24 -      else if is_some (worker_task ()) then
    1.25 -        join_work (Task_Queue.init_deps (map task_of xs))
    1.26 +      else if is_some (worker_task ()) then join_work (map task_of xs)
    1.27        else List.app (ignore o Single_Assignment.await o result_of) xs;
    1.28    in map get_result xs end;
    1.29  
    1.30 @@ -533,8 +532,8 @@
    1.31                    (Task_Queue.dequeue_passive (Thread.self ()) task));
    1.32            in if still_passive then execute (task, [job]) else () end);
    1.33        val _ =
    1.34 -        worker_waiting (Task_Queue.init_deps [task])
    1.35 -          (fn () => Single_Assignment.await result);
    1.36 +        if is_some (Single_Assignment.peek result) then ()
    1.37 +        else worker_waiting [task] (fn () => ignore (Single_Assignment.await result));
    1.38      in () end;
    1.39  
    1.40  fun fulfill x res = fulfill_result x (Exn.Result res);
     2.1 --- a/src/Pure/Concurrent/task_queue.ML	Wed Feb 02 18:22:13 2011 +0100
     2.2 +++ b/src/Pure/Concurrent/task_queue.ML	Wed Feb 02 20:32:50 2011 +0100
     2.3 @@ -21,6 +21,9 @@
     2.4    val pri_of_task: task -> int
     2.5    val str_of_task: task -> string
     2.6    val timing_of_task: task -> Time.time * Time.time * string list
     2.7 +  val running: task -> (unit -> 'a) -> 'a
     2.8 +  val joining: task -> (unit -> 'a) -> 'a
     2.9 +  val waiting: task -> task list -> (unit -> 'a) -> 'a
    2.10    type queue
    2.11    val empty: queue
    2.12    val all_passive: queue -> bool
    2.13 @@ -34,14 +37,8 @@
    2.14    val extend: task -> (bool -> bool) -> queue -> queue option
    2.15    val dequeue_passive: Thread.thread -> task -> queue -> bool * queue
    2.16    val dequeue: Thread.thread -> queue -> (task * (bool -> bool) list) option * queue
    2.17 -  type deps
    2.18 -  val init_deps: task list -> deps
    2.19 -  val finished_deps: deps -> bool
    2.20 -  val dequeue_deps: Thread.thread -> deps -> queue ->
    2.21 -    (((task * (bool -> bool) list) option * deps) * queue)
    2.22 -  val running: task -> (unit -> 'a) -> 'a
    2.23 -  val joining: task -> (unit -> 'a) -> 'a
    2.24 -  val waiting: task -> deps -> (unit -> 'a) -> 'a
    2.25 +  val dequeue_deps: Thread.thread -> task list -> queue ->
    2.26 +    (((task * (bool -> bool) list) option * task list) * queue)
    2.27  end;
    2.28  
    2.29  structure Task_Queue: TASK_QUEUE =
    2.30 @@ -140,6 +137,19 @@
    2.31  structure Task_Graph = Graph(type key = task val ord = task_ord);
    2.32  
    2.33  
    2.34 +(* timing *)
    2.35 +
    2.36 +fun running task =
    2.37 +  update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
    2.38 +
    2.39 +fun joining task =
    2.40 +  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
    2.41 +
    2.42 +fun waiting task deps =
    2.43 +  update_timing (fn t => fn (a, b, ds) =>
    2.44 +    (Time.- (a, t), Time.+ (b, t), fold (insert (op =) o name_of_task) deps ds)) task;
    2.45 +
    2.46 +
    2.47  
    2.48  (** queue of jobs and groups **)
    2.49  
    2.50 @@ -165,7 +175,7 @@
    2.51  (* queue *)
    2.52  
    2.53  datatype queue = Queue of
    2.54 - {groups: task list Inttab.table,   (*groups with presently known members*)
    2.55 + {groups: task list Inttab.table,   (*presently known group members*)
    2.56    jobs: jobs};                      (*job dependency graph*)
    2.57  
    2.58  fun make_queue groups jobs = Queue {groups = groups, jobs = jobs};
    2.59 @@ -291,15 +301,7 @@
    2.60  
    2.61  (* dequeue wrt. dynamic dependencies *)
    2.62  
    2.63 -abstype deps = Deps of task list
    2.64 -with
    2.65 -
    2.66 -fun init_deps tasks = Deps tasks;
    2.67 -fun finished_deps (Deps tasks) = null tasks;
    2.68 -
    2.69 -fun insert_deps (Deps tasks) = fold (insert (op =) o name_of_task) tasks;
    2.70 -
    2.71 -fun dequeue_deps thread (Deps deps) (queue as Queue {groups, jobs}) =
    2.72 +fun dequeue_deps thread deps (queue as Queue {groups, jobs}) =
    2.73    let
    2.74      fun ready [] rest = (NONE, rev rest)
    2.75        | ready (task :: tasks) rest =
    2.76 @@ -322,28 +324,14 @@
    2.77  
    2.78      fun result (res as (task, _)) deps' =
    2.79        let val jobs' = set_job task (Running thread) jobs
    2.80 -      in ((SOME res, Deps deps'), make_queue groups jobs') end;
    2.81 +      in ((SOME res, deps'), make_queue groups jobs') end;
    2.82    in
    2.83      (case ready deps [] of
    2.84        (SOME res, deps') => result res deps'
    2.85      | (NONE, deps') =>
    2.86          (case ready_dep [] deps' of
    2.87            SOME res => result res deps'
    2.88 -        | NONE => ((NONE, Deps deps'), queue)))
    2.89 +        | NONE => ((NONE, deps'), queue)))
    2.90    end;
    2.91  
    2.92  end;
    2.93 -
    2.94 -
    2.95 -(* timing *)
    2.96 -
    2.97 -fun running task =
    2.98 -  update_timing (fn t => fn (a, b, ds) => (Time.+ (a, t), b, ds)) task;
    2.99 -
   2.100 -fun joining task =
   2.101 -  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), b, ds)) task;
   2.102 -
   2.103 -fun waiting task deps =
   2.104 -  update_timing (fn t => fn (a, b, ds) => (Time.- (a, t), Time.+ (b, t), insert_deps deps ds)) task;
   2.105 -
   2.106 -end;