diff options
author | Paweł Chmielowski <pchmielowski@process-one.net> | 2020-04-07 14:51:49 +0200 |
---|---|---|
committer | Paweł Chmielowski <pchmielowski@process-one.net> | 2020-04-07 14:51:49 +0200 |
commit | 9bb3aee0e2f1649c45404725675c09d9548cb1b9 (patch) | |
tree | f3632e34dffe9dcda90f205b6c92706a5941bc9d /src/mod_stream_mgmt.erl | |
parent | Log errors that happen when retrieving http headers in ejabberd_http (diff) |
Make resumed sessions try to deliver possibly queued messages to new session
Between receiving resume request and being closed by new session, it's
possible (even if not very likely) that new messages would arrive to
process that is resumed. In that case try to reroute messages that were
received after we sent resume reply to new process.
Diffstat (limited to 'src/mod_stream_mgmt.erl')
-rw-r--r-- | src/mod_stream_mgmt.erl | 19 |
1 files changed, 17 insertions, 2 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index fb1a609b1..0c73eb7f7 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -192,7 +192,7 @@ c2s_handle_recv(State, _, _) -> c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, lang := Lang} = State, Pkt, SendResult) - when MgmtState == pending; MgmtState == active -> + when MgmtState == pending; MgmtState == active; MgmtState == resumed -> IsStanza = xmpp:is_stanza(Pkt), case Pkt of _ when IsStanza -> @@ -214,6 +214,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, end; #stream_error{} -> case MgmtState of + resumed -> + State; active -> State; pending -> @@ -230,7 +232,7 @@ c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State, {resume_session, Time}, From) -> State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)}, Mod:reply(From, {resume, State1}), - {stop, State#{mgmt_state => resumed}}; + {stop, State#{mgmt_state => resumed, mgmt_queue => p1_queue:clear(Queue)}}; c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) -> Mod:reply(From, {error, session_not_found}), {stop, State}; @@ -282,6 +284,7 @@ c2s_terminated(#{mgmt_state := resumed, sid := SID, jid := JID} = State, _Reason [jid:encode(JID)]), {U, S, R} = jid:tolower(JID), ejabberd_sm:close_session(SID, U, S, R), + route_late_queue_after_resume(State), ejabberd_c2s:bounce_message_queue(SID, JID), {stop, State}; c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, @@ -544,6 +547,18 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) -> State end. +-spec route_late_queue_after_resume(state()) -> ok. +route_late_queue_after_resume(#{mgmt_queue := Queue, jid := JID}) + when ?qlen(Queue) > 0 -> + ?DEBUG("Re-routing ~B late queued packets to ~ts", + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foreach( + fun({_, _Time, Pkt}) -> + ejabberd_router:route(Pkt) + end, Queue); +route_late_queue_after_resume(_State) -> + ok. + -spec resend_unacked_stanzas(state()) -> state(). resend_unacked_stanzas(#{mgmt_state := MgmtState, mgmt_queue := Queue, |