kleing@33817: (* Title: scgi_echo.ML kleing@33817: Author: Timothy Bourke, NICTA kleing@33817: kleing@33817: Simple SCGI server. kleing@33817: *) kleing@33817: kleing@33817: signature SCGI_SERVER = kleing@33817: sig kleing@33817: val max_threads : int Unsynchronized.ref kleing@33817: type handler = ScgiReq.t * Word8Vector.vector * (string -> unit) -> unit kleing@33817: val register : (string * Mime.t option * handler) -> unit kleing@33817: val server : string -> int -> unit kleing@33817: val server' : int -> string -> int -> unit (* keeps trying for port *) kleing@33817: end; kleing@33817: kleing@33817: structure ScgiServer : SCGI_SERVER = kleing@33817: struct kleing@33817: val max_threads = Unsynchronized.ref 5; kleing@33817: kleing@33817: type handler = ScgiReq.t * Word8Vector.vector * (string -> unit) -> unit; kleing@33817: kleing@33817: local kleing@33817: val servers = Unsynchronized.ref (Symtab.empty : (Mime.t option * handler) Symtab.table); kleing@33817: in kleing@33817: fun register (name, mime, f) = kleing@33817: Unsynchronized.change servers (Symtab.update_new (name, (mime, f))); kleing@33817: fun lookup name = Symtab.lookup (!servers) name; kleing@33817: kleing@33817: fun dump_handlers () = ( kleing@33817: tracing(" with handlers:"); kleing@33817: app (fn (x, _) => tracing (" - " ^ x)) (Symtab.dest (!servers))) kleing@33817: end; kleing@33817: kleing@33817: fun server server_prefix port = kleing@33817: let kleing@33817: val passive_sock = SocketUtil.init_server_socket (SOME "localhost") port; kleing@33817: kleing@33817: val thread_wait = ConditionVar.conditionVar (); kleing@33817: val thread_wait_mutex = Mutex.mutex (); kleing@33817: kleing@33817: local kleing@33817: val threads = Unsynchronized.ref ([] : Thread.thread list); kleing@33817: fun purge () = Unsynchronized.change threads (filter Thread.isActive); kleing@33817: in kleing@33817: fun add_thread th = Unsynchronized.change threads (cons th); kleing@33817: kleing@33817: fun launch_thread threadf = kleing@33817: (purge (); kleing@33817: if length (!threads) < (!max_threads) then () kleing@33817: else (tracing ("Waiting for a free thread..."); kleing@33817: ConditionVar.wait (thread_wait, thread_wait_mutex)); kleing@33817: add_thread kleing@33817: (Thread.fork kleing@33817: (fn () => exception_trace threadf, kleing@33817: [Thread.EnableBroadcastInterrupt true, kleing@33817: Thread.InterruptState kleing@33817: Thread.InterruptAsynchOnce]))) kleing@33817: end; kleing@33817: kleing@33817: fun loop () = kleing@33817: let kleing@33817: val (sock, _)= Socket.accept passive_sock; kleing@33817: kleing@33817: val (sin, sout) = SocketUtil.make_streams sock; kleing@33817: kleing@33817: fun send msg = BinIO.output (sout, Byte.stringToBytes msg); kleing@33817: fun send_log msg = (tracing msg; send msg); kleing@33817: kleing@33817: fun get_content (st, 0) = Word8Vector.fromList [] kleing@33817: | get_content x = BinIO.inputN x; kleing@33817: kleing@33817: fun do_req () = kleing@33817: let kleing@33817: val (req as ScgiReq.Req {path_info, request_method, ...}, kleing@33817: content_is) = kleing@33817: ScgiReq.parse sin kleing@33817: handle ScgiReq.InvalidReq s => kleing@33817: (send kleing@33817: (HttpUtil.reply_header (HttpStatus.bad_request, NONE, [])); kleing@33817: raise Fail ("Invalid request: " ^ s)); kleing@33817: val () = tracing ("request: " ^ path_info); kleing@33817: in kleing@33817: (case lookup (unprefix server_prefix path_info) of kleing@33817: NONE => send (HttpUtil.reply_header (HttpStatus.not_found, NONE, [])) kleing@33817: | SOME (NONE, f) => f (req, get_content content_is, send) kleing@33817: | SOME (t, f) => kleing@33817: (send (HttpUtil.reply_header (HttpStatus.ok, t, [])); kleing@33817: if request_method = ScgiReq.Head then () kleing@33817: else f (req, get_content content_is, send))) kleing@33817: end; kleing@33817: kleing@33817: fun thread_req () = kleing@33817: (do_req () handle e => (warning (exnMessage e)); kleing@33817: BinIO.closeOut sout handle e => warning (exnMessage e); kleing@33817: BinIO.closeIn sin handle e => warning (exnMessage e); kleing@33817: Socket.close sock handle e => warning (exnMessage e); kleing@33817: tracing ("request done."); kleing@33817: ConditionVar.signal thread_wait); kleing@33817: in kleing@33817: launch_thread thread_req; kleing@33817: loop () kleing@33817: end; kleing@33817: in kleing@33817: tracing ("SCGI server started."); kleing@33817: dump_handlers (); kleing@33817: loop (); kleing@33817: Socket.close passive_sock kleing@33817: end; kleing@33817: kleing@33817: local kleing@33817: val delay = 5; kleing@33817: in kleing@33817: fun server' 0 server_prefix port = (warning "Giving up."; exit 1) kleing@33817: | server' countdown server_prefix port = kleing@33817: server server_prefix port kleing@33817: handle OS.SysErr ("bind failed", _) => kleing@33817: (warning ("Could not acquire port " kleing@33817: ^ Int.toString port ^ ". Trying again in " kleing@33817: ^ Int.toString delay ^ " seconds..."); kleing@33817: OS.Process.sleep (Time.fromSeconds delay); kleing@33817: server' (countdown - 1) server_prefix port); kleing@33817: end; kleing@33817: kleing@33817: end; kleing@33817: