src/Pure/System/system_channel.ML
author wenzelm
Sun, 22 Jul 2012 23:31:57 +0200
changeset 49440 0d95980e9aae
parent 46031 db4bf4fb5492
child 51815 c0fb2839d1a9
permissions -rw-r--r--
parallel scheduling of jobs;
misc tuning;
     1 (*  Title:      Pure/System/system_channel.ML
     2     Author:     Makarius
     3 
     4 Portable system channel for inter-process communication, based on
     5 named pipes or sockets.
     6 *)
     7 
     8 signature SYSTEM_CHANNEL =
     9 sig
    10   type T
    11   val input_line: T -> string option
    12   val inputN: T -> int -> string
    13   val output: T -> string -> unit
    14   val flush: T -> unit
    15   val fifo_rendezvous: string -> string -> T
    16   val socket_rendezvous: string -> T
    17 end;
    18 
    19 structure System_Channel: SYSTEM_CHANNEL =
    20 struct
    21 
    22 datatype T = System_Channel of
    23  {input_line: unit -> string option,
    24   inputN: int -> string,
    25   output: string -> unit,
    26   flush: unit -> unit};
    27 
    28 fun input_line (System_Channel {input_line = f, ...}) = f ();
    29 fun inputN (System_Channel {inputN = f, ...}) n = f n;
    30 fun output (System_Channel {output = f, ...}) s = f s;
    31 fun flush (System_Channel {flush = f, ...}) = f ();
    32 
    33 
    34 (* named pipes *)
    35 
    36 fun fifo_rendezvous fifo1 fifo2 =
    37   let
    38     val in_stream = TextIO.openIn fifo1;
    39     val out_stream = TextIO.openOut fifo2;
    40     val _ = TextIO.StreamIO.setBufferMode (TextIO.getOutstream out_stream, IO.BLOCK_BUF);
    41   in
    42     System_Channel
    43      {input_line = fn () => TextIO.inputLine in_stream,
    44       inputN = fn n => TextIO.inputN (in_stream, n),
    45       output = fn s => TextIO.output (out_stream, s),
    46       flush = fn () => TextIO.flushOut out_stream}
    47   end;
    48 
    49 
    50 (* sockets *)  (* FIXME proper buffering via BinIO (after Poly/ML 5.4.1) *)
    51 
    52 local
    53 
    54 fun readN socket n =
    55   let
    56     fun read i buf =
    57       let
    58         val s = Byte.bytesToString (Socket.recvVec (socket, n - i));
    59         val m = size s;
    60         val i' = i + m;
    61         val buf' = Buffer.add s buf;
    62       in if m > 0 andalso n > i' then read i' buf' else Buffer.content buf' end;
    63   in read 0 Buffer.empty end;
    64 
    65 fun read_line socket =
    66   let
    67     fun result cs = implode (rev ("\n" :: cs));
    68     fun read cs =
    69       (case readN socket 1 of
    70         "" => if null cs then NONE else SOME (result cs)
    71       | "\n" => SOME (result cs)
    72       | c => read (c :: cs));
    73   in read [] end;
    74 
    75 fun write socket =
    76   let
    77     fun send buf =
    78       if Word8VectorSlice.isEmpty buf then ()
    79       else
    80         let
    81           val n = Int.min (Word8VectorSlice.length buf, 4096);
    82           val m = Socket.sendVec (socket, Word8VectorSlice.subslice (buf, 0, SOME n));
    83           val buf' = Word8VectorSlice.subslice (buf, m, NONE);
    84         in send buf' end;
    85   in fn s => send (Word8VectorSlice.full (Byte.stringToBytes s)) end;
    86 
    87 in
    88 
    89 fun socket_rendezvous name =
    90   let
    91     fun err () = error ("Bad socket name: " ^ quote name);
    92     val (host, port) =
    93       (case space_explode ":" name of
    94         [h, p] =>
    95          (case NetHostDB.getByName h of SOME host => host | NONE => err (),
    96           case Int.fromString p of SOME port => port | NONE => err ())
    97       | _ => err ());
    98     val socket: Socket.active INetSock.stream_sock = INetSock.TCP.socket ();
    99     val _ = Socket.connect (socket, INetSock.toAddr (NetHostDB.addr host, port));
   100   in
   101     System_Channel
   102      {input_line = fn () => read_line socket,
   103       inputN = fn n => readN socket n,
   104       output = fn s => write socket s,
   105       flush = fn () => ()}
   106   end;
   107 
   108 end;
   109 
   110 end;
   111