native Isabelle_Process commands, based on efficient byte channel protocol for string lists;
authorwenzelm
Wed, 11 Aug 2010 00:44:48 +0200
changeset 3856871bb3c273dd1
parent 38567 cd6906d9199f
child 38569 36187e8443dd
native Isabelle_Process commands, based on efficient byte channel protocol for string lists;
misc clarification of proc/pid state, eliminated closing flag;
misc tuning and simplification;
src/Pure/System/isabelle_process.ML
src/Pure/System/isabelle_process.scala
     1.1 --- a/src/Pure/System/isabelle_process.ML	Wed Aug 11 00:42:40 2010 +0200
     1.2 +++ b/src/Pure/System/isabelle_process.ML	Wed Aug 11 00:44:48 2010 +0200
     1.3 @@ -11,6 +11,9 @@
     1.4  signature ISABELLE_PROCESS =
     1.5  sig
     1.6    val isabelle_processN: string
     1.7 +  val add_command: string -> (string list -> unit) -> unit
     1.8 +  val command: string -> string list -> unit
     1.9 +  val crashes: exn list Unsynchronized.ref
    1.10    val init: string -> string -> unit
    1.11  end;
    1.12  
    1.13 @@ -25,6 +28,28 @@
    1.14  val _ = Markup.add_mode isabelle_processN YXML.output_markup;
    1.15  
    1.16  
    1.17 +(* commands *)
    1.18 +
    1.19 +local
    1.20 +
    1.21 +val global_commands = Unsynchronized.ref (Symtab.empty: (string list -> unit) Symtab.table);
    1.22 +
    1.23 +in
    1.24 +
    1.25 +fun add_command name cmd = CRITICAL (fn () =>
    1.26 +  Unsynchronized.change global_commands (fn cmds =>
    1.27 +   (if not (Symtab.defined cmds name) then ()
    1.28 +    else warning ("Redefining Isabelle process command " ^ quote name);
    1.29 +    Symtab.update (name, cmd) cmds)));
    1.30 +
    1.31 +fun command name args =
    1.32 +  (case Symtab.lookup (! global_commands) name of
    1.33 +    NONE => error ("Undefined Isabelle process command " ^ quote name)
    1.34 +  | SOME cmd => cmd args);
    1.35 +
    1.36 +end;
    1.37 +
    1.38 +
    1.39  (* message markup *)
    1.40  
    1.41  local
    1.42 @@ -94,6 +119,53 @@
    1.43  end;
    1.44  
    1.45  
    1.46 +(* protocol loop *)
    1.47 +
    1.48 +val crashes = Unsynchronized.ref ([]: exn list);
    1.49 +
    1.50 +local
    1.51 +
    1.52 +fun recover crash =
    1.53 +  (CRITICAL (fn () => Unsynchronized.change crashes (cons crash));
    1.54 +    warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes");
    1.55 +
    1.56 +fun read_chunk stream len =
    1.57 +  let
    1.58 +    val n =
    1.59 +      (case Int.fromString len of
    1.60 +        SOME n => n
    1.61 +      | NONE => error ("Isabelle process: malformed chunk header " ^ quote len));
    1.62 +    val chunk = TextIO.inputN (stream, n);
    1.63 +    val m = size chunk;
    1.64 +  in
    1.65 +    if m = n then chunk
    1.66 +    else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")")
    1.67 +  end;
    1.68 +
    1.69 +fun read_command stream =
    1.70 +  (case TextIO.inputLine stream of
    1.71 +    NONE => raise Runtime.TERMINATE
    1.72 +  | SOME line => map (read_chunk stream) (space_explode "," line));
    1.73 +
    1.74 +fun run_command name args =
    1.75 +  Runtime.debugging (command name) args
    1.76 +    handle exn =>
    1.77 +      error ("Isabelle process command failure: " ^ name ^ "\n" ^ ML_Compiler.exn_message exn);
    1.78 +
    1.79 +in
    1.80 +
    1.81 +fun loop stream =
    1.82 +  let val continue =
    1.83 +    (case read_command stream of
    1.84 +      [] => (Output.error_msg "Isabelle process: no input"; true)
    1.85 +    | name :: args => (run_command name args; true))
    1.86 +    handle Runtime.TERMINATE => false
    1.87 +      | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
    1.88 +  in if continue then loop stream else () end;
    1.89 +
    1.90 +end;
    1.91 +
    1.92 +
    1.93  (* init *)
    1.94  
    1.95  fun init in_fifo out_fifo =
    1.96 @@ -105,10 +177,8 @@
    1.97      val _ = quick_and_dirty := true;  (* FIXME !? *)
    1.98      val _ = Keyword.status ();
    1.99      val _ = Output.status (Markup.markup Markup.ready "");
   1.100 -    val _ =
   1.101 -      Simple_Thread.fork false (fn () =>
   1.102 -        (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true};
   1.103 -          quit ()));
   1.104 +    val _ = Context.set_thread_data NONE;
   1.105 +    val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ()));
   1.106    in () end;
   1.107  
   1.108  end;
     2.1 --- a/src/Pure/System/isabelle_process.scala	Wed Aug 11 00:42:40 2010 +0200
     2.2 +++ b/src/Pure/System/isabelle_process.scala	Wed Aug 11 00:44:48 2010 +0200
     2.3 @@ -9,7 +9,7 @@
     2.4  
     2.5  import java.util.concurrent.LinkedBlockingQueue
     2.6  import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
     2.7 -  InputStream, OutputStream, IOException}
     2.8 +  InputStream, OutputStream, BufferedOutputStream, IOException}
     2.9  
    2.10  import scala.actors.Actor
    2.11  import Actor._
    2.12 @@ -89,9 +89,8 @@
    2.13  
    2.14    /* process information */
    2.15  
    2.16 -  @volatile private var proc: Process = null
    2.17 -  @volatile private var closing = false
    2.18 -  @volatile private var pid: String = null
    2.19 +  @volatile private var proc: Option[Process] = None
    2.20 +  @volatile private var pid: Option[String] = None
    2.21  
    2.22  
    2.23    /* results */
    2.24 @@ -99,7 +98,7 @@
    2.25    private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
    2.26    {
    2.27      if (kind == Markup.INIT) {
    2.28 -      for ((Markup.PID, p) <- props) pid = p
    2.29 +      for ((Markup.PID, p) <- props) pid = Some(p)
    2.30      }
    2.31      receiver ! new Result(XML.Elem(Markup(kind, props), body))
    2.32    }
    2.33 @@ -112,42 +111,49 @@
    2.34  
    2.35    /* signals */
    2.36  
    2.37 -  def interrupt() = synchronized {  // FIXME avoid synchronized
    2.38 -    if (proc == null) error("Cannot interrupt Isabelle: no process")
    2.39 -    if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
    2.40 -    else {
    2.41 -      try {
    2.42 -        if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
    2.43 -          put_result(Markup.SIGNAL, "INT")
    2.44 -        else
    2.45 -          put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
    2.46 +  def interrupt()
    2.47 +  {
    2.48 +    if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
    2.49 +    else
    2.50 +      pid match {
    2.51 +        case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
    2.52 +        case Some(i) =>
    2.53 +          try {
    2.54 +            if (system.execute(true, "kill", "-INT", i).waitFor == 0)
    2.55 +              put_result(Markup.SIGNAL, "INT")
    2.56 +            else
    2.57 +              put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
    2.58 +          }
    2.59 +          catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
    2.60        }
    2.61 -      catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
    2.62 +  }
    2.63 +
    2.64 +  def kill()
    2.65 +  {
    2.66 +    proc match {
    2.67 +      case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
    2.68 +      case Some(p) =>
    2.69 +        close()
    2.70 +        Thread.sleep(500)  // FIXME !?
    2.71 +        put_result(Markup.SIGNAL, "KILL")
    2.72 +        p.destroy
    2.73 +        proc = None
    2.74 +        pid = None
    2.75      }
    2.76    }
    2.77  
    2.78 -  def kill() = synchronized {  // FIXME avoid synchronized
    2.79 -    if (proc == 0) error("Cannot kill Isabelle: no process")
    2.80 -    else {
    2.81 -      try_close()
    2.82 -      Thread.sleep(500)  // FIXME property!?
    2.83 -      put_result(Markup.SIGNAL, "KILL")
    2.84 -      proc.destroy
    2.85 -      proc = null
    2.86 -      pid = null
    2.87 -    }
    2.88 -  }
    2.89 -
    2.90  
    2.91  
    2.92    /** stream actors **/
    2.93  
    2.94 -  /* input */
    2.95 -
    2.96 -  case class Input(cmd: String)
    2.97 +  case class Input_Text(text: String)
    2.98 +  case class Input_Chunks(chunks: List[Array[Byte]])
    2.99    case object Close
   2.100  
   2.101 -  private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
   2.102 +
   2.103 +  /* raw stdin */
   2.104 +
   2.105 +  private def stdin_actor(name: String, stream: OutputStream): Actor =
   2.106      Library.thread_actor(name) {
   2.107        val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
   2.108        var finished = false
   2.109 @@ -155,8 +161,8 @@
   2.110          try {
   2.111            //{{{
   2.112            receive {
   2.113 -            case Input(text) =>
   2.114 -              put_result(kind, text)
   2.115 +            case Input_Text(text) =>
   2.116 +              // FIXME echo input?!
   2.117                writer.write(text)
   2.118                writer.flush
   2.119              case Close =>
   2.120 @@ -174,9 +180,9 @@
   2.121      }
   2.122  
   2.123  
   2.124 -  /* raw output */
   2.125 +  /* raw stdout */
   2.126  
   2.127 -  private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
   2.128 +  private def stdout_actor(name: String, stream: InputStream): Actor =
   2.129      Library.thread_actor(name) {
   2.130        val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
   2.131        var result = new StringBuilder(100)
   2.132 @@ -193,13 +199,43 @@
   2.133              else done = true
   2.134            }
   2.135            if (result.length > 0) {
   2.136 -            put_result(kind, result.toString)
   2.137 +            put_result(Markup.STDOUT, result.toString)
   2.138              result.length = 0
   2.139            }
   2.140            else {
   2.141              reader.close
   2.142              finished = true
   2.143 -            try_close()
   2.144 +            close()
   2.145 +          }
   2.146 +          //}}}
   2.147 +        }
   2.148 +        catch {
   2.149 +          case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
   2.150 +        }
   2.151 +      }
   2.152 +      put_result(Markup.SYSTEM, name + " terminated")
   2.153 +    }
   2.154 +
   2.155 +
   2.156 +  /* command input */
   2.157 +
   2.158 +  private def input_actor(name: String, raw_stream: OutputStream): Actor =
   2.159 +    Library.thread_actor(name) {
   2.160 +      val stream = new BufferedOutputStream(raw_stream)
   2.161 +      var finished = false
   2.162 +      while (!finished) {
   2.163 +        try {
   2.164 +          //{{{
   2.165 +          receive {
   2.166 +            case Input_Chunks(chunks) =>
   2.167 +              stream.write(Standard_System.string_bytes(
   2.168 +                  chunks.map(_.length).mkString("", ",", "\n")));
   2.169 +              chunks.foreach(stream.write(_));
   2.170 +              stream.flush
   2.171 +            case Close =>
   2.172 +              stream.close
   2.173 +              finished = true
   2.174 +            case bad => System.err.println(name + ": ignoring bad message " + bad)
   2.175            }
   2.176            //}}}
   2.177          }
   2.178 @@ -281,7 +317,7 @@
   2.179          }
   2.180        } while (c != -1)
   2.181        stream.close
   2.182 -      try_close()
   2.183 +      close()
   2.184  
   2.185        put_result(Markup.SYSTEM, name + " terminated")
   2.186      }
   2.187 @@ -299,7 +335,7 @@
   2.188    try {
   2.189      val cmdline =
   2.190        List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   2.191 -    proc = system.execute(true, cmdline: _*)
   2.192 +    proc = Some(system.execute(true, cmdline: _*))
   2.193    }
   2.194    catch {
   2.195      case e: IOException =>
   2.196 @@ -308,49 +344,37 @@
   2.197    }
   2.198  
   2.199  
   2.200 +  /* I/O actors */
   2.201 +
   2.202 +  private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
   2.203 +  stdout_actor("standard_output", proc.get.getInputStream)
   2.204 +
   2.205 +  private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo))
   2.206 +  message_actor("message_output", system.fifo_input_stream(out_fifo))
   2.207 +
   2.208 +
   2.209    /* exit process */
   2.210  
   2.211    Library.thread_actor("process_exit") {
   2.212 -    val rc = proc.waitFor()
   2.213 -    Thread.sleep(300)  // FIXME property!?
   2.214 -    put_result(Markup.SYSTEM, "process_exit terminated")
   2.215 -    put_result(Markup.EXIT, rc.toString)
   2.216 +    proc match {
   2.217 +      case None =>
   2.218 +      case Some(p) =>
   2.219 +        val rc = p.waitFor()
   2.220 +        Thread.sleep(300)  // FIXME property!?
   2.221 +        put_result(Markup.SYSTEM, "process_exit terminated")
   2.222 +        put_result(Markup.EXIT, rc.toString)
   2.223 +    }
   2.224      rm_fifos()
   2.225    }
   2.226  
   2.227  
   2.228 -  /* I/O actors */
   2.229 -
   2.230 -  private val standard_input =
   2.231 -    input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
   2.232 -
   2.233 -  private val command_input =
   2.234 -    input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
   2.235 -
   2.236 -  output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
   2.237 -  message_actor("message_output", system.fifo_input_stream(out_fifo))
   2.238 -
   2.239 -
   2.240  
   2.241    /** main methods **/
   2.242  
   2.243 -  def input_raw(text: String) = standard_input ! Input(text)
   2.244 +  def input_raw(text: String): Unit = standard_input ! Input_Text(text)
   2.245  
   2.246 -  def input(text: String) = synchronized {  // FIXME avoid synchronized
   2.247 -    if (proc == null) error("Cannot output to Isabelle: no process")
   2.248 -    if (closing) error("Cannot output to Isabelle: already closing")
   2.249 -    command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
   2.250 -  }
   2.251 +  def input(name: String, args: String*): Unit =
   2.252 +    command_input ! Input_Chunks((name :: args.toList).map(Standard_System.string_bytes))
   2.253  
   2.254 -  def close() = synchronized {    // FIXME avoid synchronized
   2.255 -    command_input ! Close
   2.256 -    closing = true
   2.257 -  }
   2.258 -
   2.259 -  def try_close() = synchronized {
   2.260 -    if (proc != null && !closing) {
   2.261 -      try { close() }
   2.262 -      catch { case _: RuntimeException => }
   2.263 -    }
   2.264 -  }
   2.265 +  def close(): Unit = command_input ! Close
   2.266  }