open Lwt.Syntax open Lwt_unix open Markup type t = (signal, async) stream * ((signal, sync) stream option -> unit) type socket = file_descr let xmlns = "http://etherx.jabber.org/streams" exception MalformedStanza of Markup.location * Markup.Error.t let header ?from domain ((stream, push) : t) = let stanza = let attributes = [(("", "to"), domain); (("", "version"), "1.0"); (("http://www.w3.org/XML/1998/namespace", "lang"), "en"); (("http://www.w3.org/2000/xmlns/", "xmlns"), "jabber:client"); (("http://www.w3.org/2000/xmlns/", "stream"), xmlns)] in [`Xml {version="1.0"; encoding=None; standalone=None}; `Start_element (("http://etherx.jabber.org/streams", "stream"), Option.fold ~none:attributes ~some:(fun jid -> (("", "from"), jid) :: attributes) from); (* Markup.ml is a streaming parser, but blocks on standalone [`Start_element] because it doesn't know if this specific element should be self-closing or not, so [write_xml] never spits out the start of the stream. Adding an empty comment resolves the ambiguity. I'm not a fan of it. If you have Github, feel free to get the word out to aantron. *) `Comment ""] in push (Some (of_list stanza)); let some_id ((_, name), value) = if name = "id" then Some value else None in let* xml = Markup_lwt.next stream in let* id = match xml with | Some `Xml {version="1.0"; encoding=None; standalone=None} -> let* stream_open = Markup_lwt.next stream in begin match stream_open with | Some `Start_element ((ns, "stream"), attributes) when ns = xmlns-> List.find_map some_id attributes |> Lwt.return | _ -> Lwt.return_none end | _ -> Lwt.return_none in match id with | Some id -> Lwt.return id | None -> Lwt.fail_with "Invalid stream opening server-side." let close (_, push) = [`End_element] |> Markup.of_list |> Option.some |> push (** [xmpp_port domain] is the port where [domain]'s XMPP server is hosted. Currently, it falls back to 5222 (always), but should use SRV records in the near future. *) let xmpp_port (_domain : string) : int = 5222 (** [tcp_stream domain] is a (stream, socket) tuple communicating with the XMPP server hosted on [domain] via plaintext TCP. *) let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t = let get_socket {ai_addr; ai_family; _} = let sock = socket ai_family SOCK_STREAM 0 in let+ () = Lwt_unix.connect sock ai_addr in sock and port_number = xmpp_port domain |> string_of_int in let* addrinfos = getaddrinfo domain port_number [AI_SOCKTYPE SOCK_STREAM] in let+ sock = List.map get_socket addrinfos |> Lwt.pick in let stream = Lwt_stream.from (fun () -> let bsize = 4096 in let buffer = Bytes.create bsize in let* len = read sock buffer 0 bsize in match len with | 0 -> Lwt.return_none | len -> Lwt.return_some (Bytes.sub_string buffer 0 len)) in (stream, sock) (** [connect domain] is a Portal.t communicating with the XMPP server located at [domain] via plaintext TCP. This function is a comparatively simple wrapper around the original TCP stream, simply converting to/from Markup.ml signals. TODO: right now it's possible to get parts of unfinished stanzas... *) let connect (domain : string) : t Lwt.t = let+ tcp_stream, tcp_socket = tcp_stream domain in let send msg = (* This is gross, but it doesn't matter because TCP does buffering. *) let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in () in let xml_stream, xml_push = Lwt_stream.create () in let push msg = let none () = xml_push None and some fragments () = Markup.iter (fun f -> xml_push (Some f)) fragments in Option.fold ~none ~some msg () and report loc err = raise (MalformedStanza (loc, err)) in let open Markup_lwt in let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals in Lwt.async (fun () -> let* _ = lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send in Lwt_unix.close tcp_socket); stream, push