simplified Symbol based on lazy Symbol.Interpretation -- reduced odd "functorial style";
tuned implicit build/init messages;
1 /* Title: Pure/System/isabelle_process.scala
3 Options: :folding=explicit:collapseFolds=1:
5 Isabelle process management -- always reactive due to multi-threaded I/O.
10 import java.lang.System
11 import java.util.concurrent.LinkedBlockingQueue
12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
13 InputStream, OutputStream, BufferedOutputStream, IOException}
15 import scala.actors.Actor
19 object Isabelle_Process
25 val message_markup = Map(
26 ('A' : Int) -> Markup.INIT,
27 ('B' : Int) -> Markup.STATUS,
28 ('C' : Int) -> Markup.REPORT,
29 ('D' : Int) -> Markup.WRITELN,
30 ('E' : Int) -> Markup.TRACING,
31 ('F' : Int) -> Markup.WARNING,
32 ('G' : Int) -> Markup.ERROR)
35 class Result(val message: XML.Elem)
37 def kind = message.markup.name
38 def properties = message.markup.properties
39 def body = message.body
41 def is_init = kind == Markup.INIT
42 def is_exit = kind == Markup.EXIT
43 def is_stdout = kind == Markup.STDOUT
44 def is_system = kind == Markup.SYSTEM
45 def is_status = kind == Markup.STATUS
46 def is_report = kind == Markup.REPORT
47 def is_ready = Isar_Document.is_ready(message)
48 def is_syslog = is_init || is_exit || is_system || is_ready
50 override def toString: String =
53 if (is_status || is_report) message.body.map(_.toString).mkString
54 else Pretty.string_of(message.body)
55 if (properties.isEmpty)
56 kind.toString + " [[" + res + "]]"
59 (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
65 class Isabelle_Process(timeout: Time, receiver: Actor, args: String*)
67 import Isabelle_Process._
70 /* demo constructor */
72 def this(args: String*) =
73 this(Time.seconds(10), actor { loop { react { case res => Console.println(res) } } }, args: _*)
78 private def system_result(text: String)
80 receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
83 private val xml_cache = new XML.Cache(131071)
85 private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
87 if (kind == Markup.INIT) rm_fifos()
88 val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
89 xml_cache.cache_tree(msg)((message: XML.Tree) =>
90 receiver ! new Result(message.asInstanceOf[XML.Elem]))
93 private def put_result(kind: String, text: String)
95 put_result(kind, Nil, List(XML.Text(Symbol.decode(text))))
101 private case class Input_Text(text: String)
102 private case class Input_Chunks(chunks: List[Array[Byte]])
104 private case object Close
105 private def close(p: (Thread, Actor))
107 if (p != null && p._1.isAlive) {
113 @volatile private var standard_input: (Thread, Actor) = null
114 @volatile private var command_input: (Thread, Actor) = null
117 /** process manager **/
119 private val in_fifo = Isabelle_System.mk_fifo()
120 private val out_fifo = Isabelle_System.mk_fifo()
121 private def rm_fifos() { Isabelle_System.rm_fifo(in_fifo); Isabelle_System.rm_fifo(out_fifo) }
123 private val process =
126 List(Isabelle_System.getenv_strict("ISABELLE_PROCESS"), "-W",
127 in_fifo + ":" + out_fifo) ++ args
128 new Isabelle_System.Managed_Process(true, cmdline: _*)
130 catch { case e: IOException => rm_fifos(); throw(e) }
133 Simple_Thread.future("process_result") { process.join }
135 private def terminate_process()
137 try { process.terminate }
138 catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
141 private val process_manager = Simple_Thread.fork("process_manager")
143 val (startup_failed, startup_output) =
145 val expired = System.currentTimeMillis() + timeout.ms
146 val result = new StringBuilder(100)
148 var finished: Option[Boolean] = None
149 while (finished.isEmpty && System.currentTimeMillis() <= expired) {
150 while (finished.isEmpty && process.stdout.ready) {
151 val c = process.stdout.read
152 if (c == 2) finished = Some(true)
153 else result += c.toChar
155 if (process_result.is_finished) finished = Some(false)
156 else Thread.sleep(10)
158 (finished.isEmpty || !finished.get, result.toString.trim)
160 system_result(startup_output)
162 if (startup_failed) {
163 put_result(Markup.EXIT, "Return code: 127")
171 val command_stream = Isabelle_System.fifo_output_stream(in_fifo)
172 val message_stream = Isabelle_System.fifo_input_stream(out_fifo)
174 standard_input = stdin_actor()
175 val stdout = stdout_actor()
176 command_input = input_actor(command_stream)
177 val message = message_actor(message_stream)
179 val rc = process_result.join
180 system_result("process terminated")
181 for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
182 system_result("process_manager terminated")
183 put_result(Markup.EXIT, "Return code: " + rc.toString)
189 /* management methods */
191 def join() { process_manager.join() }
195 try { process.interrupt }
196 catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
202 system_result("Terminating Isabelle process")
208 /** stream actors **/
212 private def stdin_actor(): (Thread, Actor) =
214 val name = "standard_input"
215 Simple_Thread.actor(name) {
221 case Input_Text(text) =>
222 process.stdin.write(text)
227 case bad => System.err.println(name + ": ignoring bad message " + bad)
231 catch { case e: IOException => system_result(name + ": " + e.getMessage) }
233 system_result(name + " terminated")
240 private def stdout_actor(): (Thread, Actor) =
242 val name = "standard_output"
243 Simple_Thread.actor(name) {
244 var result = new StringBuilder(100)
252 while (!done && (result.length == 0 || process.stdout.ready)) {
253 c = process.stdout.read
254 if (c >= 0) result.append(c.asInstanceOf[Char])
257 if (result.length > 0) {
258 put_result(Markup.STDOUT, result.toString)
267 catch { case e: IOException => system_result(name + ": " + e.getMessage) }
269 system_result(name + " terminated")
276 private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
278 val name = "command_input"
279 Simple_Thread.actor(name) {
280 val stream = new BufferedOutputStream(raw_stream)
286 case Input_Chunks(chunks) =>
287 stream.write(Standard_System.string_bytes(
288 chunks.map(_.length).mkString("", ",", "\n")));
289 chunks.foreach(stream.write(_));
294 case bad => System.err.println(name + ": ignoring bad message " + bad)
298 catch { case e: IOException => system_result(name + ": " + e.getMessage) }
300 system_result(name + " terminated")
307 private def message_actor(stream: InputStream): (Thread, Actor) =
309 class EOF extends Exception
310 class Protocol_Error(msg: String) extends Exception(msg)
312 val name = "message_output"
313 Simple_Thread.actor(name) {
314 val default_buffer = new Array[Byte](65536)
317 def read_chunk(): XML.Body =
323 if (c == -1) throw new EOF
324 while (48 <= c && c <= 57) {
325 n = 10 * n + (c - 48)
328 if (c != 10) throw new Protocol_Error("bad message chunk header")
332 if (n <= default_buffer.size) default_buffer
333 else new Array[Byte](n)
338 m = stream.read(buf, i, n - i)
340 } while (m != -1 && n > i)
342 if (i != n) throw new Protocol_Error("bad message chunk content")
344 YXML.parse_body_failsafe(YXML.decode_chars(Symbol.decode, buf, 0, n))
350 val header = read_chunk()
351 val body = read_chunk()
353 case List(XML.Elem(Markup(name, props), Nil))
354 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
355 put_result(Kind.message_markup(name(0)), props, body)
356 case _ => throw new Protocol_Error("bad header: " + header.toString)
360 case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
361 case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
367 system_result(name + " terminated")
374 def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
376 def input_bytes(name: String, args: Array[Byte]*): Unit =
377 command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
379 def input(name: String, args: String*): Unit =
380 input_bytes(name, args.map(Standard_System.string_bytes): _*)
382 def close(): Unit = { close(command_input); close(standard_input) }