summaryrefslogtreecommitdiff
path: root/portal/ws/portal.ml
blob: 0d15e5a391b0d652bf061a729d2c801ceb9dea99 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
open Lwt.Syntax
open Lwt.Infix
open Js_of_ocaml
open Markup
let jss = Js.string
let sjs = Js.to_string

type socket = WebSockets.webSocket Js.t

type t = {
    mutable stream : (signal, async) stream;
    mutable push : (signal, sync) stream option -> unit;
    mutable _socket : socket;
  }

let xmlns = "urn:ietf:params:xml:ns:xmpp-framing"

exception MalformedStanza of Markup.Error.t

(* sic. XEP-0156: "host-meta files MUST be fetched only over HTTPS". I don't make the
   rules. *)
let well_known_of (domain : string) = "https://" ^ domain ^ "/.well-known/host-meta"

(** [open_stanza domain] is an <open /> stanza for [domain].

    If [from] is specified, the <open /> stanza has the from parameter. *)
let header ?from domain {stream; push; _} =
  let stanza =
    let attributes =
      let open Option in
      [(("", "xmlns"), xmlns);
       (("", "to"), domain)]
      @ (map (fun jid -> (("", "from"), jid)) from |> to_list)
      @ [(("", "version"), "1.0")]
    in
    [`Start_element
       ((xmlns, "open"),
        attributes);
     `End_element]
  in push (Some (of_list stanza));
     let some_id ((_, name), value) = if name = "id" then Some value else None in
     let* stanza_open = Markup_lwt.next stream in
     let* id = match stanza_open with
       | Some `Start_element ((ns, "open"), attributes) when ns = xmlns ->
          let* close = Markup_lwt.next stream in
          begin match close with
          | Some `End_element -> List.find_map some_id attributes |> Lwt.return
          | _ -> Lwt.return_none
          end
       | _ -> Lwt.return_none
     in match id with
        | Some id -> Lwt.return id
        | None -> Lwt.fail_with "Invalid stream opening server-side."

(** [close] is a [<close/>] stanza. *)
let close = {|<close xmlns="|} ^ xmlns ^ {|" />|} |> string |> parse_xml |> signals

(** [ws_endpoint domain] is a promise containing the XMPP websocket endpoint
    associated with [domain], by using the domain's Web-host Metadata.

    This function uses XMLHttpRequest, so while it should work fine in the browser, in
    environments that don't provide this constructor (Node.js), there should be some
    sort of polyfill.

    Lastly, if [domain] doesn't provide a well-formed Web-host Metadata file, the
    function throws an exception. *)
let ws_endpoint (domain : string) : string Lwt.t =
  let open Markup in
  (* This ugly function extracts the href element from a Link tag's attributes if it's a
     websocket. *)
  let link_websocket = function
    | ((_, "rel"), "urn:xmpp:alt-connections:websocket") :: ((_, "href"), href) :: _
      | ((_, "href"), href) :: ((_, "rel"), "urn:xmpp:alt-connections:websocket") :: _
      -> Some href
    | _ -> None
  in let parse_xrd xrd =
       string xrd
       |> parse_xml
       |> signals
       |> tree ~element:(fun (_, name) attributes children ->
              match name with
              | "Link" -> link_websocket attributes
              | "XRD" -> List.find_map (fun x -> x) children
              | _ -> None
            )
       |> Option.join
     in let* host_meta =
          Js_of_ocaml_lwt.XmlHttpRequest.perform_raw_url (well_known_of domain)
        in match parse_xrd host_meta.content with
           | Some x -> Lwt.return x
           | None -> Lwt.fail_with (domain ^ ": no WebSocket endpoint in Web-host Metadata.")

(** [ws_stream url] is a promise to a framed Lwt stream (and its push function)
    communicating with the websocket located at [url] using the XMPP protocol.

    Valid XMPP WebSocket subprotocol frames must be sent to the stream, because it
    directly exposes the websocket under.

    If the websocket is closed server-side, the stream closes. *)
let ws_stream (url : string) =
  let open Lwt_stream in
  let stream, message = create () in
  let open Lwt_condition in
  let is_open = create () in
  let (ws : WebSockets.webSocket Js.t) =
    new%js WebSockets.webSocket_withProtocol (jss url) (jss "xmpp")
  in let push = function
       | Some msg -> ws##send (jss msg)
       | None -> ws##close
     in ws##.onclose := Dom.handler (fun _ -> message None; Js._true);
        ws##.onmessage := Dom.handler (fun x -> Some (sjs x##.data) |> message; Js._false);
        ws##.onopen := Dom.handler (fun _ -> signal is_open (); Js._false);
        let+ () = wait is_open in stream, push, ws

(** [connect domain] is an Lwt stream (and its push function) communicating with the
    XMPP server running at [domain] via the Websocket subprotocol.

    This function is a complex wrapper around ws_stream, that accepts Markup.ml
    signals and sends framed XML stanzas to the underlying socket, with exactly one
    stanza per frame, according to RFC 7935.

    In essence, it (tries to) expose an identical interface to the original XMPP
    streamed protocol.

    Here's an ASCII rendered flow of the data through the various streams.
        / -> push -> mu_stream -> to_frames -> ws_push -> \
    function user                                     websocket
        \ <--- stream <--- markup_lwt <--- ws_stream <--- / *)
let connect domain =
  let+ ws_stream, ws_push, ws = ws_endpoint domain >>= ws_stream in
  let open Markup_lwt in
  (* When sending a malformed stanza (one that Markup.ml doesn't like), a
     MalformedStanza exception is raised. *)
  let report _ err =
    Lwt.fail (MalformedStanza err)
  in
  (* Consumes a stream of Markup.ml signals into a series of frames sent to the
     WebSocket. *)
  let fragment_stream, fragment_push = Lwt_stream.create () in
  let stream = ws_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals
  in
  let push = function
    | Some fs -> Markup.iter (fun f -> fragment_push (Some f)) fs
    | None -> begin
        (* We need to send the [<close>] stanza in full to the WebSocket. *)
        Markup.iter (fun f -> fragment_push (Some f)) close;
        Lwt.async
          (fun () ->
            (* We drain completely the stream when closing, so the socket can close. *)
            let+ () = Markup_lwt.drain stream
            in fragment_push None)
      end
  in
  (* Elements filters all elements based on the `Start_element. By simply making it
     return true every time, we get a stream of stream of elements, with each sub-stream
     being a full frame. *)
  Lwt.async
    (fun () -> fragment_stream
               |> lwt_stream
               |> elements (fun _ _ -> true)
               |> map (fun x -> write_xml x |> to_string)
               |> to_lwt_stream
               |> Lwt_stream.iter (fun s -> ws_push (Some s)));
  {stream; push; _socket=ws}

let starttls _ = Lwt.fail_with "STARTTLS is unimplemented in WebSockets."