src/Pure/System/isabelle_process.scala
author wenzelm
Wed, 22 Jun 2011 21:35:48 +0200
changeset 44385 45cf8d5e109a
parent 44163 446e6621762d
child 44400 cec9b95fa35d
permissions -rw-r--r--
lazy Isabelle_System.default supports implicit boot;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.util.concurrent.LinkedBlockingQueue
    11 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    12   InputStream, OutputStream, BufferedOutputStream, IOException}
    13 
    14 import scala.actors.Actor
    15 import Actor._
    16 
    17 
    18 object Isabelle_Process
    19 {
    20   /* results */
    21 
    22   object Kind
    23   {
    24     val message_markup = Map(
    25       ('A' : Int) -> Markup.INIT,
    26       ('B' : Int) -> Markup.STATUS,
    27       ('C' : Int) -> Markup.REPORT,
    28       ('D' : Int) -> Markup.WRITELN,
    29       ('E' : Int) -> Markup.TRACING,
    30       ('F' : Int) -> Markup.WARNING,
    31       ('G' : Int) -> Markup.ERROR)
    32   }
    33 
    34   class Result(val message: XML.Elem)
    35   {
    36     def kind = message.markup.name
    37     def properties = message.markup.properties
    38     def body = message.body
    39 
    40     def is_init = kind == Markup.INIT
    41     def is_exit = kind == Markup.EXIT
    42     def is_stdout = kind == Markup.STDOUT
    43     def is_system = kind == Markup.SYSTEM
    44     def is_status = kind == Markup.STATUS
    45     def is_report = kind == Markup.REPORT
    46     def is_ready = Isar_Document.is_ready(message)
    47     def is_syslog = is_init || is_exit || is_system || is_ready
    48 
    49     override def toString: String =
    50     {
    51       val res =
    52         if (is_status || is_report) message.body.map(_.toString).mkString
    53         else Pretty.string_of(message.body)
    54       if (properties.isEmpty)
    55         kind.toString + " [[" + res + "]]"
    56       else
    57         kind.toString + " " +
    58           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    59     }
    60   }
    61 }
    62 
    63 
    64 class Isabelle_Process(system: Isabelle_System, timeout: Time, receiver: Actor, args: String*)
    65 {
    66   import Isabelle_Process._
    67 
    68 
    69   /* demo constructor */
    70 
    71   def this(args: String*) =
    72     this(Isabelle_System.default, Time.seconds(10),
    73       actor { loop { react { case res => Console.println(res) } } }, args: _*)
    74 
    75 
    76   /* results */
    77 
    78   private def system_result(text: String)
    79   {
    80     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    81   }
    82 
    83   private val xml_cache = new XML.Cache(131071)
    84 
    85   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    86   {
    87     if (kind == Markup.INIT) rm_fifos()
    88     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
    89     xml_cache.cache_tree(msg)((message: XML.Tree) =>
    90       receiver ! new Result(message.asInstanceOf[XML.Elem]))
    91   }
    92 
    93   private def put_result(kind: String, text: String)
    94   {
    95     put_result(kind, Nil, List(XML.Text(system.symbols.decode(text))))
    96   }
    97 
    98 
    99   /* input actors */
   100 
   101   private case class Input_Text(text: String)
   102   private case class Input_Chunks(chunks: List[Array[Byte]])
   103 
   104   private case object Close
   105   private def close(p: (Thread, Actor))
   106   {
   107     if (p != null && p._1.isAlive) {
   108       p._2 ! Close
   109       p._1.join
   110     }
   111   }
   112 
   113   @volatile private var standard_input: (Thread, Actor) = null
   114   @volatile private var command_input: (Thread, Actor) = null
   115 
   116 
   117   /** process manager **/
   118 
   119   private val in_fifo = system.mk_fifo()
   120   private val out_fifo = system.mk_fifo()
   121   private def rm_fifos() { system.rm_fifo(in_fifo); system.rm_fifo(out_fifo) }
   122 
   123   private val process =
   124     try {
   125       val cmdline =
   126         List(system.getenv_strict("ISABELLE_PROCESS"), "-W", in_fifo + ":" + out_fifo) ++ args
   127       new system.Managed_Process(true, cmdline: _*)
   128     }
   129     catch { case e: IOException => rm_fifos(); throw(e) }
   130 
   131   val process_result =
   132     Simple_Thread.future("process_result") { process.join }
   133 
   134   private def terminate_process()
   135   {
   136     try { process.terminate }
   137     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   138   }
   139 
   140   private val process_manager = Simple_Thread.fork("process_manager")
   141   {
   142     val (startup_failed, startup_output) =
   143     {
   144       val expired = System.currentTimeMillis() + timeout.ms
   145       val result = new StringBuilder(100)
   146 
   147       var finished: Option[Boolean] = None
   148       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   149         while (finished.isEmpty && process.stdout.ready) {
   150           val c = process.stdout.read
   151           if (c == 2) finished = Some(true)
   152           else result += c.toChar
   153         }
   154         if (process_result.is_finished) finished = Some(false)
   155         else Thread.sleep(10)
   156       }
   157       (finished.isEmpty || !finished.get, result.toString.trim)
   158     }
   159     system_result(startup_output)
   160 
   161     if (startup_failed) {
   162       put_result(Markup.EXIT, "Return code: 127")
   163       process.stdin.close
   164       Thread.sleep(300)
   165       terminate_process()
   166       process_result.join
   167     }
   168     else {
   169       // rendezvous
   170       val command_stream = system.fifo_output_stream(in_fifo)
   171       val message_stream = system.fifo_input_stream(out_fifo)
   172 
   173       standard_input = stdin_actor()
   174       val stdout = stdout_actor()
   175       command_input = input_actor(command_stream)
   176       val message = message_actor(message_stream)
   177 
   178       val rc = process_result.join
   179       system_result("process terminated")
   180       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   181       system_result("process_manager terminated")
   182       put_result(Markup.EXIT, "Return code: " + rc.toString)
   183     }
   184     rm_fifos()
   185   }
   186 
   187 
   188   /* management methods */
   189 
   190   def join() { process_manager.join() }
   191 
   192   def interrupt()
   193   {
   194     try { process.interrupt }
   195     catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
   196   }
   197 
   198   def terminate()
   199   {
   200     close()
   201     system_result("Terminating Isabelle process")
   202     terminate_process()
   203   }
   204 
   205 
   206 
   207   /** stream actors **/
   208 
   209   /* raw stdin */
   210 
   211   private def stdin_actor(): (Thread, Actor) =
   212   {
   213     val name = "standard_input"
   214     Simple_Thread.actor(name) {
   215       var finished = false
   216       while (!finished) {
   217         try {
   218           //{{{
   219           receive {
   220             case Input_Text(text) =>
   221               process.stdin.write(text)
   222               process.stdin.flush
   223             case Close =>
   224               process.stdin.close
   225               finished = true
   226             case bad => System.err.println(name + ": ignoring bad message " + bad)
   227           }
   228           //}}}
   229         }
   230         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   231       }
   232       system_result(name + " terminated")
   233     }
   234   }
   235 
   236 
   237   /* raw stdout */
   238 
   239   private def stdout_actor(): (Thread, Actor) =
   240   {
   241     val name = "standard_output"
   242     Simple_Thread.actor(name) {
   243       var result = new StringBuilder(100)
   244 
   245       var finished = false
   246       while (!finished) {
   247         try {
   248           //{{{
   249           var c = -1
   250           var done = false
   251           while (!done && (result.length == 0 || process.stdout.ready)) {
   252             c = process.stdout.read
   253             if (c >= 0) result.append(c.asInstanceOf[Char])
   254             else done = true
   255           }
   256           if (result.length > 0) {
   257             put_result(Markup.STDOUT, result.toString)
   258             result.length = 0
   259           }
   260           else {
   261             process.stdout.close
   262             finished = true
   263           }
   264           //}}}
   265         }
   266         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   267       }
   268       system_result(name + " terminated")
   269     }
   270   }
   271 
   272 
   273   /* command input */
   274 
   275   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   276   {
   277     val name = "command_input"
   278     Simple_Thread.actor(name) {
   279       val stream = new BufferedOutputStream(raw_stream)
   280       var finished = false
   281       while (!finished) {
   282         try {
   283           //{{{
   284           receive {
   285             case Input_Chunks(chunks) =>
   286               stream.write(Standard_System.string_bytes(
   287                   chunks.map(_.length).mkString("", ",", "\n")));
   288               chunks.foreach(stream.write(_));
   289               stream.flush
   290             case Close =>
   291               stream.close
   292               finished = true
   293             case bad => System.err.println(name + ": ignoring bad message " + bad)
   294           }
   295           //}}}
   296         }
   297         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   298       }
   299       system_result(name + " terminated")
   300     }
   301   }
   302 
   303 
   304   /* message output */
   305 
   306   private def message_actor(stream: InputStream): (Thread, Actor) =
   307   {
   308     class EOF extends Exception
   309     class Protocol_Error(msg: String) extends Exception(msg)
   310 
   311     val name = "message_output"
   312     Simple_Thread.actor(name) {
   313       val default_buffer = new Array[Byte](65536)
   314       var c = -1
   315 
   316       def read_chunk(): XML.Body =
   317       {
   318         //{{{
   319         // chunk size
   320         var n = 0
   321         c = stream.read
   322         if (c == -1) throw new EOF
   323         while (48 <= c && c <= 57) {
   324           n = 10 * n + (c - 48)
   325           c = stream.read
   326         }
   327         if (c != 10) throw new Protocol_Error("bad message chunk header")
   328 
   329         // chunk content
   330         val buf =
   331           if (n <= default_buffer.size) default_buffer
   332           else new Array[Byte](n)
   333 
   334         var i = 0
   335         var m = 0
   336         do {
   337           m = stream.read(buf, i, n - i)
   338           if (m != -1) i += m
   339         } while (m != -1 && n > i)
   340 
   341         if (i != n) throw new Protocol_Error("bad message chunk content")
   342 
   343         YXML.parse_body_failsafe(YXML.decode_chars(system.symbols.decode, buf, 0, n))
   344         //}}}
   345       }
   346 
   347       do {
   348         try {
   349           val header = read_chunk()
   350           val body = read_chunk()
   351           header match {
   352             case List(XML.Elem(Markup(name, props), Nil))
   353                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   354               put_result(Kind.message_markup(name(0)), props, body)
   355             case _ => throw new Protocol_Error("bad header: " + header.toString)
   356           }
   357         }
   358         catch {
   359           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   360           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   361           case _: EOF =>
   362         }
   363       } while (c != -1)
   364       stream.close
   365 
   366       system_result(name + " terminated")
   367     }
   368   }
   369 
   370 
   371   /** main methods **/
   372 
   373   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   374 
   375   def input_bytes(name: String, args: Array[Byte]*): Unit =
   376     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   377 
   378   def input(name: String, args: String*): Unit =
   379     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   380 
   381   def close(): Unit = { close(command_input); close(standard_input) }
   382 }