diff options
Diffstat (limited to 'src/mod_stream_mgmt.erl')
-rw-r--r-- | src/mod_stream_mgmt.erl | 851 |
1 files changed, 851 insertions, 0 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl new file mode 100644 index 000000000..92b9e4020 --- /dev/null +++ b/src/mod_stream_mgmt.erl @@ -0,0 +1,851 @@ +%%%------------------------------------------------------------------- +%%% Author : Holger Weiss <holger@zedat.fu-berlin.de> +%%% Created : 25 Dec 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2019 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%------------------------------------------------------------------- +-module(mod_stream_mgmt). +-behaviour(gen_mod). +-author('holger@zedat.fu-berlin.de'). +-protocol({xep, 198, '1.5.2'}). + +%% gen_mod API +-export([start/2, stop/1, reload/3, depends/2, mod_opt_type/1, mod_options/1]). +%% hooks +-export([c2s_stream_started/2, c2s_stream_features/2, + c2s_authenticated_packet/2, c2s_unauthenticated_packet/2, + c2s_unbinded_packet/2, c2s_closed/2, c2s_terminated/2, + c2s_handle_send/3, c2s_handle_info/2, c2s_handle_call/3, + c2s_handle_recv/3]). +%% adjust pending session timeout / access queue +-export([get_resume_timeout/1, set_resume_timeout/2, queue_find/2]). + +-include("xmpp.hrl"). +-include("logger.hrl"). +-include("p1_queue.hrl"). +-include("translate.hrl"). + +-define(STREAM_MGMT_CACHE, stream_mgmt_cache). + +-define(is_sm_packet(Pkt), + is_record(Pkt, sm_enable) or + is_record(Pkt, sm_resume) or + is_record(Pkt, sm_a) or + is_record(Pkt, sm_r)). + +-type state() :: ejabberd_c2s:state(). +-type queue() :: p1_queue:queue({non_neg_integer(), erlang:timestamp(), xmpp_element() | xmlel()}). +-type error_reason() :: session_not_found | session_timed_out | + session_is_dead | session_has_exited | + session_was_killed | session_copy_timed_out | + invalid_previd. + +%%%=================================================================== +%%% API +%%%=================================================================== +start(Host, Opts) -> + init_cache(Opts), + ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE, + c2s_stream_started, 50), + ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, + c2s_stream_features, 50), + ejabberd_hooks:add(c2s_unauthenticated_packet, Host, ?MODULE, + c2s_unauthenticated_packet, 50), + ejabberd_hooks:add(c2s_unbinded_packet, Host, ?MODULE, + c2s_unbinded_packet, 50), + ejabberd_hooks:add(c2s_authenticated_packet, Host, ?MODULE, + c2s_authenticated_packet, 50), + ejabberd_hooks:add(c2s_handle_send, Host, ?MODULE, c2s_handle_send, 50), + ejabberd_hooks:add(c2s_handle_recv, Host, ?MODULE, c2s_handle_recv, 50), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50), + ejabberd_hooks:add(c2s_handle_call, Host, ?MODULE, c2s_handle_call, 50), + ejabberd_hooks:add(c2s_closed, Host, ?MODULE, c2s_closed, 50), + ejabberd_hooks:add(c2s_terminated, Host, ?MODULE, c2s_terminated, 50). + +stop(Host) -> + ejabberd_hooks:delete(c2s_stream_started, Host, ?MODULE, + c2s_stream_started, 50), + ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, + c2s_stream_features, 50), + ejabberd_hooks:delete(c2s_unauthenticated_packet, Host, ?MODULE, + c2s_unauthenticated_packet, 50), + ejabberd_hooks:delete(c2s_unbinded_packet, Host, ?MODULE, + c2s_unbinded_packet, 50), + ejabberd_hooks:delete(c2s_authenticated_packet, Host, ?MODULE, + c2s_authenticated_packet, 50), + ejabberd_hooks:delete(c2s_handle_send, Host, ?MODULE, c2s_handle_send, 50), + ejabberd_hooks:delete(c2s_handle_recv, Host, ?MODULE, c2s_handle_recv, 50), + ejabberd_hooks:delete(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50), + ejabberd_hooks:delete(c2s_handle_call, Host, ?MODULE, c2s_handle_call, 50), + ejabberd_hooks:delete(c2s_closed, Host, ?MODULE, c2s_closed, 50), + ejabberd_hooks:delete(c2s_terminated, Host, ?MODULE, c2s_terminated, 50). + +reload(_Host, NewOpts, _OldOpts) -> + init_cache(NewOpts), + ?WARNING_MSG("Module ~ts is reloaded, but new configuration will take " + "effect for newly created client connections only", [?MODULE]). + +depends(_Host, _Opts) -> + []. + +c2s_stream_started(#{lserver := LServer} = State, _StreamStart) -> + State1 = maps:remove(mgmt_options, State), + ResumeTimeout = get_configured_resume_timeout(LServer), + MaxResumeTimeout = get_max_resume_timeout(LServer, ResumeTimeout), + State1#{mgmt_state => inactive, + mgmt_queue_type => get_queue_type(LServer), + mgmt_max_queue => get_max_ack_queue(LServer), + mgmt_timeout => ResumeTimeout, + mgmt_max_timeout => MaxResumeTimeout, + mgmt_ack_timeout => get_ack_timeout(LServer), + mgmt_resend => get_resend_on_timeout(LServer), + mgmt_stanzas_in => 0, + mgmt_stanzas_out => 0, + mgmt_stanzas_req => 0}; +c2s_stream_started(State, _StreamStart) -> + State. + +c2s_stream_features(Acc, Host) -> + case gen_mod:is_loaded(Host, ?MODULE) of + true -> + [#feature_sm{xmlns = ?NS_STREAM_MGMT_2}, + #feature_sm{xmlns = ?NS_STREAM_MGMT_3}|Acc]; + false -> + Acc + end. + +c2s_unauthenticated_packet(#{lang := Lang} = State, Pkt) when ?is_sm_packet(Pkt) -> + %% XEP-0198 says: "For client-to-server connections, the client MUST NOT + %% attempt to enable stream management until after it has completed Resource + %% Binding unless it is resuming a previous session". However, it also + %% says: "Stream management errors SHOULD be considered recoverable", so we + %% won't bail out. + Err = #sm_failed{reason = 'not-authorized', + text = xmpp:mk_text(?T("Unauthorized"), Lang), + xmlns = ?NS_STREAM_MGMT_3}, + {stop, send(State, Err)}; +c2s_unauthenticated_packet(State, _Pkt) -> + State. + +c2s_unbinded_packet(State, #sm_resume{} = Pkt) -> + case handle_resume(State, Pkt) of + {ok, ResumedState} -> + {stop, ResumedState}; + {error, State1} -> + {stop, State1} + end; +c2s_unbinded_packet(State, Pkt) when ?is_sm_packet(Pkt) -> + c2s_unauthenticated_packet(State, Pkt); +c2s_unbinded_packet(State, _Pkt) -> + State. + +c2s_authenticated_packet(#{mgmt_state := MgmtState} = State, Pkt) + when ?is_sm_packet(Pkt) -> + if MgmtState == pending; MgmtState == active -> + {stop, perform_stream_mgmt(Pkt, State)}; + true -> + {stop, negotiate_stream_mgmt(Pkt, State)} + end; +c2s_authenticated_packet(State, Pkt) -> + update_num_stanzas_in(State, Pkt). + +c2s_handle_recv(#{mgmt_state := MgmtState, + lang := Lang} = State, El, {error, Why}) -> + Xmlns = xmpp:get_ns(El), + IsStanza = xmpp:is_stanza(El), + if Xmlns == ?NS_STREAM_MGMT_2; Xmlns == ?NS_STREAM_MGMT_3 -> + Txt = xmpp:io_format_error(Why), + Err = #sm_failed{reason = 'bad-request', + text = xmpp:mk_text(Txt, Lang), + xmlns = Xmlns}, + send(State, Err); + IsStanza andalso (MgmtState == pending orelse MgmtState == active) -> + State1 = update_num_stanzas_in(State, El), + case xmpp:get_type(El) of + <<"result">> -> State1; + <<"error">> -> State1; + _ -> + State1#{mgmt_force_enqueue => true} + end; + true -> + State + end; +c2s_handle_recv(State, _, _) -> + State. + +c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, + lang := Lang} = State, Pkt, SendResult) + when MgmtState == pending; MgmtState == active -> + IsStanza = xmpp:is_stanza(Pkt), + case Pkt of + _ when IsStanza -> + case need_to_enqueue(State, Pkt) of + {true, State1} -> + case mgmt_queue_add(State1, Pkt) of + #{mgmt_max_queue := exceeded} = State2 -> + State3 = State2#{mgmt_resend => false}, + Err = xmpp:serr_policy_violation( + ?T("Too many unacked stanzas"), Lang), + send(State3, Err); + State2 when SendResult == ok -> + send_rack(State2); + State2 -> + State2 + end; + {false, State1} -> + State1 + end; + #stream_error{} -> + case MgmtState of + active -> + State; + pending -> + Mod:stop(State#{stop_reason => {stream, {out, Pkt}}}) + end; + _ -> + State + end; +c2s_handle_send(State, _Pkt, _Result) -> + State. + +c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State, + {resume_session, Time}, From) -> + State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)}, + Mod:reply(From, {resume, State1}), + {stop, State#{mgmt_state => resumed}}; +c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) -> + Mod:reply(From, {error, session_not_found}), + {stop, State}; +c2s_handle_call(State, _Call, _From) -> + State. + +c2s_handle_info(#{mgmt_ack_timer := TRef, jid := JID, mod := Mod} = State, + {timeout, TRef, ack_timeout}) -> + ?DEBUG("Timed out waiting for stream management acknowledgement of ~ts", + [jid:encode(JID)]), + State1 = Mod:close(State), + State2 = State1#{stop_reason => {socket, ack_timeout}}, + {stop, transition_to_pending(State2, ack_timeout)}; +c2s_handle_info(#{mgmt_state := pending, lang := Lang, + mgmt_pending_timer := TRef, jid := JID, mod := Mod} = State, + {timeout, TRef, pending_timeout}) -> + ?DEBUG("Timed out waiting for resumption of stream for ~ts", + [jid:encode(JID)]), + Txt = ?T("Timed out waiting for stream resumption"), + Err = xmpp:serr_connection_timeout(Txt, Lang), + Mod:stop(State#{mgmt_state => timeout, + stop_reason => {stream, {out, Err}}}); +c2s_handle_info(State, {_Ref, {resume, #{jid := JID} = OldState}}) -> + %% This happens if the resume_session/1 request timed out; the new session + %% now receives the late response. + ?DEBUG("Received old session state for ~ts after failed resumption", + [jid:encode(JID)]), + route_unacked_stanzas(OldState#{mgmt_resend => false}), + {stop, State}; +c2s_handle_info(State, {timeout, _, Timeout}) when Timeout == ack_timeout; + Timeout == pending_timeout -> + %% Late arrival of an already cancelled timer: we just ignore it. + %% This might happen because misc:cancel_timer/1 doesn't guarantee + %% timer cancelation in the case when p1_server is used. + {stop, State}; +c2s_handle_info(State, _) -> + State. + +c2s_closed(State, {stream, _}) -> + State; +c2s_closed(#{mgmt_state := active} = State, Reason) -> + {stop, transition_to_pending(State, Reason)}; +c2s_closed(State, _Reason) -> + State. + +c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason) -> + ?DEBUG("Closing former stream of resumed session for ~ts", + [jid:encode(JID)]), + {U, S, R} = jid:tolower(JID), + ejabberd_sm:close_session(SID, U, S, R), + ejabberd_c2s:bounce_message_queue(SID, JID), + {stop, State}; +c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, + sid := {Time, _}, jid := JID} = State, _Reason) -> + case MgmtState of + timeout -> + store_stanzas_in(jid:tolower(JID), Time, In); + _ -> + ok + end, + route_unacked_stanzas(State), + State; +c2s_terminated(State, _Reason) -> + State. + +%%%=================================================================== +%%% Adjust pending session timeout / access queue +%%%=================================================================== +-spec get_resume_timeout(state()) -> non_neg_integer(). +get_resume_timeout(#{mgmt_timeout := Timeout}) -> + Timeout. + +-spec set_resume_timeout(state(), non_neg_integer()) -> state(). +set_resume_timeout(#{mgmt_timeout := Timeout} = State, Timeout) -> + State; +set_resume_timeout(State, Timeout) -> + State1 = restart_pending_timer(State, Timeout), + State1#{mgmt_timeout => Timeout}. + +-spec queue_find(fun((stanza()) -> boolean()), queue()) + -> stanza() | none. +queue_find(Pred, Queue) -> + case p1_queue:out(Queue) of + {{value, {_, _, Pkt}}, Queue1} -> + case Pred(Pkt) of + true -> + Pkt; + false -> + queue_find(Pred, Queue1) + end; + {empty, _Queue1} -> + none + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +-spec negotiate_stream_mgmt(xmpp_element(), state()) -> state(). +negotiate_stream_mgmt(Pkt, #{lang := Lang} = State) -> + Xmlns = xmpp:get_ns(Pkt), + case Pkt of + #sm_enable{} -> + handle_enable(State#{mgmt_xmlns => Xmlns}, Pkt); + _ when is_record(Pkt, sm_a); + is_record(Pkt, sm_r); + is_record(Pkt, sm_resume) -> + Txt = ?T("Stream management is not enabled"), + Err = #sm_failed{reason = 'unexpected-request', + text = xmpp:mk_text(Txt, Lang), + xmlns = Xmlns}, + send(State, Err) + end. + +-spec perform_stream_mgmt(xmpp_element(), state()) -> state(). +perform_stream_mgmt(Pkt, #{mgmt_xmlns := Xmlns, lang := Lang} = State) -> + case xmpp:get_ns(Pkt) of + Xmlns -> + case Pkt of + #sm_r{} -> + handle_r(State); + #sm_a{} -> + handle_a(State, Pkt); + _ when is_record(Pkt, sm_enable); + is_record(Pkt, sm_resume) -> + Txt = ?T("Stream management is already enabled"), + send(State, #sm_failed{reason = 'unexpected-request', + text = xmpp:mk_text(Txt, Lang), + xmlns = Xmlns}) + end; + _ -> + Txt = ?T("Unsupported version"), + send(State, #sm_failed{reason = 'unexpected-request', + text = xmpp:mk_text(Txt, Lang), + xmlns = Xmlns}) + end. + +-spec handle_enable(state(), sm_enable()) -> state(). +handle_enable(#{mgmt_timeout := DefaultTimeout, + mgmt_queue_type := QueueType, + mgmt_max_timeout := MaxTimeout, + mgmt_xmlns := Xmlns, jid := JID} = State, + #sm_enable{resume = Resume, max = Max}) -> + Timeout = if Resume == false -> + 0; + Max /= undefined, Max > 0, Max*1000 =< MaxTimeout -> + Max*1000; + true -> + DefaultTimeout + end, + Res = if Timeout > 0 -> + ?DEBUG("Stream management with resumption enabled for ~ts", + [jid:encode(JID)]), + #sm_enabled{xmlns = Xmlns, + id = make_resume_id(State), + resume = true, + max = Timeout div 1000}; + true -> + ?DEBUG("Stream management without resumption enabled for ~ts", + [jid:encode(JID)]), + #sm_enabled{xmlns = Xmlns} + end, + State1 = State#{mgmt_state => active, + mgmt_queue => p1_queue:new(QueueType), + mgmt_timeout => Timeout}, + send(State1, Res). + +-spec handle_r(state()) -> state(). +handle_r(#{mgmt_xmlns := Xmlns, mgmt_stanzas_in := H} = State) -> + Res = #sm_a{xmlns = Xmlns, h = H}, + send(State, Res). + +-spec handle_a(state(), sm_a()) -> state(). +handle_a(State, #sm_a{h = H}) -> + State1 = check_h_attribute(State, H), + resend_rack(State1). + +-spec handle_resume(state(), sm_resume()) -> {ok, state()} | {error, state()}. +handle_resume(#{user := User, lserver := LServer, + lang := Lang, socket := Socket} = State, + #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> + R = case inherit_session_state(State, PrevID) of + {ok, InheritedState} -> + {ok, InheritedState, H}; + {error, Err, InH} -> + {error, #sm_failed{reason = 'item-not-found', + text = xmpp:mk_text(format_error(Err), Lang), + h = InH, xmlns = Xmlns}, Err}; + {error, Err} -> + {error, #sm_failed{reason = 'item-not-found', + text = xmpp:mk_text(format_error(Err), Lang), + xmlns = Xmlns}, Err} + end, + case R of + {ok, #{jid := JID} = ResumedState, NumHandled} -> + State1 = check_h_attribute(ResumedState, NumHandled), + #{mgmt_xmlns := AttrXmlns, mgmt_stanzas_in := AttrH} = State1, + AttrId = make_resume_id(State1), + State2 = send(State1, #sm_resumed{xmlns = AttrXmlns, + h = AttrH, + previd = AttrId}), + State3 = resend_unacked_stanzas(State2), + State4 = send(State3, #sm_r{xmlns = AttrXmlns}), + State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []), + ?INFO_MSG("(~ts) Resumed session for ~ts", + [xmpp_socket:pp(Socket), jid:encode(JID)]), + {ok, State5}; + {error, El, Reason} -> + log_resumption_error(User, LServer, Reason), + {error, send(State, El)} + end. + +-spec transition_to_pending(state(), _) -> state(). +transition_to_pending(#{mgmt_state := active, mod := Mod, + mgmt_timeout := 0} = State, _Reason) -> + Mod:stop(State); +transition_to_pending(#{mgmt_state := active, jid := JID, socket := Socket, + lserver := LServer, mgmt_timeout := Timeout} = State, + Reason) -> + State1 = cancel_ack_timer(State), + ?INFO_MSG("(~ts) Closing c2s connection for ~ts: ~ts; " + "waiting ~B seconds for stream resumption", + [xmpp_socket:pp(Socket), jid:encode(JID), + format_reason(State, Reason), Timeout div 1000]), + TRef = erlang:start_timer(Timeout, self(), pending_timeout), + State2 = State1#{mgmt_state => pending, mgmt_pending_timer => TRef}, + ejabberd_hooks:run_fold(c2s_session_pending, LServer, State2, []); +transition_to_pending(State, _Reason) -> + State. + +-spec check_h_attribute(state(), non_neg_integer()) -> state(). +check_h_attribute(#{mgmt_stanzas_out := NumStanzasOut, jid := JID, + lang := Lang} = State, H) + when H > NumStanzasOut -> + ?WARNING_MSG("~ts acknowledged ~B stanzas, but only ~B were sent", + [jid:encode(JID), H, NumStanzasOut]), + State1 = State#{mgmt_resend => false}, + Err = xmpp:serr_undefined_condition( + ?T("Client acknowledged more stanzas than sent by server"), Lang), + send(State1, Err); +check_h_attribute(#{mgmt_stanzas_out := NumStanzasOut, jid := JID} = State, H) -> + ?DEBUG("~ts acknowledged ~B of ~B stanzas", + [jid:encode(JID), H, NumStanzasOut]), + mgmt_queue_drop(State, H). + +-spec update_num_stanzas_in(state(), xmpp_element() | xmlel()) -> state(). +update_num_stanzas_in(#{mgmt_state := MgmtState, + mgmt_stanzas_in := NumStanzasIn} = State, El) + when MgmtState == active; MgmtState == pending -> + NewNum = case {xmpp:is_stanza(El), NumStanzasIn} of + {true, 4294967295} -> + 0; + {true, Num} -> + Num + 1; + {false, Num} -> + Num + end, + State#{mgmt_stanzas_in => NewNum}; +update_num_stanzas_in(State, _El) -> + State. + +-spec send_rack(state()) -> state(). +send_rack(#{mgmt_ack_timer := _} = State) -> + State; +send_rack(#{mgmt_xmlns := Xmlns, + mgmt_stanzas_out := NumStanzasOut, + mgmt_ack_timeout := AckTimeout} = State) -> + TRef = erlang:start_timer(AckTimeout, self(), ack_timeout), + State1 = State#{mgmt_ack_timer => TRef, mgmt_stanzas_req => NumStanzasOut}, + send(State1, #sm_r{xmlns = Xmlns}). + +-spec resend_rack(state()) -> state(). +resend_rack(#{mgmt_ack_timer := _, + mgmt_queue := Queue, + mgmt_stanzas_out := NumStanzasOut, + mgmt_stanzas_req := NumStanzasReq} = State) -> + State1 = cancel_ack_timer(State), + case NumStanzasReq < NumStanzasOut andalso not p1_queue:is_empty(Queue) of + true -> send_rack(State1); + false -> State1 + end; +resend_rack(State) -> + State. + +-spec mgmt_queue_add(state(), xmlel() | xmpp_element()) -> state(). +mgmt_queue_add(#{mgmt_stanzas_out := NumStanzasOut, + mgmt_queue := Queue} = State, Pkt) -> + NewNum = case NumStanzasOut of + 4294967295 -> 0; + Num -> Num + 1 + end, + Queue1 = p1_queue:in({NewNum, erlang:timestamp(), Pkt}, Queue), + State1 = State#{mgmt_queue => Queue1, mgmt_stanzas_out => NewNum}, + check_queue_length(State1). + +-spec mgmt_queue_drop(state(), non_neg_integer()) -> state(). +mgmt_queue_drop(#{mgmt_queue := Queue} = State, NumHandled) -> + NewQueue = p1_queue:dropwhile( + fun({N, _T, _E}) -> N =< NumHandled end, Queue), + State#{mgmt_queue => NewQueue}. + +-spec check_queue_length(state()) -> state(). +check_queue_length(#{mgmt_max_queue := Limit} = State) + when Limit == infinity; Limit == exceeded -> + State; +check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) -> + case p1_queue:len(Queue) > Limit of + true -> + State#{mgmt_max_queue => exceeded}; + false -> + State + end. + +-spec resend_unacked_stanzas(state()) -> state(). +resend_unacked_stanzas(#{mgmt_state := MgmtState, + mgmt_queue := Queue, + jid := JID} = State) + when (MgmtState == active orelse + MgmtState == pending orelse + MgmtState == timeout) andalso ?qlen(Queue) > 0 -> + ?DEBUG("Resending ~B unacknowledged stanza(s) to ~ts", + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foldl( + fun({_, Time, Pkt}, AccState) -> + Pkt1 = add_resent_delay_info(AccState, Pkt, Time), + Pkt2 = if ?is_stanza(Pkt1) -> + xmpp:put_meta(Pkt1, mgmt_is_resent, true); + true -> + Pkt1 + end, + send(AccState, Pkt2) + end, State, Queue); +resend_unacked_stanzas(State) -> + State. + +-spec route_unacked_stanzas(state()) -> ok. +route_unacked_stanzas(#{mgmt_state := MgmtState, + mgmt_resend := MgmtResend, + lang := Lang, user := User, + jid := JID, lserver := LServer, + mgmt_queue := Queue, + resource := Resource} = State) + when (MgmtState == active orelse + MgmtState == pending orelse + MgmtState == timeout) andalso ?qlen(Queue) > 0 -> + ResendOnTimeout = case MgmtResend of + Resend when is_boolean(Resend) -> + Resend; + if_offline -> + case ejabberd_sm:get_user_resources(User, LServer) of + [Resource] -> + %% Same resource opened new session + true; + [] -> true; + _ -> false + end + end, + ?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~ts", + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foreach( + fun({_, _Time, #presence{from = From}}) -> + ?DEBUG("Dropping presence stanza from ~ts", [jid:encode(From)]); + ({_, _Time, #iq{} = El}) -> + Txt = ?T("User session terminated"), + ejabberd_router:route_error( + El, xmpp:err_service_unavailable(Txt, Lang)); + ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}) -> + %% XEP-0280 says: "When a receiving server attempts to deliver a + %% forked message, and that message bounces with an error for + %% any reason, the receiving server MUST NOT forward that error + %% back to the original sender." Resending such a stanza could + %% easily lead to unexpected results as well. + ?DEBUG("Dropping forwarded message stanza from ~ts", + [jid:encode(From)]); + ({_, Time, #message{} = Msg}) -> + case ejabberd_hooks:run_fold(message_is_archived, + LServer, false, + [State, Msg]) of + true -> + ?DEBUG("Dropping archived message stanza from ~ts", + [jid:encode(xmpp:get_from(Msg))]); + false when ResendOnTimeout -> + NewEl = add_resent_delay_info(State, Msg, Time), + ejabberd_router:route(NewEl); + false -> + Txt = ?T("User session terminated"), + ejabberd_router:route_error( + Msg, xmpp:err_service_unavailable(Txt, Lang)) + end; + ({_, _Time, El}) -> + %% Raw element of type 'error' resulting from a validation error + %% We cannot pass it to the router, it will generate an error + ?DEBUG("Do not route raw element from ack queue: ~p", [El]) + end, Queue); +route_unacked_stanzas(_State) -> + ok. + +-spec inherit_session_state(state(), binary()) -> {ok, state()} | + {error, error_reason()} | + {error, error_reason(), non_neg_integer()}. +inherit_session_state(#{user := U, server := S, + mgmt_queue_type := QueueType} = State, ResumeID) -> + case misc:base64_to_term(ResumeID) of + {term, {R, Time}} -> + case ejabberd_sm:get_session_pid(U, S, R) of + none -> + case pop_stanzas_in({U, S, R}, Time) of + error -> + {error, session_not_found}; + {ok, H} -> + {error, session_timed_out, H} + end; + OldPID -> + OldSID = {Time, OldPID}, + try resume_session(OldSID, State) of + {resume, #{mgmt_xmlns := Xmlns, + mgmt_queue := Queue, + mgmt_timeout := Timeout, + mgmt_stanzas_in := NumStanzasIn, + mgmt_stanzas_out := NumStanzasOut} = OldState} -> + State1 = ejabberd_c2s:copy_state(State, OldState), + Queue1 = case QueueType of + ram -> Queue; + _ -> p1_queue:ram_to_file(Queue) + end, + State2 = State1#{mgmt_xmlns => Xmlns, + mgmt_queue => Queue1, + mgmt_timeout => Timeout, + mgmt_stanzas_in => NumStanzasIn, + mgmt_stanzas_out => NumStanzasOut, + mgmt_state => active}, + State3 = ejabberd_c2s:open_session(State2), + ejabberd_c2s:stop(OldPID), + {ok, State3}; + {error, Msg} -> + {error, Msg} + catch exit:{noproc, _} -> + {error, session_is_dead}; + exit:{normal, _} -> + {error, session_has_exited}; + exit:{shutdown, _} -> + {error, session_has_exited}; + exit:{killed, _} -> + {error, session_was_killed}; + exit:{timeout, _} -> + ejabberd_sm:close_session(OldSID, U, S, R), + ejabberd_c2s:stop(OldPID), + {error, session_copy_timed_out} + end + end; + _ -> + {error, invalid_previd} + end. + +-spec resume_session({erlang:timestamp(), pid()}, state()) -> {resume, state()} | + {error, error_reason()}. +resume_session({Time, Pid}, _State) -> + ejabberd_c2s:call(Pid, {resume_session, Time}, timer:seconds(15)). + +-spec make_resume_id(state()) -> binary(). +make_resume_id(#{sid := {Time, _}, resource := Resource}) -> + misc:term_to_base64({Resource, Time}). + +-spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza(); + (state(), xmlel(), erlang:timestamp()) -> xmlel(). +add_resent_delay_info(#{lserver := LServer}, El, Time) + when is_record(El, message); is_record(El, presence) -> + misc:add_delay_info(El, jid:make(LServer), Time, <<"Resent">>); +add_resent_delay_info(_State, El, _Time) -> + %% TODO + El. + +-spec send(state(), xmpp_element()) -> state(). +send(#{mod := Mod} = State, Pkt) -> + Mod:send(State, Pkt). + +-spec restart_pending_timer(state(), non_neg_integer()) -> state(). +restart_pending_timer(#{mgmt_pending_timer := TRef} = State, NewTimeout) -> + misc:cancel_timer(TRef), + NewTRef = erlang:start_timer(NewTimeout, self(), pending_timeout), + State#{mgmt_pending_timer => NewTRef}; +restart_pending_timer(State, _NewTimeout) -> + State. + +-spec cancel_ack_timer(state()) -> state(). +cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) -> + misc:cancel_timer(TRef), + maps:remove(mgmt_ack_timer, State); +cancel_ack_timer(State) -> + State. + +-spec need_to_enqueue(state(), xmlel() | stanza()) -> {boolean(), state()}. +need_to_enqueue(State, Pkt) when ?is_stanza(Pkt) -> + {not xmpp:get_meta(Pkt, mgmt_is_resent, false), State}; +need_to_enqueue(#{mgmt_force_enqueue := true} = State, #xmlel{}) -> + State1 = maps:remove(mgmt_force_enqueue, State), + State2 = maps:remove(mgmt_is_resent, State1), + {true, State2}; +need_to_enqueue(State, _) -> + {false, State}. + +%%%=================================================================== +%%% Formatters and Logging +%%%=================================================================== +-spec format_error(error_reason()) -> binary(). +format_error(session_not_found) -> + ?T("Previous session not found"); +format_error(session_timed_out) -> + ?T("Previous session timed out"); +format_error(session_is_dead) -> + ?T("Previous session PID is dead"); +format_error(session_has_exited) -> + ?T("Previous session PID has exited"); +format_error(session_was_killed) -> + ?T("Previous session PID has been killed"); +format_error(session_copy_timed_out) -> + ?T("Session state copying timed out"); +format_error(invalid_previd) -> + ?T("Invalid 'previd' value"). + +-spec format_reason(state(), term()) -> binary(). +format_reason(_, ack_timeout) -> + <<"Timed out waiting for stream acknowledgement">>; +format_reason(#{stop_reason := {socket, ack_timeout}} = State, _) -> + format_reason(State, ack_timeout); +format_reason(State, Reason) -> + ejabberd_c2s:format_reason(State, Reason). + +-spec log_resumption_error(binary(), binary(), error_reason()) -> ok. +log_resumption_error(User, Server, Reason) + when Reason == invalid_previd -> + ?WARNING_MSG("Cannot resume session for ~ts@~ts: ~ts", + [User, Server, format_error(Reason)]); +log_resumption_error(User, Server, Reason) -> + ?INFO_MSG("Cannot resume session for ~ts@~ts: ~ts", + [User, Server, format_error(Reason)]). + +%%%=================================================================== +%%% Cache-like storage for last handled stanzas +%%%=================================================================== +init_cache(Opts) -> + ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)). + +cache_opts(Opts) -> + [{max_size, mod_stream_mgmt_opt:cache_size(Opts)}, + {life_time, mod_stream_mgmt_opt:cache_life_time(Opts)}, + {type, ordered_set}]. + +-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean(). +store_stanzas_in(LJID, Time, Num) -> + ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num, + ejabberd_cluster:get_nodes()). + +-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error. +pop_stanzas_in(LJID, Time) -> + case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of + {ok, Val} -> + ets_cache:match_delete(?STREAM_MGMT_CACHE, {LJID, '_'}, + ejabberd_cluster:get_nodes()), + {ok, Val}; + error -> + error + end. + +%%%=================================================================== +%%% Configuration processing +%%%=================================================================== +get_max_ack_queue(Host) -> + mod_stream_mgmt_opt:max_ack_queue(Host). + +get_configured_resume_timeout(Host) -> + mod_stream_mgmt_opt:resume_timeout(Host). + +get_max_resume_timeout(Host, ResumeTimeout) -> + case mod_stream_mgmt_opt:max_resume_timeout(Host) of + undefined -> ResumeTimeout; + Max when Max >= ResumeTimeout -> Max; + _ -> ResumeTimeout + end. + +get_ack_timeout(Host) -> + mod_stream_mgmt_opt:ack_timeout(Host). + +get_resend_on_timeout(Host) -> + mod_stream_mgmt_opt:resend_on_timeout(Host). + +get_queue_type(Host) -> + mod_stream_mgmt_opt:queue_type(Host). + +mod_opt_type(max_ack_queue) -> + econf:pos_int(infinity); +mod_opt_type(resume_timeout) -> + econf:either( + econf:int(0, 0), + econf:timeout(second)); +mod_opt_type(max_resume_timeout) -> + econf:either( + econf:int(0, 0), + econf:timeout(second)); +mod_opt_type(ack_timeout) -> + econf:timeout(second, infinity); +mod_opt_type(resend_on_timeout) -> + econf:either( + if_offline, + econf:bool()); +mod_opt_type(cache_size) -> + econf:pos_int(infinity); +mod_opt_type(cache_life_time) -> + econf:timeout(second, infinity); +mod_opt_type(queue_type) -> + econf:queue_type(). + +mod_options(Host) -> + [{max_ack_queue, 5000}, + {resume_timeout, timer:seconds(300)}, + {max_resume_timeout, undefined}, + {ack_timeout, timer:seconds(60)}, + {cache_size, ejabberd_option:cache_size(Host)}, + {cache_life_time, timer:hours(48)}, + {resend_on_timeout, false}, + {queue_type, ejabberd_option:queue_type(Host)}]. |