summaryrefslogtreecommitdiff
path: root/portal/lib/ws/portal.ml
blob: 6e4a2319e308fc15b975751e3bb729c16e856777 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
open Lwt.Syntax
open Lwt.Infix
open Js_of_ocaml
let jss = Js.string
let sjs = Js.to_string

type t = string Lwt_stream.t * (string option -> unit)

(* sic. XEP-0156: "host-meta files MUST be fetched only over HTTPS". I don't make the rules. *)
let well_known_of (domain : string) = "https://" ^ domain ^ "/.well-known/host-meta"

let stanza_open domain =
  (** [open_stanza domain] is an <open /> stanza for [domain]. *)
  Printf.sprintf
    {|<open xmlns="urn:ietf:params:xml:ns:xmpp-framing" to="%s" version="1.0" />|}
    domain

let stanza_close = {|<close xmlns="urn:ietf:params:xml:ns:xmpp-framing" />|}

exception MalformedStanza of Markup.location * Markup.Error.t

let ws_endpoint (domain : string) =
  (** [ws_endpoint domain] is a promise containing the XMPP websocket endpoint associated with [domain], by using the
      domain's Web-host Metadata.

      This function uses XMLHttpRequest, so while it should work fine in the browser, in environments that don't provide
      this constructor (Node.js), there should be some sort of polyfill.

      Lastly, if [domain] doesn't provide a well-formed Web-host Metadata file, the function throws an exception. *)
  let open Markup in
  (* This ugly function extracts the href element from a Link tag's attributes if it's a websocket. *)
  let link_websocket = function
    | ((_, "rel"), "urn:xmpp:alt-connections:websocket") :: ((_, "href"), href) :: _
      | ((_, "href"), href) :: ((_, "rel"), "urn:xmpp:alt-connections:websocket") :: _
      -> Some href
    | _ -> None
  in let parse_xrd xrd =
       string xrd
       |> parse_xml
       |> signals
       |> tree ~element:(fun (_, name) attributes children ->
              match name with
              | "Link" -> link_websocket attributes
              | "XRD" -> List.find_map (fun x -> x) children
              | _ -> None
            )
       |> Option.join
     in let+ host_meta = Js_of_ocaml_lwt.XmlHttpRequest.perform_raw_url (well_known_of domain)
        in match parse_xrd host_meta.content with
            | Some x -> x
            | None -> failwith (domain ^ "doesn't advertise a WebSocket endpoint via Web-host Metadata.")

let ws_stream (url : string) =
  (** [ws_stream url] returns a framed Lwt stream (and its push function) communicating with the websocket located at
      [url] using the XMPP protocol.Lwt

      Valid XMPP WebSocket subprotocol frames must be sent to the stream, because it directly exposes the websocket
      under.

      Pushing [None] closes the websocket.

      If the websocket is closed server-side, it's still up to the caller to close the stream. *)
  let open Lwt_stream in
  let handle ws incoming () =
    let+ _ = iter (fun msg -> ws##send (jss msg)) incoming
    in (ws##close)
  in let stream, message = create () (* websocket -> user *)
     and incoming, push = create () (* user -> websocket *)
     in let (ws : WebSockets.webSocket Js.t) = new%js WebSockets.webSocket_withProtocol (jss url) (jss "xmpp")
        in ws##.onmessage :=
             Dom.handler (fun x -> Some (sjs x##.data) |> message; Js._false);
           ws##.onopen :=
             Dom.handler (fun _ -> Lwt.async @@ handle ws incoming; Js._false);
           ws##.onclose :=
             Dom.handler (fun _ -> message None; Js._true);
           stream, push


let connect domain =
  (** [connect domain] is an Lwt stream (and its push function) communicating with the XMPP server running at [domain]
      via the Websocket subprotocol.

      This function is a complex wrapper around ws_stream, that accepts streamed XML and sends framed XML stanzas to
      the underlying socket, with exactly one stanza per frame, according to RFC 7935.

      It also sends the <close/> stanza used by the WebSocket subprotocol to the underlying WebSocket.

      In essence, it (tries to) expose an identical interface to the original XMPP streamed protocol.

      Here's an ASCII rendered flow of the data through the various streams.
      / -> push -> streamed_stanzas -> to_frames -> ws_push -> \
      function user                                     websocket
      \  <---- stream <----  filter_map  <---- ws_stream <---- / *)
  let open Lwt_stream in
  let+ stream, ws_push = ws_endpoint domain >|= ws_stream in
  let streamed_stanzas, push = create () in
  (* Consumes a stream of stanzas fragments into a series of frames sent to the WebSocket. *)
  let to_frames fragments =
    let stanzas =
      (* Convert a stream of strings to a stream of characters. *)
      let spliced =
        fragments
        |> map (fun fragment -> String.to_seq fragment |> of_seq)
        |> concat
      in
      let open Markup in
      (* When sending a malformed stanza (one that Markup.ml doesn't like), a MalformedStanza exception is raised. *)
      let report loc err =
        raise (MalformedStanza (loc, err))
      in
      spliced
      |> Markup_lwt.lwt_stream
      |> Markup_lwt.parse_xml ~report
      |> signals
    (*  XML declarations are not to be transmitted to the underlying WebSocket,
        per IETF recommendation. https://datatracker.ietf.org/doc/html/rfc7395#section-3.3.3 *)
      |> content
      |> transform
           (* Parse well-formed signals. As soon as we have enough elements to send a well-formed stanza, we send it to
              the underlying WebSocket. *)
           (fun (depth, signals) s ->
             let depth = match s with
               | `Start_element _ -> depth + 1
               | `End_element -> depth - 1
               | _ -> depth
             in if depth = 0
                then [List.rev (s :: signals) |> of_list |> write_xml |> to_string], Some (0, [])
                else [], Some (depth, s :: signals))
           (0, [])
      |> Markup_lwt.to_lwt_stream
    in let+ _ = iter (fun x -> ws_push (Some x)) stanzas in ws_push None
  in Lwt.async @@ (fun () -> to_frames streamed_stanzas);
     stream, push