From 8839d911afb6404033d640845295539fb63fcdb2 Mon Sep 17 00:00:00 2001 From: Clombrong Date: Wed, 18 Jun 2025 00:39:27 +0200 Subject: feat(portal-websockets): make connect accept and return signals --- portal/lib/ws/portal.ml | 44 +++++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/portal/lib/ws/portal.ml b/portal/lib/ws/portal.ml index b6a9f00..1bffb6d 100644 --- a/portal/lib/ws/portal.ml +++ b/portal/lib/ws/portal.ml @@ -95,41 +95,31 @@ let connect domain = (** [connect domain] is an Lwt stream (and its push function) communicating with the XMPP server running at [domain] via the Websocket subprotocol. - This function is a complex wrapper around ws_stream, that accepts streamed XML and sends framed XML stanzas to - the underlying socket, with exactly one stanza per frame, according to RFC 7935. - - It also sends the stanza used by the WebSocket subprotocol to the underlying WebSocket. + This function is a complex wrapper around ws_stream, that accepts Markup.ml signals and sends framed XML stanzas + to the underlying socket, with exactly one stanza per frame, according to RFC 7935. In essence, it (tries to) expose an identical interface to the original XMPP streamed protocol. Here's an ASCII rendered flow of the data through the various streams. - / -> push -> streamed_stanzas -> to_frames -> ws_push -> \ + / -> push -> mu_stream -> to_frames -> ws_push -> \ function user websocket - \ <---- stream <---- filter_map <---- ws_stream <---- / *) + \ <--- stream <--- markup_lwt <--- ws_stream <--- / *) let open Lwt_stream in - let+ stream, ws_push = ws_endpoint domain >|= ws_stream in - let streamed_stanzas, push = create () in - (* Consumes a stream of stanzas fragments into a series of frames sent to the WebSocket. *) + let+ ws_stream, ws_push = ws_endpoint domain >|= ws_stream in + let mu_stream, mu_push = create () in + let open Markup_lwt in + (* When sending a malformed stanza (one that Markup.ml doesn't like), a MalformedStanza exception is raised. *) + let report loc err = + raise (MalformedStanza (loc, err)) + in + (* Consumes a stream of Markup.ml signals into a series of frames sent to the WebSocket. *) let to_frames fragments = let stanzas = - (* Convert a stream of strings to a stream of characters. *) - let spliced = - fragments - |> map (fun fragment -> String.to_seq fragment |> of_seq) - |> concat - in let open Markup in - (* When sending a malformed stanza (one that Markup.ml doesn't like), a MalformedStanza exception is raised. *) - let report loc err = - raise (MalformedStanza (loc, err)) - in - spliced - |> Markup_lwt.lwt_stream - |> Markup_lwt.parse_xml ~report - |> signals + (* Convert a stream of strings to a stream of characters. *) (* XML declarations are not to be transmitted to the underlying WebSocket, per IETF recommendation. https://datatracker.ietf.org/doc/html/rfc7395#section-3.3.3 *) - |> content + content fragments |> transform (* Parse well-formed signals. As soon as we have enough elements to send a well-formed stanza, we send it to the underlying WebSocket. *) @@ -143,6 +133,6 @@ let connect domain = else [], Some (depth, s :: signals)) (0, []) |> Markup_lwt.to_lwt_stream - in let+ _ = iter (fun x -> ws_push (Some x)) stanzas in ws_push None - in Lwt.async @@ (fun () -> to_frames streamed_stanzas); - stream, push + in let+ _ = Lwt_stream.iter (fun x -> ws_push (Some x)) stanzas in ws_push None + in Lwt.async @@ (fun () -> mu_stream |> Markup_lwt.lwt_stream |> to_frames); + ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report, mu_push -- cgit v1.2.3