aboutsummaryrefslogtreecommitdiff
path: root/portal
diff options
context:
space:
mode:
Diffstat (limited to 'portal')
-rw-r--r--portal/tcp/portal.ml64
1 files changed, 38 insertions, 26 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml
index 73321bd..9afa318 100644
--- a/portal/tcp/portal.ml
+++ b/portal/tcp/portal.ml
@@ -72,32 +72,43 @@ let tcp_socket (domain : string) : Lwt_unix.file_descr Lwt.t =
(** [socket_to_stream sock] is a [stream, push] tuple wrapping the Unix socket [sock] in
Markup signals. *)
-let socket_to_stream (sock : Lwt_unix.file_descr) =
+let socket_to_stream (sock : socket) =
let raw_stream =
- let recv_bytes = Bytes.create 4096 in
- Lwt_stream.from (fun () ->
- let* len =
- try%lwt Lwt_unix.read sock recv_bytes 0 4096
- with
- | Unix.Unix_error (Unix.ECONNRESET, _, _)
- | Unix.Unix_error (Unix.EPIPE, _, _)
- | End_of_file -> Lwt.return 0
- | exn -> Lwt.fail exn
- in match len with
- | 0 -> Lwt.return_none
- | len -> Lwt.return_some (Bytes.sub_string recv_bytes 0 len))
+ let from_plain p =
+ let recv_bytes = Bytes.create 4096 in
+ fun () ->
+ let* len =
+ try%lwt Lwt_unix.read p recv_bytes 0 4096
+ with
+ | Unix.Unix_error (Unix.ECONNRESET, _, _)
+ | Unix.Unix_error (Unix.EPIPE, _, _)
+ | End_of_file -> Lwt.return 0
+ | exn -> Lwt.fail exn
+ in match len with
+ | 0 -> Lwt.return_none
+ | len ->
+ Lwt.return_some (Bytes.sub_string recv_bytes 0 len)
+ in let from_socket = match sock with
+ | Plain p -> from_plain p
+ in Lwt_stream.from from_socket
in
let buffer = Buffer.create 1024 in
- let flush_buffer () =
- let content = Buffer.to_bytes buffer in
- Buffer.clear buffer;
- let* _ =
- try%lwt Lwt_unix.write sock content 0 (Bytes.length content)
- with
- | Unix.Unix_error (Unix.ECONNRESET, _, _)
- | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0
- | exn -> Lwt.fail exn
- in Lwt.return_unit
+ let flush_buffer =
+ let flush_plain p () =
+ let content = Buffer.to_bytes buffer in
+ Buffer.clear buffer;
+ let* _ =
+ try%lwt Lwt_unix.write p content 0 (Bytes.length content)
+ with
+ | Unix.Unix_error (Unix.ECONNRESET, _, _)
+ | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0
+ | exn -> Lwt.fail exn
+ in Lwt.return_unit
+ in match sock with
+ | Plain p -> flush_plain p
+ in
+ let close_sock = match sock with
+ | Plain p -> (fun () -> Lwt_unix.close p)
in
let chomp c =
Buffer.add_char buffer c;
@@ -118,12 +129,13 @@ let socket_to_stream (sock : Lwt_unix.file_descr) =
in Lwt.async (fun () ->
let* _ = lwt_stream outbound_stream |> Markup_lwt.write_xml |> iter chomp
in let* _ = flush_buffer ()
- in Lwt_unix.close sock);
+ in close_sock ());
(stream, push)
(** [connect domain] is a Portal.t communicating with the XMPP server located at
[domain] via plaintext TCP. It simply chains the two previous functions. *)
let connect (domain : string) : t Lwt.t =
- let+ _socket = tcp_socket domain
+ let+ s = tcp_socket domain in
+ let _socket = Plain s
in let stream, push = socket_to_stream _socket
- in {stream; push; _socket=Plain _socket}
+ in {stream; push; _socket=_socket}