diff options
author | Clombrong <cromblong@egregore.fun> | 2025-06-18 16:26:25 +0200 |
---|---|---|
committer | Clombrong <cromblong@egregore.fun> | 2025-06-18 16:27:03 +0200 |
commit | 290dbfcf467f5045e26e25c17dced6884a6e47f5 (patch) | |
tree | f031bf7d6df0318583fe954e3cb948a4e850914a | |
parent | ca2b815765a75afa77b7da31ce0f9565ee051c3c (diff) |
feat(portal-websockets): use Markup.ml signals in connect
-rw-r--r-- | portal/lib/ws/portal.ml | 52 |
1 files changed, 24 insertions, 28 deletions
diff --git a/portal/lib/ws/portal.ml b/portal/lib/ws/portal.ml index d8afab3..0be0d81 100644 --- a/portal/lib/ws/portal.ml +++ b/portal/lib/ws/portal.ml @@ -100,38 +100,34 @@ let connect domain = In essence, it (tries to) expose an identical interface to the original XMPP streamed protocol. Here's an ASCII rendered flow of the data through the various streams. - / -> push -> mu_stream -> to_frames -> ws_push -> \ + / -> push -> mu_stream -> to_frames -> ws_push -> \ function user websocket - \ <--- stream <--- markup_lwt <--- ws_stream <--- / *) - let open Lwt_stream in - let+ ws_stream, ws_push = ws_endpoint domain >|= ws_stream in - let mu_stream, mu_push = create () in + \ <--- stream <--- markup_lwt <--- ws_stream <--- / *) + let+ ws_stream, ws_push = ws_endpoint domain >>= ws_stream in let open Markup_lwt in (* When sending a malformed stanza (one that Markup.ml doesn't like), a MalformedStanza exception is raised. *) let report loc err = raise (MalformedStanza (loc, err)) in (* Consumes a stream of Markup.ml signals into a series of frames sent to the WebSocket. *) - let to_frames fragments = - let stanzas = - let open Markup in - (* Convert a stream of strings to a stream of characters. *) - (* XML declarations are not to be transmitted to the underlying WebSocket, - per IETF recommendation. https://datatracker.ietf.org/doc/html/rfc7395#section-3.3.3 *) - content fragments - |> transform - (* Parse well-formed signals. As soon as we have enough elements to send a well-formed stanza, we send it to - the underlying WebSocket. *) - (fun (depth, signals) s -> - let depth = match s with - | `Start_element _ -> depth + 1 - | `End_element -> depth - 1 - | _ -> depth - in if depth = 0 - then [List.rev (s :: signals) |> of_list |> write_xml |> to_string], Some (0, []) - else [], Some (depth, s :: signals)) - (0, []) - |> Markup_lwt.to_lwt_stream - in let+ _ = Lwt_stream.iter (fun x -> ws_push (Some x)) stanzas in ws_push None - in Lwt.async @@ (fun () -> Markup_lwt.(mu_stream |> lwt_stream |> iter to_frames)); - ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, mu_push + let stanza = ref [] in + let total_depth = ref 0 in + let stanza_to_string stanza = Markup.(!stanza |> List.rev |> of_list |> write_xml |> to_string) in + (* Consume a single fragment, and add it to the "stanza" ref if it's not complete -- as soon as it's completed, send it. *) + let chomp_fragment depth fragment = + let depth = match fragment with + | `Start_element _ -> depth + 1 + | `End_element -> depth - 1 + | _ -> depth + in stanza := fragment :: !stanza; + total_depth := depth; + (if depth = 0 then let s = [stanza_to_string stanza] in stanza := []; s else []), Some depth + in let push = function + | None -> ws_push None + | Some fragment -> + Lwt.async + (fun () -> content fragment + |> Markup.transform chomp_fragment !total_depth + |> Markup_lwt.to_lwt_stream + |> Lwt_stream.iter (fun s -> ws_push (Some s))) + in ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push |