aboutsummaryrefslogtreecommitdiff
path: root/src/mod_offline.erl
diff options
context:
space:
mode:
authorPaweł Chmielowski <pchmielowski@process-one.net>2019-07-01 13:36:05 +0200
committerPaweł Chmielowski <pchmielowski@process-one.net>2019-07-01 13:36:41 +0200
commit3e8f3573a380a210af966ec04c25af92999188f7 (patch)
tree75181432932a2ec9c2a6dfc7b2fa7907c2d050db /src/mod_offline.erl
parentGet rid of useless dialyzer instructions (diff)
Make count_offline_messages cache work when offline uses mam for storage
This also replace existing cache for checking if spool is empty with this cache.
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r--src/mod_offline.erl164
1 files changed, 106 insertions, 58 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 27035e840..3b41ac97d 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -98,7 +98,7 @@
-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
-callback remove_user(binary(), binary()) -> any().
-callback read_message_headers(binary(), binary()) ->
- [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}].
+ [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error.
-callback read_message(binary(), binary(), non_neg_integer()) ->
{ok, #offline_msg{}} | error.
-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
@@ -222,26 +222,26 @@ store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
Mod = gen_mod:db_mod(Server, ?MODULE),
case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
true ->
- ets_cache:lookup(?EMPTY_SPOOL_CACHE, {User, Server},
- fun() ->
- case count_messages_in_db(Mod, User, Server) of
- 0 ->
- case Mod:store_message(Msg) of
- ok ->
- {cache, ok};
- Err ->
- {nocache, Err}
- end;
- _ ->
- {cache, ok}
+ case count_offline_messages(User, Server) of
+ 0 ->
+ store_message_in_db(Mod, Msg);
+ _ ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ false ->
+ ok
end
- end);
+ end;
false ->
case get_max_user_messages(User, Server) of
infinity ->
store_message_in_db(Mod, Msg);
Limit ->
- Num = count_messages_in_db(Mod, User, Server),
+ Num = count_offline_messages(User, Server),
if Num < Limit ->
store_message_in_db(Mod, Msg);
true ->
@@ -288,7 +288,12 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID,
?NS_FLEX_OFFLINE, _Lang) ->
ejabberd_sm:route(JID, {resend_offline, false}),
Mod = gen_mod:db_mod(S, ?MODULE),
- Hdrs = Mod:read_message_headers(U, S),
+ Hdrs = case Mod:read_message_headers(U, S) of
+ L when is_list(L) ->
+ L;
+ _ ->
+ []
+ end,
BareJID = jid:remove_resource(JID),
{result, lists:map(
fun({Seq, From, _To, _TS, _El}) ->
@@ -516,9 +521,31 @@ store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
stop
end
end;
- _ -> Acc
+ _ ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end;
+ false ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end.
+
+-spec maybe_update_cache(jid(), message()) -> ok.
+maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) ->
+ case xmpp:get_meta(Packet, mam_archived, false) of
+ true ->
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ _ ->
+ ok
end;
- false -> Acc
+ _ ->
+ ok
end.
-spec check_store_hint(message()) -> store | no_store | none.
@@ -750,7 +777,12 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
read_messages(LUser, LServer) ->
- Res = read_db_messages(LUser, LServer),
+ Res = case read_db_messages(LUser, LServer) of
+ error ->
+ [];
+ L when is_list(L) ->
+ L
+ end,
case use_mam_for_user(LUser, LServer) of
true ->
read_mam_messages(LUser, LServer, Res);
@@ -758,27 +790,32 @@ read_messages(LUser, LServer) ->
Res
end.
--spec read_db_messages(binary(), binary()) -> [{binary(), message()}].
+-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error.
read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
CodecOpts = ejabberd_config:codec_options(),
- lists:flatmap(
- fun({Seq, From, To, TS, El}) ->
- Node = integer_to_binary(Seq),
- try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
- Pkt ->
+ case Mod:read_message_headers(LUser, LServer) of
+ error ->
+ error;
+ L ->
+ lists:flatmap(
+ fun({Seq, From, To, TS, El}) ->
Node = integer_to_binary(Seq),
- Pkt1 = add_delay_info(Pkt, LServer, TS),
- Pkt2 = xmpp:set_from_to(Pkt1, From, To),
- [{Node, Pkt2}]
- catch _:{xmpp_codec, Why} ->
- ?ERROR_MSG("Failed to decode packet ~p "
- "of user ~s: ~s",
- [El, jid:encode(To),
- xmpp:format_error(Why)]),
- []
- end
- end, Mod:read_message_headers(LUser, LServer)).
+ try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+ Pkt ->
+ Node = integer_to_binary(Seq),
+ Pkt1 = add_delay_info(Pkt, LServer, TS),
+ Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+ [{Node, Pkt2}]
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("Failed to decode packet ~p "
+ "of user ~s: ~s",
+ [El, jid:encode(To),
+ xmpp:format_error(Why)]),
+ []
+ end
+ end, L)
+ end.
-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
{integer() | none, [message()]}.
@@ -896,13 +933,15 @@ read_mam_messages(LUser, LServer, ReadMsgs) ->
end, 1, AllMsgs2),
AllMsgs3.
--spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
- integer().
+-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) ->
+ {cache, integer()} | {nocache, integer()}.
+count_mam_messages(_LUser, _LServer, error) ->
+ {nocache, 0};
count_mam_messages(LUser, LServer, ReadMsgs) ->
{Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
case Start of
none ->
- length(ExtraMsgs);
+ {cache, length(ExtraMsgs)};
_ ->
MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
Number when is_integer(Number) -> Number - length(ExtraMsgs);
@@ -914,7 +953,7 @@ count_mam_messages(LUser, LServer, ReadMsgs) ->
#rsm_set{max = MaxOfflineMsgs,
before = <<"9999999999999999">>},
chat, only_count),
- Count + length(ExtraMsgs)
+ {cache, Count + length(ExtraMsgs)}
end.
format_user_queue(Hdrs) ->
@@ -957,7 +996,10 @@ user_queue(User, Server, Query, Lang) ->
US = {LUser, LServer},
Mod = gen_mod:db_mod(LServer, ?MODULE),
user_queue_parse_query(LUser, LServer, Query),
- HdrsAll = Mod:read_message_headers(LUser, LServer),
+ HdrsAll = case Mod:read_message_headers(LUser, LServer) of
+ error -> [];
+ L -> L
+ end,
Hdrs = get_messages_subset(User, Server, HdrsAll),
FMsgs = format_user_queue(Hdrs),
[?XC(<<"h1">>,
@@ -1082,26 +1124,32 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
count_offline_messages(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
case use_mam_for_user(User, Server) of
true ->
- Res = read_db_messages(LUser, LServer),
- count_mam_messages(LUser, LServer, Res);
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Res = read_db_messages(LUser, LServer),
+ count_mam_messages(LUser, LServer, Res)
+ end);
+ false ->
+ Res = read_db_messages(LUser, LServer),
+ ets_cache:untag(count_mam_messages(LUser, LServer, Res))
+ end;
_ ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- count_messages_in_db(Mod, LUser, LServer)
- end.
-
--spec count_messages_in_db(module(), binary(), binary()) -> non_neg_integer().
-count_messages_in_db(Mod, LUser, LServer) ->
- case use_cache(Mod, LServer) of
- true ->
- ets_cache:lookup(
- ?SPOOL_COUNTER_CACHE, {LUser, LServer},
- fun() ->
- Mod:count_messages(LUser, LServer)
- end);
- false ->
- ets_cache:untag(Mod:count_messages(LUser, LServer))
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Mod:count_messages(LUser, LServer)
+ end);
+ false ->
+ ets_cache:untag(Mod:count_messages(LUser, LServer))
+ end
end.
-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.