aboutsummaryrefslogtreecommitdiff
path: root/src/mod_stream_mgmt.erl
diff options
context:
space:
mode:
authorEvgeny Khramtsov <ekhramtsov@process-one.net>2018-11-30 16:18:49 +0300
committerEvgeny Khramtsov <ekhramtsov@process-one.net>2018-11-30 16:19:00 +0300
commit5d27c975dc2138e2b52177738b5eec3a9a36317c (patch)
tree2e74dd98d4a01085245d82dd45fb20d0a1d9cf8c /src/mod_stream_mgmt.erl
parentNew command unban_ip (#2620) (diff)
Keep last handled stanzas number in cache rather than session table
Diffstat (limited to 'src/mod_stream_mgmt.erl')
-rw-r--r--src/mod_stream_mgmt.erl75
1 files changed, 49 insertions, 26 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index 1927afa95..546c45a69 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -40,6 +40,8 @@
-include("logger.hrl").
-include("p1_queue.hrl").
+-define(STREAM_MGMT_CACHE, stream_mgmt_cache).
+
-define(is_sm_packet(Pkt),
is_record(Pkt, sm_enable) or
is_record(Pkt, sm_resume) or
@@ -51,7 +53,8 @@
%%%===================================================================
%%% API
%%%===================================================================
-start(Host, _Opts) ->
+start(Host, Opts) ->
+ init_cache(Opts),
ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50),
ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE,
c2s_stream_started, 50),
@@ -284,23 +287,16 @@ c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) ->
[jid:encode(JID)]),
bounce_message_queue(),
{stop, State};
-c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID,
- user := U, server := S, resource := R} = State, Reason) ->
- Result = case MgmtState of
- timeout ->
- Info = [{num_stanzas_in, In}],
- %% TODO: Usually, ejabberd_c2s:process_terminated/2 is
- %% called later in the hook chain. We swap the order so
- %% that the offline info won't be purged after we stored
- %% it. This should be fixed in a proper way.
- State1 = ejabberd_c2s:process_terminated(State, Reason),
- ejabberd_sm:set_offline_info(SID, U, S, R, Info),
- {stop, State1};
- _ ->
- State
- end,
+c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In,
+ sid := {Time, _}, jid := JID} = State, _Reason) ->
+ case MgmtState of
+ timeout ->
+ store_stanzas_in(jid:tolower(JID), Time, In);
+ _ ->
+ ok
+ end,
route_unacked_stanzas(State),
- Result;
+ State;
c2s_terminated(State, _Reason) ->
State.
@@ -641,16 +637,11 @@ inherit_session_state(#{user := U, server := S,
{term, {R, Time}} ->
case ejabberd_sm:get_session_pid(U, S, R) of
none ->
- case ejabberd_sm:get_offline_info(Time, U, S, R) of
- none ->
+ case pop_stanzas_in({U, S, R}, Time) of
+ error ->
{error, <<"Previous session PID not found">>};
- Info ->
- case proplists:get_value(num_stanzas_in, Info) of
- undefined ->
- {error, <<"Previous session timed out">>};
- H ->
- {error, <<"Previous session timed out">>, H}
- end
+ {ok, H} ->
+ {error, <<"Previous session timed out">>, H}
end;
OldPID ->
OldSID = {Time, OldPID},
@@ -751,6 +742,32 @@ need_to_enqueue(State, _) ->
{false, State}.
%%%===================================================================
+%%% Cache-like storage for last handled stanzas
+%%%===================================================================
+init_cache(Opts) ->
+ ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)).
+
+cache_opts(Opts) ->
+ [{max_size, gen_mod:get_opt(cache_size, Opts)},
+ {life_time, infinity}].
+
+-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean().
+store_stanzas_in(LJID, Time, Num) ->
+ ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num,
+ ejabberd_cluster:get_nodes()).
+
+-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error.
+pop_stanzas_in(LJID, Time) ->
+ case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of
+ {ok, Val} ->
+ ets_cache:delete(?STREAM_MGMT_CACHE, {LJID, Time},
+ ejabberd_cluster:get_nodes()),
+ {ok, Val};
+ error ->
+ error
+ end.
+
+%%%===================================================================
%%% Configuration processing
%%%===================================================================
get_max_ack_queue(Host) ->
@@ -796,6 +813,11 @@ mod_opt_type(resend_on_timeout) ->
fun(B) when is_boolean(B) -> B;
(if_offline) -> if_offline
end;
+mod_opt_type(cache_size) ->
+ fun(I) when is_integer(I), I>0 -> I;
+ (unlimited) -> infinity;
+ (infinity) -> infinity
+ end;
mod_opt_type(queue_type) ->
fun(ram) -> ram; (file) -> file end.
@@ -804,5 +826,6 @@ mod_options(Host) ->
{resume_timeout, 300},
{max_resume_timeout, undefined},
{ack_timeout, 60},
+ {cache_size, ejabberd_config:cache_size(Host)},
{resend_on_timeout, false},
{queue_type, ejabberd_config:default_queue_type(Host)}].