1 /* Title: Pure/System/isabelle_process.scala
3 Options: :folding=explicit:collapseFolds=1:
5 Isabelle process management -- always reactive due to multi-threaded I/O.
10 import java.util.concurrent.LinkedBlockingQueue
11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
12 InputStream, OutputStream, BufferedOutputStream, IOException}
14 import scala.actors.Actor
18 object Isabelle_Process
24 val message_markup = Map(
25 ('A' : Int) -> Markup.INIT,
26 ('B' : Int) -> Markup.STATUS,
27 ('C' : Int) -> Markup.REPORT,
28 ('D' : Int) -> Markup.WRITELN,
29 ('E' : Int) -> Markup.TRACING,
30 ('F' : Int) -> Markup.WARNING,
31 ('G' : Int) -> Markup.ERROR)
34 class Result(val message: XML.Elem)
36 def kind = message.markup.name
37 def properties = message.markup.properties
38 def body = message.body
40 def is_init = kind == Markup.INIT
41 def is_exit = kind == Markup.EXIT
42 def is_stdout = kind == Markup.STDOUT
43 def is_system = kind == Markup.SYSTEM
44 def is_status = kind == Markup.STATUS
45 def is_report = kind == Markup.REPORT
46 def is_ready = Isar_Document.is_ready(message)
47 def is_syslog = is_init || is_exit || is_system || is_ready
49 override def toString: String =
52 if (is_status || is_report) message.body.map(_.toString).mkString
53 else Pretty.string_of(message.body)
54 if (properties.isEmpty)
55 kind.toString + " [[" + res + "]]"
58 (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
64 class Isabelle_Process(system: Isabelle_System, timeout: Time, receiver: Actor, args: String*)
66 import Isabelle_Process._
69 /* demo constructor */
71 def this(args: String*) =
72 this(Isabelle_System.default, Time.seconds(10),
73 actor { loop { react { case res => Console.println(res) } } }, args: _*)
78 private def system_result(text: String)
80 receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
83 private val xml_cache = new XML.Cache(131071)
85 private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
87 if (kind == Markup.INIT) rm_fifos()
88 val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
89 xml_cache.cache_tree(msg)((message: XML.Tree) =>
90 receiver ! new Result(message.asInstanceOf[XML.Elem]))
93 private def put_result(kind: String, text: String)
95 put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
101 private case class Input_Text(text: String)
102 private case class Input_Chunks(chunks: List[Array[Byte]])
104 private case object Close
105 private def close(p: (Thread, Actor))
107 if (p != null && p._1.isAlive) {
113 @volatile private var standard_input: (Thread, Actor) = null
114 @volatile private var command_input: (Thread, Actor) = null
117 /** process manager **/
119 private val in_fifo = system.mk_fifo()
120 private val out_fifo = system.mk_fifo()
121 private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
123 private val process =
126 List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
127 new system.Managed_Process(true, cmdline: _*)
129 catch { case e: IOException => rm_fifos(); throw(e) }
132 Simple_Thread.future("process_result") { process.join }
134 private def terminate_process()
136 try { process.terminate }
137 catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
140 private val process_manager = Simple_Thread.fork("process_manager")
142 val (startup_failed, startup_output) =
144 val expired = System.currentTimeMillis() + timeout.ms
145 val result = new StringBuilder(100)
147 var finished: Option[Boolean] = None
148 while (finished.isEmpty && System.currentTimeMillis() <= expired) {
149 while (finished.isEmpty && process.stdout.ready) {
150 val c = process.stdout.read
151 if (c == 2) finished = Some(true)
152 else result += c.toChar
154 if (process_result.is_finished) finished = Some(false)
155 else Thread.sleep(10)
157 (finished.isEmpty || !finished.get, result.toString.trim)
159 system_result(startup_output)
161 if (startup_failed) {
162 put_result(Markup.EXIT, "Return code: 127")
170 val command_stream = system.fifo_output_stream(in_fifo)
171 val message_stream = system.fifo_input_stream(out_fifo)
173 standard_input = stdin_actor()
174 val stdout = stdout_actor()
175 command_input = input_actor(command_stream)
176 val message = message_actor(message_stream)
178 val rc = process_result.join
179 system_result("process terminated")
180 for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
181 system_result("process_manager terminated")
182 put_result(Markup.EXIT, "Return code: " + rc.toString)
188 /* management methods */
190 def join() { process_manager.join() }
194 try { process.interrupt }
195 catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
201 system_result("Terminating Isabelle process")
207 /** stream actors **/
211 private def stdin_actor(): (Thread, Actor) =
213 val name = "standard_input"
214 Simple_Thread.actor(name) {
220 case Input_Text(text) =>
221 process.stdin.write(text)
226 case bad => System.err.println(name + ": ignoring bad message " + bad)
230 catch { case e: IOException => system_result(name + ": " + e.getMessage) }
232 system_result(name + " terminated")
239 private def stdout_actor(): (Thread, Actor) =
241 val name = "standard_output"
242 Simple_Thread.actor(name) {
243 var result = new StringBuilder(100)
251 while (!done && (result.length == 0 || process.stdout.ready)) {
252 c = process.stdout.read
253 if (c >= 0) result.append(c.asInstanceOf[Char])
256 if (result.length > 0) {
257 put_result(Markup.STDOUT, result.toString)
266 catch { case e: IOException => system_result(name + ": " + e.getMessage) }
268 system_result(name + " terminated")
275 private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
277 val name = "command_input"
278 Simple_Thread.actor(name) {
279 val stream = new BufferedOutputStream(raw_stream)
285 case Input_Chunks(chunks) =>
286 stream.write(Standard_System.string_bytes(
287 chunks.map(_.length).mkString("", ",", "\n")));
288 chunks.foreach(stream.write(_));
293 case bad => System.err.println(name + ": ignoring bad message " + bad)
297 catch { case e: IOException => system_result(name + ": " + e.getMessage) }
299 system_result(name + " terminated")
306 private def message_actor(stream: InputStream): (Thread, Actor) =
308 class EOF extends Exception
309 class Protocol_Error(msg: String) extends Exception(msg)
311 val name = "message_output"
312 Simple_Thread.actor(name) {
313 val default_buffer = new Array[Byte](65536)
316 def read_chunk(): XML.Body =
322 if (c == -1) throw new EOF
323 while (48 <= c && c <= 57) {
324 n = 10 * n + (c - 48)
327 if (c != 10) throw new Protocol_Error("bad message chunk header")
331 if (n <= default_buffer.size) default_buffer
332 else new Array[Byte](n)
337 m = stream.read(buf, i, n - i)
339 } while (m != -1 && n > i)
341 if (i != n) throw new Protocol_Error("bad message chunk content")
343 YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
349 val header = read_chunk()
350 val body = read_chunk()
352 case List(XML.Elem(Markup(name, props), Nil))
353 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
354 put_result(Kind.message_markup(name(0)), props, body)
355 case _ => throw new Protocol_Error("bad header: " + header.toString)
359 case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
360 case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
366 system_result(name + " terminated")
373 def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
375 def input_bytes(name: String, args: Array[Byte]*): Unit =
376 command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
378 def input(name: String, args: String*): Unit =
379 input_bytes(name, args.map(Standard_System.string_bytes): _*)
381 def close(): Unit = { close(command_input); close(standard_input) }