diff options
author | Evgeny Khramtsov <ekhramtsov@process-one.net> | 2018-11-30 16:18:49 +0300 |
---|---|---|
committer | Evgeny Khramtsov <ekhramtsov@process-one.net> | 2018-11-30 16:19:00 +0300 |
commit | 5d27c975dc2138e2b52177738b5eec3a9a36317c (patch) | |
tree | 2e74dd98d4a01085245d82dd45fb20d0a1d9cf8c /src/mod_stream_mgmt.erl | |
parent | New command unban_ip (#2620) (diff) |
Keep last handled stanzas number in cache rather than session table
Diffstat (limited to 'src/mod_stream_mgmt.erl')
-rw-r--r-- | src/mod_stream_mgmt.erl | 75 |
1 files changed, 49 insertions, 26 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index 1927afa95..546c45a69 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -40,6 +40,8 @@ -include("logger.hrl"). -include("p1_queue.hrl"). +-define(STREAM_MGMT_CACHE, stream_mgmt_cache). + -define(is_sm_packet(Pkt), is_record(Pkt, sm_enable) or is_record(Pkt, sm_resume) or @@ -51,7 +53,8 @@ %%%=================================================================== %%% API %%%=================================================================== -start(Host, _Opts) -> +start(Host, Opts) -> + init_cache(Opts), ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50), ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE, c2s_stream_started, 50), @@ -284,23 +287,16 @@ c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) -> [jid:encode(JID)]), bounce_message_queue(), {stop, State}; -c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID, - user := U, server := S, resource := R} = State, Reason) -> - Result = case MgmtState of - timeout -> - Info = [{num_stanzas_in, In}], - %% TODO: Usually, ejabberd_c2s:process_terminated/2 is - %% called later in the hook chain. We swap the order so - %% that the offline info won't be purged after we stored - %% it. This should be fixed in a proper way. - State1 = ejabberd_c2s:process_terminated(State, Reason), - ejabberd_sm:set_offline_info(SID, U, S, R, Info), - {stop, State1}; - _ -> - State - end, +c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, + sid := {Time, _}, jid := JID} = State, _Reason) -> + case MgmtState of + timeout -> + store_stanzas_in(jid:tolower(JID), Time, In); + _ -> + ok + end, route_unacked_stanzas(State), - Result; + State; c2s_terminated(State, _Reason) -> State. @@ -641,16 +637,11 @@ inherit_session_state(#{user := U, server := S, {term, {R, Time}} -> case ejabberd_sm:get_session_pid(U, S, R) of none -> - case ejabberd_sm:get_offline_info(Time, U, S, R) of - none -> + case pop_stanzas_in({U, S, R}, Time) of + error -> {error, <<"Previous session PID not found">>}; - Info -> - case proplists:get_value(num_stanzas_in, Info) of - undefined -> - {error, <<"Previous session timed out">>}; - H -> - {error, <<"Previous session timed out">>, H} - end + {ok, H} -> + {error, <<"Previous session timed out">>, H} end; OldPID -> OldSID = {Time, OldPID}, @@ -751,6 +742,32 @@ need_to_enqueue(State, _) -> {false, State}. %%%=================================================================== +%%% Cache-like storage for last handled stanzas +%%%=================================================================== +init_cache(Opts) -> + ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)). + +cache_opts(Opts) -> + [{max_size, gen_mod:get_opt(cache_size, Opts)}, + {life_time, infinity}]. + +-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean(). +store_stanzas_in(LJID, Time, Num) -> + ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num, + ejabberd_cluster:get_nodes()). + +-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error. +pop_stanzas_in(LJID, Time) -> + case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of + {ok, Val} -> + ets_cache:delete(?STREAM_MGMT_CACHE, {LJID, Time}, + ejabberd_cluster:get_nodes()), + {ok, Val}; + error -> + error + end. + +%%%=================================================================== %%% Configuration processing %%%=================================================================== get_max_ack_queue(Host) -> @@ -796,6 +813,11 @@ mod_opt_type(resend_on_timeout) -> fun(B) when is_boolean(B) -> B; (if_offline) -> if_offline end; +mod_opt_type(cache_size) -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; mod_opt_type(queue_type) -> fun(ram) -> ram; (file) -> file end. @@ -804,5 +826,6 @@ mod_options(Host) -> {resume_timeout, 300}, {max_resume_timeout, undefined}, {ack_timeout, 60}, + {cache_size, ejabberd_config:cache_size(Host)}, {resend_on_timeout, false}, {queue_type, ejabberd_config:default_queue_type(Host)}]. |