diff options
author | Clombrong <cromblong@egregore.fun> | 2025-06-28 09:08:24 +0200 |
---|---|---|
committer | Clombrong <cromblong@egregore.fun> | 2025-06-28 09:08:24 +0200 |
commit | a173831a5e5bd9c6c3527bd071db666dab4b6a16 (patch) | |
tree | c7e700b2d5cdf3e6dfc6a1ffe840a933887b916f | |
parent | 64891c28fc7b9700e049079da116278e0a94981d (diff) |
feat(portal_tcp): adapt socket_to_stream to variant type
-rw-r--r-- | portal/tcp/portal.ml | 64 |
1 files changed, 38 insertions, 26 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml index 73321bd..9afa318 100644 --- a/portal/tcp/portal.ml +++ b/portal/tcp/portal.ml @@ -72,32 +72,43 @@ let tcp_socket (domain : string) : Lwt_unix.file_descr Lwt.t = (** [socket_to_stream sock] is a [stream, push] tuple wrapping the Unix socket [sock] in Markup signals. *) -let socket_to_stream (sock : Lwt_unix.file_descr) = +let socket_to_stream (sock : socket) = let raw_stream = - let recv_bytes = Bytes.create 4096 in - Lwt_stream.from (fun () -> - let* len = - try%lwt Lwt_unix.read sock recv_bytes 0 4096 - with - | Unix.Unix_error (Unix.ECONNRESET, _, _) - | Unix.Unix_error (Unix.EPIPE, _, _) - | End_of_file -> Lwt.return 0 - | exn -> Lwt.fail exn - in match len with - | 0 -> Lwt.return_none - | len -> Lwt.return_some (Bytes.sub_string recv_bytes 0 len)) + let from_plain p = + let recv_bytes = Bytes.create 4096 in + fun () -> + let* len = + try%lwt Lwt_unix.read p recv_bytes 0 4096 + with + | Unix.Unix_error (Unix.ECONNRESET, _, _) + | Unix.Unix_error (Unix.EPIPE, _, _) + | End_of_file -> Lwt.return 0 + | exn -> Lwt.fail exn + in match len with + | 0 -> Lwt.return_none + | len -> + Lwt.return_some (Bytes.sub_string recv_bytes 0 len) + in let from_socket = match sock with + | Plain p -> from_plain p + in Lwt_stream.from from_socket in let buffer = Buffer.create 1024 in - let flush_buffer () = - let content = Buffer.to_bytes buffer in - Buffer.clear buffer; - let* _ = - try%lwt Lwt_unix.write sock content 0 (Bytes.length content) - with - | Unix.Unix_error (Unix.ECONNRESET, _, _) - | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0 - | exn -> Lwt.fail exn - in Lwt.return_unit + let flush_buffer = + let flush_plain p () = + let content = Buffer.to_bytes buffer in + Buffer.clear buffer; + let* _ = + try%lwt Lwt_unix.write p content 0 (Bytes.length content) + with + | Unix.Unix_error (Unix.ECONNRESET, _, _) + | Unix.Unix_error (Unix.EPIPE, _, _) -> Lwt.return 0 + | exn -> Lwt.fail exn + in Lwt.return_unit + in match sock with + | Plain p -> flush_plain p + in + let close_sock = match sock with + | Plain p -> (fun () -> Lwt_unix.close p) in let chomp c = Buffer.add_char buffer c; @@ -118,12 +129,13 @@ let socket_to_stream (sock : Lwt_unix.file_descr) = in Lwt.async (fun () -> let* _ = lwt_stream outbound_stream |> Markup_lwt.write_xml |> iter chomp in let* _ = flush_buffer () - in Lwt_unix.close sock); + in close_sock ()); (stream, push) (** [connect domain] is a Portal.t communicating with the XMPP server located at [domain] via plaintext TCP. It simply chains the two previous functions. *) let connect (domain : string) : t Lwt.t = - let+ _socket = tcp_socket domain + let+ s = tcp_socket domain in + let _socket = Plain s in let stream, push = socket_to_stream _socket - in {stream; push; _socket=Plain _socket} + in {stream; push; _socket=_socket} |