aboutsummaryrefslogtreecommitdiff
path: root/portal
diff options
context:
space:
mode:
Diffstat (limited to 'portal')
-rw-r--r--portal/tcp/portal.ml73
1 files changed, 38 insertions, 35 deletions
diff --git a/portal/tcp/portal.ml b/portal/tcp/portal.ml
index 61e1b40..c7afd8e 100644
--- a/portal/tcp/portal.ml
+++ b/portal/tcp/portal.ml
@@ -1,4 +1,5 @@
open Lwt.Syntax
+open Lwt.Infix
open Lwt_unix
open Markup
@@ -60,47 +61,49 @@ let close = [`End_element] |> Markup.of_list
future. *)
let xmpp_port (_domain : string) : int = 5222
-(** [tcp_stream domain] is a (stream, socket) tuple communicating with the XMPP server
- hosted on [domain] via plaintext TCP. *)
-let tcp_stream (domain : string) : (string Lwt_stream.t * file_descr) Lwt.t =
+(** [tcp_socket domain] is a plaintext TCP socket to the XMPP server [domain]. *)
+let tcp_socket (domain : string) : file_descr Lwt.t =
let get_socket {ai_addr; ai_family; _} =
let sock = socket ai_family SOCK_STREAM 0
in let+ () = Lwt_unix.connect sock ai_addr
in sock
and port_number = xmpp_port domain |> string_of_int in
let* addrinfos = getaddrinfo domain port_number [AI_SOCKTYPE SOCK_STREAM]
- in let+ sock = List.map get_socket addrinfos |> Lwt.pick
- in let stream =
- Lwt_stream.from (fun () ->
- let bsize = 4096 in
- let buffer = Bytes.create bsize in
- let* len = read sock buffer 0 bsize
- in match len with
- | 0 -> Lwt.return_none
- | len -> Lwt.return_some (Bytes.sub_string buffer 0 len))
- in (stream, sock)
+ in List.map get_socket addrinfos |> Lwt.pick
-(** [connect domain] is a Portal.t communicating with the XMPP server located at
- [domain] via plaintext TCP.
+(** [file_descr_to_portal sock] is a Portal wrapping the Unix socket [sock] in Markup
+ signals, for XML consumption. *)
+let file_descr_to_portal (sock : file_descr) : t =
+ let raw_stream =
+ Lwt_stream.from (fun () ->
+ let bsize = 4096 in
+ let buffer = Bytes.create bsize in
+ let* len = read sock buffer 0 bsize
+ in match len with
+ | 0 -> Lwt.return_none
+ | len -> Lwt.return_some (Bytes.sub_string buffer 0 len))
+ and send_char c =
+ (* This is gross, but it doesn't matter because TCP does buffering. *)
+ let+ _ = write_string sock (Char.escaped c) 0 1 in ()
+ and xml_stream, xml_push = Lwt_stream.create ()
+ in let push msg =
+ let none () = xml_push None
+ and some fragments () =
+ Markup.iter (fun f -> xml_push (Some f)) fragments
+ in Option.fold ~none ~some msg ()
+ and report loc err = raise (MalformedStanza (loc, err)) in
+ let open Markup_lwt in
+ let stream = raw_stream
+ |> lwt_stream
+ |> strings_to_bytes
+ |> parse_xml ~report
+ |> signals
+ in Lwt.async (fun () ->
+ let* _ = lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send_char
+ in Lwt_unix.close sock);
+ {stream; push; _socket=sock}
- This function is a comparatively simple wrapper around the original TCP stream,
- simply converting to/from Markup.ml signals. *)
+(** [connect domain] is a Portal.t communicating with the XMPP server located at
+ [domain] via plaintext TCP. It simply chains the two previous functions. *)
let connect (domain : string) : t Lwt.t =
- let+ tcp_stream, tcp_socket = tcp_stream domain in
- let send msg =
- (* This is gross, but it doesn't matter because TCP does buffering. *)
- let+ _ = write_string tcp_socket (Char.escaped msg) 0 1 in ()
- in
- let xml_stream, xml_push = Lwt_stream.create () in
- let push msg =
- let none () = xml_push None
- and some fragments () =
- Markup.iter (fun f -> xml_push (Some f)) fragments
- in Option.fold ~none ~some msg ()
- and report loc err = raise (MalformedStanza (loc, err)) in
- let open Markup_lwt in
- let stream = tcp_stream |> lwt_stream |> strings_to_bytes |> parse_xml ~report |> signals
- in Lwt.async (fun () ->
- let* _ = lwt_stream xml_stream |> Markup_lwt.write_xml |> iter send
- in Lwt_unix.close tcp_socket);
- {stream; push; _socket=tcp_socket}
+ tcp_socket domain >|= file_descr_to_portal