diff options
author | Clombrong <cromblong@egregore.fun> | 2025-06-26 21:29:30 +0200 |
---|---|---|
committer | Clombrong <cromblong@egregore.fun> | 2025-06-27 08:56:42 +0200 |
commit | 96989d5ca5aa15035352dcd4e80fa944d9ebd708 (patch) | |
tree | aac4b0e59e1a73f1367222a35f881dec8d783be4 /portal/ws/portal.ml | |
parent | 7e83c196ca21aef56c42fd122f0d65cfa4becd6c (diff) |
fix(portal_ws): simplify frame grouping using Lwt_streams
Diffstat (limited to 'portal/ws/portal.ml')
-rw-r--r-- | portal/ws/portal.ml | 40 |
1 files changed, 17 insertions, 23 deletions
diff --git a/portal/ws/portal.ml b/portal/ws/portal.ml index f14a2ab..b2d21e1 100644 --- a/portal/ws/portal.ml +++ b/portal/ws/portal.ml @@ -134,26 +134,20 @@ let connect domain = in (* Consumes a stream of Markup.ml signals into a series of frames sent to the WebSocket. *) - let stanza = ref [] in - let total_depth = ref 0 in - let stanza_to_string stanza = - Markup.(!stanza |> List.rev |> of_list |> write_xml |> to_string) in - (* Consume a single fragment, and add it to the "stanza" ref if it's not complete -- - as soon as it's completed, send it. *) - let chomp_fragment depth fragment = - let depth = match fragment with - | `Start_element _ -> depth + 1 - | `End_element -> depth - 1 - | _ -> depth - in stanza := fragment :: !stanza; - total_depth := depth; - (if depth = 0 then let s = [stanza_to_string stanza] in stanza := []; s else []), Some depth - in let push = function - | None -> ws_push None - | Some fragment -> - Lwt.async - (fun () -> content fragment - |> Markup.transform chomp_fragment !total_depth - |> Markup_lwt.to_lwt_stream - |> Lwt_stream.iter (fun s -> ws_push (Some s))) - in ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push + let fragment_stream, fragment_push = Lwt_stream.create () in + let stream = ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals + and push = function + | Some fs -> Markup.iter (fun f -> fragment_push (Some f)) fs + | None -> fragment_push None + in + (* Elements filters all elements based on the `Start_element. By simply making it + return true every time, we get a stream of stream of elements, with each sub-stream + being a full frame. *) + Lwt.async + (fun () -> fragment_stream + |> lwt_stream + |> elements (fun _ _ -> true) + |> map (fun x -> write_xml x |> to_string) + |> to_lwt_stream + |> Lwt_stream.iter (fun s -> ws_push (Some s))); + stream, push |