aboutsummaryrefslogtreecommitdiff
path: root/portal/tcp/portal.ml
diff options
context:
space:
mode:
Diffstat (limited to 'portal/tcp/portal.ml')
-rw-r--r--portal/tcp/portal.ml85
1 files changed, 85 insertions, 0 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml
new file mode 100644
index 0000000..32ff507
--- /dev/null
+++ b/portal/tcp/portal.ml
@@ -0,0 +1,85 @@
+open Lwt.Syntax
+open Lwt_unix
+open Markup
+
+type t = (signal, async) stream * ((signal, sync) stream option -> unit)
+
+let xmlns = "jabber:client"
+
+exception MalformedStanza of Markup.location * Markup.Error.t
+
+let stanza_open ?from domain : (signal, sync) stream =
+ let stanza =
+ let attributes =
+ [(("", "to"), domain); (("", "version"), "1.0");
+ (("http://www.w3.org/XML/1998/namespace", "lang"), "en");
+ (("http://www.w3.org/2000/xmlns/", "xmlns"), xmlns);
+ (("http://www.w3.org/2000/xmlns/", "stream"),
+ "http://etherx.jabber.org/streams")]
+ in [`Start_element
+ (("http://etherx.jabber.org/streams", "stream"),
+ Option.fold
+ ~none:attributes
+ ~some:(fun jid -> (("", "from"), jid) :: attributes)
+ from);
+ (* Markup.ml is a streaming parser, but blocks on standalone [`Start_element]
+ because it doesn't know if this specific element should be self-closing or
+ not, so [write_xml] never spits out the start of the stream. Adding an empty
+ comment resolves the ambiguity. I'm not a fan of it.
+
+ If you have Github, feel free to get the word out to aantron. *)
+ `Comment ""]
+ in stanza |> of_list
+
+let stanza_close : (signal, sync) stream = [`End_element] |> Markup.of_list
+
+(** [xmpp_port domain] is the port where [domain]'s XMPP server is hosted.
+
+ Currently, it falls back to 5222 (always), but should use SRV records in the near
+ future. *)
+let xmpp_port (_domain : string) : int = 5222
+
+(** [tcp_stream domain] is a (stream, socket) tuple communicating with the XMPP server
+ hosted on [domain] via plaintext TCP. *)
+let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t =
+ let get_socket {ai_addr; ai_family; _} =
+ let sock = socket ai_family SOCK_STREAM 0
+ in let+ () = Lwt_unix.connect sock ai_addr
+ in sock
+ and port_number = xmpp_port domain |> string_of_int in
+ let* addrinfos = getaddrinfo domain port_number [AI_SOCKTYPE SOCK_STREAM]
+ in let+ sock = List.map get_socket addrinfos |> Lwt.pick
+ in let stream =
+ Lwt_stream.from (fun () ->
+ let bsize = 4096 in
+ let buffer = Bytes.create bsize in
+ let* len = read sock buffer 0 bsize
+ in match len with
+ | 0 -> Lwt.return_none
+ | len -> Lwt.return_some (Bytes.sub_string buffer 0 len))
+ in (stream, sock)
+
+(** [connect domain] is a Portal.t communicating with the XMPP server located at
+ [domain] via plaintext TCP.
+
+ This function is a comparatively simple wrapper around the original TCP stream,
+ simply converting to/from Markup.ml signals.
+
+ TODO: right now it's possible to get parts of unfinished stanzas... *)
+let connect (domain : string) : t Lwt.t =
+ let+ tcp_stream, tcp_socket = tcp_stream domain in
+ let send msg =
+ (* This is gross, but it doesn't matter because TCP does buffering. *)
+ let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in ()
+ in
+ let xml_stream, xml_push = Lwt_stream.create () in
+ let push msg =
+ let none () = xml_push None; close tcp_socket
+ and some fragments () =
+ Markup.iter (fun f -> xml_push (Some f)) fragments |> Lwt.return
+ in Option.fold ~none ~some msg |> Lwt.async
+ and report loc err = raise (MalformedStanza (loc, err)) in
+ let open Markup_lwt in
+ let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals
+ in Lwt.async (fun () -> lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send);
+ stream, push