aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorClombrong <clombrong@egregore.fun>2025-05-09 14:01:01 +0200
committerClombrong <cromblong@egregore.fun>2025-05-09 14:01:01 +0200
commit5ade97f7b87c982b1acdcd5cb66a04cc171d5bed (patch)
treea2ffbda04f2e1b1c49d493ce521bf20627bf7b84
parent5758525fe2cf55b8b713197424d88118543ccfc6 (diff)
feat!(portal_ws): to_frames function relatively complete
-rw-r--r--portal/lib/portal_ws.ml32
-rw-r--r--portal/test/js/websockets_hello.ml7
2 files changed, 33 insertions, 6 deletions
diff --git a/portal/lib/portal_ws.ml b/portal/lib/portal_ws.ml
index cd9c08f..ac2f0de 100644
--- a/portal/lib/portal_ws.ml
+++ b/portal/lib/portal_ws.ml
@@ -91,11 +91,35 @@ let connect domain =
let+ stream, ws_push = ws_endpoint domain >|= ws_stream in
let streamed_stanzas, push = create () in
(* Consumes a stream of stanzas fragments into a series of frames sent to the WebSocket. *)
- (* Right now... This doesn't do much. *)
let to_frames fragments =
let stanzas =
- let open Markup_lwt in
- lwt_stream fragments |> to_lwt_stream
- in let+ _ = Lwt_stream.iter (fun x -> ws_push (Some x)) stanzas in ws_push None
+ (* Convert a stream of strings to a stream of characters. *)
+ let spliced =
+ fragments
+ |> map (fun fragment -> String.to_seq fragment |> of_seq)
+ |> concat
+ in
+ let open Markup in
+ spliced
+ |> Markup_lwt.lwt_stream
+ |> Markup_lwt.parse_xml
+ |> signals
+ (* XML declarations are not to be transmitted to the underlying WebSocket,
+ per IETF recommendation. https://datatracker.ietf.org/doc/html/rfc7395#section-3.3.3 *)
+ |> content
+ |> transform
+ (* Parse well-formed signals. As soon as we have enough elements to send a well-formed signal, we send it to
+ the underlying WebSocket. Currently, sending a non-well-formed signal is... undefined behavior? *)
+ (fun (depth, signals) s ->
+ let depth = match s with
+ | `Start_element _ -> depth + 1
+ | `End_element -> depth - 1
+ | _ -> depth
+ in if depth = 0
+ then [List.rev (s :: signals) |> of_list |> write_xml |> to_string], Some (0, [])
+ else [], Some (depth, s :: signals))
+ (0, [])
+ |> Markup_lwt.to_lwt_stream
+ in let+ _ = iter (fun x -> ws_push (Some x)) stanzas in ws_push None
in Lwt.async @@ (fun () -> to_frames streamed_stanzas);
stream, push
diff --git a/portal/test/js/websockets_hello.ml b/portal/test/js/websockets_hello.ml
index 1e8b364..73024e5 100644
--- a/portal/test/js/websockets_hello.ml
+++ b/portal/test/js/websockets_hello.ml
@@ -17,9 +17,12 @@ let rec run t =
let () =
run @@
- let domain = "squarebowl.club" in
+ let domain = "egregore.fun" in
let* stream, push =
Portal_ws.connect domain in
- push (Some {|<malformed/>|});
+ push (Some (Portal_ws.stanza_open domain));
+ push (Some "<iq>malformed");
+ push (Some "</iq>");
+ push (Some Portal_ws.stanza_close);
let+ _ = Lwt_stream.iter (fun f -> print_endline f) stream
in push None