aboutsummaryrefslogtreecommitdiff
path: root/portal/tcp/portal.ml
diff options
context:
space:
mode:
Diffstat (limited to 'portal/tcp/portal.ml')
-rw-r--r--portal/tcp/portal.ml58
1 files changed, 34 insertions, 24 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml
index 9afa318..18e08b9 100644
--- a/portal/tcp/portal.ml
+++ b/portal/tcp/portal.ml
@@ -1,4 +1,5 @@
open Lwt.Syntax
+open Lwt.Infix
open Markup
type socket = Plain of Lwt_unix.file_descr
@@ -74,11 +75,10 @@ let tcp_socket (domain : string) : Lwt_unix.file_descr Lwt.t =
Markup signals. *)
let socket_to_stream (sock : socket) =
let raw_stream =
- let from_plain p =
- let recv_bytes = Bytes.create 4096 in
- fun () ->
+ let recv_buffer = Lwt_bytes.create 4096 in
+ let from_plain p () =
let* len =
- try%lwt Lwt_unix.read p recv_bytes 0 4096
+ try%lwt Lwt_bytes.read p recv_buffer 0 4096
with
| Unix.Unix_error (Unix.ECONNRESET, _, _)
| Unix.Unix_error (Unix.EPIPE, _, _)
@@ -87,35 +87,45 @@ let socket_to_stream (sock : socket) =
in match len with
| 0 -> Lwt.return_none
| len ->
- Lwt.return_some (Bytes.sub_string recv_bytes 0 len)
+ Lwt_bytes.proxy recv_buffer 0 len
+ |> Lwt_bytes.to_string
+ |> Lwt.return_some
in let from_socket = match sock with
| Plain p -> from_plain p
in Lwt_stream.from from_socket
in
- let buffer = Buffer.create 1024 in
- let flush_buffer =
- let flush_plain p () =
- let content = Buffer.to_bytes buffer in
- Buffer.clear buffer;
- let* _ =
- try%lwt Lwt_unix.write p content 0 (Bytes.length content)
- with
- | Unix.Unix_error (Unix.ECONNRESET, _, _)
- | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0
- | exn -> Lwt.fail exn
- in Lwt.return_unit
- in match sock with
- | Plain p -> flush_plain p
+ let send_buffer = Lwt_bytes.create 1024 in
+ let send_pos = ref 0 in
+ let flush_plain p len =
+ try%lwt Lwt_bytes.write p send_buffer 0 len >>= (fun _ -> Lwt.return_unit)
+ with
+ | Unix.Unix_error (Unix.ECONNRESET, _, _)
+ | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return_unit
+ | exn -> Lwt.fail exn
in
- let close_sock = match sock with
- | Plain p -> (fun () -> Lwt_unix.close p)
+ let flush_socket = match sock with
+ | Plain p -> flush_plain p
+ in
+ let flush_buffer () =
+ let len = !send_pos in
+ if len > 0 then
+ begin
+ send_pos := 0;
+ flush_socket len
+ end
+ else Lwt.return_unit
in
let chomp c =
- Buffer.add_char buffer c;
- if Buffer.length buffer >= 1024 || c = '>'
+ Lwt_bytes.set send_buffer !send_pos c;
+ incr send_pos;
+ if !send_pos >= 1024 || c = '>'
then flush_buffer ()
else Lwt.return_unit
- and outbound_stream, outbound_push = Lwt_stream.create ()
+ in
+ let close_sock = match sock with
+ | Plain p -> (fun () -> Lwt_unix.close p)
+ in
+ let outbound_stream, outbound_push = Lwt_stream.create ()
in let push = function
| None -> outbound_push None
| Some signals -> Markup.iter (fun f -> outbound_push (Some f)) signals