aboutsummaryrefslogtreecommitdiff
path: root/portal/ws/portal.ml
diff options
context:
space:
mode:
Diffstat (limited to 'portal/ws/portal.ml')
-rw-r--r--portal/ws/portal.ml40
1 files changed, 17 insertions, 23 deletions
diff --git a/portal/ws/portal.ml b/portal/ws/portal.ml
index f14a2ab..b2d21e1 100644
--- a/portal/ws/portal.ml
+++ b/portal/ws/portal.ml
@@ -134,26 +134,20 @@ let connect domain =
in
(* Consumes a stream of Markup.ml signals into a series of frames sent to the
WebSocket. *)
- let stanza = ref [] in
- let total_depth = ref 0 in
- let stanza_to_string stanza =
- Markup.(!stanza |> List.rev |> of_list |> write_xml |> to_string) in
- (* Consume a single fragment, and add it to the "stanza" ref if it's not complete --
- as soon as it's completed, send it. *)
- let chomp_fragment depth fragment =
- let depth = match fragment with
- | `Start_element _ -> depth + 1
- | `End_element -> depth - 1
- | _ -> depth
- in stanza := fragment :: !stanza;
- total_depth := depth;
- (if depth = 0 then let s = [stanza_to_string stanza] in stanza := []; s else []), Some depth
- in let push = function
- | None -> ws_push None
- | Some fragment ->
- Lwt.async
- (fun () -> content fragment
- |> Markup.transform chomp_fragment !total_depth
- |> Markup_lwt.to_lwt_stream
- |> Lwt_stream.iter (fun s -> ws_push (Some s)))
- in ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push
+ let fragment_stream, fragment_push = Lwt_stream.create () in
+ let stream = ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals
+ and push = function
+ | Some fs -> Markup.iter (fun f -> fragment_push (Some f)) fs
+ | None -> fragment_push None
+ in
+ (* Elements filters all elements based on the `Start_element. By simply making it
+ return true every time, we get a stream of stream of elements, with each sub-stream
+ being a full frame. *)
+ Lwt.async
+ (fun () -> fragment_stream
+ |> lwt_stream
+ |> elements (fun _ _ -> true)
+ |> map (fun x -> write_xml x |> to_string)
+ |> to_lwt_stream
+ |> Lwt_stream.iter (fun s -> ws_push (Some s)));
+ stream, push