summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-03-10 20:21:04 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-03-10 20:21:04 +0300
commit4b1bdb563ec30075f29f5558208c132a3753e915 (patch)
tree032518ca16e2c80130e15f20bd5628c4035cf04d
parentAdd support for file-based queues (diff)
Improve overloaded S2S queue processing
-rw-r--r--rebar.config2
-rw-r--r--src/ejabberd_s2s_out.erl26
2 files changed, 17 insertions, 11 deletions
diff --git a/rebar.config b/rebar.config
index 7088cfef..ca41b6ee 100644
--- a/rebar.config
+++ b/rebar.config
@@ -19,7 +19,7 @@
%%%----------------------------------------------------------------------
{deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}},
- {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "13b03e1c8c7a5777de728f759809142f997f8af3"}},
+ {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", "f677e61"}},
{cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.6"}}},
{fast_tls, ".*", {git, "https://github.com/processone/fast_tls", "afdd07811e0e6eff444c035ffeb2aa9efb4dbe6d"}},
{stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.7"}}},
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl
index 8c9f9d63..60c19b08 100644
--- a/src/ejabberd_s2s_out.erl
+++ b/src/ejabberd_s2s_out.erl
@@ -145,14 +145,14 @@ process_closed(#{server := LServer, remote_server := RServer,
on_route := send} = State,
Reason) ->
?INFO_MSG("Closing outbound s2s connection ~s -> ~s: ~s",
- [LServer, RServer, xmpp_stream_out:format_error(Reason)]),
+ [LServer, RServer, format_error(Reason)]),
stop(State);
process_closed(#{server := LServer, remote_server := RServer} = State,
Reason) ->
Delay = get_delay(),
?INFO_MSG("Failed to establish outbound s2s connection ~s -> ~s: ~s; "
"bouncing for ~p seconds",
- [LServer, RServer, xmpp_stream_out:format_error(Reason), Delay]),
+ [LServer, RServer, format_error(Reason), Delay]),
State1 = State#{on_route => bounce},
State2 = bounce_queue(State1),
xmpp_stream_out:set_timeout(State2, timer:seconds(Delay)).
@@ -309,11 +309,9 @@ handle_info({route, Pkt}, #{queue := Q, on_route := Action} = State) ->
queue ->
try State#{queue => p1_queue:in(Pkt, Q)}
catch error:full ->
- #{server := LServer, remote_server := RServer} = State,
- ?INFO_MSG("Failed to establish outbound s2s connection "
- "~s -> ~s: message queue is overloaded",
- [LServer, RServer]),
- stop(State#{stop_reason => queue_full})
+ Q1 = p1_queue:set_limit(Q, unlimited),
+ Q2 = p1_queue:in(Pkt, Q1),
+ handle_stream_end(queue_full, State#{queue => Q2})
end;
bounce -> bounce_packet(Pkt, State);
send -> set_idle_timeout(send(State, Pkt))
@@ -371,12 +369,12 @@ bounce_packet(_, State) ->
-spec mk_bounce_error(binary(), state()) -> stanza_error().
mk_bounce_error(Lang, #{stop_reason := Why}) ->
- Reason = xmpp_stream_out:format_error(Why),
+ Reason = format_error(Why),
case Why of
internal_failure ->
- xmpp:err_internal_server_error();
+ xmpp:err_internal_server_error(Reason, Lang);
queue_full ->
- xmpp:err_resource_constraint();
+ xmpp:err_resource_constraint(Reason, Lang);
{dns, _} ->
xmpp:err_remote_server_not_found(Reason, Lang);
_ ->
@@ -401,6 +399,7 @@ set_idle_timeout(#{on_route := send, server := LServer} = State) ->
set_idle_timeout(State) ->
State.
+-spec queue_fold(fun((xmpp_element(), state()) -> state()), state()) -> state().
queue_fold(F, #{queue := Q} = State) ->
case p1_queue:out(Q) of
{{value, Pkt}, Q1} ->
@@ -410,6 +409,13 @@ queue_fold(F, #{queue := Q} = State) ->
State#{queue => Q1}
end.
+format_error(internal_failure) ->
+ <<"Internal server error">>;
+format_error(queue_full) ->
+ <<"Stream queue is overloaded">>;
+format_error(Reason) ->
+ xmpp_stream_out:format_error(Reason).
+
transform_options(Opts) ->
lists:foldl(fun transform_options/2, [], Opts).