summaryrefslogtreecommitdiff
path: root/src/mod_carboncopy.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-14 13:57:52 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-14 13:57:52 +0300
commite40baf0bdaecf3206420fe8c16c33f2c166cb717 (patch)
tree75d9fe880e8257ea9fd20c095c252d7940cea89d /src/mod_carboncopy.erl
parentBump xmpp dependency, it's required by previous commit (diff)
Use cache in front of Redis/SQL RAM backends
Diffstat (limited to 'src/mod_carboncopy.erl')
-rw-r--r--src/mod_carboncopy.erl127
1 files changed, 119 insertions, 8 deletions
diff --git a/src/mod_carboncopy.erl b/src/mod_carboncopy.erl
index a7ae37f4..91c18aab 100644
--- a/src/mod_carboncopy.erl
+++ b/src/mod_carboncopy.erl
@@ -36,18 +36,23 @@
-export([user_send_packet/1, user_receive_packet/1,
iq_handler/1, remove_connection/4, disco_features/5,
- is_carbon_copy/1, mod_opt_type/1, depends/2]).
+ is_carbon_copy/1, mod_opt_type/1, depends/2, clean_cache/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
-include("xmpp.hrl").
+-include("mod_carboncopy.hrl").
-type direction() :: sent | received.
-callback init(binary(), gen_mod:opts()) -> any().
-callback enable(binary(), binary(), binary(), binary()) -> ok | {error, any()}.
-callback disable(binary(), binary(), binary()) -> ok | {error, any()}.
--callback list(binary(), binary()) -> [{binary(), binary()}].
+-callback list(binary(), binary()) -> [{binary(), binary(), node()}].
+-callback use_cache(binary()) -> boolean().
+-callback cache_nodes(binary()) -> [node()].
+
+-optional_callbacks([use_cache/1, cache_nodes/1]).
-spec is_carbon_copy(stanza()) -> boolean().
is_carbon_copy(#message{meta = #{carbon_copy := true}}) ->
@@ -59,7 +64,9 @@ start(Host, Opts) ->
IQDisc = gen_mod:get_opt(iqdisc, Opts,fun gen_iq_handler:check_type/1, one_queue),
ejabberd_hooks:add(disco_local_features, Host, ?MODULE, disco_features, 50),
Mod = gen_mod:ram_db_mod(Host, ?MODULE),
+ init_cache(Mod, Host, Opts),
Mod:init(Host, Opts),
+ clean_cache(),
ejabberd_hooks:add(unset_presence_hook,Host, ?MODULE, remove_connection, 10),
%% why priority 89: to define clearly that we must run BEFORE mod_logdb hook (90)
ejabberd_hooks:add(user_send_packet,Host, ?MODULE, user_send_packet, 89),
@@ -82,6 +89,12 @@ reload(Host, NewOpts, OldOpts) ->
true ->
ok
end,
+ case use_cache(NewMod, Host) of
+ true ->
+ ets_cache:new(?CARBONCOPY_CACHE, cache_opts(Host, NewOpts));
+ false ->
+ ok
+ end,
case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts,
fun gen_iq_handler:check_type/1,
one_queue) of
@@ -247,13 +260,20 @@ build_forward_packet(JID, #message{type = T} = Msg, Sender, Dest, Direction) ->
enable(Host, U, R, CC)->
?DEBUG("enabling for ~p", [U]),
Mod = gen_mod:ram_db_mod(Host, ?MODULE),
- Mod:enable(U, Host, R, CC).
+ case Mod:enable(U, Host, R, CC) of
+ ok ->
+ delete_cache(Mod, U, Host);
+ {error, _} = Err ->
+ Err
+ end.
-spec disable(binary(), binary(), binary()) -> ok | {error, any()}.
disable(Host, U, R)->
?DEBUG("disabling for ~p", [U]),
Mod = gen_mod:ram_db_mod(Host, ?MODULE),
- Mod:disable(U, Host, R).
+ Res = Mod:disable(U, Host, R),
+ delete_cache(Mod, U, Host),
+ Res.
-spec complete_packet(jid(), message(), direction()) -> message().
complete_packet(From, #message{from = undefined} = Msg, sent) ->
@@ -276,15 +296,106 @@ is_muc_pm(#jid{lresource = <<>>}, _Packet) ->
is_muc_pm(_To, Packet) ->
xmpp:has_subtag(Packet, #muc_user{}).
--spec list(binary(), binary()) -> [{binary(), binary()}].
-%% list {resource, cc_version} with carbons enabled for given user and host
+-spec list(binary(), binary()) -> [{Resource :: binary(), Namespace :: binary()}].
list(User, Server) ->
Mod = gen_mod:ram_db_mod(Server, ?MODULE),
- Mod:list(User, Server).
+ case use_cache(Mod, Server) of
+ true ->
+ case ets_cache:lookup(
+ ?CARBONCOPY_CACHE, {User, Server},
+ fun() ->
+ case Mod:list(User, Server) of
+ {ok, L} when L /= [] -> {ok, L};
+ _ -> error
+ end
+ end) of
+ {ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L];
+ error -> []
+ end;
+ false ->
+ case Mod:list(User, Server) of
+ {ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L];
+ error -> []
+ end
+ end.
+
+-spec init_cache(module(), binary(), gen_mod:opts()) -> ok.
+init_cache(Mod, Host, Opts) ->
+ case use_cache(Mod, Host) of
+ true ->
+ ets_cache:new(?CARBONCOPY_CACHE, cache_opts(Host, Opts));
+ false ->
+ ets_cache:delete(?CARBONCOPY_CACHE)
+ end.
+
+-spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()].
+cache_opts(Host, Opts) ->
+ MaxSize = gen_mod:get_opt(
+ cache_size, Opts, mod_opt_type(cache_size),
+ ejabberd_config:cache_size(Host)),
+ CacheMissed = gen_mod:get_opt(
+ cache_missed, Opts, mod_opt_type(cache_missed),
+ ejabberd_config:cache_missed(Host)),
+ LifeTime = case gen_mod:get_opt(
+ cache_life_time, Opts, mod_opt_type(cache_life_time),
+ ejabberd_config:cache_life_time(Host)) of
+ infinity -> infinity;
+ I -> timer:seconds(I)
+ end,
+ [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}].
+
+-spec use_cache(module(), binary()) -> boolean().
+use_cache(Mod, Host) ->
+ case erlang:function_exported(Mod, use_cache, 1) of
+ true -> Mod:use_cache(Host);
+ false ->
+ gen_mod:get_module_opt(
+ Host, ?MODULE, use_cache, mod_opt_type(use_cache),
+ ejabberd_config:use_cache(Host))
+ end.
+
+-spec cache_nodes(module(), binary()) -> [node()].
+cache_nodes(Mod, Host) ->
+ case erlang:function_exported(Mod, cache_nodes, 1) of
+ true -> Mod:cache_nodes(Host);
+ false -> ejabberd_cluster:get_nodes()
+ end.
+
+-spec clean_cache(node()) -> ok.
+clean_cache(Node) ->
+ ets_cache:filter(
+ ?CARBONCOPY_CACHE,
+ fun(_, error) ->
+ false;
+ (_, {ok, L}) ->
+ not lists:any(fun({_, _, N}) -> N == Node end, L)
+ end).
+
+-spec clean_cache() -> ok.
+clean_cache() ->
+ ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]).
+
+-spec delete_cache(module(), binary(), binary()) -> ok.
+delete_cache(Mod, User, Server) ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:delete(?CARBONCOPY_CACHE, {User, Server},
+ cache_nodes(Mod, Server));
+ false ->
+ ok
+ end.
depends(_Host, _Opts) ->
[].
mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1;
mod_opt_type(ram_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
-mod_opt_type(_) -> [ram_db_type, iqdisc].
+mod_opt_type(O) when O == use_cache; O == cache_missed ->
+ fun(B) when is_boolean(B) -> B end;
+mod_opt_type(O) when O == cache_size; O == cache_life_time ->
+ fun(I) when is_integer(I), I>0 -> I;
+ (unlimited) -> infinity;
+ (infinity) -> infinity
+ end;
+mod_opt_type(_) ->
+ [ram_db_type, iqdisc, use_cache, cache_size, cache_missed, cache_life_time].