diff options
Diffstat (limited to 'src/ejabberd_s2s_out.erl')
-rw-r--r-- | src/ejabberd_s2s_out.erl | 167 |
1 files changed, 48 insertions, 119 deletions
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index d940284ef..074952df0 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -21,10 +21,7 @@ %%%------------------------------------------------------------------- -module(ejabberd_s2s_out). -behaviour(xmpp_stream_out). --behaviour(ejabberd_config). -%% ejabberd_config callbacks --export([opt_type/1, transform_options/1]). %% xmpp_stream_out callbacks -export([tls_options/1, tls_required/1, tls_verify/1, tls_enabled/1, connect_timeout/1, address_families/1, default_port/1, @@ -44,8 +41,9 @@ -include("xmpp.hrl"). -include("logger.hrl"). +-include("translate.hrl"). --type state() :: map(). +-type state() :: xmpp_stream_out:state(). -export_type([state/0]). %%%=================================================================== @@ -77,8 +75,7 @@ connect(Ref) -> close(Ref) -> xmpp_stream_out:close(Ref). --spec close(pid(), atom()) -> ok; - (state(), atom()) -> state(). +-spec close(pid(), atom()) -> ok. close(Ref, Reason) -> xmpp_stream_out:close(Ref, Reason). @@ -165,11 +162,11 @@ process_closed(#{server := LServer, remote_server := RServer} = State, xmpp_stream_out:set_timeout(State2, timer:seconds(Delay)). handle_unexpected_info(State, Info) -> - ?WARNING_MSG("got unexpected info: ~p", [Info]), + ?WARNING_MSG("Unexpected info: ~p", [Info]), State. handle_unexpected_cast(State, Msg) -> - ?WARNING_MSG("got unexpected cast: ~p", [Msg]), + ?WARNING_MSG("Unexpected cast: ~p", [Msg]), State. process_downgraded(State, _StreamStart) -> @@ -178,36 +175,32 @@ process_downgraded(State, _StreamStart) -> %%%=================================================================== %%% xmpp_stream_out callbacks %%%=================================================================== -tls_options(#{server := LServer}) -> - ejabberd_s2s:tls_options(LServer, []). +tls_options(#{server_host := ServerHost}) -> + ejabberd_s2s:tls_options(ServerHost, []). -tls_required(#{server := LServer}) -> - ejabberd_s2s:tls_required(LServer). +tls_required(#{server_host := ServerHost}) -> + ejabberd_s2s:tls_required(ServerHost). -tls_verify(#{server := LServer}) -> - ejabberd_s2s:tls_verify(LServer). +tls_verify(#{server_host := ServerHost} = State) -> + ejabberd_hooks:run_fold(s2s_out_tls_verify, ServerHost, true, [State]). -tls_enabled(#{server := LServer}) -> - ejabberd_s2s:tls_enabled(LServer). +tls_enabled(#{server_host := ServerHost}) -> + ejabberd_s2s:tls_enabled(ServerHost). -connect_timeout(#{server := LServer}) -> - ejabberd_config:get_option( - {outgoing_s2s_timeout, LServer}, - timer:seconds(10)). +connect_timeout(#{server_host := ServerHost}) -> + ejabberd_option:outgoing_s2s_timeout(ServerHost). -default_port(#{server := LServer}) -> - ejabberd_config:get_option({outgoing_s2s_port, LServer}, 5269). +default_port(#{server_host := ServerHost}) -> + ejabberd_option:outgoing_s2s_port(ServerHost). -address_families(#{server := LServer}) -> - ejabberd_config:get_option( - {outgoing_s2s_families, LServer}, - [inet, inet6]). +address_families(#{server_host := ServerHost}) -> + ejabberd_option:outgoing_s2s_families(ServerHost). -dns_retries(#{server := LServer}) -> - ejabberd_config:get_option({s2s_dns_retries, LServer}, 2). +dns_retries(#{server_host := ServerHost}) -> + ejabberd_option:s2s_dns_retries(ServerHost). -dns_timeout(#{server := LServer}) -> - ejabberd_config:get_option({s2s_dns_timeout, LServer}, timer:seconds(10)). +dns_timeout(#{server_host := ServerHost}) -> + ejabberd_option:s2s_dns_timeout(ServerHost). handle_auth_success(Mech, #{socket := Socket, ip := IP, remote_server := RServer, @@ -257,23 +250,23 @@ handle_timeout(#{on_route := Action, lang := Lang} = State) -> case Action of bounce -> stop(State); _ -> - Txt = <<"Idle connection">>, + Txt = ?T("Idle connection"), send(State, xmpp:serr_connection_timeout(Txt, Lang)) end. init([#{server := LServer, remote_server := RServer} = State, Opts]) -> ServerHost = ejabberd_router:host_of_route(LServer), - QueueType = ejabberd_s2s:queue_type(LServer), + QueueType = ejabberd_s2s:queue_type(ServerHost), QueueLimit = case lists:keyfind( max_queue, 1, ejabberd_config:fsm_limit_opts([])) of {_, N} -> N; false -> unlimited end, - Timeout = ejabberd_config:negotiation_timeout(), + Timeout = ejabberd_option:negotiation_timeout(), State1 = State#{on_route => queue, queue => p1_queue:new(QueueType, QueueLimit), xmlns => ?NS_SERVER, - lang => ejabberd_config:get_mylang(), + lang => ejabberd_option:language(), server_host => ServerHost, shaper => none}, State2 = xmpp_stream_out:set_timeout(State1, Timeout), @@ -314,8 +307,8 @@ terminate(Reason, #{server := LServer, normal -> State; _ -> State#{stop_reason => internal_failure} end, - bounce_queue(State1), - bounce_message_queue(State1). + State2 = bounce_queue(State1), + bounce_message_queue({LServer, RServer}, State2). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -337,13 +330,22 @@ bounce_queue(State) -> bounce_packet(Pkt, AccState) end, State). --spec bounce_message_queue(state()) -> state(). -bounce_message_queue(State) -> - receive {route, Pkt} -> - State1 = bounce_packet(Pkt, State), - bounce_message_queue(State1) - after 0 -> - State +-spec bounce_message_queue({binary(), binary()}, state()) -> state(). +bounce_message_queue({LServer, RServer} = FromTo, State) -> + Pids = ejabberd_s2s:get_connections_pids(FromTo), + case lists:member(self(), Pids) of + true -> + ?WARNING_MSG("Outgoing s2s connection ~s -> ~s is supposed " + "to be unregistered, but pid ~p still presents " + "in 's2s' table", [LServer, RServer, self()]), + State; + false -> + receive {route, Pkt} -> + State1 = bounce_packet(Pkt, State), + bounce_message_queue(FromTo, State1) + after 0 -> + State + end end. -spec bounce_packet(xmpp_element(), state()) -> state(). @@ -374,12 +376,12 @@ mk_bounce_error(_Lang, _State) -> -spec get_delay() -> non_neg_integer(). get_delay() -> - MaxDelay = ejabberd_config:get_option(s2s_max_retry_delay, 300), + MaxDelay = ejabberd_option:s2s_max_retry_delay(), p1_rand:uniform(MaxDelay). -spec set_idle_timeout(state()) -> state(). -set_idle_timeout(#{on_route := send, server := LServer} = State) -> - Timeout = ejabberd_s2s:get_idle_timeout(LServer), +set_idle_timeout(#{on_route := send, server_host := ServerHost} = State) -> + Timeout = ejabberd_s2s:get_idle_timeout(ServerHost), xmpp_stream_out:set_timeout(State, Timeout); set_idle_timeout(State) -> State. @@ -400,76 +402,3 @@ format_error(queue_full) -> <<"Stream queue is overloaded">>; format_error(Reason) -> xmpp_stream_out:format_error(Reason). - -transform_options(Opts) -> - lists:foldl(fun transform_options/2, [], Opts). - -transform_options({outgoing_s2s_options, Families, Timeout}, Opts) -> - ?WARNING_MSG("Option 'outgoing_s2s_options' is deprecated. " - "The option is still supported " - "but it is better to fix your config: " - "use 'outgoing_s2s_timeout' and " - "'outgoing_s2s_families' instead.", []), - maybe_report_huge_timeout(outgoing_s2s_timeout, Timeout), - [{outgoing_s2s_families, Families}, - {outgoing_s2s_timeout, Timeout} - | Opts]; -transform_options({s2s_dns_options, S2SDNSOpts}, AllOpts) -> - ?WARNING_MSG("Option 's2s_dns_options' is deprecated. " - "The option is still supported " - "but it is better to fix your config: " - "use 's2s_dns_timeout' and " - "'s2s_dns_retries' instead", []), - lists:foldr( - fun({timeout, T}, AccOpts) -> - maybe_report_huge_timeout(s2s_dns_timeout, T), - [{s2s_dns_timeout, T}|AccOpts]; - ({retries, R}, AccOpts) -> - [{s2s_dns_retries, R}|AccOpts]; - (_, AccOpts) -> - AccOpts - end, AllOpts, S2SDNSOpts); -transform_options({Opt, T}, Opts) - when Opt == outgoing_s2s_timeout; Opt == s2s_dns_timeout -> - maybe_report_huge_timeout(Opt, T), - [{Opt, T}|Opts]; -transform_options(Opt, Opts) -> - [Opt|Opts]. - -maybe_report_huge_timeout(Opt, T) when is_integer(T), T >= 1000 -> - ?WARNING_MSG("value '~p' of option '~p' is too big, " - "are you sure you have set seconds?", - [T, Opt]); -maybe_report_huge_timeout(_, _) -> - ok. - --spec opt_type(atom()) -> fun((any()) -> any()) | [atom()]. -opt_type(outgoing_s2s_families) -> - fun(Families) -> - lists:map( - fun(ipv4) -> inet; - (ipv6) -> inet6 - end, Families) - end; -opt_type(outgoing_s2s_port) -> - fun (I) when is_integer(I), I > 0, I < 65536 -> I end; -opt_type(outgoing_s2s_timeout) -> - fun(TimeOut) when is_integer(TimeOut), TimeOut > 0 -> - timer:seconds(TimeOut); - (unlimited) -> - infinity; - (infinity) -> - infinity - end; -opt_type(s2s_dns_retries) -> - fun (I) when is_integer(I), I >= 0 -> I end; -opt_type(s2s_dns_timeout) -> - fun(I) when is_integer(I), I>=0 -> timer:seconds(I); - (infinity) -> infinity; - (unlimited) -> infinity - end; -opt_type(s2s_max_retry_delay) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(_) -> - [outgoing_s2s_families, outgoing_s2s_port, outgoing_s2s_timeout, - s2s_dns_retries, s2s_dns_timeout, s2s_max_retry_delay]. |