1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
open Lwt.Syntax
open Lwt_unix
open Markup
type t = (signal, async) stream * ((signal, sync) stream option -> unit)
let xmlns = "jabber:client"
exception MalformedStanza of Markup.location * Markup.Error.t
let stanza_open ?from domain : (signal, sync) stream =
let stanza =
let attributes =
[(("", "to"), domain); (("", "version"), "1.0");
(("http://www.w3.org/XML/1998/namespace", "lang"), "en");
(("http://www.w3.org/2000/xmlns/", "xmlns"), xmlns);
(("http://www.w3.org/2000/xmlns/", "stream"),
"http://etherx.jabber.org/streams")]
in [`Start_element
(("http://etherx.jabber.org/streams", "stream"),
Option.fold
~none:attributes
~some:(fun jid -> (("", "from"), jid) :: attributes)
from);
(* Markup.ml is a streaming parser, but blocks on standalone [`Start_element]
because it doesn't know if this specific element should be self-closing or
not, so [write_xml] never spits out the start of the stream. Adding an empty
comment resolves the ambiguity. I'm not a fan of it.
If you have Github, feel free to get the word out to aantron. *)
`Comment ""]
in stanza |> of_list
let stanza_close : (signal, sync) stream = [`End_element] |> Markup.of_list
(** [xmpp_port domain] is the port where [domain]'s XMPP server is hosted.
Currently, it falls back to 5222 (always), but should use SRV records in the near
future. *)
let xmpp_port (_domain : string) : int = 5222
(** [tcp_stream domain] is a (stream, socket) tuple communicating with the XMPP server
hosted on [domain] via plaintext TCP. *)
let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t =
let get_socket {ai_addr; ai_family; _} =
let sock = socket ai_family SOCK_STREAM 0
in let+ () = Lwt_unix.connect sock ai_addr
in sock
and port_number = xmpp_port domain |> string_of_int in
let* addrinfos = getaddrinfo domain port_number [AI_SOCKTYPE SOCK_STREAM]
in let+ sock = List.map get_socket addrinfos |> Lwt.pick
in let stream =
Lwt_stream.from (fun () ->
let bsize = 4096 in
let buffer = Bytes.create bsize in
let* len = read sock buffer 0 bsize
in match len with
| 0 -> Lwt.return_none
| len -> Lwt.return_some (Bytes.sub_string buffer 0 len))
in (stream, sock)
(** [connect domain] is a Portal.t communicating with the XMPP server located at
[domain] via plaintext TCP.
This function is a comparatively simple wrapper around the original TCP stream,
simply converting to/from Markup.ml signals.
TODO: right now it's possible to get parts of unfinished stanzas... *)
let connect (domain : string) : t Lwt.t =
let+ tcp_stream, tcp_socket = tcp_stream domain in
let send msg =
(* This is gross, but it doesn't matter because TCP does buffering. *)
let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in ()
in
let xml_stream, xml_push = Lwt_stream.create () in
let push msg =
let none () = xml_push None; close tcp_socket
and some fragments () =
Markup.iter (fun f -> xml_push (Some f)) fragments |> Lwt.return
in Option.fold ~none ~some msg |> Lwt.async
and report loc err = raise (MalformedStanza (loc, err)) in
let open Markup_lwt in
let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals
in Lwt.async (fun () -> lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send);
stream, push
|