aboutsummaryrefslogtreecommitdiff
path: root/src/mod_offline.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r--src/mod_offline.erl421
1 files changed, 267 insertions, 154 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 76682d06c..07d71bfdc 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -77,12 +77,14 @@
-include("mod_offline.hrl").
+-include("translate.hrl").
+
-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
%% default value for the maximum number of user messages
-define(MAX_USER_MESSAGES, infinity).
--define(EMPTY_SPOOL_CACHE, offline_empty_cache).
+-define(SPOOL_COUNTER_CACHE, offline_msg_counter_cache).
-type c2s_state() :: ejabberd_c2s:state().
@@ -95,23 +97,26 @@
-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
-callback remove_user(binary(), binary()) -> any().
-callback read_message_headers(binary(), binary()) ->
- [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}].
+ [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error.
-callback read_message(binary(), binary(), non_neg_integer()) ->
{ok, #offline_msg{}} | error.
-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
-callback read_all_messages(binary(), binary()) -> [#offline_msg{}].
-callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
--callback count_messages(binary(), binary()) -> non_neg_integer().
+-callback count_messages(binary(), binary()) -> {ets_cache:tag(), non_neg_integer()}.
+-callback use_cache(binary()) -> boolean().
+-callback cache_nodes(binary()) -> [node()].
--optional_callbacks([remove_expired_messages/1, remove_old_messages/2]).
+-optional_callbacks([remove_expired_messages/1, remove_old_messages/2,
+ use_cache/1, cache_nodes/1]).
depends(_Host, _Opts) ->
[].
start(Host, Opts) ->
- Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
+ Mod = gen_mod:db_mod(Opts, ?MODULE),
Mod:init(Host, Opts),
- init_cache(Opts),
+ init_cache(Mod, Host, Opts),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
store_packet, 50),
ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
@@ -159,67 +164,88 @@ stop(Host) ->
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
reload(Host, NewOpts, OldOpts) ->
- NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE),
- OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE),
- init_cache(NewOpts),
+ NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
+ OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
+ init_cache(NewMod, Host, NewOpts),
if NewMod /= OldMod ->
NewMod:init(Host, NewOpts);
true ->
ok
end.
-init_cache(Opts) ->
- case gen_mod:get_opt(use_mam_for_storage, Opts) of
- true ->
- MaxSize = gen_mod:get_opt(cache_size, Opts),
- LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of
- infinity -> infinity;
- I -> timer:seconds(I)
- end,
- COpts = [{max_size, MaxSize}, {cache_missed, false}, {life_time, LifeTime}],
- ets_cache:new(?EMPTY_SPOOL_CACHE, COpts);
- false ->
- ets_cache:delete(?EMPTY_SPOOL_CACHE)
+init_cache(Mod, Host, Opts) ->
+ CacheOpts = [{max_size, mod_offline_opt:cache_size(Opts)},
+ {life_time, mod_offline_opt:cache_life_time(Opts)},
+ {cache_missed, false}],
+ case use_cache(Mod, Host) of
+ true ->
+ ets_cache:new(?SPOOL_COUNTER_CACHE, CacheOpts);
+ false ->
+ ets_cache:delete(?SPOOL_COUNTER_CACHE)
+ end.
+
+-spec use_cache(module(), binary()) -> boolean().
+use_cache(Mod, Host) ->
+ case erlang:function_exported(Mod, use_cache, 1) of
+ true -> Mod:use_cache(Host);
+ false -> mod_offline_opt:use_cache(Host)
+ end.
+
+-spec cache_nodes(module(), binary()) -> [node()].
+cache_nodes(Mod, Host) ->
+ case erlang:function_exported(Mod, cache_nodes, 1) of
+ true -> Mod:cache_nodes(Host);
+ false -> ejabberd_cluster:get_nodes()
+ end.
+
+-spec flush_cache(module(), binary(), binary()) -> ok.
+flush_cache(Mod, User, Server) ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:delete(?SPOOL_COUNTER_CACHE,
+ {User, Server},
+ cache_nodes(Mod, Server));
+ false ->
+ ok
end.
-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
UseMam = use_mam_for_user(User, Server),
+ Mod = gen_mod:db_mod(Server, ?MODULE),
case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
true ->
- Mod = gen_mod:db_mod(Server, ?MODULE),
- ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
- fun() ->
- case count_messages_in_db(User, Server) of
- 0 ->
- case Mod:store_message(Msg) of
- ok ->
- {cache, ok};
- Err ->
- {nocache, Err}
- end;
- _ ->
- {cache, ok}
+ case count_offline_messages(User, Server) of
+ 0 ->
+ store_message_in_db(Mod, Msg);
+ _ ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ false ->
+ ok
end
- end);
+ end;
false ->
- Mod = gen_mod:db_mod(Server, ?MODULE),
case get_max_user_messages(User, Server) of
infinity ->
- Mod:store_message(Msg);
+ store_message_in_db(Mod, Msg);
Limit ->
- Num = count_messages_in_db(User, Server),
+ Num = count_offline_messages(User, Server),
if Num < Limit ->
- Mod:store_message(Msg);
- true ->
+ store_message_in_db(Mod, Msg);
+ true ->
{error, full}
end
end
end.
get_max_user_messages(User, Server) ->
- Access = gen_mod:get_module_opt(Server, ?MODULE, access_max_user_messages),
- case acl:match_rule(Server, Access, jid:make(User, Server)) of
+ Access = mod_offline_opt:access_max_user_messages(Server),
+ case ejabberd_shaper:match(Server, Access, jid:make(User, Server)) of
Max when is_integer(Max) -> Max;
infinity -> infinity;
_ -> ?MAX_USER_MESSAGES
@@ -255,7 +281,12 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID,
?NS_FLEX_OFFLINE, _Lang) ->
ejabberd_sm:route(JID, {resend_offline, false}),
Mod = gen_mod:db_mod(S, ?MODULE),
- Hdrs = Mod:read_message_headers(U, S),
+ Hdrs = case Mod:read_message_headers(U, S) of
+ L when is_list(L) ->
+ L;
+ _ ->
+ []
+ end,
BareJID = jid:remove_resource(JID),
{result, lists:map(
fun({Seq, From, _To, _TS, _El}) ->
@@ -297,7 +328,7 @@ handle_offline_query(#iq{from = #jid{luser = U1, lserver = S1},
lang = Lang,
sub_els = [#offline{}]} = IQ)
when {U1, S1} /= {U2, S2} ->
- Txt = <<"Query to another users is forbidden">>,
+ Txt = ?T("Query to another users is forbidden"),
xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang));
handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From,
to = #jid{luser = U, lserver = S} = _To,
@@ -318,7 +349,7 @@ handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From,
{atomic, ok} ->
xmpp:make_iq_result(IQ);
_Err ->
- Txt = <<"Database failure">>,
+ Txt = ?T("Database failure"),
xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
end;
{set, #offline{fetch = false, items = [_|_] = Items, purge = false}} ->
@@ -330,7 +361,7 @@ handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From,
xmpp:make_error(IQ, xmpp:err_bad_request())
end;
handle_offline_query(#iq{lang = Lang} = IQ) ->
- Txt = <<"No module is handling this query">>,
+ Txt = ?T("No module is handling this query"),
xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)).
-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
@@ -349,7 +380,7 @@ handle_offline_items_view(JID, Items) ->
NewEl = set_offline_tag(El, Node),
case ejabberd_sm:get_session_pid(U, S, R) of
Pid when is_pid(Pid) ->
- Pid ! {route, NewEl};
+ ejabberd_c2s:route(Pid, {route, NewEl});
none ->
ok
end,
@@ -408,6 +439,7 @@ remove_msg_by_node(To, Seq) ->
LServer = To#jid.lserver,
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:remove_message(LUser, LServer, I),
+ flush_cache(Mod, LUser, LServer),
true;
_ ->
false
@@ -432,15 +464,13 @@ need_to_store(LServer, #message{type = Type} = Packet) ->
none ->
Store = case Type of
groupchat ->
- gen_mod:get_module_opt(
- LServer, ?MODULE, store_groupchat);
+ mod_offline_opt:store_groupchat(LServer);
headline ->
false;
_ ->
true
end,
- case {Store, gen_mod:get_module_opt(
- LServer, ?MODULE, store_empty_body)} of
+ case {Store, mod_offline_opt:store_empty_body(LServer)} of
{false, _} ->
false;
{_, true} ->
@@ -484,9 +514,31 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
stop
end
end;
- _ -> Acc
+ _ ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end;
+ false ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end.
+
+-spec maybe_update_cache(jid(), message()) -> ok.
+maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) ->
+ case xmpp:get_meta(Packet, mam_archived, false) of
+ true ->
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ _ ->
+ ok
end;
- false -> Acc
+ _ ->
+ ok
end.
-spec check_store_hint(message()) -> store | no_store | none.
@@ -567,8 +619,7 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State)
{ok, OffMsgs} ->
case use_mam_for_user(LUser, LServer) of
true ->
- ets_cache:delete(?EMPTY_SPOOL_CACHE, {LUser, LServer},
- ejabberd_cluster:get_nodes()),
+ flush_cache(Mod, LUser, LServer),
lists:map(
fun({_, #message{from = From, to = To} = Msg}) ->
#offline_msg{from = From, to = To,
@@ -576,6 +627,7 @@ route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State)
packet = Msg}
end, read_mam_messages(LUser, LServer, OffMsgs));
_ ->
+ flush_cache(Mod, LUser, LServer),
OffMsgs
end;
_ ->
@@ -622,16 +674,24 @@ remove_expired_messages(Server) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case erlang:function_exported(Mod, remove_expired_messages, 1) of
- true -> Mod:remove_expired_messages(LServer);
- false -> erlang:error(not_implemented)
+ true ->
+ Ret = Mod:remove_expired_messages(LServer),
+ ets_cache:clear(?SPOOL_COUNTER_CACHE),
+ Ret;
+ false ->
+ erlang:error(not_implemented)
end.
remove_old_messages(Days, Server) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
case erlang:function_exported(Mod, remove_old_messages, 2) of
- true -> Mod:remove_old_messages(Days, LServer);
- false -> erlang:error(not_implemented)
+ true ->
+ Ret = Mod:remove_old_messages(Days, LServer),
+ ets_cache:clear(?SPOOL_COUNTER_CACHE),
+ Ret;
+ false ->
+ erlang:error(not_implemented)
end.
-spec remove_user(binary(), binary()) -> ok.
@@ -640,7 +700,7 @@ remove_user(User, Server) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:remove_user(LUser, LServer),
- ok.
+ flush_cache(Mod, LUser, LServer).
%% Helper functions:
@@ -648,11 +708,11 @@ remove_user(User, Server) ->
check_if_message_should_be_bounced(Packet) ->
case Packet of
#message{type = groupchat, to = #jid{lserver = LServer}} ->
- gen_mod:get_module_opt(LServer, ?MODULE, bounce_groupchat);
+ mod_offline_opt:bounce_groupchat(LServer);
#message{to = #jid{lserver = LServer}} ->
case misc:is_mucsub_message(Packet) of
true ->
- gen_mod:get_module_opt(LServer, ?MODULE, bounce_groupchat);
+ mod_offline_opt:bounce_groupchat(LServer);
_ ->
true
end;
@@ -669,11 +729,11 @@ discard_warn_sender(Packet, Reason) ->
Lang = xmpp:get_lang(Packet),
Err = case Reason of
full ->
- ErrText = <<"Your contact offline message queue is "
- "full. The message has been discarded.">>,
+ ErrText = ?T("Your contact offline message queue is "
+ "full. The message has been discarded."),
xmpp:err_resource_constraint(ErrText, Lang);
_ ->
- ErrText = <<"Database failure">>,
+ ErrText = ?T("Database failure"),
xmpp:err_internal_server_error(ErrText, Lang)
end,
ejabberd_router:route_error(Packet, Err);
@@ -694,14 +754,14 @@ get_offline_els(LUser, LServer) ->
-spec offline_msg_to_route(binary(), #offline_msg{}) ->
{route, message()} | error.
offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
- CodecOpts = ejabberd_config:codec_options(LServer),
+ CodecOpts = ejabberd_config:codec_options(),
try xmpp:decode(R#offline_msg.packet, ?NS_CLIENT, CodecOpts) of
Pkt ->
Pkt1 = xmpp:set_from_to(Pkt, From, To),
Pkt2 = add_delay_info(Pkt1, LServer, R#offline_msg.timestamp),
{route, Pkt2}
catch _:{xmpp_codec, Why} ->
- ?ERROR_MSG("failed to decode packet ~p of user ~s: ~s",
+ ?ERROR_MSG("Failed to decode packet ~p of user ~s: ~s",
[R#offline_msg.packet, jid:encode(To),
xmpp:format_error(Why)]),
error
@@ -709,7 +769,12 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
read_messages(LUser, LServer) ->
- Res = read_db_messages(LUser, LServer),
+ Res = case read_db_messages(LUser, LServer) of
+ error ->
+ [];
+ L when is_list(L) ->
+ L
+ end,
case use_mam_for_user(LUser, LServer) of
true ->
read_mam_messages(LUser, LServer, Res);
@@ -717,27 +782,32 @@ read_messages(LUser, LServer) ->
Res
end.
--spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
+-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error.
read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
- CodecOpts = ejabberd_config:codec_options(LServer),
- lists:flatmap(
- fun({Seq, From, To, TS, El}) ->
- Node = integer_to_binary(Seq),
- try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
- Pkt ->
+ CodecOpts = ejabberd_config:codec_options(),
+ case Mod:read_message_headers(LUser, LServer) of
+ error ->
+ error;
+ L ->
+ lists:flatmap(
+ fun({Seq, From, To, TS, El}) ->
Node = integer_to_binary(Seq),
- Pkt1 = add_delay_info(Pkt, LServer, TS),
- Pkt2 = xmpp:set_from_to(Pkt1, From, To),
- [{Node, Pkt2}]
- catch _:{xmpp_codec, Why} ->
- ?ERROR_MSG("failed to decode packet ~p "
- "of user ~s: ~s",
- [El, jid:encode(To),
- xmpp:format_error(Why)]),
- []
- end
- end, Mod:read_message_headers(LUser, LServer)).
+ try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+ Pkt ->
+ Node = integer_to_binary(Seq),
+ Pkt1 = add_delay_info(Pkt, LServer, TS),
+ Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+ [{Node, Pkt2}]
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("Failed to decode packet ~p "
+ "of user ~s: ~s",
+ [El, jid:encode(To),
+ xmpp:format_error(Why)]),
+ []
+ end
+ end, L)
+ end.
-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
{integer() | none, [message()]}.
@@ -745,7 +815,7 @@ parse_marker_messages(LServer, ReadMsgs) ->
{Timestamp, ExtraMsgs} = lists:foldl(
fun({_Node, #message{id = <<"ActivityMarker">>,
body = [], type = error} = Msg}, {T, E}) ->
- case xmpp:get_subtag(Msg, #delay{}) of
+ case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of
#delay{stamp = Time} ->
if T == none orelse T > Time ->
{Time, E};
@@ -760,7 +830,7 @@ parse_marker_messages(LServer, ReadMsgs) ->
body = [], type = error} = Msg ->
TS2 = case TS of
undefined ->
- case xmpp:get_subtag(Msg, #delay{}) of
+ case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of
#delay{stamp = TS0} ->
TS0;
_ ->
@@ -785,7 +855,7 @@ parse_marker_messages(LServer, ReadMsgs) ->
end, {none, []}, ReadMsgs),
Start = case {Timestamp, ExtraMsgs} of
{none, [First|_]} ->
- case xmpp:get_subtag(First, #delay{}) of
+ case xmpp:get_subtag(First, #delay{stamp = {0,0,0}}) of
#delay{stamp = {Mega, Sec, Micro}} ->
{Mega, Sec, Micro+1};
_ ->
@@ -807,9 +877,10 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
ExtraMsgs;
_ ->
MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
- Number when is_integer(Number) -> Number - length(ExtraMsgs);
- infinity -> undefined;
- _ -> 100 - length(ExtraMsgs)
+ Number when is_integer(Number) ->
+ max(0, Number - length(ExtraMsgs));
+ infinity ->
+ undefined
end,
JID = jid:make(LUser, LServer, <<>>),
{MamMsgs, _, _} = mod_mam:select(LServer, JID, JID,
@@ -826,20 +897,20 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
end,
AllMsgs2 = lists:sort(
fun(A, B) ->
- DA = case xmpp:get_subtag(A, #stanza_id{}) of
+ DA = case xmpp:get_subtag(A, #stanza_id{by = #jid{}}) of
#stanza_id{id = IDA} ->
IDA;
- _ -> case xmpp:get_subtag(A, #delay{}) of
+ _ -> case xmpp:get_subtag(A, #delay{stamp = {0,0,0}}) of
#delay{stamp = STA} ->
integer_to_binary(misc:now_to_usec(STA));
_ ->
<<"unknown">>
end
end,
- DB = case xmpp:get_subtag(B, #stanza_id{}) of
+ DB = case xmpp:get_subtag(B, #stanza_id{by = #jid{}}) of
#stanza_id{id = IDB} ->
IDB;
- _ -> case xmpp:get_subtag(B, #delay{}) of
+ _ -> case xmpp:get_subtag(B, #delay{stamp = {0,0,0}}) of
#delay{stamp = STB} ->
integer_to_binary(misc:now_to_usec(STB));
_ ->
@@ -854,18 +925,19 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
end, 1, AllMsgs2),
AllMsgs3.
--spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
- integer().
+-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) ->
+ {cache, integer()} | {nocache, integer()}.
+count_mam_messages(_LUser, _LServer, error) ->
+ {nocache, 0};
count_mam_messages(LUser, LServer, ReadMsgs) ->
{Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
case Start of
none ->
- length(ExtraMsgs);
+ {cache, length(ExtraMsgs)};
_ ->
MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
Number when is_integer(Number) -> Number - length(ExtraMsgs);
- infinity -> undefined;
- _ -> 100 - length(ExtraMsgs)
+ infinity -> undefined
end,
JID = jid:make(LUser, LServer, <<>>),
{_, _, Count} = mod_mam:select(LServer, JID, JID,
@@ -873,7 +945,7 @@ count_mam_messages(LUser, LServer, ReadMsgs) ->
#rsm_set{max = MaxOfflineMsgs,
before = <<"9999999999999999">>},
chat, only_count),
- Count + length(ExtraMsgs)
+ {cache, Count + length(ExtraMsgs)}
end.
format_user_queue(Hdrs) ->
@@ -915,28 +987,26 @@ user_queue(User, Server, Query, Lang) ->
LServer = jid:nameprep(Server),
US = {LUser, LServer},
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Res = user_queue_parse_query(LUser, LServer, Query),
- HdrsAll = Mod:read_message_headers(LUser, LServer),
+ user_queue_parse_query(LUser, LServer, Query),
+ HdrsAll = case Mod:read_message_headers(LUser, LServer) of
+ error -> [];
+ L -> L
+ end,
Hdrs = get_messages_subset(User, Server, HdrsAll),
FMsgs = format_user_queue(Hdrs),
[?XC(<<"h1">>,
- (str:format(?T(<<"~s's Offline Messages Queue">>),
- [us_to_list(US)])))]
- ++
- case Res of
- ok -> [?XREST(<<"Submitted">>)];
- nothing -> []
- end
- ++
+ (str:format(translate:translate(Lang, ?T("~s's Offline Messages Queue")),
+ [us_to_list(US)])))]
+ ++ [?XREST(?T("Submitted"))] ++
[?XAE(<<"form">>,
[{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}],
[?XE(<<"table">>,
[?XE(<<"thead">>,
[?XE(<<"tr">>,
- [?X(<<"td">>), ?XCT(<<"td">>, <<"Time">>),
- ?XCT(<<"td">>, <<"From">>),
- ?XCT(<<"td">>, <<"To">>),
- ?XCT(<<"td">>, <<"Packet">>)])]),
+ [?X(<<"td">>), ?XCT(<<"td">>, ?T("Time")),
+ ?XCT(<<"td">>, ?T("From")),
+ ?XCT(<<"td">>, ?T("To")),
+ ?XCT(<<"td">>, ?T("Packet"))])]),
?XE(<<"tbody">>,
if FMsgs == [] ->
[?XE(<<"tr">>,
@@ -946,29 +1016,35 @@ user_queue(User, Server, Query, Lang) ->
end)]),
?BR,
?INPUTT(<<"submit">>, <<"delete">>,
- <<"Delete Selected">>)])].
+ ?T("Delete Selected"))])].
user_queue_parse_query(LUser, LServer, Query) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
case lists:keysearch(<<"delete">>, 1, Query) of
{value, _} ->
- user_queue_parse_query(LUser, LServer, Query, Mod);
+ case user_queue_parse_query(LUser, LServer, Query, Mod, false) of
+ true ->
+ flush_cache(Mod, LUser, LServer);
+ false ->
+ ok
+ end;
_ ->
- nothing
+ ok
end.
-user_queue_parse_query(LUser, LServer, Query, Mod) ->
+user_queue_parse_query(LUser, LServer, Query, Mod, Acc) ->
case lists:keytake(<<"selected">>, 1, Query) of
{value, {_, Seq}, Query2} ->
- case catch binary_to_integer(Seq) of
- I when is_integer(I), I>=0 ->
- Mod:remove_message(LUser, LServer, I);
- _ ->
- nothing
- end,
- user_queue_parse_query(LUser, LServer, Query2, Mod);
+ NewAcc = case catch binary_to_integer(Seq) of
+ I when is_integer(I), I>=0 ->
+ Mod:remove_message(LUser, LServer, I),
+ true;
+ _ ->
+ Acc
+ end,
+ user_queue_parse_query(LUser, LServer, Query2, Mod, NewAcc);
false ->
- nothing
+ Acc
end.
us_to_list({User, Server}) ->
@@ -1004,18 +1080,20 @@ webadmin_user(Acc, User, Server, Lang) ->
FQueueLen = [?AC(<<"queue/">>,
(integer_to_binary(QueueLen)))],
Acc ++
- [?XCT(<<"h3">>, <<"Offline Messages:">>)] ++
+ [?XCT(<<"h3">>, ?T("Offline Messages:"))] ++
FQueueLen ++
[?C(<<" ">>),
?INPUTT(<<"submit">>, <<"removealloffline">>,
- <<"Remove All Offline Messages">>)].
+ ?T("Remove All Offline Messages"))].
-spec delete_all_msgs(binary(), binary()) -> {atomic, any()}.
delete_all_msgs(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:remove_all_messages(LUser, LServer).
+ Ret = Mod:remove_all_messages(LUser, LServer),
+ flush_cache(Mod, LUser, LServer),
+ Ret.
webadmin_user_parse_query(_, <<"removealloffline">>,
User, Server, _Query) ->
@@ -1038,18 +1116,50 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
count_offline_messages(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
case use_mam_for_user(User, Server) of
true ->
- Res = read_db_messages(LUser, LServer),
- count_mam_messages(LUser, LServer, Res);
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Res = read_db_messages(LUser, LServer),
+ count_mam_messages(LUser, LServer, Res)
+ end);
+ false ->
+ Res = read_db_messages(LUser, LServer),
+ ets_cache:untag(count_mam_messages(LUser, LServer, Res))
+ end;
_ ->
- count_messages_in_db(LUser, LServer)
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Mod:count_messages(LUser, LServer)
+ end);
+ false ->
+ ets_cache:untag(Mod:count_messages(LUser, LServer))
+ end
end.
--spec count_messages_in_db(binary(), binary()) -> non_neg_integer().
-count_messages_in_db(LUser, LServer) ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:count_messages(LUser, LServer).
+-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.
+store_message_in_db(Mod, #offline_msg{us = {User, Server}} = Msg) ->
+ case Mod:store_message(Msg) of
+ ok ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ false ->
+ ok
+ end;
+ Err ->
+ Err
+ end.
-spec add_delay_info(message(), binary(),
undefined | erlang:timestamp()) -> message().
@@ -1099,26 +1209,28 @@ import(LServer, {sql, _}, DBType, <<"spool">>,
Mod:import(OffMsg).
use_mam_for_user(_User, Server) ->
- gen_mod:get_module_opt(Server, ?MODULE, use_mam_for_storage).
+ mod_offline_opt:use_mam_for_storage(Server).
mod_opt_type(access_max_user_messages) ->
- fun acl:shaper_rules_validator/1;
-mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
+ econf:shaper();
mod_opt_type(store_groupchat) ->
- fun(V) when is_boolean(V) -> V end;
+ econf:bool();
mod_opt_type(bounce_groupchat) ->
- fun(V) when is_boolean(V) -> V end;
+ econf:bool();
mod_opt_type(use_mam_for_storage) ->
- fun(V) when is_boolean(V) -> V end;
+ econf:bool();
mod_opt_type(store_empty_body) ->
- fun (V) when is_boolean(V) -> V;
- (unless_chat_state) -> unless_chat_state
- end;
-mod_opt_type(O) when O == cache_life_time; O == cache_size ->
- fun (I) when is_integer(I), I > 0 -> I;
- (infinity) -> infinity
- end.
-
+ econf:either(
+ unless_chat_state,
+ econf:bool());
+mod_opt_type(db_type) ->
+ econf:db_type(?MODULE);
+mod_opt_type(use_cache) ->
+ econf:bool();
+mod_opt_type(cache_size) ->
+ econf:pos_int(infinity);
+mod_opt_type(cache_life_time) ->
+ econf:timeout(second, infinity).
mod_options(Host) ->
[{db_type, ejabberd_config:default_db(Host, ?MODULE)},
@@ -1127,5 +1239,6 @@ mod_options(Host) ->
{use_mam_for_storage, false},
{bounce_groupchat, false},
{store_groupchat, false},
- {cache_size, ejabberd_config:cache_size(Host)},
- {cache_life_time, ejabberd_config:cache_life_time(Host)}].
+ {use_cache, ejabberd_option:use_cache(Host)},
+ {cache_size, ejabberd_option:cache_size(Host)},
+ {cache_life_time, ejabberd_option:cache_life_time(Host)}].