1 /* Title: Pure/System/isabelle_process.ML
3 Options: :folding=explicit:collapseFolds=1:
5 Isabelle process management -- always reactive due to multi-threaded I/O.
10 import java.util.concurrent.LinkedBlockingQueue
11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
12 InputStream, OutputStream, BufferedOutputStream, IOException}
14 import scala.actors.Actor
18 object Isabelle_Process
25 ('A' : Int) -> Markup.INIT,
26 ('B' : Int) -> Markup.STATUS,
27 ('C' : Int) -> Markup.REPORT,
28 ('D' : Int) -> Markup.WRITELN,
29 ('E' : Int) -> Markup.TRACING,
30 ('F' : Int) -> Markup.WARNING,
31 ('G' : Int) -> Markup.ERROR)
32 def is_raw(kind: String) =
34 def is_control(kind: String) =
35 kind == Markup.SYSTEM ||
36 kind == Markup.SIGNAL ||
38 def is_system(kind: String) =
39 kind == Markup.SYSTEM ||
40 kind == Markup.INPUT ||
41 kind == Markup.STDIN ||
42 kind == Markup.SIGNAL ||
43 kind == Markup.EXIT ||
47 class Result(val message: XML.Elem)
49 def kind = message.markup.name
50 def properties = message.markup.properties
51 def body = message.body
53 def is_raw = Kind.is_raw(kind)
54 def is_control = Kind.is_control(kind)
55 def is_system = Kind.is_system(kind)
56 def is_status = kind == Markup.STATUS
57 def is_report = kind == Markup.REPORT
58 def is_ready = is_status && body == List(XML.Elem(Markup.Ready, Nil))
60 override def toString: String =
63 if (is_status || is_report) message.body.map(_.toString).mkString
64 else Pretty.string_of(message.body)
65 if (properties.isEmpty)
66 kind.toString + " [[" + res + "]]"
69 (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
75 class Isabelle_Process(system: Isabelle_System, receiver: Actor, args: String*)
77 import Isabelle_Process._
80 /* demo constructor */
82 def this(args: String*) =
83 this(new Isabelle_System,
84 actor { loop { react { case res => Console.println(res) } } }, args: _*)
87 /* process information */
89 @volatile private var proc: Option[Process] = None
90 @volatile private var pid: Option[String] = None
95 private val xml_cache = new XML.Cache(131071)
97 private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
99 if (pid.isEmpty && kind == Markup.INIT)
100 pid = props.find(_._1 == Markup.PID).map(_._1)
102 val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
103 xml_cache.cache_tree(msg)((message: XML.Tree) =>
104 receiver ! new Result(message.asInstanceOf[XML.Elem]))
107 private def put_result(kind: String, text: String)
109 put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
117 if (proc.isEmpty) put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: no process")
120 case None => put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: unknowd pid")
123 if (system.execute(true, "kill", "-INT", i).waitFor == 0)
124 put_result(Markup.SIGNAL, "INT")
126 put_result(Markup.SYSTEM, "Cannot interrupt Isabelle: kill command failed")
128 catch { case e: IOException => error("Cannot interrupt Isabelle: " + e.getMessage) }
135 case None => put_result(Markup.SYSTEM, "Cannot kill Isabelle: no process")
138 Thread.sleep(500) // FIXME !?
139 put_result(Markup.SIGNAL, "KILL")
148 /** stream actors **/
150 private case class Input_Text(text: String)
151 private case class Input_Chunks(chunks: List[Array[Byte]])
152 private case object Close
157 private def stdin_actor(name: String, stream: OutputStream): Actor =
158 Simple_Thread.actor(name) {
159 val writer = new BufferedWriter(new OutputStreamWriter(stream, Standard_System.charset))
165 case Input_Text(text) =>
171 case bad => System.err.println(name + ": ignoring bad message " + bad)
176 case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
179 put_result(Markup.SYSTEM, name + " terminated")
185 private def stdout_actor(name: String, stream: InputStream): Actor =
186 Simple_Thread.actor(name) {
187 val reader = new BufferedReader(new InputStreamReader(stream, Standard_System.charset))
188 var result = new StringBuilder(100)
196 while (!done && (result.length == 0 || reader.ready)) {
198 if (c >= 0) result.append(c.asInstanceOf[Char])
201 if (result.length > 0) {
202 put_result(Markup.STDOUT, result.toString)
213 case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
216 put_result(Markup.SYSTEM, name + " terminated")
222 private def input_actor(name: String, fifo: String): Actor =
223 Simple_Thread.actor(name) {
224 val stream = new BufferedOutputStream(system.fifo_output_stream(fifo)) // FIXME potentially blocking forever
230 case Input_Chunks(chunks) =>
231 stream.write(Standard_System.string_bytes(
232 chunks.map(_.length).mkString("", ",", "\n")));
233 chunks.foreach(stream.write(_));
238 case bad => System.err.println(name + ": ignoring bad message " + bad)
243 case e: IOException => put_result(Markup.SYSTEM, name + ": " + e.getMessage)
246 put_result(Markup.SYSTEM, name + " terminated")
252 private class Protocol_Error(msg: String) extends Exception(msg)
254 private def message_actor(name: String, fifo: String): Actor =
255 Simple_Thread.actor(name) {
256 val stream = system.fifo_input_stream(fifo) // FIXME potentially blocking forever
257 val default_buffer = new Array[Byte](65536)
260 def read_chunk(): XML.Body =
266 while (48 <= c && c <= 57) {
267 n = 10 * n + (c - 48)
270 if (c != 10) throw new Protocol_Error("bad message chunk header")
274 if (n <= default_buffer.size) default_buffer
275 else new Array[Byte](n)
280 m = stream.read(buf, i, n - i)
282 } while (m > 0 && n > i)
284 if (i != n) throw new Protocol_Error("bad message chunk content")
286 YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
292 val header = read_chunk()
293 val body = read_chunk()
295 case List(XML.Elem(Markup(name, props), Nil))
296 if name.size == 1 && Kind.markup.isDefinedAt(name(0)) =>
297 put_result(Kind.markup(name(0)), props, body)
298 case _ => throw new Protocol_Error("bad header: " + header.toString)
302 case e: IOException =>
303 put_result(Markup.SYSTEM, "Cannot read message:\n" + e.getMessage)
304 case e: Protocol_Error =>
305 put_result(Markup.SYSTEM, "Malformed message:\n" + e.getMessage)
311 put_result(Markup.SYSTEM, name + " terminated")
320 private val in_fifo = system.mk_fifo()
321 private val out_fifo = system.mk_fifo()
322 private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
326 List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
327 proc = Some(system.execute(true, cmdline: _*))
329 catch { case e: IOException => rm_fifos(); throw(e) }
334 private val command_input = input_actor("command_input", in_fifo)
335 message_actor("message_output", out_fifo)
337 private val standard_input = stdin_actor("standard_input", proc.get.getOutputStream)
338 stdout_actor("standard_output", proc.get.getInputStream)
343 Simple_Thread.actor("process_exit") {
348 Thread.sleep(300) // FIXME property!?
349 put_result(Markup.SYSTEM, "Isabelle process terminated")
350 put_result(Markup.EXIT, rc.toString)
359 def input_raw(text: String): Unit = standard_input ! Input_Text(text)
361 def input_bytes(name: String, args: Array[Byte]*): Unit =
362 command_input ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
364 def input(name: String, args: String*): Unit =
365 input_bytes(name, args.map(Standard_System.string_bytes): _*)
367 def close(): Unit = { standard_input ! Close; command_input ! Close }