diff options
author | Clombrong <cromblong@egregore.fun> | 2025-06-25 10:11:34 +0200 |
---|---|---|
committer | Clombrong <cromblong@egregore.fun> | 2025-06-25 10:11:34 +0200 |
commit | 43755d2956879e72056c900a00248be9f240fe39 (patch) | |
tree | 87f08368f85b148f8ef846301afe87e9e3c3ba6c /portal/ws | |
parent | d9457c4b845d1027075e39f40a4dbf6d401eebaa (diff) |
feat(portal): remove lib/ subdirectory
Diffstat (limited to 'portal/ws')
-rw-r--r-- | portal/ws/dune | 6 | ||||
-rw-r--r-- | portal/ws/portal.ml | 145 |
2 files changed, 151 insertions, 0 deletions
diff --git a/portal/ws/dune b/portal/ws/dune new file mode 100644 index 0000000..22de458 --- /dev/null +++ b/portal/ws/dune @@ -0,0 +1,6 @@ +(library + (name portal_ws) + (implements portal) + (public_name portal-websockets) + (libraries lwt js_of_ocaml js_of_ocaml-lwt markup markup-lwt) + (preprocess (pps js_of_ocaml-ppx))) diff --git a/portal/ws/portal.ml b/portal/ws/portal.ml new file mode 100644 index 0000000..3fd142a --- /dev/null +++ b/portal/ws/portal.ml @@ -0,0 +1,145 @@ +open Lwt.Syntax +open Lwt.Infix +open Js_of_ocaml +open Markup +let jss = Js.string +let sjs = Js.to_string + +type t = (signal, async) stream * ((signal, sync) stream option -> unit) + +let xmlns = "urn:ietf:params:xml:ns:xmpp-framing" + +(* sic. XEP-0156: "host-meta files MUST be fetched only over HTTPS". I don't make the + rules. *) +let well_known_of (domain : string) = "https://" ^ domain ^ "/.well-known/host-meta" + +let stanza_open ?from domain : (signal, sync) stream = + (** [open_stanza domain] is an <open /> stanza for [domain]. + + If [from] is specified, the <open /> stanza has the from parameter. + *) + let open Markup in + let stanza = + let attributes = + let open Option in + [(("", "xmlns"), xmlns); + (("", "to"), domain)] + @ (map (fun jid -> (("", "from"), jid)) from |> to_list) + @ [(("", "version"), "1.0")] + in + [`Start_element + ((xmlns, "open"), + attributes); + `End_element] + in stanza |> of_list + +let stanza_close = {|<close xmlns="|} ^ xmlns ^ {|" />|} |> string |> parse_xml |> signals + +exception MalformedStanza of Markup.location * Markup.Error.t + +let ws_endpoint (domain : string) = + (** [ws_endpoint domain] is a promise containing the XMPP websocket endpoint + associated with [domain], by using the domain's Web-host Metadata. + + This function uses XMLHttpRequest, so while it should work fine in the browser, in + environments that don't provide this constructor (Node.js), there should be some + sort of polyfill. + + Lastly, if [domain] doesn't provide a well-formed Web-host Metadata file, the + function throws an exception. *) + let open Markup in + (* This ugly function extracts the href element from a Link tag's attributes if it's a + websocket. *) + let link_websocket = function + | ((_, "rel"), "urn:xmpp:alt-connections:websocket") :: ((_, "href"), href) :: _ + | ((_, "href"), href) :: ((_, "rel"), "urn:xmpp:alt-connections:websocket") :: _ + -> Some href + | _ -> None + in let parse_xrd xrd = + string xrd + |> parse_xml + |> signals + |> tree ~element:(fun (_, name) attributes children -> + match name with + | "Link" -> link_websocket attributes + | "XRD" -> List.find_map (fun x -> x) children + | _ -> None + ) + |> Option.join + in let+ host_meta = + Js_of_ocaml_lwt.XmlHttpRequest.perform_raw_url (well_known_of domain) + in match parse_xrd host_meta.content with + | Some x -> x + | None -> failwith (domain ^ "doesn't advertise a WebSocket endpoint via Web-host Metadata.") + +let ws_stream (url : string) = + (** [ws_stream url] is a promise to a framed Lwt stream (and its push function) + communicating with the websocket located at [url] using the XMPP protocol. + + Valid XMPP WebSocket subprotocol frames must be sent to the stream, because it + directly exposes the websocket under. + + Pushing [None] closes the websocket. + + If the websocket is closed server-side, the stream closes. *) + let open Lwt_stream in + let stream, message = create () in + let open Lwt_condition in + let is_open = create () in + let (ws : WebSockets.webSocket Js.t) = + new%js WebSockets.webSocket_withProtocol (jss url) (jss "xmpp") + in let push = function + | Some msg -> ws##send (jss msg) + | None -> ws##close + in ws##.onclose := Dom.handler (fun _ -> message None; Js._true); + ws##.onmessage := Dom.handler (fun x -> Some (sjs x##.data) |> message; Js._false); + ws##.onopen := Dom.handler (fun _ -> signal is_open (); Js._false); + let+ () = wait is_open in stream, push + +let connect domain = + (** [connect domain] is an Lwt stream (and its push function) communicating with the + XMPP server running at [domain] via the Websocket subprotocol. + + This function is a complex wrapper around ws_stream, that accepts Markup.ml + signals and sends framed XML stanzas to the underlying socket, with exactly one + stanza per frame, according to RFC 7935. + + In essence, it (tries to) expose an identical interface to the original XMPP + streamed protocol. + + Here's an ASCII rendered flow of the data through the various streams. + / -> push -> mu_stream -> to_frames -> ws_push -> \ + function user websocket + \ <--- stream <--- markup_lwt <--- ws_stream <--- / *) + let+ ws_stream, ws_push = ws_endpoint domain >>= ws_stream in + let open Markup_lwt in + (* When sending a malformed stanza (one that Markup.ml doesn't like), a + MalformedStanza exception is raised. *) + let report loc err = + raise (MalformedStanza (loc, err)) + in + (* Consumes a stream of Markup.ml signals into a series of frames sent to the + WebSocket. *) + let stanza = ref [] in + let total_depth = ref 0 in + let stanza_to_string stanza = + Markup.(!stanza |> List.rev |> of_list |> write_xml |> to_string) in + (* Consume a single fragment, and add it to the "stanza" ref if it's not complete -- + as soon as it's completed, send it. *) + let chomp_fragment depth fragment = + let depth = match fragment with + | `Start_element _ -> depth + 1 + | `End_element -> depth - 1 + | _ -> depth + in stanza := fragment :: !stanza; + total_depth := depth; + (if depth = 0 then let s = [stanza_to_string stanza] in stanza := []; s else []), Some depth + in let push = function + | None -> ws_push None + | Some fragment -> + Lwt.async + (fun () -> content fragment + |> Markup.transform chomp_fragment !total_depth + |> Markup_lwt.to_lwt_stream + |> Lwt_stream.iter (fun s -> ws_push (Some s))) + in ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals, push |