diff options
author | Clombrong <cromblong@egregore.fun> | 2025-06-26 00:24:25 +0200 |
---|---|---|
committer | Clombrong <cromblong@egregore.fun> | 2025-06-26 00:24:25 +0200 |
commit | 1d54e3f65f661d8b5e8f12fa347caab27b968c16 (patch) | |
tree | e260f71088b154bce9fc62d248e3126db696ddc5 /portal/tcp/portal.ml | |
parent | fdeff689457eab5c77133930f27d45df826dce58 (diff) |
feat(portal): make portal_tcp an implementation of Portal
Diffstat (limited to 'portal/tcp/portal.ml')
-rw-r--r-- | portal/tcp/portal.ml | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml new file mode 100644 index 0000000..32ff507 --- /dev/null +++ b/portal/tcp/portal.ml @@ -0,0 +1,85 @@ +open Lwt.Syntax +open Lwt_unix +open Markup + +type t = (signal, async) stream * ((signal, sync) stream option -> unit) + +let xmlns = "jabber:client" + +exception MalformedStanza of Markup.location * Markup.Error.t + +let stanza_open ?from domain : (signal, sync) stream = + let stanza = + let attributes = + [(("", "to"), domain); (("", "version"), "1.0"); + (("http://www.w3.org/XML/1998/namespace", "lang"), "en"); + (("http://www.w3.org/2000/xmlns/", "xmlns"), xmlns); + (("http://www.w3.org/2000/xmlns/", "stream"), + "http://etherx.jabber.org/streams")] + in [`Start_element + (("http://etherx.jabber.org/streams", "stream"), + Option.fold + ~none:attributes + ~some:(fun jid -> (("", "from"), jid) :: attributes) + from); + (* Markup.ml is a streaming parser, but blocks on standalone [`Start_element] + because it doesn't know if this specific element should be self-closing or + not, so [write_xml] never spits out the start of the stream. Adding an empty + comment resolves the ambiguity. I'm not a fan of it. + + If you have Github, feel free to get the word out to aantron. *) + `Comment ""] + in stanza |> of_list + +let stanza_close : (signal, sync) stream = [`End_element] |> Markup.of_list + +(** [xmpp_port domain] is the port where [domain]'s XMPP server is hosted. + + Currently, it falls back to 5222 (always), but should use SRV records in the near + future. *) +let xmpp_port (_domain : string) : int = 5222 + +(** [tcp_stream domain] is a (stream, socket) tuple communicating with the XMPP server + hosted on [domain] via plaintext TCP. *) +let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t = + let get_socket {ai_addr; ai_family; _} = + let sock = socket ai_family SOCK_STREAM 0 + in let+ () = Lwt_unix.connect sock ai_addr + in sock + and port_number = xmpp_port domain |> string_of_int in + let* addrinfos = getaddrinfo domain port_number [AI_SOCKTYPE SOCK_STREAM] + in let+ sock = List.map get_socket addrinfos |> Lwt.pick + in let stream = + Lwt_stream.from (fun () -> + let bsize = 4096 in + let buffer = Bytes.create bsize in + let* len = read sock buffer 0 bsize + in match len with + | 0 -> Lwt.return_none + | len -> Lwt.return_some (Bytes.sub_string buffer 0 len)) + in (stream, sock) + +(** [connect domain] is a Portal.t communicating with the XMPP server located at + [domain] via plaintext TCP. + + This function is a comparatively simple wrapper around the original TCP stream, + simply converting to/from Markup.ml signals. + + TODO: right now it's possible to get parts of unfinished stanzas... *) +let connect (domain : string) : t Lwt.t = + let+ tcp_stream, tcp_socket = tcp_stream domain in + let send msg = + (* This is gross, but it doesn't matter because TCP does buffering. *) + let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in () + in + let xml_stream, xml_push = Lwt_stream.create () in + let push msg = + let none () = xml_push None; close tcp_socket + and some fragments () = + Markup.iter (fun f -> xml_push (Some f)) fragments |> Lwt.return + in Option.fold ~none ~some msg |> Lwt.async + and report loc err = raise (MalformedStanza (loc, err)) in + let open Markup_lwt in + let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals + in Lwt.async (fun () -> lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send); + stream, push |