aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--portal/lib/ws/portal.ml52
1 files changed, 24 insertions, 28 deletions
diff --git a/portal/lib/ws/portal.ml b/portal/lib/ws/portal.ml
index d8afab3..0be0d81 100644
--- a/portal/lib/ws/portal.ml
+++ b/portal/lib/ws/portal.ml
@@ -100,38 +100,34 @@ let connect domain =
In essence, it (tries to) expose an identical interface to the original XMPP streamed protocol.
Here's an ASCII rendered flow of the data through the various streams.
- / -> push -> mu_stream -> to_frames -> ws_push -> \
+ / -> push -> mu_stream -> to_frames -> ws_push -> \
function user websocket
- \ <--- stream <--- markup_lwt <--- ws_stream <--- / *)
- let open Lwt_stream in
- let+ ws_stream, ws_push = ws_endpoint domain >|= ws_stream in
- let mu_stream, mu_push = create () in
+ \ <--- stream <--- markup_lwt <--- ws_stream <--- / *)
+ let+ ws_stream, ws_push = ws_endpoint domain >>= ws_stream in
let open Markup_lwt in
(* When sending a malformed stanza (one that Markup.ml doesn't like), a MalformedStanza exception is raised. *)
let report loc err =
raise (MalformedStanza (loc, err))
in
(* Consumes a stream of Markup.ml signals into a series of frames sent to the WebSocket. *)
- let to_frames fragments =
- let stanzas =
- let open Markup in
- (* Convert a stream of strings to a stream of characters. *)
- (* XML declarations are not to be transmitted to the underlying WebSocket,
- per IETF recommendation. https://datatracker.ietf.org/doc/html/rfc7395#section-3.3.3 *)
- content fragments
- |> transform
- (* Parse well-formed signals. As soon as we have enough elements to send a well-formed stanza, we send it to
- the underlying WebSocket. *)
- (fun (depth, signals) s ->
- let depth = match s with
- | `Start_element _ -> depth + 1
- | `End_element -> depth - 1
- | _ -> depth
- in if depth = 0
- then [List.rev (s :: signals) |> of_list |> write_xml |> to_string], Some (0, [])
- else [], Some (depth, s :: signals))
- (0, [])
- |> Markup_lwt.to_lwt_stream
- in let+ _ = Lwt_stream.iter (fun x -> ws_push (Some x)) stanzas in ws_push None
- in Lwt.async @@ (fun () -> Markup_lwt.(mu_stream |> lwt_stream |> iter to_frames));
- ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, mu_push
+ let stanza = ref [] in
+ let total_depth = ref 0 in
+ let stanza_to_string stanza = Markup.(!stanza |> List.rev |> of_list |> write_xml |> to_string) in
+ (* Consume a single fragment, and add it to the "stanza" ref if it's not complete -- as soon as it's completed, send it. *)
+ let chomp_fragment depth fragment =
+ let depth = match fragment with
+ | `Start_element _ -> depth + 1
+ | `End_element -> depth - 1
+ | _ -> depth
+ in stanza := fragment :: !stanza;
+ total_depth := depth;
+ (if depth = 0 then let s = [stanza_to_string stanza] in stanza := []; s else []), Some depth
+ in let push = function
+ | None -> ws_push None
+ | Some fragment ->
+ Lwt.async
+ (fun () -> content fragment
+ |> Markup.transform chomp_fragment !total_depth
+ |> Markup_lwt.to_lwt_stream
+ |> Lwt_stream.iter (fun s -> ws_push (Some s)))
+ in ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push