explicit management of Session.Protocol_Handlers, with protocol state and functions;
authorwenzelm
Wed, 22 May 2013 14:10:45 +0200
changeset 532481fd184eaa310
parent 53245 06db08182c4b
child 53249 3610ae73cfdb
explicit management of Session.Protocol_Handlers, with protocol state and functions;
more self-contained ML/Scala module Invoke_Scala;
src/Pure/PIDE/markup.ML
src/Pure/PIDE/markup.scala
src/Pure/PIDE/protocol.ML
src/Pure/PIDE/protocol.scala
src/Pure/System/invoke_scala.ML
src/Pure/System/invoke_scala.scala
src/Pure/System/isabelle_process.ML
src/Pure/System/session.ML
src/Pure/System/session.scala
     1.1 --- a/src/Pure/PIDE/markup.ML	Wed May 22 08:46:39 2013 +0200
     1.2 +++ b/src/Pure/PIDE/markup.ML	Wed May 22 14:10:45 2013 +0200
     1.3 @@ -142,6 +142,7 @@
     1.4    val functionN: string
     1.5    val assign_execs: Properties.T
     1.6    val removed_versions: Properties.T
     1.7 +  val protocol_handler: string -> Properties.T
     1.8    val invoke_scala: string -> string -> Properties.T
     1.9    val cancel_scala: string -> Properties.T
    1.10    val ML_statistics: Properties.entry
    1.11 @@ -461,6 +462,8 @@
    1.12  val assign_execs = [(functionN, "assign_execs")];
    1.13  val removed_versions = [(functionN, "removed_versions")];
    1.14  
    1.15 +fun protocol_handler name = [(functionN, "protocol_handler"), (nameN, name)];
    1.16 +
    1.17  fun invoke_scala name id = [(functionN, "invoke_scala"), (nameN, name), (idN, id)];
    1.18  fun cancel_scala id = [(functionN, "cancel_scala"), (idN, id)];
    1.19  
     2.1 --- a/src/Pure/PIDE/markup.scala	Wed May 22 08:46:39 2013 +0200
     2.2 +++ b/src/Pure/PIDE/markup.scala	Wed May 22 14:10:45 2013 +0200
     2.3 @@ -316,19 +316,31 @@
     2.4    val Assign_Execs: Properties.T = List((FUNCTION, "assign_execs"))
     2.5    val Removed_Versions: Properties.T = List((FUNCTION, "removed_versions"))
     2.6  
     2.7 +  object Protocol_Handler
     2.8 +  {
     2.9 +    def unapply(props: Properties.T): Option[(String)] =
    2.10 +      props match {
    2.11 +        case List((FUNCTION, "protocol_handler"), (NAME, name)) => Some(name)
    2.12 +        case _ => None
    2.13 +      }
    2.14 +  }
    2.15 +
    2.16 +  val INVOKE_SCALA = "invoke_scala"
    2.17    object Invoke_Scala
    2.18    {
    2.19      def unapply(props: Properties.T): Option[(String, String)] =
    2.20        props match {
    2.21 -        case List((FUNCTION, "invoke_scala"), (NAME, name), (ID, id)) => Some((name, id))
    2.22 +        case List((FUNCTION, INVOKE_SCALA), (NAME, name), (ID, id)) => Some((name, id))
    2.23          case _ => None
    2.24        }
    2.25    }
    2.26 +
    2.27 +  val CANCEL_SCALA = "cancel_scala"
    2.28    object Cancel_Scala
    2.29    {
    2.30      def unapply(props: Properties.T): Option[String] =
    2.31        props match {
    2.32 -        case List((FUNCTION, "cancel_scala"), (ID, id)) => Some(id)
    2.33 +        case List((FUNCTION, CANCEL_SCALA), (ID, id)) => Some(id)
    2.34          case _ => None
    2.35        }
    2.36    }
     3.1 --- a/src/Pure/PIDE/protocol.ML	Wed May 22 08:46:39 2013 +0200
     3.2 +++ b/src/Pure/PIDE/protocol.ML	Wed May 22 14:10:45 2013 +0200
     3.3 @@ -78,11 +78,5 @@
     3.4        Active.dialog_result (Markup.parse_int serial) result
     3.5          handle exn => if Exn.is_interrupt exn then () else reraise exn);
     3.6  
     3.7 -val _ =
     3.8 -  Isabelle_Process.protocol_command "Document.invoke_scala"
     3.9 -    (fn [id, tag, res] =>
    3.10 -      Invoke_Scala.fulfill_method id tag res
    3.11 -        handle exn => if Exn.is_interrupt exn then () else reraise exn);
    3.12 -
    3.13  end;
    3.14  
     4.1 --- a/src/Pure/PIDE/protocol.scala	Wed May 22 08:46:39 2013 +0200
     4.2 +++ b/src/Pure/PIDE/protocol.scala	Wed May 22 14:10:45 2013 +0200
     4.3 @@ -364,12 +364,4 @@
     4.4    {
     4.5      input("Document.dialog_result", Properties.Value.Long(serial), result)
     4.6    }
     4.7 -
     4.8 -
     4.9 -  /* method invocation service */
    4.10 -
    4.11 -  def invoke_scala(id: String, tag: Invoke_Scala.Tag.Value, res: String)
    4.12 -  {
    4.13 -    input("Document.invoke_scala", id, tag.toString, res)
    4.14 -  }
    4.15  }
     5.1 --- a/src/Pure/System/invoke_scala.ML	Wed May 22 08:46:39 2013 +0200
     5.2 +++ b/src/Pure/System/invoke_scala.ML	Wed May 22 14:10:45 2013 +0200
     5.3 @@ -6,16 +6,15 @@
     5.4  
     5.5  signature INVOKE_SCALA =
     5.6  sig
     5.7 -  exception Null
     5.8    val method: string -> string -> string
     5.9    val promise_method: string -> string -> string future
    5.10 -  val fulfill_method: string -> string -> string -> unit
    5.11 +  exception Null
    5.12  end;
    5.13  
    5.14  structure Invoke_Scala: INVOKE_SCALA =
    5.15  struct
    5.16  
    5.17 -exception Null;
    5.18 +val _ = Session.protocol_handler "isabelle.Invoke_Scala";
    5.19  
    5.20  
    5.21  (* pending promises *)
    5.22 @@ -40,9 +39,11 @@
    5.23  fun method name arg = Future.join (promise_method name arg);
    5.24  
    5.25  
    5.26 -(* fulfill method *)
    5.27 +(* fulfill *)
    5.28  
    5.29 -fun fulfill_method id tag res =
    5.30 +exception Null;
    5.31 +
    5.32 +fun fulfill id tag res =
    5.33    let
    5.34      val result =
    5.35        (case tag of
    5.36 @@ -58,5 +59,11 @@
    5.37      val _ = Future.fulfill_result promise result;
    5.38    in () end;
    5.39  
    5.40 +val _ =
    5.41 +  Isabelle_Process.protocol_command "Invoke_Scala.fulfill"
    5.42 +    (fn [id, tag, res] =>
    5.43 +      fulfill id tag res
    5.44 +        handle exn => if Exn.is_interrupt exn then () else reraise exn);
    5.45 +
    5.46  end;
    5.47  
     6.1 --- a/src/Pure/System/invoke_scala.scala	Wed May 22 08:46:39 2013 +0200
     6.2 +++ b/src/Pure/System/invoke_scala.scala	Wed May 22 14:10:45 2013 +0200
     6.3 @@ -64,3 +64,69 @@
     6.4        case Exn.Exn(e) => (Tag.FAIL, Exn.message(e))
     6.5      }
     6.6  }
     6.7 +
     6.8 +
     6.9 +/* protocol handler */
    6.10 +
    6.11 +class Invoke_Scala extends Session.Protocol_Handler
    6.12 +{
    6.13 +  private var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
    6.14 +
    6.15 +  private def fulfill(prover: Session.Prover,
    6.16 +    id: String, tag: Invoke_Scala.Tag.Value, res: String): Unit = synchronized
    6.17 +  {
    6.18 +    if (futures.isDefinedAt(id)) {
    6.19 +      prover.input("Invoke_Scala.fulfill", id, tag.toString, res)
    6.20 +      futures -= id
    6.21 +    }
    6.22 +  }
    6.23 +
    6.24 +  private def cancel(prover: Session.Prover,
    6.25 +    id: String, future: java.util.concurrent.Future[Unit])
    6.26 +  {
    6.27 +    future.cancel(true)
    6.28 +    fulfill(prover, id, Invoke_Scala.Tag.INTERRUPT, "")
    6.29 +  }
    6.30 +
    6.31 +  private def invoke_scala(
    6.32 +    prover: Session.Prover, output: Isabelle_Process.Output): Boolean = synchronized
    6.33 +  {
    6.34 +    output.properties match {
    6.35 +      case Markup.Invoke_Scala(name, id) =>
    6.36 +        futures += (id ->
    6.37 +          default_thread_pool.submit(() =>
    6.38 +            {
    6.39 +              val arg = XML.content(output.body)
    6.40 +              val (tag, result) = Invoke_Scala.method(name, arg)
    6.41 +              fulfill(prover, id, tag, result)
    6.42 +            }))
    6.43 +        true
    6.44 +      case _ => false
    6.45 +    }
    6.46 +  }
    6.47 +
    6.48 +  private def cancel_scala(
    6.49 +    prover: Session.Prover, output: Isabelle_Process.Output): Boolean = synchronized
    6.50 +  {
    6.51 +    output.properties match {
    6.52 +      case Markup.Cancel_Scala(id) =>
    6.53 +        futures.get(id) match {
    6.54 +          case Some(future) => cancel(prover, id, future)
    6.55 +          case None =>
    6.56 +        }
    6.57 +        true
    6.58 +      case _ => false
    6.59 +    }
    6.60 +  }
    6.61 +
    6.62 +  override def stop(prover: Session.Prover): Unit = synchronized
    6.63 +  {
    6.64 +    for ((id, future) <- futures) cancel(prover, id, future)
    6.65 +    futures = Map.empty
    6.66 +  }
    6.67 +
    6.68 +  val functions = Map(
    6.69 +    Markup.INVOKE_SCALA -> invoke_scala _,
    6.70 +    Markup.CANCEL_SCALA -> cancel_scala _)
    6.71 +}
    6.72 +
     7.1 --- a/src/Pure/System/isabelle_process.ML	Wed May 22 08:46:39 2013 +0200
     7.2 +++ b/src/Pure/System/isabelle_process.ML	Wed May 22 14:10:45 2013 +0200
     7.3 @@ -221,6 +221,7 @@
     7.4  
     7.5      val channel = rendezvous ();
     7.6      val _ = init_channels channel;
     7.7 +    val _ = Session.init_protocol_handlers ();
     7.8    in loop channel end));
     7.9  
    7.10  fun init_fifos fifo1 fifo2 = init (fn () => System_Channel.fifo_rendezvous fifo1 fifo2);
     8.1 --- a/src/Pure/System/session.ML	Wed May 22 08:46:39 2013 +0200
     8.2 +++ b/src/Pure/System/session.ML	Wed May 22 14:10:45 2013 +0200
     8.3 @@ -1,7 +1,7 @@
     8.4  (*  Title:      Pure/System/session.ML
     8.5      Author:     Makarius
     8.6  
     8.7 -Session management -- internal state of logic images (not thread-safe).
     8.8 +Session management -- internal state of logic images.
     8.9  *)
    8.10  
    8.11  signature SESSION =
    8.12 @@ -11,12 +11,14 @@
    8.13    val init: bool -> bool -> Path.T -> string -> bool -> string -> (string * string) list ->
    8.14      string -> string * string -> bool * string -> bool -> unit
    8.15    val finish: unit -> unit
    8.16 +  val protocol_handler: string -> unit
    8.17 +  val init_protocol_handlers: unit -> unit
    8.18  end;
    8.19  
    8.20  structure Session: SESSION =
    8.21  struct
    8.22  
    8.23 -(* session state *)
    8.24 +(** session identification -- not thread-safe **)
    8.25  
    8.26  val session = Unsynchronized.ref {chapter = "Pure", name = "Pure"};
    8.27  val session_finished = Unsynchronized.ref false;
    8.28 @@ -58,4 +60,20 @@
    8.29    Event_Timer.shutdown ();
    8.30    session_finished := true);
    8.31  
    8.32 +
    8.33 +(** protocol handlers **)
    8.34 +
    8.35 +val protocol_handlers = Synchronized.var "protocol_handlers" ([]: string list);
    8.36 +
    8.37 +fun protocol_handler name =
    8.38 +  Synchronized.change protocol_handlers (fn handlers =>
    8.39 +   (Output.try_protocol_message (Markup.protocol_handler name) "";
    8.40 +    if not (member (op =) handlers name) then ()
    8.41 +    else warning ("Redefining protocol handler: " ^ quote name);
    8.42 +    update (op =) name handlers));
    8.43 +
    8.44 +fun init_protocol_handlers () =
    8.45 +  Synchronized.value protocol_handlers
    8.46 +  |> List.app (fn name => Output.try_protocol_message (Markup.protocol_handler name) "");
    8.47 +
    8.48  end;
     9.1 --- a/src/Pure/System/session.scala	Wed May 22 08:46:39 2013 +0200
     9.2 +++ b/src/Pure/System/session.scala	Wed May 22 14:10:45 2013 +0200
     9.3 @@ -37,6 +37,68 @@
     9.4    case object Ready extends Phase
     9.5    case object Shutdown extends Phase  // transient
     9.6    //}}}
     9.7 +
     9.8 +
     9.9 +  /* protocol handlers */
    9.10 +
    9.11 +  type Prover = Isabelle_Process with Protocol
    9.12 +
    9.13 +  abstract class Protocol_Handler
    9.14 +  {
    9.15 +    def stop(prover: Prover): Unit = {}
    9.16 +    val functions: Map[String, (Prover, Isabelle_Process.Output) => Boolean]
    9.17 +  }
    9.18 +
    9.19 +  class Protocol_Handlers(
    9.20 +    handlers: Map[String, Session.Protocol_Handler] = Map.empty,
    9.21 +    functions: Map[String, Isabelle_Process.Output => Boolean] = Map.empty)
    9.22 +  {
    9.23 +    def add(prover: Prover, name: String): Protocol_Handlers =
    9.24 +    {
    9.25 +      val (handlers1, functions1) =
    9.26 +        handlers.get(name) match {
    9.27 +          case Some(old_handler) =>
    9.28 +            System.err.println("Redefining protocol handler: " + name)
    9.29 +            old_handler.stop(prover)
    9.30 +            (handlers - name, functions -- old_handler.functions.keys)
    9.31 +          case None => (handlers, functions)
    9.32 +        }
    9.33 +
    9.34 +      val (handlers2, functions2) =
    9.35 +        try {
    9.36 +          val new_handler = Class.forName(name).newInstance.asInstanceOf[Protocol_Handler]
    9.37 +          val new_functions =
    9.38 +            for ((a, f) <- new_handler.functions.toList) yield
    9.39 +              (a, (output: Isabelle_Process.Output) => f(prover, output))
    9.40 +
    9.41 +          val dups = for ((a, _) <- new_functions if functions1.isDefinedAt(a)) yield a
    9.42 +          if (!dups.isEmpty) error("Duplicate protocol functions: " + commas_quote(dups))
    9.43 +
    9.44 +          (handlers1 + (name -> new_handler), functions1 ++ new_functions)
    9.45 +        }
    9.46 +        catch {
    9.47 +          case exn: Throwable =>
    9.48 +            System.err.println("Failed to initialize protocol handler: " +
    9.49 +              name + "\n" + Exn.message(exn))
    9.50 +            (handlers1, functions1)
    9.51 +        }
    9.52 +
    9.53 +      new Protocol_Handlers(handlers2, functions2)
    9.54 +    }
    9.55 +
    9.56 +    def invoke(output: Isabelle_Process.Output): Boolean =
    9.57 +      output.properties match {
    9.58 +        case Markup.Function(a) if functions.isDefinedAt(a) =>
    9.59 +          try { functions(a)(output) }
    9.60 +          catch {
    9.61 +            case exn: Throwable =>
    9.62 +              System.err.println("Failed invocation of protocol function: " +
    9.63 +                quote(a) + "\n" + Exn.message(exn))
    9.64 +            false
    9.65 +          }
    9.66 +        case _ => false
    9.67 +      }
    9.68 +  }
    9.69  }
    9.70  
    9.71  
    9.72 @@ -176,17 +238,16 @@
    9.73      previous: Document.Version,
    9.74      version: Document.Version)
    9.75    private case class Messages(msgs: List[Isabelle_Process.Message])
    9.76 -  private case class Finished_Scala(id: String, tag: Invoke_Scala.Tag.Value, result: String)
    9.77    private case class Update_Options(options: Options)
    9.78  
    9.79    private val (_, session_actor) = Simple_Thread.actor("session_actor", daemon = true)
    9.80    {
    9.81      val this_actor = self
    9.82  
    9.83 +    var protocol_handlers = new Session.Protocol_Handlers()
    9.84 +
    9.85      var prune_next = System.currentTimeMillis() + prune_delay.ms
    9.86  
    9.87 -    var futures = Map.empty[String, java.util.concurrent.Future[Unit]]
    9.88 -
    9.89  
    9.90      /* buffered prover messages */
    9.91  
    9.92 @@ -222,7 +283,7 @@
    9.93        def cancel() { timer.cancel() }
    9.94      }
    9.95  
    9.96 -    var prover: Option[Isabelle_Process with Protocol] = None
    9.97 +    var prover: Option[Session.Prover] = None
    9.98  
    9.99  
   9.100      /* delayed command changes */
   9.101 @@ -318,73 +379,68 @@
   9.102          }
   9.103        }
   9.104  
   9.105 -      output.properties match {
   9.106 +      if (output.is_protocol) {
   9.107 +        val handled = protocol_handlers.invoke(output)
   9.108 +        if (!handled) {
   9.109 +          output.properties match {
   9.110 +            case Markup.Protocol_Handler(name) =>
   9.111 +              protocol_handlers = protocol_handlers.add(prover.get, name)
   9.112  
   9.113 -        case Position.Id(state_id) if !output.is_protocol =>
   9.114 -          accumulate(state_id, output.message)
   9.115 +            case Protocol.Command_Timing(state_id, timing) =>
   9.116 +              val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   9.117 +              accumulate(state_id, prover.get.xml_cache.elem(message))
   9.118  
   9.119 -        case Protocol.Command_Timing(state_id, timing) if output.is_protocol =>
   9.120 -          val message = XML.elem(Markup.STATUS, List(XML.Elem(Markup.Timing(timing), Nil)))
   9.121 -          accumulate(state_id, prover.get.xml_cache.elem(message))
   9.122 +            case Markup.Assign_Execs =>
   9.123 +              XML.content(output.body) match {
   9.124 +                case Protocol.Assign(id, assign) =>
   9.125 +                  try {
   9.126 +                    val cmds = global_state >>> (_.assign(id, assign))
   9.127 +                    delay_commands_changed.invoke(true, cmds)
   9.128 +                  }
   9.129 +                  catch { case _: Document.State.Fail => bad_output() }
   9.130 +                case _ => bad_output()
   9.131 +              }
   9.132 +              // FIXME separate timeout event/message!?
   9.133 +              if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   9.134 +                val old_versions = global_state >>> (_.prune_history(prune_size))
   9.135 +                if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   9.136 +                prune_next = System.currentTimeMillis() + prune_delay.ms
   9.137 +              }
   9.138  
   9.139 -        case Markup.Assign_Execs if output.is_protocol =>
   9.140 -          XML.content(output.body) match {
   9.141 -            case Protocol.Assign(id, assign) =>
   9.142 -              try {
   9.143 -                val cmds = global_state >>> (_.assign(id, assign))
   9.144 -                delay_commands_changed.invoke(true, cmds)
   9.145 +            case Markup.Removed_Versions =>
   9.146 +              XML.content(output.body) match {
   9.147 +                case Protocol.Removed(removed) =>
   9.148 +                  try {
   9.149 +                    global_state >> (_.removed_versions(removed))
   9.150 +                  }
   9.151 +                  catch { case _: Document.State.Fail => bad_output() }
   9.152 +                case _ => bad_output()
   9.153                }
   9.154 -              catch { case _: Document.State.Fail => bad_output() }
   9.155 +
   9.156 +            case Markup.ML_Statistics(props) =>
   9.157 +              statistics.event(Session.Statistics(props))
   9.158 +
   9.159 +            case Markup.Task_Statistics(props) =>
   9.160 +              // FIXME
   9.161 +
   9.162              case _ => bad_output()
   9.163            }
   9.164 -          // FIXME separate timeout event/message!?
   9.165 -          if (prover.isDefined && System.currentTimeMillis() > prune_next) {
   9.166 -            val old_versions = global_state >>> (_.prune_history(prune_size))
   9.167 -            if (!old_versions.isEmpty) prover.get.remove_versions(old_versions)
   9.168 -            prune_next = System.currentTimeMillis() + prune_delay.ms
   9.169 -          }
   9.170 +        }
   9.171 +      }
   9.172 +      else {
   9.173 +        output.properties match {
   9.174 +          case Position.Id(state_id) =>
   9.175 +            accumulate(state_id, output.message)
   9.176  
   9.177 -        case Markup.Removed_Versions if output.is_protocol =>
   9.178 -          XML.content(output.body) match {
   9.179 -            case Protocol.Removed(removed) =>
   9.180 -              try {
   9.181 -                global_state >> (_.removed_versions(removed))
   9.182 -              }
   9.183 -              catch { case _: Document.State.Fail => bad_output() }
   9.184 -            case _ => bad_output()
   9.185 -          }
   9.186 +          case _ if output.is_init =>
   9.187 +            phase = Session.Ready
   9.188  
   9.189 -        case Markup.Invoke_Scala(name, id) if output.is_protocol =>
   9.190 -          futures += (id ->
   9.191 -            default_thread_pool.submit(() =>
   9.192 -              {
   9.193 -                val arg = XML.content(output.body)
   9.194 -                val (tag, result) = Invoke_Scala.method(name, arg)
   9.195 -                this_actor ! Finished_Scala(id, tag, result)
   9.196 -              }))
   9.197 +          case Markup.Return_Code(rc) if output.is_exit =>
   9.198 +            if (rc == 0) phase = Session.Inactive
   9.199 +            else phase = Session.Failed
   9.200  
   9.201 -        case Markup.Cancel_Scala(id) if output.is_protocol =>
   9.202 -          futures.get(id) match {
   9.203 -            case Some(future) =>
   9.204 -              future.cancel(true)
   9.205 -              this_actor ! Finished_Scala(id, Invoke_Scala.Tag.INTERRUPT, "")
   9.206 -            case None =>
   9.207 -          }
   9.208 -
   9.209 -        case Markup.ML_Statistics(props) if output.is_protocol =>
   9.210 -          statistics.event(Session.Statistics(props))
   9.211 -
   9.212 -        case Markup.Task_Statistics(props) if output.is_protocol =>
   9.213 -          // FIXME
   9.214 -
   9.215 -        case _ if output.is_init =>
   9.216 -          phase = Session.Ready
   9.217 -
   9.218 -        case Markup.Return_Code(rc) if output.is_exit =>
   9.219 -          if (rc == 0) phase = Session.Inactive
   9.220 -          else phase = Session.Failed
   9.221 -
   9.222 -        case _ => bad_output()
   9.223 +          case _ => bad_output()
   9.224 +        }
   9.225        }
   9.226      }
   9.227      //}}}
   9.228 @@ -455,12 +511,6 @@
   9.229          if prover.isDefined && global_state().is_assigned(change.previous) =>
   9.230            handle_change(change)
   9.231  
   9.232 -        case Finished_Scala(id, tag, result) if prover.isDefined =>
   9.233 -          if (futures.isDefinedAt(id)) {
   9.234 -            prover.get.invoke_scala(id, tag, result)
   9.235 -            futures -= id
   9.236 -          }
   9.237 -
   9.238          case bad if !bad.isInstanceOf[Change] =>
   9.239            System.err.println("session_actor: ignoring bad message " + bad)
   9.240        }