diff options
Diffstat (limited to 'portal/tcp/portal.ml')
-rw-r--r-- | portal/tcp/portal.ml | 64 |
1 files changed, 38 insertions, 26 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml index 73321bd..9afa318 100644 --- a/portal/tcp/portal.ml +++ b/portal/tcp/portal.ml @@ -72,32 +72,43 @@ let tcp_socket (domain : string) : Lwt_unix.file_descr Lwt.t = (** [socket_to_stream sock] is a [stream, push] tuple wrapping the Unix socket [sock] in Markup signals. *) -let socket_to_stream (sock : Lwt_unix.file_descr) = +let socket_to_stream (sock : socket) = let raw_stream = - let recv_bytes = Bytes.create 4096 in - Lwt_stream.from (fun () -> - let* len = - try%lwt Lwt_unix.read sock recv_bytes 0 4096 - with - | Unix.Unix_error (Unix.ECONNRESET, _, _) - | Unix.Unix_error (Unix.EPIPE, _, _) - | End_of_file -> Lwt.return 0 - | exn -> Lwt.fail exn - in match len with - | 0 -> Lwt.return_none - | len -> Lwt.return_some (Bytes.sub_string recv_bytes 0 len)) + let from_plain p = + let recv_bytes = Bytes.create 4096 in + fun () -> + let* len = + try%lwt Lwt_unix.read p recv_bytes 0 4096 + with + | Unix.Unix_error (Unix.ECONNRESET, _, _) + | Unix.Unix_error (Unix.EPIPE, _, _) + | End_of_file -> Lwt.return 0 + | exn -> Lwt.fail exn + in match len with + | 0 -> Lwt.return_none + | len -> + Lwt.return_some (Bytes.sub_string recv_bytes 0 len) + in let from_socket = match sock with + | Plain p -> from_plain p + in Lwt_stream.from from_socket in let buffer = Buffer.create 1024 in - let flush_buffer () = - let content = Buffer.to_bytes buffer in - Buffer.clear buffer; - let* _ = - try%lwt Lwt_unix.write sock content 0 (Bytes.length content) - with - | Unix.Unix_error (Unix.ECONNRESET, _, _) - | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0 - | exn -> Lwt.fail exn - in Lwt.return_unit + let flush_buffer = + let flush_plain p () = + let content = Buffer.to_bytes buffer in + Buffer.clear buffer; + let* _ = + try%lwt Lwt_unix.write p content 0 (Bytes.length content) + with + | Unix.Unix_error (Unix.ECONNRESET, _, _) + | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0 + | exn -> Lwt.fail exn + in Lwt.return_unit + in match sock with + | Plain p -> flush_plain p + in + let close_sock = match sock with + | Plain p -> (fun () -> Lwt_unix.close p) in let chomp c = Buffer.add_char buffer c; @@ -118,12 +129,13 @@ let socket_to_stream (sock : Lwt_unix.file_descr) = in Lwt.async (fun () -> let* _ = lwt_stream outbound_stream |> Markup_lwt.write_xml |> iter chomp in let* _ = flush_buffer () - in Lwt_unix.close sock); + in close_sock ()); (stream, push) (** [connect domain] is a Portal.t communicating with the XMPP server located at [domain] via plaintext TCP. It simply chains the two previous functions. *) let connect (domain : string) : t Lwt.t = - let+ _socket = tcp_socket domain + let+ s = tcp_socket domain in + let _socket = Plain s in let stream, push = socket_to_stream _socket - in {stream; push; _socket=Plain _socket} + in {stream; push; _socket=_socket} |