diff options
Diffstat (limited to 'portal/tcp')
-rw-r--r-- | portal/tcp/portal.ml | 58 |
1 files changed, 34 insertions, 24 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml index 9afa318..18e08b9 100644 --- a/portal/tcp/portal.ml +++ b/portal/tcp/portal.ml @@ -1,4 +1,5 @@ open Lwt.Syntax +open Lwt.Infix open Markup type socket = Plain of Lwt_unix.file_descr @@ -74,11 +75,10 @@ let tcp_socket (domain : string) : Lwt_unix.file_descr Lwt.t = Markup signals. *) let socket_to_stream (sock : socket) = let raw_stream = - let from_plain p = - let recv_bytes = Bytes.create 4096 in - fun () -> + let recv_buffer = Lwt_bytes.create 4096 in + let from_plain p () = let* len = - try%lwt Lwt_unix.read p recv_bytes 0 4096 + try%lwt Lwt_bytes.read p recv_buffer 0 4096 with | Unix.Unix_error (Unix.ECONNRESET, _, _) | Unix.Unix_error (Unix.EPIPE, _, _) @@ -87,35 +87,45 @@ let socket_to_stream (sock : socket) = in match len with | 0 -> Lwt.return_none | len -> - Lwt.return_some (Bytes.sub_string recv_bytes 0 len) + Lwt_bytes.proxy recv_buffer 0 len + |> Lwt_bytes.to_string + |> Lwt.return_some in let from_socket = match sock with | Plain p -> from_plain p in Lwt_stream.from from_socket in - let buffer = Buffer.create 1024 in - let flush_buffer = - let flush_plain p () = - let content = Buffer.to_bytes buffer in - Buffer.clear buffer; - let* _ = - try%lwt Lwt_unix.write p content 0 (Bytes.length content) - with - | Unix.Unix_error (Unix.ECONNRESET, _, _) - | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0 - | exn -> Lwt.fail exn - in Lwt.return_unit - in match sock with - | Plain p -> flush_plain p + let send_buffer = Lwt_bytes.create 1024 in + let send_pos = ref 0 in + let flush_plain p len = + try%lwt Lwt_bytes.write p send_buffer 0 len >>= (fun _ -> Lwt.return_unit) + with + | Unix.Unix_error (Unix.ECONNRESET, _, _) + | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return_unit + | exn -> Lwt.fail exn in - let close_sock = match sock with - | Plain p -> (fun () -> Lwt_unix.close p) + let flush_socket = match sock with + | Plain p -> flush_plain p + in + let flush_buffer () = + let len = !send_pos in + if len > 0 then + begin + send_pos := 0; + flush_socket len + end + else Lwt.return_unit in let chomp c = - Buffer.add_char buffer c; - if Buffer.length buffer >= 1024 || c = '>' + Lwt_bytes.set send_buffer !send_pos c; + incr send_pos; + if !send_pos >= 1024 || c = '>' then flush_buffer () else Lwt.return_unit - and outbound_stream, outbound_push = Lwt_stream.create () + in + let close_sock = match sock with + | Plain p -> (fun () -> Lwt_unix.close p) + in + let outbound_stream, outbound_push = Lwt_stream.create () in let push = function | None -> outbound_push None | Some signals -> Markup.iter (fun f -> outbound_push (Some f)) signals |