open Lwt.Syntax open Markup type socket = Lwt_unix.file_descr type t = { mutable stream : (signal, async) stream; mutable push : (signal, sync) stream option -> unit; mutable _socket : socket; } let xmlns = "http://etherx.jabber.org/streams" exception MalformedStanza of Markup.location * Markup.Error.t let header ?from domain ({stream; push; _} : t) = let stanza = let attributes = [(("", "to"), domain); (("", "version"), "1.0"); (("http://www.w3.org/XML/1998/namespace", "lang"), "en"); (("http://www.w3.org/2000/xmlns/", "xmlns"), "jabber:client"); (("http://www.w3.org/2000/xmlns/", "stream"), xmlns)] in [`Xml {version="1.0"; encoding=None; standalone=None}; `Start_element (("http://etherx.jabber.org/streams", "stream"), Option.fold ~none:attributes ~some:(fun jid -> (("", "from"), jid) :: attributes) from); (* Markup.ml is a streaming parser, but blocks on standalone [`Start_element] because it doesn't know if this specific element should be self-closing or not, so [write_xml] never spits out the start of the stream. Adding an empty comment resolves the ambiguity. I'm not a fan of it. If you have Github, feel free to get the word out to aantron. *) `Comment ""] in push (Some (of_list stanza)); let some_id ((_, name), value) = if name = "id" then Some value else None in let* xml = Markup_lwt.next stream in let* id = match xml with | Some `Xml {version="1.0"; encoding=None; standalone=None} -> let* stream_open = Markup_lwt.next stream in begin match stream_open with | Some `Start_element ((ns, "stream"), attributes) when ns = xmlns-> List.find_map some_id attributes |> Lwt.return | _ -> Lwt.return_none end | _ -> Lwt.return_none in match id with | Some id -> Lwt.return id | None -> Lwt.fail_with "Invalid stream opening server-side." (** [close portal] is a closing tag to the [] document. *) let close = [`End_element] |> Markup.of_list (** [xmpp_port domain] is the port where [domain]'s XMPP server is hosted. Currently, it falls back to 5222 (always), but should use SRV records in the near future. *) let xmpp_port (_domain : string) : int = 5222 (** [tcp_socket domain] is a plaintext TCP socket to the XMPP server [domain]. *) let tcp_socket (domain : string) : Lwt_unix.file_descr Lwt.t = let open Lwt_unix in let get_socket {ai_addr; ai_family; _} = let sock = socket ai_family SOCK_STREAM 0 in let+ () = Lwt_unix.connect sock ai_addr in sock and port_number = xmpp_port domain |> string_of_int in let* addrinfos = getaddrinfo domain port_number [AI_SOCKTYPE SOCK_STREAM] in List.map get_socket addrinfos |> Lwt.pick (** [socket_to_stream sock] is a [stream, push] tuple wrapping the Unix socket [sock] in Markup signals. *) let socket_to_stream (sock : socket) = let raw_stream = let buffer = Bytes.create 4096 in Lwt_stream.from (fun () -> let* len = Lwt_unix.read sock buffer 0 4096 in match len with | 0 -> Lwt.return_none | len -> Lwt.return_some (Bytes.sub_string buffer 0 len)) and send_char c = (* This is gross, but it doesn't matter because TCP does buffering. *) let+ _ = Lwt_unix.write_string sock (Char.escaped c) 0 1 in () and xml_stream, xml_push = Lwt_stream.create () in let push = function | None -> xml_push None | Some signals -> Markup.iter (fun f -> xml_push (Some f)) signals and report loc err = raise (MalformedStanza (loc, err)) in let open Markup_lwt in let stream = raw_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals in Lwt.async (fun () -> let* _ = lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send_char in Lwt_unix.close sock); (stream, push) (** [connect domain] is a Portal.t communicating with the XMPP server located at [domain] via plaintext TCP. It simply chains the two previous functions. *) let connect (domain : string) : t Lwt.t = let+ _socket = tcp_socket domain in let stream, push = socket_to_stream _socket in {stream; push; _socket}