aboutsummaryrefslogtreecommitdiff
path: root/portal
diff options
context:
space:
mode:
Diffstat (limited to 'portal')
-rw-r--r--portal/tcp/portal.ml41
1 files changed, 25 insertions, 16 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml
index c4c5e55..31c45c0 100644
--- a/portal/tcp/portal.ml
+++ b/portal/tcp/portal.ml
@@ -112,22 +112,31 @@ let socket_to_stream (sock : socket) =
| Plain p -> (fun () -> Lwt_unix.close p)
| Tls t -> (fun () -> Tls_lwt.Unix.close t)
in
- let outbound_stream, outbound_push = Lwt_stream.create ()
- in let push = function
- | None -> outbound_push None
- | Some signals -> Markup.iter (fun f -> outbound_push (Some f)) signals
- and report _ err = Lwt.fail (MalformedStanza err) in
- let open Markup_lwt in
- let stream = raw_stream
- |> lwt_stream
- |> strings_to_bytes
- |> parse_xml ~report
- |> signals
- in Lwt.async (fun () ->
- let* _ = lwt_stream outbound_stream |> write_xml |> iter chomp
- in let* _ = flush_buffer ()
- in close_sock ());
- (stream, push)
+ let stream =
+ let open Markup_lwt in
+ let report _ err = Lwt.fail (MalformedStanza err) in
+ raw_stream
+ |> lwt_stream
+ |> strings_to_bytes
+ |> parse_xml ~report
+ |> signals
+ in
+ let outbound_stream, outbound_push = Lwt_stream.create () in
+ let push = function
+ | None -> outbound_push None
+ | Some signals -> Markup.iter (fun f -> outbound_push (Some f)) signals
+ in Lwt.async begin
+ fun () ->
+ let* _ =
+ outbound_stream
+ |> Markup_lwt.lwt_stream
+ |> write_xml
+ |> Markup_lwt.iter chomp
+ in
+ let* _ = flush_buffer ()
+ in close_sock ()
+ end;
+ (stream, push)
(** [connect domain] is a Portal.t communicating with the XMPP server located at
[domain] via plaintext TCP. It simply chains the two previous functions. *)