slightly more robust Isabelle_Process startup -- NB: openening fifo streams synchronizes with other end, which may fail to reach that point;
1 /* Title: Pure/System/isabelle_process.ML
3 Options: :folding=explicit:collapseFolds=1:
5 Isabelle process management -- always reactive due to multi-threaded I/O.
10 import java.util.concurrent.LinkedBlockingQueue
11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
12 InputStream, OutputStream, BufferedOutputStream, IOException}
14 import scala.actors.Actor
18 object Isabelle_Process
25 ('A' : Int) -> Markup.INIT,
26 ('B' : Int) -> Markup.STATUS,
27 ('C' : Int) -> Markup.REPORT,
28 ('D' : Int) -> Markup.WRITELN,
29 ('E' : Int) -> Markup.TRACING,
30 ('F' : Int) -> Markup.WARNING,
31 ('G' : Int) -> Markup.ERROR)
32 def is_raw(kind: String) =
34 def is_control(kind: String) =
35 kind == Markup.SYSTEM ||
36 kind == Markup.SIGNAL ||
38 def is_system(kind: String) =
39 kind == Markup.SYSTEM ||
40 kind == Markup.INPUT ||
41 kind == Markup.STDIN ||
42 kind == Markup.SIGNAL ||
43 kind == Markup.EXIT ||
47 class Result(val message: XML.Elem)
49 def kind = message.markup.name
50 def properties = message.markup.properties
51 def body = message.body
53 def is_raw = Kind.is_raw(kind)
54 def is_control = Kind.is_control(kind)
55 def is_system = Kind.is_system(kind)
56 def is_status = kind == Markup.STATUS
57 def is_report = kind == Markup.REPORT
58 def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
60 override def toString: String =
63 if (is_status || is_report) message.body.map(_.toString).mkString
64 else Pretty.string_of(message.body)
65 if (properties.isEmpty)
66 kind.toString + " [[" + res + "]]"
69 (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
75 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
77 import Isabelle_Process._
80 /* demo constructor */
82 def this(args: String*) =
83 this(new Isabelle_System,
84 actor { loop { react { case res => Console.println(res) } } }, args: _*)
87 /* process information */
89 @volatile private var proc: Option[Process] = None
90 @volatile private var pid: Option[String] = None
95 private val xml_cache = new XML.Cache(131071)
97 private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
99 if (pid.isEmpty && kind == Markup.INIT)
100 pid = props.find(_._1 == Markup.PID).map(_._1)
102 val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
103 xml_cache.cache_tree(msg)((message: XML.Tree) =>
104 receiver ! new Result(message.asInstanceOf[XML.Elem]))
107 private def put_result(kind: String, text: String)
109 put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
117 if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
120 case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
123 if (system.execute(true, "kill", "-INT", i).waitFor == 0)
124 put_result(Markup.SIGNAL, "INT")
126 put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
128 catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
135 case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
138 Thread.sleep(500) // FIXME !?
139 put_result(Markup.SIGNAL, "KILL")
148 /** stream actors **/
150 private val in_fifo = system.mk_fifo()
151 private val out_fifo = system.mk_fifo()
152 private def rm_fifos() = { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
154 private case class Input_Text(text: String)
155 private case class Input_Chunks(chunks: List[Array[Byte]])
156 private case object Close
161 private def stdin_actor(name: String, stream: OutputStream): Actor =
162 Simple_Thread.actor(name) {
163 val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
169 case Input_Text(text) =>
170 // FIXME echo input?!
176 case bad => System.err.println(name + ": ignoring bad message " + bad)
181 case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
184 put_result(Markup.SYSTEM, name + " terminated")
190 private def stdout_actor(name: String, stream: InputStream): Actor =
191 Simple_Thread.actor(name) {
192 val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
193 var result = new StringBuilder(100)
201 while (!done && (result.length == 0 || reader.ready)) {
203 if (c >= 0) result.append(c.asInstanceOf[Char])
206 if (result.length > 0) {
207 put_result(Markup.STDOUT, result.toString)
218 case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
221 put_result(Markup.SYSTEM, name + " terminated")
227 private def input_actor(name: String): Actor =
228 Simple_Thread.actor(name) {
229 val stream = new BufferedOutputStream(system.fifo_output_stream(in_fifo)) // FIXME potentially blocking forever
235 case Input_Chunks(chunks) =>
236 stream.write(Standard_System.string_bytes(
237 chunks.map(_.length).mkString("", ",", "\n")));
238 chunks.foreach(stream.write(_));
243 case bad => System.err.println(name + ": ignoring bad message " + bad)
248 case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
251 put_result(Markup.SYSTEM, name + " terminated")
257 private class Protocol_Error(msg: String) extends Exception(msg)
259 private def message_actor(name: String): Actor =
260 Simple_Thread.actor(name) {
261 val stream = system.fifo_input_stream(out_fifo) // FIXME potentially blocking forever
262 val default_buffer = new Array[Byte](65536)
265 def read_chunk(): XML.Body =
271 while (48 <= c && c <= 57) {
272 n = 10 * n + (c - 48)
275 if (c != 10) throw new Protocol_Error("bad message chunk header")
279 if (n <= default_buffer.size) default_buffer
280 else new Array[Byte](n)
285 m = stream.read(buf, i, n - i)
287 } while (m > 0 && n > i)
289 if (i != n) throw new Protocol_Error("bad message chunk content")
291 YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
297 val header = read_chunk()
298 val body = read_chunk()
300 case List(XML.Elem(Markup(name, props), Nil))
301 if name.size == 1 && Kind.markup.isDefinedAt(name(0)) =>
302 put_result(Kind.markup(name(0)), props, body)
303 case _ => throw new Protocol_Error("bad header: " + header.toString)
307 case e: IOException =>
308 put_result(Markup.SYSTEM, "Cannot read message:\n" + e.getMessage)
309 case e: Protocol_Error =>
310 put_result(Markup.SYSTEM, "Malformed message:\n" + e.getMessage)
316 put_result(Markup.SYSTEM, name + " terminated")
327 List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
328 proc = Some(system.execute(true, cmdline: _*))
331 case e: IOException =>
333 error("Failed to execute Isabelle process: " + e.getMessage)
339 private val command_input = input_actor("command_input")
340 message_actor("message_output")
342 private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
343 stdout_actor("standard_output", proc.get.getInputStream)
348 Simple_Thread.actor("process_exit") {
353 Thread.sleep(300) // FIXME property!?
354 put_result(Markup.SYSTEM, "process_exit terminated")
355 put_result(Markup.EXIT, rc.toString)
364 def input_raw(text: String): Unit = standard_input ! Input_Text(text)
366 def input_bytes(name: String, args: Array[Byte]*): Unit =
367 command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
369 def input(name: String, args: String*): Unit =
370 input_bytes(name, args.map(Standard_System.string_bytes): _*)
372 def close(): Unit = command_input ! Close