aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--portal/tcp/portal_tcp.ml16
1 files changed, 11 insertions, 5 deletions
diff --git a/portal/tcp/portal_tcp.ml b/portal/tcp/portal_tcp.ml
index a50450d..32ff507 100644
--- a/portal/tcp/portal_tcp.ml
+++ b/portal/tcp/portal_tcp.ml
@@ -68,12 +68,18 @@ let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t =
TODO: right now it's possible to get parts of unfinished stanzas... *)
let connect (domain : string) : t Lwt.t =
let+ tcp_stream, tcp_socket = tcp_stream domain in
+ let send msg =
+ (* This is gross, but it doesn't matter because TCP does buffering. *)
+ let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in ()
+ in
+ let xml_stream, xml_push = Lwt_stream.create () in
let push msg =
- let none () = close tcp_socket
- and some s () =
- let str = write_xml s |> to_string
- in write_string tcp_socket str 0 (String.length str) |> Lwt.map ignore
+ let none () = xml_push None; close tcp_socket
+ and some fragments () =
+ Markup.iter (fun f -> xml_push (Some f)) fragments |> Lwt.return
in Option.fold ~none ~some msg |> Lwt.async
and report loc err = raise (MalformedStanza (loc, err)) in
let open Markup_lwt in
- tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push
+ let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals
+ in Lwt.async (fun () -> lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send);
+ stream, push