diff options
author | Holger Weiss <holger@zedat.fu-berlin.de> | 2020-06-01 21:33:55 +0200 |
---|---|---|
committer | Holger Weiss <holger@zedat.fu-berlin.de> | 2020-06-01 21:33:55 +0200 |
commit | cd336369a5691da8289574f402fa2311b6dc027c (patch) | |
tree | 07da7d2552d60b7ba4f43e60e608e8e2e790bbce /src/mod_stream_mgmt.erl | |
parent | Test 23.0 version (diff) |
mod_stream_mgmt: Don't kill new PID on resumption
During XEP-0198 resumption, the ejabberd_c2s process that handles the
new connection reopens the ejabberd_sm session of the old one. Since
commit b4770815c0b0416c21d01507d2908f94c25b3097, the new process adds
the new session table entry before the old process removes the old one.
While adding the new one, ejabberd_sm checks for old sessions to
replace. This check assumes old SIDs compare lower than new ones. This
assumption didn't necessarily hold for the session resumption case,
where the old SID's timestamp was copied over to the new SID and only
the PID was updated. Therefore, the new process was killed if the new
PID happened to be smaller than the old one.
Fix this by having mod_stream_mgmt use its own SM-ID rather than copying
over the old SID's timestamp to the new SID.
Thanks to Thilo Molitor and Friedrich Altheide for reporting the issue,
and to Thomas Leister for his help with debugging it.
Diffstat (limited to 'src/mod_stream_mgmt.erl')
-rw-r--r-- | src/mod_stream_mgmt.erl | 86 |
1 files changed, 51 insertions, 35 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 45d52ccb0..ee72e152f 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -52,6 +52,7 @@ -type state() :: ejabberd_c2s:state(). -type queue() :: p1_queue:queue({non_neg_integer(), erlang:timestamp(), xmpp_element() | xmlel()}). +-type id() :: binary(). -type error_reason() :: session_not_found | session_timed_out | session_is_dead | session_has_exited | session_was_killed | session_copy_timed_out | @@ -228,8 +229,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, c2s_handle_send(State, _Pkt, _Result) -> State. -c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State, - {resume_session, Time}, From) -> +c2s_handle_call(#{mgmt_id := MgmtID, mgmt_queue := Queue, mod := Mod} = State, + {resume_session, MgmtID}, From) -> State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)}, Mod:reply(From, {resume, State1}), {stop, State#{mgmt_state => resumed, mgmt_queue => p1_queue:clear(Queue)}}; @@ -288,10 +289,10 @@ c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason ejabberd_c2s:bounce_message_queue(SID, JID), {stop, State}; c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, - sid := {Time, _}, jid := JID} = State, _Reason) -> + mgmt_id := MgmtID, jid := JID} = State, _Reason) -> case MgmtState of timeout -> - store_stanzas_in(jid:tolower(JID), Time, In); + store_stanzas_in(jid:tolower(JID), MgmtID, In); _ -> ok end, @@ -377,6 +378,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout, mgmt_max_timeout := MaxTimeout, mgmt_xmlns := Xmlns, jid := JID} = State, #sm_enable{resume = Resume, max = Max}) -> + State1 = State#{mgmt_id => make_id()}, Timeout = if Resume == false -> 0; Max /= undefined, Max > 0, Max*1000 =< MaxTimeout -> @@ -388,7 +390,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout, ?DEBUG("Stream management with resumption enabled for ~ts", [jid:encode(JID)]), #sm_enabled{xmlns = Xmlns, - id = make_resume_id(State), + id = encode_id(State1), resume = true, max = Timeout div 1000}; true -> @@ -396,10 +398,10 @@ handle_enable(#{mgmt_timeout := DefaultTimeout, [jid:encode(JID)]), #sm_enabled{xmlns = Xmlns} end, - State1 = State#{mgmt_state => active, - mgmt_queue => p1_queue:new(QueueType), - mgmt_timeout => Timeout}, - send(State1, Res). + State2 = State1#{mgmt_state => active, + mgmt_queue => p1_queue:new(QueueType), + mgmt_timeout => Timeout}, + send(State2, Res). -spec handle_r(state()) -> state(). handle_r(#{mgmt_xmlns := Xmlns, mgmt_stanzas_in := H} = State) -> @@ -431,10 +433,9 @@ handle_resume(#{user := User, lserver := LServer, {ok, #{jid := JID} = ResumedState, NumHandled} -> State1 = check_h_attribute(ResumedState, NumHandled), #{mgmt_xmlns := AttrXmlns, mgmt_stanzas_in := AttrH} = State1, - AttrId = make_resume_id(State1), State2 = send(State1, #sm_resumed{xmlns = AttrXmlns, h = AttrH, - previd = AttrId}), + previd = PrevID}), State3 = resend_unacked_stanzas(State2), State4 = send(State3, #sm_r{xmlns = AttrXmlns}), State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []), @@ -649,20 +650,19 @@ route_unacked_stanzas(_State) -> {error, error_reason()} | {error, error_reason(), non_neg_integer()}. inherit_session_state(#{user := U, server := S, - mgmt_queue_type := QueueType} = State, ResumeID) -> - case misc:base64_to_term(ResumeID) of - {term, {R, Time}} -> - case ejabberd_sm:get_session_pid(U, S, R) of + mgmt_queue_type := QueueType} = State, PrevID) -> + case decode_id(PrevID) of + {ok, {R, MgmtID}} -> + case ejabberd_sm:get_session_sid(U, S, R) of none -> - case pop_stanzas_in({U, S, R}, Time) of + case pop_stanzas_in({U, S, R}, MgmtID) of error -> {error, session_not_found}; {ok, H} -> {error, session_timed_out, H} end; - OldPID -> - OldSID = {Time, OldPID}, - try resume_session(OldSID, State) of + {_, OldPID} = OldSID -> + try resume_session(OldPID, MgmtID, State) of {resume, #{mgmt_xmlns := Xmlns, mgmt_queue := Queue, mgmt_timeout := Timeout, @@ -673,7 +673,9 @@ inherit_session_state(#{user := U, server := S, ram -> Queue; _ -> p1_queue:ram_to_file(Queue) end, - State2 = State1#{mgmt_xmlns => Xmlns, + State2 = State1#{sid => ejabberd_sm:make_sid(), + mgmt_id => MgmtID, + mgmt_xmlns => Xmlns, mgmt_queue => Queue1, mgmt_timeout => Timeout, mgmt_stanzas_in => NumStanzasIn, @@ -698,18 +700,14 @@ inherit_session_state(#{user := U, server := S, {error, session_copy_timed_out} end end; - _ -> + error -> {error, invalid_previd} end. --spec resume_session({erlang:timestamp(), pid()}, state()) -> {resume, state()} | - {error, error_reason()}. -resume_session({Time, Pid}, _State) -> - ejabberd_c2s:call(Pid, {resume_session, Time}, timer:seconds(15)). - --spec make_resume_id(state()) -> binary(). -make_resume_id(#{sid := {Time, _}, resource := Resource}) -> - misc:term_to_base64({Resource, Time}). +-spec resume_session(pid(), id(), state()) -> {resume, state()} | + {error, error_reason()}. +resume_session(PID, MgmtID, _State) -> + ejabberd_c2s:call(PID, {resume_session, MgmtID}, timer:seconds(15)). -spec add_resent_delay_info(state(), stanza(), erlang:timestamp()) -> stanza(); (state(), xmlel(), erlang:timestamp()) -> xmlel(). @@ -756,6 +754,24 @@ need_to_enqueue(#{mgmt_force_enqueue := true} = State, #xmlel{}) -> need_to_enqueue(State, _) -> {false, State}. +-spec make_id() -> id(). +make_id() -> + p1_rand:bytes(8). + +-spec encode_id(state()) -> binary(). +encode_id(#{mgmt_id := MgmtID, resource := Resource}) -> + misc:term_to_base64({Resource, MgmtID}). + +-spec decode_id(binary()) -> {ok, {binary(), id()}} | error. +decode_id(Encoded) -> + case misc:base64_to_term(Encoded) of + {term, {Resource, MgmtID}} when is_binary(Resource), + is_binary(MgmtID) -> + {ok, {Resource, MgmtID}}; + _ -> + error + end. + %%%=================================================================== %%% Formatters and Logging %%%=================================================================== @@ -803,14 +819,14 @@ cache_opts(Opts) -> {life_time, mod_stream_mgmt_opt:cache_life_time(Opts)}, {type, ordered_set}]. --spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean(). -store_stanzas_in(LJID, Time, Num) -> - ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num, +-spec store_stanzas_in(ljid(), id(), non_neg_integer()) -> boolean(). +store_stanzas_in(LJID, MgmtID, Num) -> + ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, MgmtID}, Num, ejabberd_cluster:get_nodes()). --spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error. -pop_stanzas_in(LJID, Time) -> - case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of +-spec pop_stanzas_in(ljid(), id()) -> {ok, non_neg_integer()} | error. +pop_stanzas_in(LJID, MgmtID) -> + case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, MgmtID}) of {ok, Val} -> ets_cache:match_delete(?STREAM_MGMT_CACHE, {LJID, '_'}, ejabberd_cluster:get_nodes()), |