aboutsummaryrefslogtreecommitdiff
path: root/src/xmpp_stream_out.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-12-26 18:55:57 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-12-26 18:55:57 +0300
commit2d43c07c624d6c85f0d7c8b3274c9d30e458d95e (patch)
tree41f4a748ac66d9dd42b8e532259e3d5911650a45 /src/xmpp_stream_out.erl
parentRevert "Don't set twice" (diff)
Get rid of ejabberd receiver
ejabberd receivers were meant to serve connections from frontends to backends. However, this approach was not popular and frontend related code was removed in previous releases. Now, ejabberd receiver's code was also removed, making the code shorter and cleaner. Also, in stress tests ejabberd now handles load more robustly, without c2s processes overload (even with disabled shapers). ejabberd_socket.erl is renamed to xmpp_socket.erl: it's supposed to be finally moved into stand-alone xmpp library.
Diffstat (limited to 'src/xmpp_stream_out.erl')
-rw-r--r--src/xmpp_stream_out.erl88
1 files changed, 51 insertions, 37 deletions
diff --git a/src/xmpp_stream_out.erl b/src/xmpp_stream_out.erl
index 8f4fa5c84..b5851b0b8 100644
--- a/src/xmpp_stream_out.erl
+++ b/src/xmpp_stream_out.erl
@@ -191,16 +191,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
set_timeout(_, _) ->
erlang:error(badarg).
-get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner})
+get_transport(#{socket := Socket, owner := Owner})
when Owner == self() ->
- SockMod:get_transport(Socket);
+ xmpp_socket:get_transport(Socket);
get_transport(_) ->
erlang:error(badarg).
--spec change_shaper(state(), shaper:shaper()) -> ok.
-change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper)
+-spec change_shaper(state(), shaper:shaper()) -> state().
+change_shaper(#{socket := Socket, owner := Owner} = State, Shaper)
when Owner == self() ->
- SockMod:change_shaper(Socket, Shaper);
+ Socket1 = xmpp_socket:change_shaper(Socket, Shaper),
+ State#{socket => Socket1};
change_shaper(_, _) ->
erlang:error(badarg).
@@ -233,11 +234,10 @@ format_error(Err) ->
%%% gen_server callbacks
%%%===================================================================
-spec init(list()) -> {ok, state(), timeout()} | {stop, term()} | ignore.
-init([Mod, SockMod, From, To, Opts]) ->
+init([Mod, _SockMod, From, To, Opts]) ->
Time = p1_time_compat:monotonic_time(milli_seconds),
State = #{owner => self(),
mod => Mod,
- sockmod => SockMod,
server => From,
user => <<"">>,
resource => <<"">>,
@@ -272,7 +272,6 @@ handle_call(Call, From, #{mod := Mod} = State) ->
-spec handle_cast(term(), state()) -> noreply().
handle_cast(connect, #{remote_server := RemoteServer,
- sockmod := SockMod,
stream_state := connecting} = State) ->
noreply(
case idna_to_ascii(RemoteServer) of
@@ -283,7 +282,7 @@ handle_cast(connect, #{remote_server := RemoteServer,
{ok, AddrPorts} ->
case connect(AddrPorts, State) of
{ok, Socket, {Addr, Port, Encrypted}} ->
- SocketMonitor = SockMod:monitor(Socket),
+ SocketMonitor = xmpp_socket:monitor(Socket),
State1 = State#{ip => {Addr, Port},
socket => Socket,
stream_encrypted => Encrypted,
@@ -388,6 +387,21 @@ handle_info(timeout, #{mod := Mod} = State) ->
handle_info({'DOWN', MRef, _Type, _Object, _Info},
#{socket_monitor := MRef} = State) ->
noreply(process_stream_end({socket, closed}, State));
+handle_info({tcp, _, Data}, #{socket := Socket} = State) ->
+ noreply(
+ case xmpp_socket:recv(Socket, Data) of
+ {ok, NewSocket} ->
+ State#{socket => NewSocket};
+ {error, Reason} when is_atom(Reason) ->
+ process_stream_end({socket, Reason}, State);
+ {error, Reason} ->
+ %% TODO: make fast_tls return atoms
+ process_stream_end({tls, Reason}, State)
+ end);
+handle_info({tcp_closed, _}, State) ->
+ handle_info({'$gen_event', closed}, State);
+handle_info({tcp_error, _, Reason}, State) ->
+ noreply(process_stream_end({socket, Reason}, State));
handle_info(Info, #{mod := Mod} = State) ->
noreply(try Mod:handle_info(Info, State)
catch _:undef -> State
@@ -638,13 +652,13 @@ process_cert_verification(State) ->
-spec process_sasl_success(state()) -> state().
process_sasl_success(#{mod := Mod,
- sockmod := SockMod,
socket := Socket} = State) ->
- SockMod:reset_stream(Socket),
- State1 = State#{stream_id => new_id(),
- stream_restarted => true,
- stream_state => wait_for_stream,
- stream_authenticated => true},
+ Socket1 = xmpp_socket:reset_stream(Socket),
+ State0 = State#{socket => Socket1},
+ State1 = State0#{stream_id => new_id(),
+ stream_restarted => true,
+ stream_state => wait_for_stream,
+ stream_authenticated => true},
State2 = send_header(State1),
case is_disconnected(State2) of
true -> State2;
@@ -745,15 +759,15 @@ send_error(State, Pkt, Err) ->
end.
-spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}.
-socket_send(#{sockmod := SockMod, socket := Socket, xmlns := NS,
+socket_send(#{socket := Socket, xmlns := NS,
stream_state := StateName}, Pkt) ->
case Pkt of
trailer ->
- SockMod:send_trailer(Socket);
+ xmpp_socket:send_trailer(Socket);
#stream_start{} when StateName /= disconnected ->
- SockMod:send_header(Socket, xmpp:encode(Pkt));
+ xmpp_socket:send_header(Socket, xmpp:encode(Pkt));
_ when StateName /= disconnected ->
- SockMod:send_element(Socket, xmpp:encode(Pkt, NS));
+ xmpp_socket:send_element(Socket, xmpp:encode(Pkt, NS));
_ ->
{error, closed}
end;
@@ -768,8 +782,8 @@ send_trailer(State) ->
-spec close_socket(state()) -> state().
close_socket(State) ->
case State of
- #{sockmod := SockMod, socket := Socket} ->
- SockMod:close(Socket);
+ #{socket := Socket} ->
+ xmpp_socket:close(Socket);
_ ->
ok
end,
@@ -777,8 +791,8 @@ close_socket(State) ->
stream_state => disconnected}.
-spec starttls(term(), state()) -> {ok, term()} | {error, tls_error_reason()}.
-starttls(Socket, #{sockmod := SockMod, mod := Mod,
- xmlns := NS, remote_server := RemoteServer} = State) ->
+starttls(Socket, #{mod := Mod, xmlns := NS,
+ remote_server := RemoteServer} = State) ->
TLSOpts = try Mod:tls_options(State)
catch _:undef -> []
end,
@@ -787,7 +801,7 @@ starttls(Socket, #{sockmod := SockMod, mod := Mod,
?NS_SERVER -> <<"xmpp-server">>;
?NS_CLIENT -> <<"xmpp-client">>
end,
- SockMod:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]).
+ xmpp_socket:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]).
-spec select_lang(binary(), binary()) -> binary().
select_lang(Lang, <<"">>) -> Lang;
@@ -1020,9 +1034,9 @@ host_entry_to_addr_ports(#hostent{h_addr_list = AddrList}, Port, TLS) ->
-spec connect([ip_port()], state()) -> {ok, term(), ip_port()} |
{error, {socket, socket_error_reason()}} |
{error, {tls, tls_error_reason()}}.
-connect(AddrPorts, #{sockmod := SockMod} = State) ->
+connect(AddrPorts, State) ->
Timeout = get_connect_timeout(State),
- case connect(AddrPorts, SockMod, Timeout, {error, nxdomain}) of
+ case connect(AddrPorts, Timeout, {error, nxdomain}) of
{ok, Socket, {Addr, Port, TLS = true}} ->
case starttls(Socket, State) of
{ok, TLSSocket} -> {ok, TLSSocket, {Addr, Port, TLS}};
@@ -1034,24 +1048,24 @@ connect(AddrPorts, #{sockmod := SockMod} = State) ->
{error, {socket, Why}}
end.
--spec connect([ip_port()], module(), timeout(), network_error()) ->
+-spec connect([ip_port()], timeout(), network_error()) ->
{ok, term(), ip_port()} | network_error().
-connect([{Addr, Port, TLS}|AddrPorts], SockMod, Timeout, _) ->
+connect([{Addr, Port, TLS}|AddrPorts], Timeout, _) ->
Type = get_addr_type(Addr),
- try SockMod:connect(Addr, Port,
- [binary, {packet, 0},
- {send_timeout, ?TCP_SEND_TIMEOUT},
- {send_timeout_close, true},
- {active, false}, Type],
- Timeout) of
+ try xmpp_socket:connect(Addr, Port,
+ [binary, {packet, 0},
+ {send_timeout, ?TCP_SEND_TIMEOUT},
+ {send_timeout_close, true},
+ {active, false}, Type],
+ Timeout) of
{ok, Socket} ->
{ok, Socket, {Addr, Port, TLS}};
Err ->
- connect(AddrPorts, SockMod, Timeout, Err)
+ connect(AddrPorts, Timeout, Err)
catch _:badarg ->
- connect(AddrPorts, SockMod, Timeout, {error, einval})
+ connect(AddrPorts, Timeout, {error, einval})
end;
-connect([], _SockMod, _Timeout, Err) ->
+connect([], _Timeout, Err) ->
Err.
-spec get_addr_type(inet:ip_address()) -> inet:address_family().