diff options
author | Clombrong <cromblong@egregore.fun> | 2025-06-27 10:01:34 +0200 |
---|---|---|
committer | Clombrong <cromblong@egregore.fun> | 2025-06-27 10:01:34 +0200 |
commit | 08f1534588695c295e84234082120b7065c8a324 (patch) | |
tree | 65510af378a5d4971bcfa05bdb0e18520460b78e /portal/tcp/portal.ml | |
parent | 3a1fb81fdb94405c52ef5a78104eabdd8dafce04 (diff) |
refactor(portal_tcp): split tcp_stream into two functions
Diffstat (limited to 'portal/tcp/portal.ml')
-rw-r--r-- | portal/tcp/portal.ml | 73 |
1 files changed, 38 insertions, 35 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml index 61e1b40..c7afd8e 100644 --- a/portal/tcp/portal.ml +++ b/portal/tcp/portal.ml @@ -1,4 +1,5 @@ open Lwt.Syntax +open Lwt.Infix open Lwt_unix open Markup @@ -60,47 +61,49 @@ let close = [`End_element] |> Markup.of_list future. *) let xmpp_port (_domain : string) : int = 5222 -(** [tcp_stream domain] is a (stream, socket) tuple communicating with the XMPP server - hosted on [domain] via plaintext TCP. *) -let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t = +(** [tcp_socket domain] is a plaintext TCP socket to the XMPP server [domain]. *) +let tcp_socket (domain : string) : file_descr Lwt.t = let get_socket {ai_addr; ai_family; _} = let sock = socket ai_family SOCK_STREAM 0 in let+ () = Lwt_unix.connect sock ai_addr in sock and port_number = xmpp_port domain |> string_of_int in let* addrinfos = getaddrinfo domain port_number [AI_SOCKTYPE SOCK_STREAM] - in let+ sock = List.map get_socket addrinfos |> Lwt.pick - in let stream = - Lwt_stream.from (fun () -> - let bsize = 4096 in - let buffer = Bytes.create bsize in - let* len = read sock buffer 0 bsize - in match len with - | 0 -> Lwt.return_none - | len -> Lwt.return_some (Bytes.sub_string buffer 0 len)) - in (stream, sock) + in List.map get_socket addrinfos |> Lwt.pick -(** [connect domain] is a Portal.t communicating with the XMPP server located at - [domain] via plaintext TCP. +(** [file_descr_to_portal sock] is a Portal wrapping the Unix socket [sock] in Markup + signals, for XML consumption. *) +let file_descr_to_portal (sock : file_descr) : t = + let raw_stream = + Lwt_stream.from (fun () -> + let bsize = 4096 in + let buffer = Bytes.create bsize in + let* len = read sock buffer 0 bsize + in match len with + | 0 -> Lwt.return_none + | len -> Lwt.return_some (Bytes.sub_string buffer 0 len)) + and send_char c = + (* This is gross, but it doesn't matter because TCP does buffering. *) + let+ _ = write_string sock (Char.escaped c) 0 1 in () + and xml_stream, xml_push = Lwt_stream.create () + in let push msg = + let none () = xml_push None + and some fragments () = + Markup.iter (fun f -> xml_push (Some f)) fragments + in Option.fold ~none ~some msg () + and report loc err = raise (MalformedStanza (loc, err)) in + let open Markup_lwt in + let stream = raw_stream + |> lwt_stream + |> strings_to_bytes + |> parse_xml ~report + |> signals + in Lwt.async (fun () -> + let* _ = lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send_char + in Lwt_unix.close sock); + {stream; push; _socket=sock} - This function is a comparatively simple wrapper around the original TCP stream, - simply converting to/from Markup.ml signals. *) +(** [connect domain] is a Portal.t communicating with the XMPP server located at + [domain] via plaintext TCP. It simply chains the two previous functions. *) let connect (domain : string) : t Lwt.t = - let+ tcp_stream, tcp_socket = tcp_stream domain in - let send msg = - (* This is gross, but it doesn't matter because TCP does buffering. *) - let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in () - in - let xml_stream, xml_push = Lwt_stream.create () in - let push msg = - let none () = xml_push None - and some fragments () = - Markup.iter (fun f -> xml_push (Some f)) fragments - in Option.fold ~none ~some msg () - and report loc err = raise (MalformedStanza (loc, err)) in - let open Markup_lwt in - let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals - in Lwt.async (fun () -> - let* _ = lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send - in Lwt_unix.close tcp_socket); - {stream; push; _socket=tcp_socket} + tcp_socket domain >|= file_descr_to_portal |