diff options
Diffstat (limited to 'src/mod_stream_mgmt.erl')
-rw-r--r-- | src/mod_stream_mgmt.erl | 84 |
1 files changed, 54 insertions, 30 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index f897a2794..5adeaf8c2 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -40,6 +40,8 @@ -include("logger.hrl"). -include("p1_queue.hrl"). +-define(STREAM_MGMT_CACHE, stream_mgmt_cache). + -define(is_sm_packet(Pkt), is_record(Pkt, sm_enable) or is_record(Pkt, sm_resume) or @@ -51,7 +53,8 @@ %%%=================================================================== %%% API %%%=================================================================== -start(Host, _Opts) -> +start(Host, Opts) -> + init_cache(Opts), ejabberd_hooks:add(c2s_init, ?MODULE, c2s_stream_init, 50), ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE, c2s_stream_started, 50), @@ -94,7 +97,8 @@ stop(Host) -> ejabberd_hooks:delete(c2s_closed, Host, ?MODULE, c2s_closed, 50), ejabberd_hooks:delete(c2s_terminated, Host, ?MODULE, c2s_terminated, 50). -reload(_Host, _NewOpts, _OldOpts) -> +reload(_Host, NewOpts, _OldOpts) -> + init_cache(NewOpts), ?WARNING_MSG("module ~s is reloaded, but new configuration will take " "effect for newly created client connections only", [?MODULE]). @@ -284,23 +288,16 @@ c2s_terminated(#{mgmt_state := resumed, jid := JID} = State, _Reason) -> [jid:encode(JID)]), bounce_message_queue(), {stop, State}; -c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, sid := SID, - user := U, server := S, resource := R} = State, Reason) -> - Result = case MgmtState of - timeout -> - Info = [{num_stanzas_in, In}], - %% TODO: Usually, ejabberd_c2s:process_terminated/2 is - %% called later in the hook chain. We swap the order so - %% that the offline info won't be purged after we stored - %% it. This should be fixed in a proper way. - State1 = ejabberd_c2s:process_terminated(State, Reason), - ejabberd_sm:set_offline_info(SID, U, S, R, Info), - {stop, State1}; - _ -> - State - end, +c2s_terminated(#{mgmt_state := MgmtState, mgmt_stanzas_in := In, + sid := {Time, _}, jid := JID} = State, _Reason) -> + case MgmtState of + timeout -> + store_stanzas_in(jid:tolower(JID), Time, In); + _ -> + ok + end, route_unacked_stanzas(State), - Result; + State; c2s_terminated(State, _Reason) -> State. @@ -446,8 +443,8 @@ handle_resume(#{user := User, lserver := LServer, [xmpp_socket:pp(Socket), jid:encode(JID)]), {ok, State5}; {error, El, Msg} -> - ?INFO_MSG("Cannot resume session for ~s@~s: ~s", - [User, LServer, Msg]), + ?WARNING_MSG("Cannot resume session for ~s@~s: ~s", + [User, LServer, Msg]), {error, send(State, El)} end. @@ -641,16 +638,11 @@ inherit_session_state(#{user := U, server := S, {term, {R, Time}} -> case ejabberd_sm:get_session_pid(U, S, R) of none -> - case ejabberd_sm:get_offline_info(Time, U, S, R) of - none -> + case pop_stanzas_in({U, S, R}, Time) of + error -> {error, <<"Previous session PID not found">>}; - Info -> - case proplists:get_value(num_stanzas_in, Info) of - undefined -> - {error, <<"Previous session timed out">>}; - H -> - {error, <<"Previous session timed out">>, H} - end + {ok, H} -> + {error, <<"Previous session timed out">>, H} end; OldPID -> OldSID = {Time, OldPID}, @@ -706,7 +698,7 @@ make_resume_id(#{sid := {Time, _}, resource := Resource}) -> (state(), xmlel(), erlang:timestamp()) -> xmlel(). add_resent_delay_info(#{lserver := LServer}, El, Time) when is_record(El, message); is_record(El, presence) -> - xmpp_util:add_delay_info(El, jid:make(LServer), Time, <<"Resent">>); + misc:add_delay_info(El, jid:make(LServer), Time, <<"Resent">>); add_resent_delay_info(_State, El, _Time) -> %% TODO El. @@ -751,6 +743,32 @@ need_to_enqueue(State, _) -> {false, State}. %%%=================================================================== +%%% Cache-like storage for last handled stanzas +%%%=================================================================== +init_cache(Opts) -> + ets_cache:new(?STREAM_MGMT_CACHE, cache_opts(Opts)). + +cache_opts(Opts) -> + [{max_size, gen_mod:get_opt(cache_size, Opts)}, + {life_time, infinity}]. + +-spec store_stanzas_in(ljid(), erlang:timestamp(), non_neg_integer()) -> boolean(). +store_stanzas_in(LJID, Time, Num) -> + ets_cache:insert(?STREAM_MGMT_CACHE, {LJID, Time}, Num, + ejabberd_cluster:get_nodes()). + +-spec pop_stanzas_in(ljid(), erlang:timestamp()) -> {ok, non_neg_integer()} | error. +pop_stanzas_in(LJID, Time) -> + case ets_cache:lookup(?STREAM_MGMT_CACHE, {LJID, Time}) of + {ok, Val} -> + ets_cache:delete(?STREAM_MGMT_CACHE, {LJID, Time}, + ejabberd_cluster:get_nodes()), + {ok, Val}; + error -> + error + end. + +%%%=================================================================== %%% Configuration processing %%%=================================================================== get_max_ack_queue(Host) -> @@ -796,6 +814,11 @@ mod_opt_type(resend_on_timeout) -> fun(B) when is_boolean(B) -> B; (if_offline) -> if_offline end; +mod_opt_type(cache_size) -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; mod_opt_type(queue_type) -> fun(ram) -> ram; (file) -> file end. @@ -804,5 +827,6 @@ mod_options(Host) -> {resume_timeout, 300}, {max_resume_timeout, undefined}, {ack_timeout, 60}, + {cache_size, ejabberd_config:cache_size(Host)}, {resend_on_timeout, false}, {queue_type, ejabberd_config:default_queue_type(Host)}]. |