diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
commit | e40baf0bdaecf3206420fe8c16c33f2c166cb717 (patch) | |
tree | 75d9fe880e8257ea9fd20c095c252d7940cea89d /src/mod_carboncopy.erl | |
parent | Bump xmpp dependency, it's required by previous commit (diff) |
Use cache in front of Redis/SQL RAM backends
Diffstat (limited to 'src/mod_carboncopy.erl')
-rw-r--r-- | src/mod_carboncopy.erl | 127 |
1 files changed, 119 insertions, 8 deletions
diff --git a/src/mod_carboncopy.erl b/src/mod_carboncopy.erl index a7ae37f4..91c18aab 100644 --- a/src/mod_carboncopy.erl +++ b/src/mod_carboncopy.erl @@ -36,18 +36,23 @@ -export([user_send_packet/1, user_receive_packet/1, iq_handler/1, remove_connection/4, disco_features/5, - is_carbon_copy/1, mod_opt_type/1, depends/2]). + is_carbon_copy/1, mod_opt_type/1, depends/2, clean_cache/1]). -include("ejabberd.hrl"). -include("logger.hrl"). -include("xmpp.hrl"). +-include("mod_carboncopy.hrl"). -type direction() :: sent | received. -callback init(binary(), gen_mod:opts()) -> any(). -callback enable(binary(), binary(), binary(), binary()) -> ok | {error, any()}. -callback disable(binary(), binary(), binary()) -> ok | {error, any()}. --callback list(binary(), binary()) -> [{binary(), binary()}]. +-callback list(binary(), binary()) -> [{binary(), binary(), node()}]. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). -spec is_carbon_copy(stanza()) -> boolean(). is_carbon_copy(#message{meta = #{carbon_copy := true}}) -> @@ -59,7 +64,9 @@ start(Host, Opts) -> IQDisc = gen_mod:get_opt(iqdisc, Opts,fun gen_iq_handler:check_type/1, one_queue), ejabberd_hooks:add(disco_local_features, Host, ?MODULE, disco_features, 50), Mod = gen_mod:ram_db_mod(Host, ?MODULE), + init_cache(Mod, Host, Opts), Mod:init(Host, Opts), + clean_cache(), ejabberd_hooks:add(unset_presence_hook,Host, ?MODULE, remove_connection, 10), %% why priority 89: to define clearly that we must run BEFORE mod_logdb hook (90) ejabberd_hooks:add(user_send_packet,Host, ?MODULE, user_send_packet, 89), @@ -82,6 +89,12 @@ reload(Host, NewOpts, OldOpts) -> true -> ok end, + case use_cache(NewMod, Host) of + true -> + ets_cache:new(?CARBONCOPY_CACHE, cache_opts(Host, NewOpts)); + false -> + ok + end, case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, fun gen_iq_handler:check_type/1, one_queue) of @@ -247,13 +260,20 @@ build_forward_packet(JID, #message{type = T} = Msg, Sender, Dest, Direction) -> enable(Host, U, R, CC)-> ?DEBUG("enabling for ~p", [U]), Mod = gen_mod:ram_db_mod(Host, ?MODULE), - Mod:enable(U, Host, R, CC). + case Mod:enable(U, Host, R, CC) of + ok -> + delete_cache(Mod, U, Host); + {error, _} = Err -> + Err + end. -spec disable(binary(), binary(), binary()) -> ok | {error, any()}. disable(Host, U, R)-> ?DEBUG("disabling for ~p", [U]), Mod = gen_mod:ram_db_mod(Host, ?MODULE), - Mod:disable(U, Host, R). + Res = Mod:disable(U, Host, R), + delete_cache(Mod, U, Host), + Res. -spec complete_packet(jid(), message(), direction()) -> message(). complete_packet(From, #message{from = undefined} = Msg, sent) -> @@ -276,15 +296,106 @@ is_muc_pm(#jid{lresource = <<>>}, _Packet) -> is_muc_pm(_To, Packet) -> xmpp:has_subtag(Packet, #muc_user{}). --spec list(binary(), binary()) -> [{binary(), binary()}]. -%% list {resource, cc_version} with carbons enabled for given user and host +-spec list(binary(), binary()) -> [{Resource :: binary(), Namespace :: binary()}]. list(User, Server) -> Mod = gen_mod:ram_db_mod(Server, ?MODULE), - Mod:list(User, Server). + case use_cache(Mod, Server) of + true -> + case ets_cache:lookup( + ?CARBONCOPY_CACHE, {User, Server}, + fun() -> + case Mod:list(User, Server) of + {ok, L} when L /= [] -> {ok, L}; + _ -> error + end + end) of + {ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L]; + error -> [] + end; + false -> + case Mod:list(User, Server) of + {ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L]; + error -> [] + end + end. + +-spec init_cache(module(), binary(), gen_mod:opts()) -> ok. +init_cache(Mod, Host, Opts) -> + case use_cache(Mod, Host) of + true -> + ets_cache:new(?CARBONCOPY_CACHE, cache_opts(Host, Opts)); + false -> + ets_cache:delete(?CARBONCOPY_CACHE) + end. + +-spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()]. +cache_opts(Host, Opts) -> + MaxSize = gen_mod:get_opt( + cache_size, Opts, mod_opt_type(cache_size), + ejabberd_config:cache_size(Host)), + CacheMissed = gen_mod:get_opt( + cache_missed, Opts, mod_opt_type(cache_missed), + ejabberd_config:cache_missed(Host)), + LifeTime = case gen_mod:get_opt( + cache_life_time, Opts, mod_opt_type(cache_life_time), + ejabberd_config:cache_life_time(Host)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> + gen_mod:get_module_opt( + Host, ?MODULE, use_cache, mod_opt_type(use_cache), + ejabberd_config:use_cache(Host)) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. + +-spec clean_cache(node()) -> ok. +clean_cache(Node) -> + ets_cache:filter( + ?CARBONCOPY_CACHE, + fun(_, error) -> + false; + (_, {ok, L}) -> + not lists:any(fun({_, _, N}) -> N == Node end, L) + end). + +-spec clean_cache() -> ok. +clean_cache() -> + ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]). + +-spec delete_cache(module(), binary(), binary()) -> ok. +delete_cache(Mod, User, Server) -> + case use_cache(Mod, Server) of + true -> + ets_cache:delete(?CARBONCOPY_CACHE, {User, Server}, + cache_nodes(Mod, Server)); + false -> + ok + end. depends(_Host, _Opts) -> []. mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1; mod_opt_type(ram_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; -mod_opt_type(_) -> [ram_db_type, iqdisc]. +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(O) when O == cache_size; O == cache_life_time -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; +mod_opt_type(_) -> + [ram_db_type, iqdisc, use_cache, cache_size, cache_missed, cache_life_time]. |