aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-02-21 12:38:03 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-02-21 12:38:03 +0300
commit6ffd5ffd0ce3577988618a1198905c4d9faab9cf (patch)
treea087b437caad93af8c8067608f6efc3040f51b12
parentImprove stream management tests (diff)
Test stream management queue overload
-rw-r--r--src/mod_stream_mgmt.erl24
-rw-r--r--test/ejabberd_SUITE.erl1
-rw-r--r--test/ejabberd_SUITE_data/ejabberd.yml3
-rw-r--r--test/sm_tests.erl66
4 files changed, 80 insertions, 14 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index 3ec3eb72a..981b357b7 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -180,22 +180,17 @@ c2s_handle_recv(State, _, _) ->
c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
lang := Lang} = State, Pkt, SendResult)
when MgmtState == pending; MgmtState == active ->
- case xmpp:is_stanza(Pkt) of
- true ->
+ case Pkt of
+ _ when ?is_stanza(Pkt) ->
Meta = xmpp:get_meta(Pkt),
case maps:get(mgmt_is_resent, Meta, false) of
false ->
case mgmt_queue_add(State, Pkt) of
#{mgmt_max_queue := exceeded} = State1 ->
State2 = State1#{mgmt_resend => false},
- case MgmtState of
- active ->
- Err = xmpp:serr_policy_violation(
- <<"Too many unacked stanzas">>, Lang),
- send(State2, Err);
- _ ->
- Mod:stop(State2)
- end;
+ Err = xmpp:serr_policy_violation(
+ <<"Too many unacked stanzas">>, Lang),
+ send(State2, Err);
State1 when SendResult == ok ->
send_rack(State1);
State1 ->
@@ -204,7 +199,14 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
true ->
State
end;
- false ->
+ #stream_error{} ->
+ case MgmtState of
+ active ->
+ State;
+ pending ->
+ Mod:stop(State#{stop_reason => {stream, {out, Pkt}}})
+ end;
+ _ ->
State
end;
c2s_handle_send(State, _Pkt, _Result) ->
diff --git a/test/ejabberd_SUITE.erl b/test/ejabberd_SUITE.erl
index 089d9d6a6..175a6e69a 100644
--- a/test/ejabberd_SUITE.erl
+++ b/test/ejabberd_SUITE.erl
@@ -372,6 +372,7 @@ no_db_tests() ->
s2s_required,
s2s_required_trusted]},
sm_tests:single_cases(),
+ sm_tests:master_slave_cases(),
muc_tests:single_cases(),
muc_tests:master_slave_cases(),
proxy65_tests:single_cases(),
diff --git a/test/ejabberd_SUITE_data/ejabberd.yml b/test/ejabberd_SUITE_data/ejabberd.yml
index 1eede5331..e766d0cc3 100644
--- a/test/ejabberd_SUITE_data/ejabberd.yml
+++ b/test/ejabberd_SUITE_data/ejabberd.yml
@@ -464,7 +464,8 @@ Welcome to this XMPP server."
mod_stats: []
mod_s2s_dialback: []
mod_legacy_auth: []
- mod_stream_mgmt: []
+ mod_stream_mgmt:
+ max_ack_queue: 10
mod_time: []
mod_version: []
registration_timeout: infinity
diff --git a/test/sm_tests.erl b/test/sm_tests.erl
index 37cecf5b7..134a2f951 100644
--- a/test/sm_tests.erl
+++ b/test/sm_tests.erl
@@ -26,7 +26,8 @@
%% API
-compile(export_all).
-import(suite, [send/2, recv/1, close_socket/1, set_opt/3, my_jid/1,
- recv_message/1, disconnect/1]).
+ recv_message/1, disconnect/1, send_recv/2,
+ put_event/2, get_event/1]).
-include("suite.hrl").
@@ -109,7 +110,51 @@ resume_failed(Config) ->
%%% Master-slave tests
%%%===================================================================
master_slave_cases() ->
- {sm_master_slave, [sequence], []}.
+ {sm_master_slave, [sequence],
+ [master_slave_test(queue_limit),
+ master_slave_test(queue_limit_detached)]}.
+
+queue_limit_master(Config) ->
+ ct:comment("Waiting for 'send' command from the peer"),
+ send = get_event(Config),
+ send_recv_messages(Config),
+ ct:comment("Waiting for peer to disconnect"),
+ peer_down = get_event(Config),
+ disconnect(Config).
+
+queue_limit_slave(Config) ->
+ ct:comment("Enable the session management without resumption"),
+ send(Config, #sm_enable{xmlns = ?NS_STREAM_MGMT_3}),
+ #sm_enabled{resume = false} = recv(Config),
+ put_event(Config, send),
+ ct:comment("Receiving all messages"),
+ lists:foreach(
+ fun(I) ->
+ ID = integer_to_binary(I),
+ Body = xmpp:mk_text(ID),
+ #message{id = ID, body = Body} = recv_message(Config)
+ end, lists:seq(1, 11)),
+ ct:comment("Receiving request ACK"),
+ #sm_r{} = recv(Config),
+ ct:comment("Receiving policy-violation stream error"),
+ #stream_error{reason = 'policy-violation'} = recv(Config),
+ {xmlstreamend, <<"stream:stream">>} = recv(Config),
+ ct:comment("Closing socket"),
+ close_socket(Config).
+
+queue_limit_detached_master(Config) ->
+ ct:comment("Waiting for the peer to disconnect"),
+ peer_down = get_event(Config),
+ send_recv_messages(Config),
+ disconnect(Config).
+
+queue_limit_detached_slave(Config) ->
+ #presence{} = send_recv(Config, #presence{}),
+ ct:comment("Enable the session management with resumption enabled"),
+ send(Config, #sm_enable{resume = true, xmlns = ?NS_STREAM_MGMT_3}),
+ #sm_enabled{resume = true} = recv(Config),
+ ct:comment("Closing socket"),
+ close_socket(Config).
%%%===================================================================
%%% Internal functions
@@ -121,3 +166,20 @@ master_slave_test(T) ->
{list_to_atom("sm_" ++ atom_to_list(T)), [parallel],
[list_to_atom("sm_" ++ atom_to_list(T) ++ "_master"),
list_to_atom("sm_" ++ atom_to_list(T) ++ "_slave")]}.
+
+send_recv_messages(Config) ->
+ PeerJID = ?config(peer, Config),
+ Msg = #message{to = PeerJID},
+ ct:comment("Sending messages to peer"),
+ lists:foreach(
+ fun(I) ->
+ ID = integer_to_binary(I),
+ send(Config, Msg#message{id = ID, body = xmpp:mk_text(ID)})
+ end, lists:seq(1, 11)),
+ ct:comment("Receiving bounced messages from the peer"),
+ lists:foreach(
+ fun(I) ->
+ ID = integer_to_binary(I),
+ Err = #message{id = ID, type = error} = recv_message(Config),
+ #stanza_error{reason = 'service-unavailable'} = xmpp:get_error(Err)
+ end, lists:seq(1, 11)).