diff options
Diffstat (limited to 'portal')
-rw-r--r-- | portal/ws/portal.ml | 40 |
1 files changed, 17 insertions, 23 deletions
diff --git a/portal/ws/portal.ml b/portal/ws/portal.ml index f14a2ab..b2d21e1 100644 --- a/portal/ws/portal.ml +++ b/portal/ws/portal.ml @@ -134,26 +134,20 @@ let connect domain = in (* Consumes a stream of Markup.ml signals into a series of frames sent to the WebSocket. *) - let stanza = ref [] in - let total_depth = ref 0 in - let stanza_to_string stanza = - Markup.(!stanza |> List.rev |> of_list |> write_xml |> to_string) in - (* Consume a single fragment, and add it to the "stanza" ref if it's not complete -- - as soon as it's completed, send it. *) - let chomp_fragment depth fragment = - let depth = match fragment with - | `Start_element _ -> depth + 1 - | `End_element -> depth - 1 - | _ -> depth - in stanza := fragment :: !stanza; - total_depth := depth; - (if depth = 0 then let s = [stanza_to_string stanza] in stanza := []; s else []), Some depth - in let push = function - | None -> ws_push None - | Some fragment -> - Lwt.async - (fun () -> content fragment - |> Markup.transform chomp_fragment !total_depth - |> Markup_lwt.to_lwt_stream - |> Lwt_stream.iter (fun s -> ws_push (Some s))) - in ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push + let fragment_stream, fragment_push = Lwt_stream.create () in + let stream = ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals + and push = function + | Some fs -> Markup.iter (fun f -> fragment_push (Some f)) fs + | None -> fragment_push None + in + (* Elements filters all elements based on the `Start_element. By simply making it + return true every time, we get a stream of stream of elements, with each sub-stream + being a full frame. *) + Lwt.async + (fun () -> fragment_stream + |> lwt_stream + |> elements (fun _ _ -> true) + |> map (fun x -> write_xml x |> to_string) + |> to_lwt_stream + |> Lwt_stream.iter (fun s -> ws_push (Some s))); + stream, push |