Future.promise: explicit abort operation (like uninterruptible future job);
authorwenzelm
Fri, 19 Aug 2011 14:01:20 +0200
changeset 45173b8f8488704e2
parent 45172 b3bd26fd22d3
child 45174 061599cb6eb0
Future.promise: explicit abort operation (like uninterruptible future job);
explicit cancel_scala message -- still unused;
src/Pure/Concurrent/future.ML
src/Pure/Concurrent/lazy.ML
src/Pure/General/markup.ML
src/Pure/General/markup.scala
src/Pure/System/invoke_scala.ML
src/Pure/System/session.scala
     1.1 --- a/src/Pure/Concurrent/future.ML	Fri Aug 19 13:55:32 2011 +0200
     1.2 +++ b/src/Pure/Concurrent/future.ML	Fri Aug 19 14:01:20 2011 +0200
     1.3 @@ -62,8 +62,8 @@
     1.4    val value: 'a -> 'a future
     1.5    val map: ('a -> 'b) -> 'a future -> 'b future
     1.6    val cond_forks: fork_params -> (unit -> 'a) list -> 'a future list
     1.7 -  val promise_group: Task_Queue.group -> 'a future
     1.8 -  val promise: unit -> 'a future
     1.9 +  val promise_group: Task_Queue.group -> (unit -> unit) -> 'a future
    1.10 +  val promise: (unit -> unit) -> 'a future
    1.11    val fulfill_result: 'a future -> 'a Exn.result -> unit
    1.12    val fulfill: 'a future -> 'a -> unit
    1.13    val shutdown: unit -> unit
    1.14 @@ -561,19 +561,23 @@
    1.15  
    1.16  (* promised futures -- fulfilled by external means *)
    1.17  
    1.18 -fun promise_group group : 'a future =
    1.19 +fun promise_group group abort : 'a future =
    1.20    let
    1.21      val result = Single_Assignment.var "promise" : 'a result;
    1.22 -    fun abort () = assign_result group result Exn.interrupt_exn
    1.23 +    fun assign () = assign_result group result Exn.interrupt_exn
    1.24        handle Fail _ => true
    1.25          | exn =>
    1.26 -            if Exn.is_interrupt exn then raise Fail "Concurrent attempt to fulfill promise"
    1.27 +            if Exn.is_interrupt exn
    1.28 +            then raise Fail "Concurrent attempt to fulfill promise"
    1.29              else reraise exn;
    1.30 +    fun job () =
    1.31 +      Multithreading.with_attributes Multithreading.no_interrupts
    1.32 +        (fn _ => assign () before abort ());
    1.33      val task = SYNCHRONIZED "enqueue_passive" (fn () =>
    1.34 -      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group abort));
    1.35 +      Unsynchronized.change_result queue (Task_Queue.enqueue_passive group job));
    1.36    in Future {promised = true, task = task, result = result} end;
    1.37  
    1.38 -fun promise () = promise_group (worker_subgroup ());
    1.39 +fun promise abort = promise_group (worker_subgroup ()) abort;
    1.40  
    1.41  fun fulfill_result (Future {promised, task, result}) res =
    1.42    if not promised then raise Fail "Not a promised future"
     2.1 --- a/src/Pure/Concurrent/lazy.ML	Fri Aug 19 13:55:32 2011 +0200
     2.2 +++ b/src/Pure/Concurrent/lazy.ML	Fri Aug 19 14:01:20 2011 +0200
     2.3 @@ -57,7 +57,7 @@
     2.4            val (expr, x) =
     2.5              Synchronized.change_result var
     2.6                (fn Expr e =>
     2.7 -                    let val x = Future.promise ()
     2.8 +                    let val x = Future.promise I
     2.9                      in ((SOME e, x), Result x) end
    2.10                  | Result x => ((NONE, x), Result x));
    2.11          in
     3.1 --- a/src/Pure/General/markup.ML	Fri Aug 19 13:55:32 2011 +0200
     3.2 +++ b/src/Pure/General/markup.ML	Fri Aug 19 14:01:20 2011 +0200
     3.3 @@ -119,6 +119,7 @@
     3.4    val badN: string val bad: T
     3.5    val functionN: string
     3.6    val invoke_scala: string -> string -> Properties.T
     3.7 +  val cancel_scala: string -> Properties.T
     3.8    val no_output: Output.output * Output.output
     3.9    val default_output: T -> Output.output * Output.output
    3.10    val add_mode: string -> (T -> Output.output * Output.output) -> unit
    3.11 @@ -377,6 +378,7 @@
    3.12  val functionN = "function"
    3.13  
    3.14  fun invoke_scala name id = [(functionN, "invoke_scala"), (nameN, name), (idN, id)];
    3.15 +fun cancel_scala id = [(functionN, "cancel_scala"), (idN, id)];
    3.16  
    3.17  
    3.18  
     4.1 --- a/src/Pure/General/markup.scala	Fri Aug 19 13:55:32 2011 +0200
     4.2 +++ b/src/Pure/General/markup.scala	Fri Aug 19 14:01:20 2011 +0200
     4.3 @@ -283,6 +283,16 @@
     4.4        }
     4.5    }
     4.6  
     4.7 +  val CANCEL_SCALA = "cancel_scala"
     4.8 +  object Cancel_Scala
     4.9 +  {
    4.10 +    def unapply(props: Properties.T): Option[String] =
    4.11 +      props match {
    4.12 +        case List((FUNCTION, CANCEL_SCALA), (ID, id)) => Some(id)
    4.13 +        case _ => None
    4.14 +      }
    4.15 +  }
    4.16 +
    4.17  
    4.18    /* system data */
    4.19  
     5.1 --- a/src/Pure/System/invoke_scala.ML	Fri Aug 19 13:55:32 2011 +0200
     5.2 +++ b/src/Pure/System/invoke_scala.ML	Fri Aug 19 14:01:20 2011 +0200
     5.3 @@ -33,7 +33,8 @@
     5.4  fun promise_method name arg =
     5.5    let
     5.6      val id = new_id ();
     5.7 -    val promise = Future.promise () : string future;
     5.8 +    fun abort () = Output.raw_message (Markup.cancel_scala id) "";
     5.9 +    val promise = Future.promise abort : string future;
    5.10      val _ = Synchronized.change promises (Symtab.update (id, promise));
    5.11      val _ = Output.raw_message (Markup.invoke_scala name id) arg;
    5.12    in promise end;
     6.1 --- a/src/Pure/System/session.scala	Fri Aug 19 13:55:32 2011 +0200
     6.2 +++ b/src/Pure/System/session.scala	Fri Aug 19 14:01:20 2011 +0200
     6.3 @@ -275,6 +275,8 @@
     6.4              val (tag, res) = Invoke_Scala.method(name, arg)
     6.5              prover.get.invoke_scala(id, tag, res)
     6.6            }
     6.7 +        case Markup.Cancel_Scala(id) =>
     6.8 +          System.err.println("cancel_scala " + id)  // FIXME cancel JVM task
     6.9          case _ =>
    6.10            if (result.is_syslog) {
    6.11              reverse_syslog ::= result.message