diff options
-rw-r--r-- | portal/tcp/portal_tcp.ml | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/portal/tcp/portal_tcp.ml b/portal/tcp/portal_tcp.ml index a50450d..32ff507 100644 --- a/portal/tcp/portal_tcp.ml +++ b/portal/tcp/portal_tcp.ml @@ -68,12 +68,18 @@ let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t = TODO: right now it's possible to get parts of unfinished stanzas... *) let connect (domain : string) : t Lwt.t = let+ tcp_stream, tcp_socket = tcp_stream domain in + let send msg = + (* This is gross, but it doesn't matter because TCP does buffering. *) + let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in () + in + let xml_stream, xml_push = Lwt_stream.create () in let push msg = - let none () = close tcp_socket - and some s () = - let str = write_xml s |> to_string - in write_string tcp_socket str 0 (String.length str) |> Lwt.map ignore + let none () = xml_push None; close tcp_socket + and some fragments () = + Markup.iter (fun f -> xml_push (Some f)) fragments |> Lwt.return in Option.fold ~none ~some msg |> Lwt.async and report loc err = raise (MalformedStanza (loc, err)) in let open Markup_lwt in - tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push + let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals + in Lwt.async (fun () -> lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send); + stream, push |