aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_s2s_out.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_s2s_out.erl')
-rw-r--r--src/ejabberd_s2s_out.erl167
1 files changed, 48 insertions, 119 deletions
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl
index d940284ef..074952df0 100644
--- a/src/ejabberd_s2s_out.erl
+++ b/src/ejabberd_s2s_out.erl
@@ -21,10 +21,7 @@
%%%-------------------------------------------------------------------
-module(ejabberd_s2s_out).
-behaviour(xmpp_stream_out).
--behaviour(ejabberd_config).
-%% ejabberd_config callbacks
--export([opt_type/1, transform_options/1]).
%% xmpp_stream_out callbacks
-export([tls_options/1, tls_required/1, tls_verify/1, tls_enabled/1,
connect_timeout/1, address_families/1, default_port/1,
@@ -44,8 +41,9 @@
-include("xmpp.hrl").
-include("logger.hrl").
+-include("translate.hrl").
--type state() :: map().
+-type state() :: xmpp_stream_out:state().
-export_type([state/0]).
%%%===================================================================
@@ -77,8 +75,7 @@ connect(Ref) ->
close(Ref) ->
xmpp_stream_out:close(Ref).
--spec close(pid(), atom()) -> ok;
- (state(), atom()) -> state().
+-spec close(pid(), atom()) -> ok.
close(Ref, Reason) ->
xmpp_stream_out:close(Ref, Reason).
@@ -165,11 +162,11 @@ process_closed(#{server := LServer, remote_server := RServer} = State,
xmpp_stream_out:set_timeout(State2, timer:seconds(Delay)).
handle_unexpected_info(State, Info) ->
- ?WARNING_MSG("got unexpected info: ~p", [Info]),
+ ?WARNING_MSG("Unexpected info: ~p", [Info]),
State.
handle_unexpected_cast(State, Msg) ->
- ?WARNING_MSG("got unexpected cast: ~p", [Msg]),
+ ?WARNING_MSG("Unexpected cast: ~p", [Msg]),
State.
process_downgraded(State, _StreamStart) ->
@@ -178,36 +175,32 @@ process_downgraded(State, _StreamStart) ->
%%%===================================================================
%%% xmpp_stream_out callbacks
%%%===================================================================
-tls_options(#{server := LServer}) ->
- ejabberd_s2s:tls_options(LServer, []).
+tls_options(#{server_host := ServerHost}) ->
+ ejabberd_s2s:tls_options(ServerHost, []).
-tls_required(#{server := LServer}) ->
- ejabberd_s2s:tls_required(LServer).
+tls_required(#{server_host := ServerHost}) ->
+ ejabberd_s2s:tls_required(ServerHost).
-tls_verify(#{server := LServer}) ->
- ejabberd_s2s:tls_verify(LServer).
+tls_verify(#{server_host := ServerHost} = State) ->
+ ejabberd_hooks:run_fold(s2s_out_tls_verify, ServerHost, true, [State]).
-tls_enabled(#{server := LServer}) ->
- ejabberd_s2s:tls_enabled(LServer).
+tls_enabled(#{server_host := ServerHost}) ->
+ ejabberd_s2s:tls_enabled(ServerHost).
-connect_timeout(#{server := LServer}) ->
- ejabberd_config:get_option(
- {outgoing_s2s_timeout, LServer},
- timer:seconds(10)).
+connect_timeout(#{server_host := ServerHost}) ->
+ ejabberd_option:outgoing_s2s_timeout(ServerHost).
-default_port(#{server := LServer}) ->
- ejabberd_config:get_option({outgoing_s2s_port, LServer}, 5269).
+default_port(#{server_host := ServerHost}) ->
+ ejabberd_option:outgoing_s2s_port(ServerHost).
-address_families(#{server := LServer}) ->
- ejabberd_config:get_option(
- {outgoing_s2s_families, LServer},
- [inet, inet6]).
+address_families(#{server_host := ServerHost}) ->
+ ejabberd_option:outgoing_s2s_families(ServerHost).
-dns_retries(#{server := LServer}) ->
- ejabberd_config:get_option({s2s_dns_retries, LServer}, 2).
+dns_retries(#{server_host := ServerHost}) ->
+ ejabberd_option:s2s_dns_retries(ServerHost).
-dns_timeout(#{server := LServer}) ->
- ejabberd_config:get_option({s2s_dns_timeout, LServer}, timer:seconds(10)).
+dns_timeout(#{server_host := ServerHost}) ->
+ ejabberd_option:s2s_dns_timeout(ServerHost).
handle_auth_success(Mech, #{socket := Socket, ip := IP,
remote_server := RServer,
@@ -257,23 +250,23 @@ handle_timeout(#{on_route := Action, lang := Lang} = State) ->
case Action of
bounce -> stop(State);
_ ->
- Txt = <<"Idle connection">>,
+ Txt = ?T("Idle connection"),
send(State, xmpp:serr_connection_timeout(Txt, Lang))
end.
init([#{server := LServer, remote_server := RServer} = State, Opts]) ->
ServerHost = ejabberd_router:host_of_route(LServer),
- QueueType = ejabberd_s2s:queue_type(LServer),
+ QueueType = ejabberd_s2s:queue_type(ServerHost),
QueueLimit = case lists:keyfind(
max_queue, 1, ejabberd_config:fsm_limit_opts([])) of
{_, N} -> N;
false -> unlimited
end,
- Timeout = ejabberd_config:negotiation_timeout(),
+ Timeout = ejabberd_option:negotiation_timeout(),
State1 = State#{on_route => queue,
queue => p1_queue:new(QueueType, QueueLimit),
xmlns => ?NS_SERVER,
- lang => ejabberd_config:get_mylang(),
+ lang => ejabberd_option:language(),
server_host => ServerHost,
shaper => none},
State2 = xmpp_stream_out:set_timeout(State1, Timeout),
@@ -314,8 +307,8 @@ terminate(Reason, #{server := LServer,
normal -> State;
_ -> State#{stop_reason => internal_failure}
end,
- bounce_queue(State1),
- bounce_message_queue(State1).
+ State2 = bounce_queue(State1),
+ bounce_message_queue({LServer, RServer}, State2).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -337,13 +330,22 @@ bounce_queue(State) ->
bounce_packet(Pkt, AccState)
end, State).
--spec bounce_message_queue(state()) -> state().
-bounce_message_queue(State) ->
- receive {route, Pkt} ->
- State1 = bounce_packet(Pkt, State),
- bounce_message_queue(State1)
- after 0 ->
- State
+-spec bounce_message_queue({binary(), binary()}, state()) -> state().
+bounce_message_queue({LServer, RServer} = FromTo, State) ->
+ Pids = ejabberd_s2s:get_connections_pids(FromTo),
+ case lists:member(self(), Pids) of
+ true ->
+ ?WARNING_MSG("Outgoing s2s connection ~s -> ~s is supposed "
+ "to be unregistered, but pid ~p still presents "
+ "in 's2s' table", [LServer, RServer, self()]),
+ State;
+ false ->
+ receive {route, Pkt} ->
+ State1 = bounce_packet(Pkt, State),
+ bounce_message_queue(FromTo, State1)
+ after 0 ->
+ State
+ end
end.
-spec bounce_packet(xmpp_element(), state()) -> state().
@@ -374,12 +376,12 @@ mk_bounce_error(_Lang, _State) ->
-spec get_delay() -> non_neg_integer().
get_delay() ->
- MaxDelay = ejabberd_config:get_option(s2s_max_retry_delay, 300),
+ MaxDelay = ejabberd_option:s2s_max_retry_delay(),
p1_rand:uniform(MaxDelay).
-spec set_idle_timeout(state()) -> state().
-set_idle_timeout(#{on_route := send, server := LServer} = State) ->
- Timeout = ejabberd_s2s:get_idle_timeout(LServer),
+set_idle_timeout(#{on_route := send, server_host := ServerHost} = State) ->
+ Timeout = ejabberd_s2s:get_idle_timeout(ServerHost),
xmpp_stream_out:set_timeout(State, Timeout);
set_idle_timeout(State) ->
State.
@@ -400,76 +402,3 @@ format_error(queue_full) ->
<<"Stream queue is overloaded">>;
format_error(Reason) ->
xmpp_stream_out:format_error(Reason).
-
-transform_options(Opts) ->
- lists:foldl(fun transform_options/2, [], Opts).
-
-transform_options({outgoing_s2s_options, Families, Timeout}, Opts) ->
- ?WARNING_MSG("Option 'outgoing_s2s_options' is deprecated. "
- "The option is still supported "
- "but it is better to fix your config: "
- "use 'outgoing_s2s_timeout' and "
- "'outgoing_s2s_families' instead.", []),
- maybe_report_huge_timeout(outgoing_s2s_timeout, Timeout),
- [{outgoing_s2s_families, Families},
- {outgoing_s2s_timeout, Timeout}
- | Opts];
-transform_options({s2s_dns_options, S2SDNSOpts}, AllOpts) ->
- ?WARNING_MSG("Option 's2s_dns_options' is deprecated. "
- "The option is still supported "
- "but it is better to fix your config: "
- "use 's2s_dns_timeout' and "
- "'s2s_dns_retries' instead", []),
- lists:foldr(
- fun({timeout, T}, AccOpts) ->
- maybe_report_huge_timeout(s2s_dns_timeout, T),
- [{s2s_dns_timeout, T}|AccOpts];
- ({retries, R}, AccOpts) ->
- [{s2s_dns_retries, R}|AccOpts];
- (_, AccOpts) ->
- AccOpts
- end, AllOpts, S2SDNSOpts);
-transform_options({Opt, T}, Opts)
- when Opt == outgoing_s2s_timeout; Opt == s2s_dns_timeout ->
- maybe_report_huge_timeout(Opt, T),
- [{Opt, T}|Opts];
-transform_options(Opt, Opts) ->
- [Opt|Opts].
-
-maybe_report_huge_timeout(Opt, T) when is_integer(T), T >= 1000 ->
- ?WARNING_MSG("value '~p' of option '~p' is too big, "
- "are you sure you have set seconds?",
- [T, Opt]);
-maybe_report_huge_timeout(_, _) ->
- ok.
-
--spec opt_type(atom()) -> fun((any()) -> any()) | [atom()].
-opt_type(outgoing_s2s_families) ->
- fun(Families) ->
- lists:map(
- fun(ipv4) -> inet;
- (ipv6) -> inet6
- end, Families)
- end;
-opt_type(outgoing_s2s_port) ->
- fun (I) when is_integer(I), I > 0, I < 65536 -> I end;
-opt_type(outgoing_s2s_timeout) ->
- fun(TimeOut) when is_integer(TimeOut), TimeOut > 0 ->
- timer:seconds(TimeOut);
- (unlimited) ->
- infinity;
- (infinity) ->
- infinity
- end;
-opt_type(s2s_dns_retries) ->
- fun (I) when is_integer(I), I >= 0 -> I end;
-opt_type(s2s_dns_timeout) ->
- fun(I) when is_integer(I), I>=0 -> timer:seconds(I);
- (infinity) -> infinity;
- (unlimited) -> infinity
- end;
-opt_type(s2s_max_retry_delay) ->
- fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(_) ->
- [outgoing_s2s_families, outgoing_s2s_port, outgoing_s2s_timeout,
- s2s_dns_retries, s2s_dns_timeout, s2s_max_retry_delay].