src/Pure/System/isabelle_process.scala
author wenzelm
Thu, 07 Jul 2011 13:48:30 +0200
changeset 44569 5130dfe1b7be
parent 44532 39fdbd814c7f
child 44603 fad8634cee62
permissions -rw-r--r--
simplified Symbol based on lazy Symbol.Interpretation -- reduced odd "functorial style";
tuned implicit build/init messages;
     1 /*  Title:      Pure/System/isabelle_process.scala
     2     Author:     Makarius
     3     Options:    :folding=explicit:collapseFolds=1:
     4 
     5 Isabelle process management -- always reactive due to multi-threaded I/O.
     6 */
     7 
     8 package isabelle
     9 
    10 import java.lang.System
    11 import java.util.concurrent.LinkedBlockingQueue
    12 import java.io.{BufferedReader, BufferedWriter, InputStreamReader, OutputStreamWriter,
    13   InputStream, OutputStream, BufferedOutputStream, IOException}
    14 
    15 import scala.actors.Actor
    16 import Actor._
    17 
    18 
    19 object Isabelle_Process
    20 {
    21   /* results */
    22 
    23   object Kind
    24   {
    25     val message_markup = Map(
    26       ('A' : Int) -> Markup.INIT,
    27       ('B' : Int) -> Markup.STATUS,
    28       ('C' : Int) -> Markup.REPORT,
    29       ('D' : Int) -> Markup.WRITELN,
    30       ('E' : Int) -> Markup.TRACING,
    31       ('F' : Int) -> Markup.WARNING,
    32       ('G' : Int) -> Markup.ERROR)
    33   }
    34 
    35   class Result(val message: XML.Elem)
    36   {
    37     def kind = message.markup.name
    38     def properties = message.markup.properties
    39     def body = message.body
    40 
    41     def is_init = kind == Markup.INIT
    42     def is_exit = kind == Markup.EXIT
    43     def is_stdout = kind == Markup.STDOUT
    44     def is_system = kind == Markup.SYSTEM
    45     def is_status = kind == Markup.STATUS
    46     def is_report = kind == Markup.REPORT
    47     def is_ready = Isar_Document.is_ready(message)
    48     def is_syslog = is_init || is_exit || is_system || is_ready
    49 
    50     override def toString: String =
    51     {
    52       val res =
    53         if (is_status || is_report) message.body.map(_.toString).mkString
    54         else Pretty.string_of(message.body)
    55       if (properties.isEmpty)
    56         kind.toString + " [[" + res + "]]"
    57       else
    58         kind.toString + " " +
    59           (for ((x, y) <- properties) yield x + "=" + y).mkString("{", ",", "}") + " [[" + res + "]]"
    60     }
    61   }
    62 }
    63 
    64 
    65 class Isabelle_Process(timeout: Time, receiver: Actor, args: String*)
    66 {
    67   import Isabelle_Process._
    68 
    69 
    70   /* demo constructor */
    71 
    72   def this(args: String*) =
    73     this(Time.seconds(10), actor { loop { react { case res => Console.println(res) } } }, args: _*)
    74 
    75 
    76   /* results */
    77 
    78   private def system_result(text: String)
    79   {
    80     receiver ! new Result(XML.Elem(Markup(Markup.SYSTEM, Nil), List(XML.Text(text))))
    81   }
    82 
    83   private val xml_cache = new XML.Cache(131071)
    84 
    85   private def put_result(kind: String, props: List[(String, String)], body: XML.Body)
    86   {
    87     if (kind == Markup.INIT) rm_fifos()
    88     val msg = XML.Elem(Markup(kind, props), Isar_Document.clean_message(body))
    89     xml_cache.cache_tree(msg)((message: XML.Tree) =>
    90       receiver ! new Result(message.asInstanceOf[XML.Elem]))
    91   }
    92 
    93   private def put_result(kind: String, text: String)
    94   {
    95     put_result(kind, Nil, List(XML.Text(Symbol.decode(text))))
    96   }
    97 
    98 
    99   /* input actors */
   100 
   101   private case class Input_Text(text: String)
   102   private case class Input_Chunks(chunks: List[Array[Byte]])
   103 
   104   private case object Close
   105   private def close(p: (Thread, Actor))
   106   {
   107     if (p != null && p._1.isAlive) {
   108       p._2 ! Close
   109       p._1.join
   110     }
   111   }
   112 
   113   @volatile private var standard_input: (Thread, Actor) = null
   114   @volatile private var command_input: (Thread, Actor) = null
   115 
   116 
   117   /** process manager **/
   118 
   119   private val in_fifo = Isabelle_System.mk_fifo()
   120   private val out_fifo = Isabelle_System.mk_fifo()
   121   private def rm_fifos() { Isabelle_System.rm_fifo(in_fifo); Isabelle_System.rm_fifo(out_fifo) }
   122 
   123   private val process =
   124     try {
   125       val cmdline =
   126         List(Isabelle_System.getenv_strict("ISABELLE_PROCESS"), "-W",
   127           in_fifo + ":" + out_fifo) ++ args
   128       new Isabelle_System.Managed_Process(true, cmdline: _*)
   129     }
   130     catch { case e: IOException => rm_fifos(); throw(e) }
   131 
   132   val process_result =
   133     Simple_Thread.future("process_result") { process.join }
   134 
   135   private def terminate_process()
   136   {
   137     try { process.terminate }
   138     catch { case e: IOException => system_result("Failed to terminate Isabelle: " + e.getMessage) }
   139   }
   140 
   141   private val process_manager = Simple_Thread.fork("process_manager")
   142   {
   143     val (startup_failed, startup_output) =
   144     {
   145       val expired = System.currentTimeMillis() + timeout.ms
   146       val result = new StringBuilder(100)
   147 
   148       var finished: Option[Boolean] = None
   149       while (finished.isEmpty && System.currentTimeMillis() <= expired) {
   150         while (finished.isEmpty && process.stdout.ready) {
   151           val c = process.stdout.read
   152           if (c == 2) finished = Some(true)
   153           else result += c.toChar
   154         }
   155         if (process_result.is_finished) finished = Some(false)
   156         else Thread.sleep(10)
   157       }
   158       (finished.isEmpty || !finished.get, result.toString.trim)
   159     }
   160     system_result(startup_output)
   161 
   162     if (startup_failed) {
   163       put_result(Markup.EXIT, "Return code: 127")
   164       process.stdin.close
   165       Thread.sleep(300)
   166       terminate_process()
   167       process_result.join
   168     }
   169     else {
   170       // rendezvous
   171       val command_stream = Isabelle_System.fifo_output_stream(in_fifo)
   172       val message_stream = Isabelle_System.fifo_input_stream(out_fifo)
   173 
   174       standard_input = stdin_actor()
   175       val stdout = stdout_actor()
   176       command_input = input_actor(command_stream)
   177       val message = message_actor(message_stream)
   178 
   179       val rc = process_result.join
   180       system_result("process terminated")
   181       for ((thread, _) <- List(standard_input, stdout, command_input, message)) thread.join
   182       system_result("process_manager terminated")
   183       put_result(Markup.EXIT, "Return code: " + rc.toString)
   184     }
   185     rm_fifos()
   186   }
   187 
   188 
   189   /* management methods */
   190 
   191   def join() { process_manager.join() }
   192 
   193   def interrupt()
   194   {
   195     try { process.interrupt }
   196     catch { case e: IOException => system_result("Failed to interrupt Isabelle: " + e.getMessage) }
   197   }
   198 
   199   def terminate()
   200   {
   201     close()
   202     system_result("Terminating Isabelle process")
   203     terminate_process()
   204   }
   205 
   206 
   207 
   208   /** stream actors **/
   209 
   210   /* raw stdin */
   211 
   212   private def stdin_actor(): (Thread, Actor) =
   213   {
   214     val name = "standard_input"
   215     Simple_Thread.actor(name) {
   216       var finished = false
   217       while (!finished) {
   218         try {
   219           //{{{
   220           receive {
   221             case Input_Text(text) =>
   222               process.stdin.write(text)
   223               process.stdin.flush
   224             case Close =>
   225               process.stdin.close
   226               finished = true
   227             case bad => System.err.println(name + ": ignoring bad message " + bad)
   228           }
   229           //}}}
   230         }
   231         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   232       }
   233       system_result(name + " terminated")
   234     }
   235   }
   236 
   237 
   238   /* raw stdout */
   239 
   240   private def stdout_actor(): (Thread, Actor) =
   241   {
   242     val name = "standard_output"
   243     Simple_Thread.actor(name) {
   244       var result = new StringBuilder(100)
   245 
   246       var finished = false
   247       while (!finished) {
   248         try {
   249           //{{{
   250           var c = -1
   251           var done = false
   252           while (!done && (result.length == 0 || process.stdout.ready)) {
   253             c = process.stdout.read
   254             if (c >= 0) result.append(c.asInstanceOf[Char])
   255             else done = true
   256           }
   257           if (result.length > 0) {
   258             put_result(Markup.STDOUT, result.toString)
   259             result.length = 0
   260           }
   261           else {
   262             process.stdout.close
   263             finished = true
   264           }
   265           //}}}
   266         }
   267         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   268       }
   269       system_result(name + " terminated")
   270     }
   271   }
   272 
   273 
   274   /* command input */
   275 
   276   private def input_actor(raw_stream: OutputStream): (Thread, Actor) =
   277   {
   278     val name = "command_input"
   279     Simple_Thread.actor(name) {
   280       val stream = new BufferedOutputStream(raw_stream)
   281       var finished = false
   282       while (!finished) {
   283         try {
   284           //{{{
   285           receive {
   286             case Input_Chunks(chunks) =>
   287               stream.write(Standard_System.string_bytes(
   288                   chunks.map(_.length).mkString("", ",", "\n")));
   289               chunks.foreach(stream.write(_));
   290               stream.flush
   291             case Close =>
   292               stream.close
   293               finished = true
   294             case bad => System.err.println(name + ": ignoring bad message " + bad)
   295           }
   296           //}}}
   297         }
   298         catch { case e: IOException => system_result(name + ": " + e.getMessage) }
   299       }
   300       system_result(name + " terminated")
   301     }
   302   }
   303 
   304 
   305   /* message output */
   306 
   307   private def message_actor(stream: InputStream): (Thread, Actor) =
   308   {
   309     class EOF extends Exception
   310     class Protocol_Error(msg: String) extends Exception(msg)
   311 
   312     val name = "message_output"
   313     Simple_Thread.actor(name) {
   314       val default_buffer = new Array[Byte](65536)
   315       var c = -1
   316 
   317       def read_chunk(): XML.Body =
   318       {
   319         //{{{
   320         // chunk size
   321         var n = 0
   322         c = stream.read
   323         if (c == -1) throw new EOF
   324         while (48 <= c && c <= 57) {
   325           n = 10 * n + (c - 48)
   326           c = stream.read
   327         }
   328         if (c != 10) throw new Protocol_Error("bad message chunk header")
   329 
   330         // chunk content
   331         val buf =
   332           if (n <= default_buffer.size) default_buffer
   333           else new Array[Byte](n)
   334 
   335         var i = 0
   336         var m = 0
   337         do {
   338           m = stream.read(buf, i, n - i)
   339           if (m != -1) i += m
   340         } while (m != -1 && n > i)
   341 
   342         if (i != n) throw new Protocol_Error("bad message chunk content")
   343 
   344         YXML.parse_body_failsafe(YXML.decode_chars(Symbol.decode, buf, 0, n))
   345         //}}}
   346       }
   347 
   348       do {
   349         try {
   350           val header = read_chunk()
   351           val body = read_chunk()
   352           header match {
   353             case List(XML.Elem(Markup(name, props), Nil))
   354                 if name.size == 1 && Kind.message_markup.isDefinedAt(name(0)) =>
   355               put_result(Kind.message_markup(name(0)), props, body)
   356             case _ => throw new Protocol_Error("bad header: " + header.toString)
   357           }
   358         }
   359         catch {
   360           case e: IOException => system_result("Cannot read message:\n" + e.getMessage)
   361           case e: Protocol_Error => system_result("Malformed message:\n" + e.getMessage)
   362           case _: EOF =>
   363         }
   364       } while (c != -1)
   365       stream.close
   366 
   367       system_result(name + " terminated")
   368     }
   369   }
   370 
   371 
   372   /** main methods **/
   373 
   374   def input_raw(text: String): Unit = standard_input._2 ! Input_Text(text)
   375 
   376   def input_bytes(name: String, args: Array[Byte]*): Unit =
   377     command_input._2 ! Input_Chunks(Standard_System.string_bytes(name) :: args.toList)
   378 
   379   def input(name: String, args: String*): Unit =
   380     input_bytes(name, args.map(Standard_System.string_bytes): _*)
   381 
   382   def close(): Unit = { close(command_input); close(standard_input) }
   383 }