diff options
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r-- | src/mod_offline.erl | 421 |
1 files changed, 267 insertions, 154 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 76682d06c..07d71bfdc 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -77,12 +77,14 @@ -include("mod_offline.hrl"). +-include("translate.hrl"). + -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). %% default value for the maximum number of user messages -define(MAX_USER_MESSAGES, infinity). --define(EMPTY_SPOOL_CACHE, offline_empty_cache). +-define(SPOOL_COUNTER_CACHE, offline_msg_counter_cache). -type c2s_state() :: ejabberd_c2s:state(). @@ -95,23 +97,26 @@ -callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}. -callback remove_user(binary(), binary()) -> any(). -callback read_message_headers(binary(), binary()) -> - [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}]. + [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error. -callback read_message(binary(), binary(), non_neg_integer()) -> {ok, #offline_msg{}} | error. -callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}. -callback read_all_messages(binary(), binary()) -> [#offline_msg{}]. -callback remove_all_messages(binary(), binary()) -> {atomic, any()}. --callback count_messages(binary(), binary()) -> non_neg_integer(). +-callback count_messages(binary(), binary()) -> {ets_cache:tag(), non_neg_integer()}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. --optional_callbacks([remove_expired_messages/1, remove_old_messages/2]). +-optional_callbacks([remove_expired_messages/1, remove_old_messages/2, + use_cache/1, cache_nodes/1]). depends(_Host, _Opts) -> []. start(Host, Opts) -> - Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + Mod = gen_mod:db_mod(Opts, ?MODULE), Mod:init(Host, Opts), - init_cache(Opts), + init_cache(Mod, Host, Opts), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50), @@ -159,67 +164,88 @@ stop(Host) -> gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE). reload(Host, NewOpts, OldOpts) -> - NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), - OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), - init_cache(NewOpts), + NewMod = gen_mod:db_mod(NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(OldOpts, ?MODULE), + init_cache(NewMod, Host, NewOpts), if NewMod /= OldMod -> NewMod:init(Host, NewOpts); true -> ok end. -init_cache(Opts) -> - case gen_mod:get_opt(use_mam_for_storage, Opts) of - true -> - MaxSize = gen_mod:get_opt(cache_size, Opts), - LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of - infinity -> infinity; - I -> timer:seconds(I) - end, - COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}], - ets_cache:new(?EMPTY_SPOOL_CACHE, COpts); - false -> - ets_cache:delete(?EMPTY_SPOOL_CACHE) +init_cache(Mod, Host, Opts) -> + CacheOpts = [{max_size, mod_offline_opt:cache_size(Opts)}, + {life_time, mod_offline_opt:cache_life_time(Opts)}, + {cache_missed, false}], + case use_cache(Mod, Host) of + true -> + ets_cache:new(?SPOOL_COUNTER_CACHE, CacheOpts); + false -> + ets_cache:delete(?SPOOL_COUNTER_CACHE) + end. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> mod_offline_opt:use_cache(Host) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. + +-spec flush_cache(module(), binary(), binary()) -> ok. +flush_cache(Mod, User, Server) -> + case use_cache(Mod, Server) of + true -> + ets_cache:delete(?SPOOL_COUNTER_CACHE, + {User, Server}, + cache_nodes(Mod, Server)); + false -> + ok end. -spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}. store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) -> UseMam = use_mam_for_user(User, Server), + Mod = gen_mod:db_mod(Server, ?MODULE), case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of true -> - Mod = gen_mod:db_mod(Server, ?MODULE), - ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server}, - fun() -> - case count_messages_in_db(User, Server) of - 0 -> - case Mod:store_message(Msg) of - ok -> - {cache, ok}; - Err -> - {nocache, Err} - end; - _ -> - {cache, ok} + case count_offline_messages(User, Server) of + 0 -> + store_message_in_db(Mod, Msg); + _ -> + case use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + false -> + ok end - end); + end; false -> - Mod = gen_mod:db_mod(Server, ?MODULE), case get_max_user_messages(User, Server) of infinity -> - Mod:store_message(Msg); + store_message_in_db(Mod, Msg); Limit -> - Num = count_messages_in_db(User, Server), + Num = count_offline_messages(User, Server), if Num < Limit -> - Mod:store_message(Msg); - true -> + store_message_in_db(Mod, Msg); + true -> {error, full} end end end. get_max_user_messages(User, Server) -> - Access = gen_mod:get_module_opt(Server, ?MODULE, access_max_user_messages), - case acl:match_rule(Server, Access, jid:make(User, Server)) of + Access = mod_offline_opt:access_max_user_messages(Server), + case ejabberd_shaper:match(Server, Access, jid:make(User, Server)) of Max when is_integer(Max) -> Max; infinity -> infinity; _ -> ?MAX_USER_MESSAGES @@ -255,7 +281,12 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID, ?NS_FLEX_OFFLINE, _Lang) -> ejabberd_sm:route(JID, {resend_offline, false}), Mod = gen_mod:db_mod(S, ?MODULE), - Hdrs = Mod:read_message_headers(U, S), + Hdrs = case Mod:read_message_headers(U, S) of + L when is_list(L) -> + L; + _ -> + [] + end, BareJID = jid:remove_resource(JID), {result, lists:map( fun({Seq, From, _To, _TS, _El}) -> @@ -297,7 +328,7 @@ handle_offline_query(#iq{from = #jid{luser = U1, lserver = S1}, lang = Lang, sub_els = [#offline{}]} = IQ) when {U1, S1} /= {U2, S2} -> - Txt = <<"Query to another users is forbidden">>, + Txt = ?T("Query to another users is forbidden"), xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang)); handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From, to = #jid{luser = U, lserver = S} = _To, @@ -318,7 +349,7 @@ handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From, {atomic, ok} -> xmpp:make_iq_result(IQ); _Err -> - Txt = <<"Database failure">>, + Txt = ?T("Database failure"), xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang)) end; {set, #offline{fetch = false, items = [_|_] = Items, purge = false}} -> @@ -330,7 +361,7 @@ handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From, xmpp:make_error(IQ, xmpp:err_bad_request()) end; handle_offline_query(#iq{lang = Lang} = IQ) -> - Txt = <<"No module is handling this query">>, + Txt = ?T("No module is handling this query"), xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)). -spec handle_offline_items_view(jid(), [offline_item()]) -> boolean(). @@ -349,7 +380,7 @@ handle_offline_items_view(JID, Items) -> NewEl = set_offline_tag(El, Node), case ejabberd_sm:get_session_pid(U, S, R) of Pid when is_pid(Pid) -> - Pid ! {route, NewEl}; + ejabberd_c2s:route(Pid, {route, NewEl}); none -> ok end, @@ -408,6 +439,7 @@ remove_msg_by_node(To, Seq) -> LServer = To#jid.lserver, Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:remove_message(LUser, LServer, I), + flush_cache(Mod, LUser, LServer), true; _ -> false @@ -432,15 +464,13 @@ need_to_store(LServer, #message{type = Type} = Packet) -> none -> Store = case Type of groupchat -> - gen_mod:get_module_opt( - LServer, ?MODULE, store_groupchat); + mod_offline_opt:store_groupchat(LServer); headline -> false; _ -> true end, - case {Store, gen_mod:get_module_opt( - LServer, ?MODULE, store_empty_body)} of + case {Store, mod_offline_opt:store_empty_body(LServer)} of {false, _} -> false; {_, true} -> @@ -484,9 +514,31 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) -> stop end end; - _ -> Acc + _ -> + maybe_update_cache(To, Packet), + Acc + end; + false -> + maybe_update_cache(To, Packet), + Acc + end. + +-spec maybe_update_cache(jid(), message()) -> ok. +maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) -> + case xmpp:get_meta(Packet, mam_archived, false) of + true -> + Mod = gen_mod:db_mod(Server, ?MODULE), + case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + _ -> + ok end; - false -> Acc + _ -> + ok end. -spec check_store_hint(message()) -> store | no_store | none. @@ -567,8 +619,7 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) {ok, OffMsgs} -> case use_mam_for_user(LUser, LServer) of true -> - ets_cache:delete(?EMPTY_SPOOL_CACHE, {LUser, LServer}, - ejabberd_cluster:get_nodes()), + flush_cache(Mod, LUser, LServer), lists:map( fun({_, #message{from = From, to = To} = Msg}) -> #offline_msg{from = From, to = To, @@ -576,6 +627,7 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) packet = Msg} end, read_mam_messages(LUser, LServer, OffMsgs)); _ -> + flush_cache(Mod, LUser, LServer), OffMsgs end; _ -> @@ -622,16 +674,24 @@ remove_expired_messages(Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), case erlang:function_exported(Mod, remove_expired_messages, 1) of - true -> Mod:remove_expired_messages(LServer); - false -> erlang:error(not_implemented) + true -> + Ret = Mod:remove_expired_messages(LServer), + ets_cache:clear(?SPOOL_COUNTER_CACHE), + Ret; + false -> + erlang:error(not_implemented) end. remove_old_messages(Days, Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), case erlang:function_exported(Mod, remove_old_messages, 2) of - true -> Mod:remove_old_messages(Days, LServer); - false -> erlang:error(not_implemented) + true -> + Ret = Mod:remove_old_messages(Days, LServer), + ets_cache:clear(?SPOOL_COUNTER_CACHE), + Ret; + false -> + erlang:error(not_implemented) end. -spec remove_user(binary(), binary()) -> ok. @@ -640,7 +700,7 @@ remove_user(User, Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:remove_user(LUser, LServer), - ok. + flush_cache(Mod, LUser, LServer). %% Helper functions: @@ -648,11 +708,11 @@ remove_user(User, Server) -> check_if_message_should_be_bounced(Packet) -> case Packet of #message{type = groupchat, to = #jid{lserver = LServer}} -> - gen_mod:get_module_opt(LServer, ?MODULE, bounce_groupchat); + mod_offline_opt:bounce_groupchat(LServer); #message{to = #jid{lserver = LServer}} -> case misc:is_mucsub_message(Packet) of true -> - gen_mod:get_module_opt(LServer, ?MODULE, bounce_groupchat); + mod_offline_opt:bounce_groupchat(LServer); _ -> true end; @@ -669,11 +729,11 @@ discard_warn_sender(Packet, Reason) -> Lang = xmpp:get_lang(Packet), Err = case Reason of full -> - ErrText = <<"Your contact offline message queue is " - "full. The message has been discarded.">>, + ErrText = ?T("Your contact offline message queue is " + "full. The message has been discarded."), xmpp:err_resource_constraint(ErrText, Lang); _ -> - ErrText = <<"Database failure">>, + ErrText = ?T("Database failure"), xmpp:err_internal_server_error(ErrText, Lang) end, ejabberd_router:route_error(Packet, Err); @@ -694,14 +754,14 @@ get_offline_els(LUser, LServer) -> -spec offline_msg_to_route(binary(), #offline_msg{}) -> {route, message()} | error. offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) -> - CodecOpts = ejabberd_config:codec_options(LServer), + CodecOpts = ejabberd_config:codec_options(), try xmpp:decode(R#offline_msg.packet, ?NS_CLIENT, CodecOpts) of Pkt -> Pkt1 = xmpp:set_from_to(Pkt, From, To), Pkt2 = add_delay_info(Pkt1, LServer, R#offline_msg.timestamp), {route, Pkt2} catch _:{xmpp_codec, Why} -> - ?ERROR_MSG("failed to decode packet ~p of user ~s: ~s", + ?ERROR_MSG("Failed to decode packet ~p of user ~s: ~s", [R#offline_msg.packet, jid:encode(To), xmpp:format_error(Why)]), error @@ -709,7 +769,12 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) -> -spec read_messages(binary(), binary()) -> [{binary(), message()}]. read_messages(LUser, LServer) -> - Res = read_db_messages(LUser, LServer), + Res = case read_db_messages(LUser, LServer) of + error -> + []; + L when is_list(L) -> + L + end, case use_mam_for_user(LUser, LServer) of true -> read_mam_messages(LUser, LServer, Res); @@ -717,27 +782,32 @@ read_messages(LUser, LServer) -> Res end. --spec read_db_messages(binary(), binary()) -> [{binary(), message()}]. +-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error. read_db_messages(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - CodecOpts = ejabberd_config:codec_options(LServer), - lists:flatmap( - fun({Seq, From, To, TS, El}) -> - Node = integer_to_binary(Seq), - try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of - Pkt -> + CodecOpts = ejabberd_config:codec_options(), + case Mod:read_message_headers(LUser, LServer) of + error -> + error; + L -> + lists:flatmap( + fun({Seq, From, To, TS, El}) -> Node = integer_to_binary(Seq), - Pkt1 = add_delay_info(Pkt, LServer, TS), - Pkt2 = xmpp:set_from_to(Pkt1, From, To), - [{Node, Pkt2}] - catch _:{xmpp_codec, Why} -> - ?ERROR_MSG("failed to decode packet ~p " - "of user ~s: ~s", - [El, jid:encode(To), - xmpp:format_error(Why)]), - [] - end - end, Mod:read_message_headers(LUser, LServer)). + try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of + Pkt -> + Node = integer_to_binary(Seq), + Pkt1 = add_delay_info(Pkt, LServer, TS), + Pkt2 = xmpp:set_from_to(Pkt1, From, To), + [{Node, Pkt2}] + catch _:{xmpp_codec, Why} -> + ?ERROR_MSG("Failed to decode packet ~p " + "of user ~s: ~s", + [El, jid:encode(To), + xmpp:format_error(Why)]), + [] + end + end, L) + end. -spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) -> {integer() | none, [message()]}. @@ -745,7 +815,7 @@ parse_marker_messages(LServer, ReadMsgs) -> {Timestamp, ExtraMsgs} = lists:foldl( fun({_Node, #message{id = <<"ActivityMarker">>, body = [], type = error} = Msg}, {T, E}) -> - case xmpp:get_subtag(Msg, #delay{}) of + case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of #delay{stamp = Time} -> if T == none orelse T > Time -> {Time, E}; @@ -760,7 +830,7 @@ parse_marker_messages(LServer, ReadMsgs) -> body = [], type = error} = Msg -> TS2 = case TS of undefined -> - case xmpp:get_subtag(Msg, #delay{}) of + case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of #delay{stamp = TS0} -> TS0; _ -> @@ -785,7 +855,7 @@ parse_marker_messages(LServer, ReadMsgs) -> end, {none, []}, ReadMsgs), Start = case {Timestamp, ExtraMsgs} of {none, [First|_]} -> - case xmpp:get_subtag(First, #delay{}) of + case xmpp:get_subtag(First, #delay{stamp = {0,0,0}}) of #delay{stamp = {Mega, Sec, Micro}} -> {Mega, Sec, Micro+1}; _ -> @@ -807,9 +877,10 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> ExtraMsgs; _ -> MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of - Number when is_integer(Number) -> Number - length(ExtraMsgs); - infinity -> undefined; - _ -> 100 - length(ExtraMsgs) + Number when is_integer(Number) -> + max(0, Number - length(ExtraMsgs)); + infinity -> + undefined end, JID = jid:make(LUser, LServer, <<>>), {MamMsgs, _, _} = mod_mam:select(LServer, JID, JID, @@ -826,20 +897,20 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> end, AllMsgs2 = lists:sort( fun(A, B) -> - DA = case xmpp:get_subtag(A, #stanza_id{}) of + DA = case xmpp:get_subtag(A, #stanza_id{by = #jid{}}) of #stanza_id{id = IDA} -> IDA; - _ -> case xmpp:get_subtag(A, #delay{}) of + _ -> case xmpp:get_subtag(A, #delay{stamp = {0,0,0}}) of #delay{stamp = STA} -> integer_to_binary(misc:now_to_usec(STA)); _ -> <<"unknown">> end end, - DB = case xmpp:get_subtag(B, #stanza_id{}) of + DB = case xmpp:get_subtag(B, #stanza_id{by = #jid{}}) of #stanza_id{id = IDB} -> IDB; - _ -> case xmpp:get_subtag(B, #delay{}) of + _ -> case xmpp:get_subtag(B, #delay{stamp = {0,0,0}}) of #delay{stamp = STB} -> integer_to_binary(misc:now_to_usec(STB)); _ -> @@ -854,18 +925,19 @@ read_mam_messages(LUser, LServer, ReadMsgs) -> end, 1, AllMsgs2), AllMsgs3. --spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) -> - integer(). +-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) -> + {cache, integer()} | {nocache, integer()}. +count_mam_messages(_LUser, _LServer, error) -> + {nocache, 0}; count_mam_messages(LUser, LServer, ReadMsgs) -> {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs), case Start of none -> - length(ExtraMsgs); + {cache, length(ExtraMsgs)}; _ -> MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of Number when is_integer(Number) -> Number - length(ExtraMsgs); - infinity -> undefined; - _ -> 100 - length(ExtraMsgs) + infinity -> undefined end, JID = jid:make(LUser, LServer, <<>>), {_, _, Count} = mod_mam:select(LServer, JID, JID, @@ -873,7 +945,7 @@ count_mam_messages(LUser, LServer, ReadMsgs) -> #rsm_set{max = MaxOfflineMsgs, before = <<"9999999999999999">>}, chat, only_count), - Count + length(ExtraMsgs) + {cache, Count + length(ExtraMsgs)} end. format_user_queue(Hdrs) -> @@ -915,28 +987,26 @@ user_queue(User, Server, Query, Lang) -> LServer = jid:nameprep(Server), US = {LUser, LServer}, Mod = gen_mod:db_mod(LServer, ?MODULE), - Res = user_queue_parse_query(LUser, LServer, Query), - HdrsAll = Mod:read_message_headers(LUser, LServer), + user_queue_parse_query(LUser, LServer, Query), + HdrsAll = case Mod:read_message_headers(LUser, LServer) of + error -> []; + L -> L + end, Hdrs = get_messages_subset(User, Server, HdrsAll), FMsgs = format_user_queue(Hdrs), [?XC(<<"h1">>, - (str:format(?T(<<"~s's Offline Messages Queue">>), - [us_to_list(US)])))] - ++ - case Res of - ok -> [?XREST(<<"Submitted">>)]; - nothing -> [] - end - ++ + (str:format(translate:translate(Lang, ?T("~s's Offline Messages Queue")), + [us_to_list(US)])))] + ++ [?XREST(?T("Submitted"))] ++ [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], [?XE(<<"table">>, [?XE(<<"thead">>, [?XE(<<"tr">>, - [?X(<<"td">>), ?XCT(<<"td">>, <<"Time">>), - ?XCT(<<"td">>, <<"From">>), - ?XCT(<<"td">>, <<"To">>), - ?XCT(<<"td">>, <<"Packet">>)])]), + [?X(<<"td">>), ?XCT(<<"td">>, ?T("Time")), + ?XCT(<<"td">>, ?T("From")), + ?XCT(<<"td">>, ?T("To")), + ?XCT(<<"td">>, ?T("Packet"))])]), ?XE(<<"tbody">>, if FMsgs == [] -> [?XE(<<"tr">>, @@ -946,29 +1016,35 @@ user_queue(User, Server, Query, Lang) -> end)]), ?BR, ?INPUTT(<<"submit">>, <<"delete">>, - <<"Delete Selected">>)])]. + ?T("Delete Selected"))])]. user_queue_parse_query(LUser, LServer, Query) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case lists:keysearch(<<"delete">>, 1, Query) of {value, _} -> - user_queue_parse_query(LUser, LServer, Query, Mod); + case user_queue_parse_query(LUser, LServer, Query, Mod, false) of + true -> + flush_cache(Mod, LUser, LServer); + false -> + ok + end; _ -> - nothing + ok end. -user_queue_parse_query(LUser, LServer, Query, Mod) -> +user_queue_parse_query(LUser, LServer, Query, Mod, Acc) -> case lists:keytake(<<"selected">>, 1, Query) of {value, {_, Seq}, Query2} -> - case catch binary_to_integer(Seq) of - I when is_integer(I), I>=0 -> - Mod:remove_message(LUser, LServer, I); - _ -> - nothing - end, - user_queue_parse_query(LUser, LServer, Query2, Mod); + NewAcc = case catch binary_to_integer(Seq) of + I when is_integer(I), I>=0 -> + Mod:remove_message(LUser, LServer, I), + true; + _ -> + Acc + end, + user_queue_parse_query(LUser, LServer, Query2, Mod, NewAcc); false -> - nothing + Acc end. us_to_list({User, Server}) -> @@ -1004,18 +1080,20 @@ webadmin_user(Acc, User, Server, Lang) -> FQueueLen = [?AC(<<"queue/">>, (integer_to_binary(QueueLen)))], Acc ++ - [?XCT(<<"h3">>, <<"Offline Messages:">>)] ++ + [?XCT(<<"h3">>, ?T("Offline Messages:"))] ++ FQueueLen ++ [?C(<<" ">>), ?INPUTT(<<"submit">>, <<"removealloffline">>, - <<"Remove All Offline Messages">>)]. + ?T("Remove All Offline Messages"))]. -spec delete_all_msgs(binary(), binary()) -> {atomic, any()}. delete_all_msgs(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_all_messages(LUser, LServer). + Ret = Mod:remove_all_messages(LUser, LServer), + flush_cache(Mod, LUser, LServer), + Ret. webadmin_user_parse_query(_, <<"removealloffline">>, User, Server, _Query) -> @@ -1038,18 +1116,50 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server, count_offline_messages(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), + Mod = gen_mod:db_mod(LServer, ?MODULE), case use_mam_for_user(User, Server) of true -> - Res = read_db_messages(LUser, LServer), - count_mam_messages(LUser, LServer, Res); + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?SPOOL_COUNTER_CACHE, {LUser, LServer}, + fun() -> + Res = read_db_messages(LUser, LServer), + count_mam_messages(LUser, LServer, Res) + end); + false -> + Res = read_db_messages(LUser, LServer), + ets_cache:untag(count_mam_messages(LUser, LServer, Res)) + end; _ -> - count_messages_in_db(LUser, LServer) + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?SPOOL_COUNTER_CACHE, {LUser, LServer}, + fun() -> + Mod:count_messages(LUser, LServer) + end); + false -> + ets_cache:untag(Mod:count_messages(LUser, LServer)) + end end. --spec count_messages_in_db(binary(), binary()) -> non_neg_integer(). -count_messages_in_db(LUser, LServer) -> - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:count_messages(LUser, LServer). +-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}. +store_message_in_db(Mod, #offline_msg{us = {User, Server}} = Msg) -> + case Mod:store_message(Msg) of + ok -> + case use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + false -> + ok + end; + Err -> + Err + end. -spec add_delay_info(message(), binary(), undefined | erlang:timestamp()) -> message(). @@ -1099,26 +1209,28 @@ import(LServer, {sql, _}, DBType, <<"spool">>, Mod:import(OffMsg). use_mam_for_user(_User, Server) -> - gen_mod:get_module_opt(Server, ?MODULE, use_mam_for_storage). + mod_offline_opt:use_mam_for_storage(Server). mod_opt_type(access_max_user_messages) -> - fun acl:shaper_rules_validator/1; -mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; + econf:shaper(); mod_opt_type(store_groupchat) -> - fun(V) when is_boolean(V) -> V end; + econf:bool(); mod_opt_type(bounce_groupchat) -> - fun(V) when is_boolean(V) -> V end; + econf:bool(); mod_opt_type(use_mam_for_storage) -> - fun(V) when is_boolean(V) -> V end; + econf:bool(); mod_opt_type(store_empty_body) -> - fun (V) when is_boolean(V) -> V; - (unless_chat_state) -> unless_chat_state - end; -mod_opt_type(O) when O == cache_life_time; O == cache_size -> - fun (I) when is_integer(I), I > 0 -> I; - (infinity) -> infinity - end. - + econf:either( + unless_chat_state, + econf:bool()); +mod_opt_type(db_type) -> + econf:db_type(?MODULE); +mod_opt_type(use_cache) -> + econf:bool(); +mod_opt_type(cache_size) -> + econf:pos_int(infinity); +mod_opt_type(cache_life_time) -> + econf:timeout(second, infinity). mod_options(Host) -> [{db_type, ejabberd_config:default_db(Host, ?MODULE)}, @@ -1127,5 +1239,6 @@ mod_options(Host) -> {use_mam_for_storage, false}, {bounce_groupchat, false}, {store_groupchat, false}, - {cache_size, ejabberd_config:cache_size(Host)}, - {cache_life_time, ejabberd_config:cache_life_time(Host)}]. + {use_cache, ejabberd_option:use_cache(Host)}, + {cache_size, ejabberd_option:cache_size(Host)}, + {cache_life_time, ejabberd_option:cache_life_time(Host)}]. |