aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPaweł Chmielowski <pchmielowski@process-one.net>2020-04-01 14:35:49 +0200
committerPaweł Chmielowski <pchmielowski@process-one.net>2020-04-01 14:36:01 +0200
commit1bd560f3f25d0a644bac3d06904ca97e20a6f7d9 (patch)
tree3d74c9bc22f95409ebda8cc230169283096e9188 /src
parentUse different username than other tests, but still include the test chars (diff)
Fix potential message loss in terminating c2s sessions
Calling sync version of xmpp_stream_in/out:stop could lead to messages never being processed by c2s process if they were queued in p1_server. This could be reproduced by when having messages in offline storage, starting sessions, enabling stream_mgmt, sending initial presence, and then immediately </stream:stream>, messages that mod_offline would send process would not be bounced back by stream_mgmt.
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_c2s.erl12
-rw-r--r--src/ejabberd_s2s.erl10
-rw-r--r--src/ejabberd_s2s_in.erl10
-rw-r--r--src/ejabberd_s2s_out.erl16
-rw-r--r--src/ejabberd_service.erl11
-rw-r--r--src/ejabberd_sm.erl2
-rw-r--r--src/mod_s2s_dialback.erl3
-rw-r--r--src/mod_stream_mgmt.erl15
8 files changed, 43 insertions, 36 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 8f069bcbe..f533fbed3 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -43,7 +43,7 @@
process_closed/2, process_terminated/2, process_info/2]).
%% API
-export([get_presence/1, set_presence/2, resend_presence/1, resend_presence/2,
- open_session/1, call/3, cast/2, send/2, close/1, close/2, stop/1,
+ open_session/1, call/3, cast/2, send/2, close/1, close/2, stop_async/1,
reply/2, copy_state/2, set_timeout/2, route/2, format_reason/2,
host_up/1, host_down/1, send_ws_ping/1, bounce_message_queue/2]).
@@ -110,10 +110,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_in:close(Ref, Reason).
--spec stop(pid()) -> ok;
- (state()) -> no_return().
-stop(Ref) ->
- xmpp_stream_in:stop(Ref).
+-spec stop_async(pid()) -> ok.
+stop_async(Pid) ->
+ xmpp_stream_in:stop_async(Pid).
-spec send(pid(), xmpp_element()) -> ok;
(state(), xmpp_element()) -> state().
@@ -285,7 +284,8 @@ process_auth_result(#{sasl_mech := Mech,
State.
process_closed(State, Reason) ->
- stop(State#{stop_reason => Reason}).
+ stop_async(self()),
+ State#{stop_reason => Reason}.
process_terminated(#{sid := SID, socket := Socket,
jid := JID, user := U, server := S, resource := R} = State,
diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl
index afe941f9a..3d2302f95 100644
--- a/src/ejabberd_s2s.erl
+++ b/src/ejabberd_s2s.erl
@@ -319,7 +319,7 @@ host_down(Host) ->
case ejabberd_router:host_of_route(From) of
Host ->
ejabberd_s2s_out:send(Pid, Err),
- ejabberd_s2s_out:stop(Pid);
+ ejabberd_s2s_out:stop_async(Pid);
_ ->
ok
end;
@@ -473,14 +473,14 @@ new_connection(MyServer, Server, From, FromTo,
if Pid1 == Pid ->
ejabberd_s2s_out:connect(Pid);
true ->
- ejabberd_s2s_out:stop(Pid)
+ ejabberd_s2s_out:stop_async(Pid)
end,
[Pid1];
{aborted, Reason} ->
?ERROR_MSG("Failed to register s2s connection ~ts -> ~ts: "
"Mnesia failure: ~p",
[MyServer, Server, Reason]),
- ejabberd_s2s_out:stop(Pid),
+ ejabberd_s2s_out:stop_async(Pid),
[]
end.
@@ -553,13 +553,13 @@ stop_s2s_connections(Err) ->
lists:foreach(
fun({_Id, Pid, _Type, _Module}) ->
ejabberd_s2s_in:send(Pid, Err),
- ejabberd_s2s_in:stop(Pid),
+ ejabberd_s2s_in:stop_async(Pid),
supervisor:terminate_child(ejabberd_s2s_in_sup, Pid)
end, supervisor:which_children(ejabberd_s2s_in_sup)),
lists:foreach(
fun({_Id, Pid, _Type, _Module}) ->
ejabberd_s2s_out:send(Pid, Err),
- ejabberd_s2s_out:stop(Pid),
+ ejabberd_s2s_out:stop_async(Pid),
supervisor:terminate_child(ejabberd_s2s_out_sup, Pid)
end, supervisor:which_children(ejabberd_s2s_out_sup)),
_ = mnesia:clear_table(s2s),
diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl
index 2c838f76e..187869cfb 100644
--- a/src/ejabberd_s2s_in.erl
+++ b/src/ejabberd_s2s_in.erl
@@ -38,7 +38,7 @@
-export([handle_unexpected_info/2, handle_unexpected_cast/2,
reject_unauthenticated_packet/2, process_closed/2]).
%% API
--export([stop/1, close/1, close/2, send/2, update_state/2, establish/1,
+-export([stop_async/1, close/1, close/2, send/2, update_state/2, establish/1,
host_up/1, host_down/1]).
-include("xmpp.hrl").
@@ -64,8 +64,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_in:close(Ref, Reason).
-stop(Ref) ->
- xmpp_stream_in:stop(Ref).
+-spec stop_async(pid()) -> ok.
+stop_async(Pid) ->
+ xmpp_stream_in:stop_async(Pid).
accept(Ref) ->
xmpp_stream_in:accept(Ref).
@@ -130,7 +131,8 @@ process_closed(#{server := LServer} = State, Reason) ->
end,
?INFO_MSG("Closing inbound s2s connection ~ts -> ~ts: ~ts",
[RServer, LServer, xmpp_stream_out:format_error(Reason)]),
- stop(State).
+ stop_async(self()),
+ State.
%%%===================================================================
%%% xmpp_stream_in callbacks
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl
index 7bbaf870c..0b44eb132 100644
--- a/src/ejabberd_s2s_out.erl
+++ b/src/ejabberd_s2s_out.erl
@@ -36,7 +36,7 @@
-export([process_auth_result/2, process_closed/2, handle_unexpected_info/2,
handle_unexpected_cast/2, process_downgraded/2]).
%% API
--export([start/3, start_link/3, connect/1, close/1, close/2, stop/1, send/2,
+-export([start/3, start_link/3, connect/1, close/1, close/2, stop_async/1, send/2,
route/2, establish/1, update_state/2, host_up/1, host_down/1]).
-include("xmpp.hrl").
@@ -79,10 +79,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_out:close(Ref, Reason).
--spec stop(pid()) -> ok;
- (state()) -> no_return().
-stop(Ref) ->
- xmpp_stream_out:stop(Ref).
+-spec stop_async(pid()) -> ok.
+stop_async(Pid) ->
+ xmpp_stream_out:stop_async(Pid).
-spec send(pid(), xmpp_element()) -> ok;
(state(), xmpp_element()) -> state().
@@ -150,7 +149,8 @@ process_closed(#{server := LServer, remote_server := RServer,
Reason) ->
?INFO_MSG("Closing outbound s2s connection ~ts -> ~ts: ~ts",
[LServer, RServer, format_error(Reason)]),
- stop(State);
+ stop_async(self()),
+ State;
process_closed(#{server := LServer, remote_server := RServer} = State,
Reason) ->
Delay = get_delay(),
@@ -248,7 +248,9 @@ handle_send(El, Pkt, #{server_host := ServerHost} = State) ->
handle_timeout(#{on_route := Action, lang := Lang} = State) ->
case Action of
- bounce -> stop(State);
+ bounce ->
+ stop_async(self()),
+ State;
_ ->
Txt = ?T("Idle connection"),
send(State, xmpp:serr_connection_timeout(Txt, Lang))
diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl
index 92350956d..a5270b54d 100644
--- a/src/ejabberd_service.erl
+++ b/src/ejabberd_service.erl
@@ -33,7 +33,7 @@
-export([handle_stream_start/2, handle_auth_success/4, handle_auth_failure/4,
handle_authenticated_packet/2, get_password_fun/1, tls_options/1]).
%% API
--export([send/2, close/1, close/2, stop/1]).
+-export([send/2, close/1, close/2, stop_async/1]).
-include("xmpp.hrl").
-include("logger.hrl").
@@ -59,7 +59,7 @@ stop() ->
lists:foreach(
fun({_Id, Pid, _Type, _Module}) ->
send(Pid, Err),
- stop(Pid),
+ stop_async(Pid),
supervisor:terminate_child(ejabberd_service_sup, Pid)
end, supervisor:which_children(ejabberd_service_sup)),
_ = supervisor:terminate_child(ejabberd_sup, ejabberd_service_sup),
@@ -83,10 +83,9 @@ close(Ref) ->
close(Ref, Reason) ->
xmpp_stream_in:close(Ref, Reason).
--spec stop(pid()) -> ok;
- (state()) -> no_return().
-stop(Ref) ->
- xmpp_stream_in:stop(Ref).
+-spec stop_async(pid()) -> ok.
+stop_async(Pid) ->
+ xmpp_stream_in:stop_async(Pid).
%%%===================================================================
%%% xmpp_stream_in callbacks
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index 6202614db..79839b4aa 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -538,7 +538,7 @@ host_down(Host) ->
lists:foreach(
fun(#session{sid = {_, Pid}}) when node(Pid) == node() ->
ejabberd_c2s:send(Pid, Err),
- ejabberd_c2s:stop(Pid);
+ ejabberd_c2s:stop_async(Pid);
(_) ->
ok
end, get_sessions(Mod, Host)),
diff --git a/src/mod_s2s_dialback.erl b/src/mod_s2s_dialback.erl
index bf241bf52..91e2554ad 100644
--- a/src/mod_s2s_dialback.erl
+++ b/src/mod_s2s_dialback.erl
@@ -265,7 +265,8 @@ s2s_out_packet(#{server := LServer,
ejabberd_s2s_in:update_state(
Pid, fun(S) -> send_db_result(S, Response) end),
%% At this point the connection is no longer needed and we can terminate it
- ejabberd_s2s_out:stop(State);
+ ejabberd_s2s_out:stop_async(self()),
+ State;
s2s_out_packet(#{server := LServer, remote_server := RServer} = State,
#db_result{to = LServer, from = RServer,
type = Type} = Result) when Type /= undefined ->
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index 4c9ff3c8b..fb1a609b1 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -217,7 +217,8 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
active ->
State;
pending ->
- Mod:stop(State#{stop_reason => {stream, {out, Pkt}}})
+ Mod:stop_async(self()),
+ {stop, State#{stop_reason => {stream, {out, Pkt}}}}
end;
_ ->
State
@@ -250,8 +251,9 @@ c2s_handle_info(#{mgmt_state := pending, lang := Lang,
[jid:encode(JID)]),
Txt = ?T("Timed out waiting for stream resumption"),
Err = xmpp:serr_connection_timeout(Txt, Lang),
- Mod:stop(State#{mgmt_state => timeout,
- stop_reason => {stream, {out, Err}}});
+ Mod:stop_async(self()),
+ {stop, State#{mgmt_state => timeout,
+ stop_reason => {stream, {out, Err}}}};
c2s_handle_info(State, {_Ref, {resume, #{jid := JID} = OldState}}) ->
%% This happens if the resume_session/1 request timed out; the new session
%% now receives the late response.
@@ -444,7 +446,8 @@ handle_resume(#{user := User, lserver := LServer,
-spec transition_to_pending(state(), _) -> state().
transition_to_pending(#{mgmt_state := active, mod := Mod,
mgmt_timeout := 0} = State, _Reason) ->
- Mod:stop(State);
+ Mod:stop_async(self()),
+ State;
transition_to_pending(#{mgmt_state := active, jid := JID, socket := Socket,
lserver := LServer, mgmt_timeout := Timeout} = State,
Reason) ->
@@ -660,7 +663,7 @@ inherit_session_state(#{user := U, server := S,
mgmt_stanzas_out => NumStanzasOut,
mgmt_state => active},
State3 = ejabberd_c2s:open_session(State2),
- ejabberd_c2s:stop(OldPID),
+ ejabberd_c2s:stop_async(OldPID),
{ok, State3};
{error, Msg} ->
{error, Msg}
@@ -674,7 +677,7 @@ inherit_session_state(#{user := U, server := S,
{error, session_was_killed};
exit:{timeout, _} ->
ejabberd_sm:close_session(OldSID, U, S, R),
- ejabberd_c2s:stop(OldPID),
+ ejabberd_c2s:stop_async(OldPID),
{error, session_copy_timed_out}
end
end;