native Isabelle_Process commands, based on efficient byte channel protocol for string lists;
misc clarification of proc/pid state, eliminated closing flag;
misc tuning and simplification;
1.1 --- a/src/Pure/System/isabelle_process.ML Wed Aug 11 00:42:40 2010 +0200
1.2 +++ b/src/Pure/System/isabelle_process.ML Wed Aug 11 00:44:48 2010 +0200
1.3 @@ -11,6 +11,9 @@
1.4 signature ISABELLE_PROCESS =
1.5 sig
1.6 val isabelle_processN: string
1.7 + val add_command: string -> (string list -> unit) -> unit
1.8 + val command: string -> string list -> unit
1.9 + val crashes: exn list Unsynchronized.ref
1.10 val init: string -> string -> unit
1.11 end;
1.12
1.13 @@ -25,6 +28,28 @@
1.14 val _ = Markup.add_mode isabelle_processN YXML.output_markup;
1.15
1.16
1.17 +(* commands *)
1.18 +
1.19 +local
1.20 +
1.21 +val global_commands = Unsynchronized.ref (Symtab.empty: (string list -> unit) Symtab.table);
1.22 +
1.23 +in
1.24 +
1.25 +fun add_command name cmd = CRITICAL (fn () =>
1.26 + Unsynchronized.change global_commands (fn cmds =>
1.27 + (if not (Symtab.defined cmds name) then ()
1.28 + else warning ("Redefining Isabelle process command " ^ quote name);
1.29 + Symtab.update (name, cmd) cmds)));
1.30 +
1.31 +fun command name args =
1.32 + (case Symtab.lookup (! global_commands) name of
1.33 + NONE => error ("Undefined Isabelle process command " ^ quote name)
1.34 + | SOME cmd => cmd args);
1.35 +
1.36 +end;
1.37 +
1.38 +
1.39 (* message markup *)
1.40
1.41 local
1.42 @@ -94,6 +119,53 @@
1.43 end;
1.44
1.45
1.46 +(* protocol loop *)
1.47 +
1.48 +val crashes = Unsynchronized.ref ([]: exn list);
1.49 +
1.50 +local
1.51 +
1.52 +fun recover crash =
1.53 + (CRITICAL (fn () => Unsynchronized.change crashes (cons crash));
1.54 + warning "Recovering from Isabelle process crash -- see also Isabelle_Process.crashes");
1.55 +
1.56 +fun read_chunk stream len =
1.57 + let
1.58 + val n =
1.59 + (case Int.fromString len of
1.60 + SOME n => n
1.61 + | NONE => error ("Isabelle process: malformed chunk header " ^ quote len));
1.62 + val chunk = TextIO.inputN (stream, n);
1.63 + val m = size chunk;
1.64 + in
1.65 + if m = n then chunk
1.66 + else error ("Isabelle process: bad chunk (" ^ string_of_int m ^ " vs. " ^ string_of_int n ^ ")")
1.67 + end;
1.68 +
1.69 +fun read_command stream =
1.70 + (case TextIO.inputLine stream of
1.71 + NONE => raise Runtime.TERMINATE
1.72 + | SOME line => map (read_chunk stream) (space_explode "," line));
1.73 +
1.74 +fun run_command name args =
1.75 + Runtime.debugging (command name) args
1.76 + handle exn =>
1.77 + error ("Isabelle process command failure: " ^ name ^ "\n" ^ ML_Compiler.exn_message exn);
1.78 +
1.79 +in
1.80 +
1.81 +fun loop stream =
1.82 + let val continue =
1.83 + (case read_command stream of
1.84 + [] => (Output.error_msg "Isabelle process: no input"; true)
1.85 + | name :: args => (run_command name args; true))
1.86 + handle Runtime.TERMINATE => false
1.87 + | exn => (Output.error_msg (ML_Compiler.exn_message exn) handle crash => recover crash; true);
1.88 + in if continue then loop stream else () end;
1.89 +
1.90 +end;
1.91 +
1.92 +
1.93 (* init *)
1.94
1.95 fun init in_fifo out_fifo =
1.96 @@ -105,10 +177,8 @@
1.97 val _ = quick_and_dirty := true; (* FIXME !? *)
1.98 val _ = Keyword.status ();
1.99 val _ = Output.status (Markup.markup Markup.ready "");
1.100 - val _ =
1.101 - Simple_Thread.fork false (fn () =>
1.102 - (Isar.toplevel_loop in_stream {init = true, welcome = false, sync = true, secure = true};
1.103 - quit ()));
1.104 + val _ = Context.set_thread_data NONE;
1.105 + val _ = Simple_Thread.fork false (fn () => (loop in_stream; quit ()));
1.106 in () end;
1.107
1.108 end;
2.1 --- a/src/Pure/System/isabelle_process.scala Wed Aug 11 00:42:40 2010 +0200
2.2 +++ b/src/Pure/System/isabelle_process.scala Wed Aug 11 00:44:48 2010 +0200
2.3 @@ -9,7 +9,7 @@
2.4
2.5 import java.util.concurrent.LinkedBlockingQueue
2.6 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
2.7 - InputStream, OutputStream, IOException}
2.8 + InputStream, OutputStream, BufferedOutputStream, IOException}
2.9
2.10 import scala.actors.Actor
2.11 import Actor._
2.12 @@ -89,9 +89,8 @@
2.13
2.14 /* process information */
2.15
2.16 - @volatile private var proc: Process = null
2.17 - @volatile private var closing = false
2.18 - @volatile private var pid: String = null
2.19 + @volatile private var proc: Option[Process] = None
2.20 + @volatile private var pid: Option[String] = None
2.21
2.22
2.23 /* results */
2.24 @@ -99,7 +98,7 @@
2.25 private def put_result(kind: String, props: List[(String, String)], body: List[XML.Tree])
2.26 {
2.27 if (kind == Markup.INIT) {
2.28 - for ((Markup.PID, p) <- props) pid = p
2.29 + for ((Markup.PID, p) <- props) pid = Some(p)
2.30 }
2.31 receiver ! new Result(XML.Elem(Markup(kind, props), body))
2.32 }
2.33 @@ -112,42 +111,49 @@
2.34
2.35 /* signals */
2.36
2.37 - def interrupt() = synchronized { // FIXME avoid synchronized
2.38 - if (proc == null) error("Cannot interrupt Isabelle: no process")
2.39 - if (pid == null) put_result(Markup.SYSTEM, "Cannot interrupt: unknown pid")
2.40 - else {
2.41 - try {
2.42 - if (system.execute(true, "kill", "-INT", pid).waitFor == 0)
2.43 - put_result(Markup.SIGNAL, "INT")
2.44 - else
2.45 - put_result(Markup.SYSTEM, "Cannot interrupt: kill command failed")
2.46 + def interrupt()
2.47 + {
2.48 + if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
2.49 + else
2.50 + pid match {
2.51 + case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
2.52 + case Some(i) =>
2.53 + try {
2.54 + if (system.execute(true, "kill", "-INT", i).waitFor == 0)
2.55 + put_result(Markup.SIGNAL, "INT")
2.56 + else
2.57 + put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
2.58 + }
2.59 + catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
2.60 }
2.61 - catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
2.62 + }
2.63 +
2.64 + def kill()
2.65 + {
2.66 + proc match {
2.67 + case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
2.68 + case Some(p) =>
2.69 + close()
2.70 + Thread.sleep(500) // FIXME !?
2.71 + put_result(Markup.SIGNAL, "KILL")
2.72 + p.destroy
2.73 + proc = None
2.74 + pid = None
2.75 }
2.76 }
2.77
2.78 - def kill() = synchronized { // FIXME avoid synchronized
2.79 - if (proc == 0) error("Cannot kill Isabelle: no process")
2.80 - else {
2.81 - try_close()
2.82 - Thread.sleep(500) // FIXME property!?
2.83 - put_result(Markup.SIGNAL, "KILL")
2.84 - proc.destroy
2.85 - proc = null
2.86 - pid = null
2.87 - }
2.88 - }
2.89 -
2.90
2.91
2.92 /** stream actors **/
2.93
2.94 - /* input */
2.95 -
2.96 - case class Input(cmd: String)
2.97 + case class Input_Text(text: String)
2.98 + case class Input_Chunks(chunks: List[Array[Byte]])
2.99 case object Close
2.100
2.101 - private def input_actor(name: String, kind: String, stream: => OutputStream): Actor =
2.102 +
2.103 + /* raw stdin */
2.104 +
2.105 + private def stdin_actor(name: String, stream: OutputStream): Actor =
2.106 Library.thread_actor(name) {
2.107 val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
2.108 var finished = false
2.109 @@ -155,8 +161,8 @@
2.110 try {
2.111 //{{{
2.112 receive {
2.113 - case Input(text) =>
2.114 - put_result(kind, text)
2.115 + case Input_Text(text) =>
2.116 + // FIXME echo input?!
2.117 writer.write(text)
2.118 writer.flush
2.119 case Close =>
2.120 @@ -174,9 +180,9 @@
2.121 }
2.122
2.123
2.124 - /* raw output */
2.125 + /* raw stdout */
2.126
2.127 - private def output_actor(name: String, kind: String, stream: => InputStream): Actor =
2.128 + private def stdout_actor(name: String, stream: InputStream): Actor =
2.129 Library.thread_actor(name) {
2.130 val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
2.131 var result = new StringBuilder(100)
2.132 @@ -193,13 +199,43 @@
2.133 else done = true
2.134 }
2.135 if (result.length > 0) {
2.136 - put_result(kind, result.toString)
2.137 + put_result(Markup.STDOUT, result.toString)
2.138 result.length = 0
2.139 }
2.140 else {
2.141 reader.close
2.142 finished = true
2.143 - try_close()
2.144 + close()
2.145 + }
2.146 + //}}}
2.147 + }
2.148 + catch {
2.149 + case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
2.150 + }
2.151 + }
2.152 + put_result(Markup.SYSTEM, name + " terminated")
2.153 + }
2.154 +
2.155 +
2.156 + /* command input */
2.157 +
2.158 + private def input_actor(name: String, raw_stream: OutputStream): Actor =
2.159 + Library.thread_actor(name) {
2.160 + val stream = new BufferedOutputStream(raw_stream)
2.161 + var finished = false
2.162 + while (!finished) {
2.163 + try {
2.164 + //{{{
2.165 + receive {
2.166 + case Input_Chunks(chunks) =>
2.167 + stream.write(Standard_System.string_bytes(
2.168 + chunks.map(_.length).mkString("", ",", "\n")));
2.169 + chunks.foreach(stream.write(_));
2.170 + stream.flush
2.171 + case Close =>
2.172 + stream.close
2.173 + finished = true
2.174 + case bad => System.err.println(name + ": ignoring bad message " + bad)
2.175 }
2.176 //}}}
2.177 }
2.178 @@ -281,7 +317,7 @@
2.179 }
2.180 } while (c != -1)
2.181 stream.close
2.182 - try_close()
2.183 + close()
2.184
2.185 put_result(Markup.SYSTEM, name + " terminated")
2.186 }
2.187 @@ -299,7 +335,7 @@
2.188 try {
2.189 val cmdline =
2.190 List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
2.191 - proc = system.execute(true, cmdline: _*)
2.192 + proc = Some(system.execute(true, cmdline: _*))
2.193 }
2.194 catch {
2.195 case e: IOException =>
2.196 @@ -308,49 +344,37 @@
2.197 }
2.198
2.199
2.200 + /* I/O actors */
2.201 +
2.202 + private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
2.203 + stdout_actor("standard_output", proc.get.getInputStream)
2.204 +
2.205 + private val command_input = input_actor("command_input", system.fifo_output_stream(in_fifo))
2.206 + message_actor("message_output", system.fifo_input_stream(out_fifo))
2.207 +
2.208 +
2.209 /* exit process */
2.210
2.211 Library.thread_actor("process_exit") {
2.212 - val rc = proc.waitFor()
2.213 - Thread.sleep(300) // FIXME property!?
2.214 - put_result(Markup.SYSTEM, "process_exit terminated")
2.215 - put_result(Markup.EXIT, rc.toString)
2.216 + proc match {
2.217 + case None =>
2.218 + case Some(p) =>
2.219 + val rc = p.waitFor()
2.220 + Thread.sleep(300) // FIXME property!?
2.221 + put_result(Markup.SYSTEM, "process_exit terminated")
2.222 + put_result(Markup.EXIT, rc.toString)
2.223 + }
2.224 rm_fifos()
2.225 }
2.226
2.227
2.228 - /* I/O actors */
2.229 -
2.230 - private val standard_input =
2.231 - input_actor("standard_input", Markup.STDIN, proc.getOutputStream)
2.232 -
2.233 - private val command_input =
2.234 - input_actor("command_input", Markup.INPUT, system.fifo_output_stream(in_fifo))
2.235 -
2.236 - output_actor("standard_output", Markup.STDOUT, proc.getInputStream)
2.237 - message_actor("message_output", system.fifo_input_stream(out_fifo))
2.238 -
2.239 -
2.240
2.241 /** main methods **/
2.242
2.243 - def input_raw(text: String) = standard_input ! Input(text)
2.244 + def input_raw(text: String): Unit = standard_input ! Input_Text(text)
2.245
2.246 - def input(text: String) = synchronized { // FIXME avoid synchronized
2.247 - if (proc == null) error("Cannot output to Isabelle: no process")
2.248 - if (closing) error("Cannot output to Isabelle: already closing")
2.249 - command_input ! Input(" \\<^sync>\n; " + text + " \\<^sync>;\n")
2.250 - }
2.251 + def input(name: String, args: String*): Unit =
2.252 + command_input ! Input_Chunks((name :: args.toList).map(Standard_System.string_bytes))
2.253
2.254 - def close() = synchronized { // FIXME avoid synchronized
2.255 - command_input ! Close
2.256 - closing = true
2.257 - }
2.258 -
2.259 - def try_close() = synchronized {
2.260 - if (proc != null && !closing) {
2.261 - try { close() }
2.262 - catch { case _: RuntimeException => }
2.263 - }
2.264 - }
2.265 + def close(): Unit = command_input ! Close
2.266 }