diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
commit | e40baf0bdaecf3206420fe8c16c33f2c166cb717 (patch) | |
tree | 75d9fe880e8257ea9fd20c095c252d7940cea89d | |
parent | Bump xmpp dependency, it's required by previous commit (diff) |
Use cache in front of Redis/SQL RAM backends
-rw-r--r-- | include/bosh.hrl | 2 | ||||
-rw-r--r-- | include/ejabberd_router.hrl | 2 | ||||
-rw-r--r-- | include/ejabberd_sm.hrl | 2 | ||||
-rw-r--r-- | include/mod_carboncopy.hrl | 2 | ||||
-rw-r--r-- | rebar.config | 2 | ||||
-rw-r--r-- | src/ejabberd_cluster.erl | 15 | ||||
-rw-r--r-- | src/ejabberd_config.erl | 37 | ||||
-rw-r--r-- | src/ejabberd_redis.erl | 117 | ||||
-rw-r--r-- | src/ejabberd_redis_sup.erl | 2 | ||||
-rw-r--r-- | src/ejabberd_router.erl | 184 | ||||
-rw-r--r-- | src/ejabberd_router_mnesia.erl | 44 | ||||
-rw-r--r-- | src/ejabberd_router_redis.erl | 93 | ||||
-rw-r--r-- | src/ejabberd_router_sql.erl | 72 | ||||
-rw-r--r-- | src/ejabberd_sm.erl | 234 | ||||
-rw-r--r-- | src/ejabberd_sm_mnesia.erl | 31 | ||||
-rw-r--r-- | src/ejabberd_sm_redis.erl | 143 | ||||
-rw-r--r-- | src/ejabberd_sm_sql.erl | 60 | ||||
-rw-r--r-- | src/mod_bosh.erl | 125 | ||||
-rw-r--r-- | src/mod_bosh_mnesia.erl | 8 | ||||
-rw-r--r-- | src/mod_bosh_redis.erl | 81 | ||||
-rw-r--r-- | src/mod_bosh_sql.erl | 19 | ||||
-rw-r--r-- | src/mod_carboncopy.erl | 127 | ||||
-rw-r--r-- | src/mod_carboncopy_mnesia.erl | 31 | ||||
-rw-r--r-- | src/mod_carboncopy_redis.erl | 88 | ||||
-rw-r--r-- | src/mod_carboncopy_sql.erl | 13 | ||||
-rw-r--r-- | src/randoms.erl | 5 |
26 files changed, 1155 insertions, 384 deletions
diff --git a/include/bosh.hrl b/include/bosh.hrl index 3f9095e58..d95784c08 100644 --- a/include/bosh.hrl +++ b/include/bosh.hrl @@ -47,3 +47,5 @@ -define(HEADER(CType), [CType, ?AC_ALLOW_ORIGIN, ?AC_ALLOW_HEADERS]). + +-define(BOSH_CACHE, bosh_cache). diff --git a/include/ejabberd_router.hrl b/include/ejabberd_router.hrl index f22bd723b..04ea6e304 100644 --- a/include/ejabberd_router.hrl +++ b/include/ejabberd_router.hrl @@ -1,3 +1,5 @@ +-define(ROUTES_CACHE, routes_cache). + -type local_hint() :: integer() | {apply, atom(), atom()}. -record(route, {domain :: binary() | '_', diff --git a/include/ejabberd_sm.hrl b/include/ejabberd_sm.hrl index 71cfc9ee9..377c98a4d 100644 --- a/include/ejabberd_sm.hrl +++ b/include/ejabberd_sm.hrl @@ -21,6 +21,8 @@ -ifndef(EJABBERD_SM_HRL). -define(EJABBERD_SM_HRL, true). +-define(SM_CACHE, sm_cache). + -record(session, {sid, usr, us, priority, info = []}). -record(session_counter, {vhost, count}). -type sid() :: {erlang:timestamp(), pid()}. diff --git a/include/mod_carboncopy.hrl b/include/mod_carboncopy.hrl index b58a5044e..1da76ffbc 100644 --- a/include/mod_carboncopy.hrl +++ b/include/mod_carboncopy.hrl @@ -22,3 +22,5 @@ -record(carboncopy, {us :: {binary(), binary()} | matchspec_atom(), resource :: binary() | matchspec_atom(), version :: binary() | matchspec_atom()}). + +-define(CARBONCOPY_CACHE, carboncopy_cache). diff --git a/rebar.config b/rebar.config index 03cf910b1..05d5d29a2 100644 --- a/rebar.config +++ b/rebar.config @@ -20,7 +20,7 @@ {deps, [{lager, ".*", {git, "https://github.com/basho/lager", {tag, "3.2.1"}}}, {p1_utils, ".*", {git, "https://github.com/processone/p1_utils", {tag, "1.0.8"}}}, - {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", {tag, "1.0.7"}}}, + {cache_tab, ".*", {git, "https://github.com/processone/cache_tab", "35cc9904fde"}}, {fast_tls, ".*", {git, "https://github.com/processone/fast_tls", {tag, "1.0.11"}}}, {stringprep, ".*", {git, "https://github.com/processone/stringprep", {tag, "1.0.8"}}}, {fast_xml, ".*", {git, "https://github.com/processone/fast_xml", {tag, "1.1.21"}}}, diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index a331a0084..aeae294b0 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -26,7 +26,8 @@ -module(ejabberd_cluster). %% API --export([get_nodes/0, call/4, multicall/3, multicall/4]). +-export([get_nodes/0, call/4, multicall/3, multicall/4, + eval_everywhere/3, eval_everywhere/4]). -export([join/1, leave/1, get_known_nodes/0]). -export([node_id/0, get_node_by_id/1]). @@ -59,6 +60,18 @@ multicall(Module, Function, Args) -> multicall(Nodes, Module, Function, Args) -> rpc:multicall(Nodes, Module, Function, Args, 5000). +-spec eval_everywhere(module(), atom(), [any()]) -> ok. + +eval_everywhere(Module, Function, Args) -> + eval_everywhere(get_nodes(), Module, Function, Args), + ok. + +-spec eval_everywhere([node()], module(), atom(), [any()]) -> ok. + +eval_everywhere(Nodes, Module, Function, Args) -> + rpc:eval_everywhere(Nodes, Module, Function, Args), + ok. + -spec join(node()) -> ok | {error, any()}. join(Node) -> diff --git a/src/ejabberd_config.erl b/src/ejabberd_config.erl index 03b893ac3..1f357c294 100644 --- a/src/ejabberd_config.erl +++ b/src/ejabberd_config.erl @@ -37,7 +37,8 @@ env_binary_to_list/2, opt_type/1, may_hide_data/1, is_elixir_enabled/0, v_dbs/1, v_dbs_mods/1, default_db/1, default_db/2, default_ram_db/1, default_ram_db/2, - default_queue_type/1, queue_dir/0, fsm_limit_opts/1]). + default_queue_type/1, queue_dir/0, fsm_limit_opts/1, + use_cache/1, cache_size/1, cache_missed/1, cache_life_time/1]). -export([start/2]). @@ -1460,9 +1461,24 @@ opt_type(queue_dir) -> fun iolist_to_binary/1; opt_type(queue_type) -> fun(ram) -> ram; (file) -> file end; +opt_type(use_cache) -> + fun(B) when is_boolean(B) -> B end; +opt_type(cache_size) -> + fun(I) when is_integer(I), I>0 -> I; + (infinity) -> infinity; + (unlimited) -> infinity + end; +opt_type(cache_missed) -> + fun(B) when is_boolean(B) -> B end; +opt_type(cache_life_time) -> + fun(I) when is_integer(I), I>0 -> I; + (infinity) -> infinity; + (unlimited) -> infinity + end; opt_type(_) -> [hide_sensitive_log_data, hosts, language, max_fsm_queue, - default_db, default_ram_db, queue_type, queue_dir, loglevel]. + default_db, default_ram_db, queue_type, queue_dir, loglevel, + use_cache, cache_size, cache_missed, cache_life_time]. -spec may_hide_data(any()) -> any(). may_hide_data(Data) -> @@ -1499,3 +1515,20 @@ queue_dir() -> -spec default_queue_type(binary()) -> ram | file. default_queue_type(Host) -> get_option({queue_type, Host}, opt_type(queue_type), ram). + +-spec use_cache(binary() | global) -> boolean(). +use_cache(Host) -> + get_option({use_cache, Host}, opt_type(use_cache), true). + +-spec cache_size(binary() | global) -> pos_integer() | infinity. +cache_size(Host) -> + get_option({cache_size, Host}, opt_type(cache_size), 1000). + +-spec cache_missed(binary() | global) -> boolean(). +cache_missed(Host) -> + get_option({cache_missed, Host}, opt_type(cache_missed), true). + +-spec cache_life_time(binary() | global) -> pos_integer() | infinity. +%% NOTE: the integer value returned is in *seconds* +cache_life_time(Host) -> + get_option({cache_life_time, Host}, opt_type(cache_life_time), 3600). diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index e7cc74d98..bd85f0ee5 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -31,11 +31,12 @@ -compile({no_auto_import, [get/1, put/2]}). %% API --export([start_link/1, get_proc/1, q/1, qp/1, format_error/1]). +-export([start_link/1, get_proc/1, get_connection/1, q/1, qp/1, format_error/1]). %% Commands -export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, sismember/2, scard/1, - hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]). + hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1, + subscribe/1, publish/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -53,14 +54,18 @@ -record(state, {connection :: pid() | undefined, num :: pos_integer(), + subscriptions = #{} :: map(), pending_q :: p1_queue:queue()}). --type redis_error() :: {error, binary() | timeout | disconnected | overloaded}. +-type error_reason() :: binary() | timeout | disconnected | overloaded. +-type redis_error() :: {error, error_reason()}. -type redis_reply() :: binary() | [binary()]. -type redis_command() :: [binary()]. -type redis_pipeline() :: [redis_command()]. -type state() :: #state{}. +-export_type([error_reason/0]). + %%%=================================================================== %%% API %%%=================================================================== @@ -79,11 +84,11 @@ get_connection(I) -> -spec q(redis_command()) -> {ok, redis_reply()} | redis_error(). q(Command) -> - call(get_worker(), {q, Command}, ?MAX_RETRIES). + call(get_rnd_id(), {q, Command}, ?MAX_RETRIES). -spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error(). qp(Pipeline) -> - call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES). + call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES). -spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error(). multi(F) -> @@ -288,6 +293,30 @@ hkeys(Key) -> erlang:error(transaction_unsupported) end. +-spec subscribe([binary()]) -> ok | redis_error(). +subscribe(Channels) -> + try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT) + catch exit:{Why, {?GEN_SERVER, call, _}} -> + Reason = case Why of + timeout -> timeout; + _ -> disconnected + end, + {error, Reason} + end. + +-spec publish(iodata(), iodata()) -> {ok, non_neg_integer()} | redis_error() | queued. +publish(Channel, Data) -> + Cmd = [<<"PUBLISH">>, Channel, Data], + case erlang:get(?TR_STACK) of + undefined -> + case q(Cmd) of + {ok, N} -> {ok, binary_to_integer(N)}; + {error, _} = Err -> Err + end; + Stack -> + tr_enq(Cmd, Stack) + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -315,6 +344,15 @@ handle_call(connect, From, #state{connection = Pid} = State) -> self() ! connect, handle_call(connect, From, State#state{connection = undefined}) end; +handle_call({subscribe, Caller, Channels}, _From, + #state{connection = Pid, subscriptions = Subs} = State) -> + Subs1 = lists:foldl( + fun(Channel, Acc) -> + Callers = maps:get(Channel, Acc, []) -- [Caller], + maps:put(Channel, [Caller|Callers], Acc) + end, Subs, Channels), + eredis_subscribe(Pid, Channels), + {reply, ok, State#state{subscriptions = Subs1}}; handle_call(Request, _From, State) -> ?WARNING_MSG("unexepected call: ~p", [Request]), {noreply, State}. @@ -326,6 +364,7 @@ handle_info(connect, #state{connection = undefined} = State) -> NewState = case connect(State) of {ok, Connection} -> Q1 = flush_queue(State#state.pending_q), + re_subscribe(Connection, State#state.subscriptions), State#state{connection = Connection, pending_q = Q1}; {error, _} -> State @@ -342,6 +381,31 @@ handle_info({'EXIT', Pid, _}, State) -> _ -> {noreply, State} end; +handle_info({subscribed, Channel, Pid}, State) -> + case State#state.connection of + Pid -> + case maps:is_key(Channel, State#state.subscriptions) of + true -> eredis_sub:ack_message(Pid); + false -> + ?WARNING_MSG("got subscription ack for unknown channel ~s", + [Channel]) + end; + _ -> + ok + end, + {noreply, State}; +handle_info({message, Channel, Data, Pid}, State) -> + case State#state.connection of + Pid -> + lists:foreach( + fun(Subscriber) -> + erlang:send(Subscriber, {redis_message, Channel, Data}) + end, maps:get(Channel, State#state.subscriptions, [])), + eredis_sub:ack_message(Pid); + _ -> + ok + end, + {noreply, State}; handle_info(Info, State) -> ?WARNING_MSG("unexpected info = ~p", [Info]), {noreply, State}. @@ -377,8 +441,7 @@ connect(#state{num = Num}) -> redis_connect_timeout, fun(I) when is_integer(I), I>0 -> I end, 1)), - try case eredis:start_link(Server, Port, DB, Pass, - no_reconnect, ConnTimeout) of + try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of {ok, Client} -> ?DEBUG("Connection #~p established to Redis at ~s:~p", [Num, Server, Port]), @@ -397,12 +460,24 @@ connect(#state{num = Num}) -> {error, Reason} end. --spec call({atom(), atom()}, {q, redis_command()}, integer()) -> +do_connect(1, Server, Port, Pass, _DB, _ConnTimeout) -> + %% First connection in the pool is always a subscriber + Res = eredis_sub:start_link(Server, Port, Pass, no_reconnect, infinity, drop), + case Res of + {ok, Pid} -> eredis_sub:controlling_process(Pid); + _ -> ok + end, + Res; +do_connect(_, Server, Port, Pass, DB, ConnTimeout) -> + eredis:start_link(Server, Port, DB, Pass, no_reconnect, ConnTimeout). + +-spec call(pos_integer(), {q, redis_command()}, integer()) -> {ok, redis_reply()} | redis_error(); - ({atom(), atom()}, {qp, redis_pipeline()}, integer()) -> + (pos_integer(), {qp, redis_pipeline()}, integer()) -> {ok, [redis_reply()]} | redis_error(). -call({Conn, Parent}, {F, Cmd}, Retries) -> +call(I, {F, Cmd}, Retries) -> ?DEBUG("redis query: ~p", [Cmd]), + Conn = get_connection(I), Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of {error, Reason} when is_atom(Reason) -> try exit(whereis(Conn), kill) catch _:_ -> ok end, @@ -414,8 +489,8 @@ call({Conn, Parent}, {F, Cmd}, Retries) -> end, case Res of {error, disconnected} when Retries > 0 -> - try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of - ok -> call({Conn, Parent}, {F, Cmd}, Retries-1); + try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of + ok -> call(I, {F, Cmd}, Retries-1); {error, _} = Err -> Err catch exit:{Why, {?GEN_SERVER, call, _}} -> Reason1 = case Why of @@ -439,11 +514,9 @@ log_error(Cmd, Reason) -> "** response = ~s", [Cmd, format_error(Reason)]). --spec get_worker() -> {atom(), atom()}. -get_worker() -> - Time = p1_time_compat:system_time(), - I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1, - {get_connection(I), get_proc(I)}. +-spec get_rnd_id() -> pos_integer(). +get_rnd_id() -> + randoms:uniform(2, ejabberd_redis_sup:get_pool_size()). -spec get_result([{error, atom() | binary()} | {ok, iodata()}]) -> {ok, [redis_reply()]} | {error, binary()}. @@ -531,3 +604,13 @@ clean_queue(Q, CurrTime) -> true -> Q1 end. + +re_subscribe(Pid, Subs) -> + case maps:keys(Subs) of + [] -> ok; + Channels -> eredis_subscribe(Pid, Channels) + end. + +eredis_subscribe(Pid, Channels) -> + ?DEBUG("redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]), + eredis_sub:subscribe(Pid, Channels). diff --git a/src/ejabberd_redis_sup.erl b/src/ejabberd_redis_sup.erl index 23330f87c..7e2953c11 100644 --- a/src/ejabberd_redis_sup.erl +++ b/src/ejabberd_redis_sup.erl @@ -136,7 +136,7 @@ get_pool_size() -> ejabberd_config:get_option( redis_pool_size, fun(N) when is_integer(N), N >= 1 -> N end, - ?DEFAULT_POOL_SIZE). + ?DEFAULT_POOL_SIZE) + 1. iolist_to_list(IOList) -> binary_to_list(iolist_to_binary(IOList)). diff --git a/src/ejabberd_router.erl b/src/ejabberd_router.erl index 7474f9a67..30654a03b 100644 --- a/src/ejabberd_router.erl +++ b/src/ejabberd_router.erl @@ -49,7 +49,8 @@ get_all_routes/0, is_my_route/1, is_my_host/1, - find_routes/0, + clean_cache/1, + config_reloaded/0, get_backend/0]). -export([start_link/0]). @@ -70,12 +71,8 @@ -callback register_route(binary(), binary(), local_hint(), undefined | pos_integer(), pid()) -> ok | {error, term()}. -callback unregister_route(binary(), undefined | pos_integer(), pid()) -> ok | {error, term()}. --callback find_routes(binary()) -> [#route{}]. --callback find_routes() -> [#route{}]. --callback host_of_route(binary()) -> {ok, binary()} | error. --callback is_my_route(binary()) -> boolean(). --callback is_my_host(binary()) -> boolean(). --callback get_all_routes() -> [binary()]. +-callback find_routes(binary()) -> {ok, [#route{}]} | {error, any()}. +-callback get_all_routes() -> {ok, [binary()]} | {error, any()}. -record(state, {}). @@ -159,7 +156,8 @@ register_route(Domain, ServerHost, LocalHint, Pid) -> case Mod:register_route(LDomain, LServerHost, LocalHint, get_component_number(LDomain), Pid) of ok -> - ?DEBUG("Route registered: ~s", [LDomain]); + ?DEBUG("Route registered: ~s", [LDomain]), + delete_cache(Mod, LDomain); {error, Err} -> ?ERROR_MSG("Failed to register route ~s: ~p", [LDomain, Err]) @@ -186,7 +184,8 @@ unregister_route(Domain, Pid) -> case Mod:unregister_route( LDomain, get_component_number(LDomain), Pid) of ok -> - ?DEBUG("Route unregistered: ~s", [LDomain]); + ?DEBUG("Route unregistered: ~s", [LDomain]), + delete_cache(Mod, LDomain); {error, Err} -> ?ERROR_MSG("Failed to unregister route ~s: ~p", [LDomain, Err]) @@ -199,15 +198,55 @@ unregister_routes(Domains) -> end, Domains). --spec get_all_routes() -> [binary()]. -get_all_routes() -> +-spec find_routes(binary()) -> [#route{}]. +find_routes(Domain) -> Mod = get_backend(), - Mod:get_all_routes(). + case use_cache(Mod) of + true -> + case ets_cache:lookup( + ?ROUTES_CACHE, {route, Domain}, + fun() -> + case Mod:find_routes(Domain) of + {ok, Rs} when Rs /= [] -> + {ok, Rs}; + _ -> + error + end + end) of + {ok, Rs} -> Rs; + error -> [] + end; + false -> + case Mod:find_routes(Domain) of + {ok, Rs} -> Rs; + _ -> [] + end + end. --spec find_routes() -> [#route{}]. -find_routes() -> +-spec get_all_routes() -> [binary()]. +get_all_routes() -> Mod = get_backend(), - Mod:find_routes(). + case use_cache(Mod) of + true -> + case ets_cache:lookup( + ?ROUTES_CACHE, routes, + fun() -> + case Mod:get_all_routes() of + {ok, Rs} when Rs /= [] -> + {ok, Rs}; + _ -> + error + end + end) of + {ok, Rs} -> Rs; + error -> [] + end; + false -> + case Mod:get_all_routes() of + {ok, Rs} -> Rs; + _ -> [] + end + end. -spec host_of_route(binary()) -> binary(). host_of_route(Domain) -> @@ -215,10 +254,11 @@ host_of_route(Domain) -> error -> erlang:error({invalid_domain, Domain}); LDomain -> - Mod = get_backend(), - case Mod:host_of_route(LDomain) of - {ok, ServerHost} -> ServerHost; - error -> erlang:error({unregistered_route, Domain}) + case find_routes(LDomain) of + [#route{server_host = ServerHost}|_] -> + ServerHost; + _ -> + erlang:error({unregistered_route, Domain}) end end. @@ -228,8 +268,7 @@ is_my_route(Domain) -> error -> erlang:error({invalid_domain, Domain}); LDomain -> - Mod = get_backend(), - Mod:is_my_route(LDomain) + lists:member(LDomain, get_all_routes()) end. -spec is_my_host(binary()) -> boolean(). @@ -238,8 +277,10 @@ is_my_host(Domain) -> error -> erlang:error({invalid_domain, Domain}); LDomain -> - Mod = get_backend(), - Mod:is_my_host(LDomain) + case find_routes(LDomain) of + [#route{server_host = LDomain}|_] -> true; + _ -> false + end end. -spec process_iq(iq()) -> any(). @@ -250,12 +291,20 @@ process_iq(#iq{to = To} = IQ) -> ejabberd_sm:process_iq(IQ) end. +-spec config_reloaded() -> ok. +config_reloaded() -> + Mod = get_backend(), + init_cache(Mod). + %%==================================================================== %% gen_server callbacks %%==================================================================== init([]) -> + ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50), Mod = get_backend(), + init_cache(Mod), Mod:init(), + clean_cache(), {ok, #state{}}. handle_call(_Request, _From, State) -> @@ -273,7 +322,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, _State) -> - ok. + ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -290,8 +339,7 @@ do_route(OrigPacket) -> Packet -> To = xmpp:get_to(Packet), LDstDomain = To#jid.lserver, - Mod = get_backend(), - case Mod:find_routes(LDstDomain) of + case find_routes(LDstDomain) of [] -> ejabberd_s2s:route(Packet); [Route] -> @@ -366,6 +414,80 @@ get_backend() -> end, list_to_atom("ejabberd_router_" ++ atom_to_list(DBType)). +-spec cache_nodes(module()) -> [node()]. +cache_nodes(Mod) -> + case erlang:function_exported(Mod, cache_nodes, 0) of + true -> Mod:cache_nodes(); + false -> ejabberd_cluster:get_nodes() + end. + +-spec use_cache(module()) -> boolean(). +use_cache(Mod) -> + case erlang:function_exported(Mod, use_cache, 0) of + true -> Mod:use_cache(); + false -> + ejabberd_config:get_option( + router_use_cache, opt_type(router_use_cache), + ejabberd_config:use_cache(global)) + end. + +-spec delete_cache(module(), binary()) -> ok. +delete_cache(Mod, Domain) -> + case use_cache(Mod) of + true -> + ets_cache:delete(?ROUTES_CACHE, {route, Domain}, cache_nodes(Mod)), + ets_cache:delete(?ROUTES_CACHE, routes, cache_nodes(Mod)); + false -> + ok + end. + +-spec init_cache(module()) -> ok. +init_cache(Mod) -> + case use_cache(Mod) of + true -> + ets_cache:new(?ROUTES_CACHE, cache_opts()); + false -> + ets_cache:delete(?ROUTES_CACHE) + end. + +-spec cache_opts() -> [proplists:property()]. +cache_opts() -> + MaxSize = ejabberd_config:get_option( + router_cache_size, + opt_type(router_cache_size), + ejabberd_config:cache_size(global)), + CacheMissed = ejabberd_config:get_option( + router_cache_missed, + opt_type(router_cache_missed), + ejabberd_config:cache_missed(global)), + LifeTime = case ejabberd_config:get_option( + router_cache_life_time, + opt_type(router_cache_life_time), + ejabberd_config:cache_life_time(global)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec clean_cache(node()) -> ok. +clean_cache(Node) -> + ets_cache:filter( + ?ROUTES_CACHE, + fun(_, error) -> + false; + (routes, _) -> + false; + ({route, _}, {ok, Rs}) -> + not lists:any( + fun(#route{pid = Pid}) -> + node(Pid) == Node + end, Rs) + end). + +-spec clean_cache() -> ok. +clean_cache() -> + ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]). + opt_type(domain_balancing) -> fun (random) -> random; (source) -> source; @@ -376,6 +498,14 @@ opt_type(domain_balancing) -> opt_type(domain_balancing_component_number) -> fun (N) when is_integer(N), N > 1 -> N end; opt_type(router_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; +opt_type(O) when O == router_use_cache; O == router_cache_missed -> + fun(B) when is_boolean(B) -> B end; +opt_type(O) when O == router_cache_size; O == router_cache_life_time -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; opt_type(_) -> [domain_balancing, domain_balancing_component_number, - router_db_type]. + router_db_type, router_use_cache, router_cache_size, + router_cache_missed, router_cache_life_time]. diff --git a/src/ejabberd_router_mnesia.erl b/src/ejabberd_router_mnesia.erl index e3b550a75..d8664fee9 100644 --- a/src/ejabberd_router_mnesia.erl +++ b/src/ejabberd_router_mnesia.erl @@ -25,8 +25,7 @@ %% API -export([init/0, register_route/5, unregister_route/3, find_routes/1, - host_of_route/1, is_my_route/1, is_my_host/1, get_all_routes/0, - find_routes/0]). + get_all_routes/0, use_cache/0]). %% gen_server callbacks -export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, code_change/3, start_link/0]). @@ -54,6 +53,9 @@ init() -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +use_cache() -> + false. + register_route(Domain, ServerHost, LocalHint, undefined, Pid) -> F = fun () -> mnesia:write(#route{domain = Domain, @@ -124,37 +126,15 @@ unregister_route(Domain, _, Pid) -> transaction(F). find_routes(Domain) -> - mnesia:dirty_read(route, Domain). - -host_of_route(Domain) -> - case mnesia:dirty_read(route, Domain) of - [#route{server_host = ServerHost}|_] -> - {ok, ServerHost}; - [] -> - error - end. - -is_my_route(Domain) -> - mnesia:dirty_read(route, Domain) /= []. - -is_my_host(Domain) -> - case mnesia:dirty_read(route, Domain) of - [#route{server_host = Host}|_] -> - Host == Domain; - [] -> - false - end. + {ok, mnesia:dirty_read(route, Domain)}. get_all_routes() -> - mnesia:dirty_select( - route, - ets:fun2ms( - fun(#route{domain = Domain, server_host = ServerHost}) - when Domain /= ServerHost -> Domain - end)). - -find_routes() -> - ets:tab2list(route). + {ok, mnesia:dirty_select( + route, + ets:fun2ms( + fun(#route{domain = Domain, server_host = ServerHost}) + when Domain /= ServerHost -> Domain + end))}. %%%=================================================================== %%% gen_server callbacks @@ -227,7 +207,7 @@ transaction(F) -> ok; {aborted, Reason} -> ?ERROR_MSG("Mnesia transaction failed: ~p", [Reason]), - {error, Reason} + {error, db_failure} end. -spec update_tables() -> ok. diff --git a/src/ejabberd_router_redis.erl b/src/ejabberd_router_redis.erl index 58c1fca4a..2b02a7595 100644 --- a/src/ejabberd_router_redis.erl +++ b/src/ejabberd_router_redis.erl @@ -22,23 +22,37 @@ %%%------------------------------------------------------------------- -module(ejabberd_router_redis). -behaviour(ejabberd_router). +-behaviour(gen_server). %% API -export([init/0, register_route/5, unregister_route/3, find_routes/1, - host_of_route/1, is_my_route/1, is_my_host/1, get_all_routes/0, - find_routes/0]). + get_all_routes/0]). +%% gen_server callbacks +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, + terminate/2, code_change/3, start_link/0]). -include("ejabberd.hrl"). -include("logger.hrl"). -include("ejabberd_router.hrl"). --define(ROUTES_KEY, "ejabberd:routes"). +-record(state, {}). + +-define(ROUTES_KEY, <<"ejabberd:routes">>). %%%=================================================================== %%% API %%%=================================================================== init() -> - clean_table(). + Spec = {?MODULE, {?MODULE, start_link, []}, + transient, 5000, worker, [?MODULE]}, + case supervisor:start_child(ejabberd_backend_sup, Spec) of + {ok, _Pid} -> ok; + Err -> Err + end. + +-spec start_link() -> {ok, pid()} | {error, any()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). register_route(Domain, ServerHost, LocalHint, _, Pid) -> DomKey = domain_key(Domain), @@ -83,47 +97,48 @@ find_routes(Domain) -> DomKey = domain_key(Domain), case ejabberd_redis:hgetall(DomKey) of {ok, Vals} -> - decode_routes(Domain, Vals); - {error, _} -> - [] - end. - -host_of_route(Domain) -> - DomKey = domain_key(Domain), - case ejabberd_redis:hgetall(DomKey) of - {ok, [{_Pid, Data}|_]} -> - {ServerHost, _} = binary_to_term(Data), - {ok, ServerHost}; + {ok, decode_routes(Domain, Vals)}; _ -> - error - end. - -is_my_route(Domain) -> - case ejabberd_redis:sismember(?ROUTES_KEY, Domain) of - {ok, Bool} -> - Bool; - {error, _} -> - false + {error, db_failure} end. -is_my_host(Domain) -> - {ok, Domain} == host_of_route(Domain). - get_all_routes() -> case ejabberd_redis:smembers(?ROUTES_KEY) of {ok, Routes} -> - Routes; - {error, _} -> - [] + {ok, Routes}; + _ -> + {error, db_failure} end. -find_routes() -> - lists:flatmap(fun find_routes/1, get_all_routes()). +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + clean_table(), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(Info, State) -> + ?ERROR_MSG("unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== clean_table() -> + ?INFO_MSG("Cleaning Redis route entries...", []), lists:foreach( fun(#route{domain = Domain, pid = Pid}) when node(Pid) == node() -> unregister_route(Domain, undefined, Pid); @@ -131,6 +146,20 @@ clean_table() -> ok end, find_routes()). +find_routes() -> + case get_all_routes() of + {ok, Domains} -> + lists:flatmap( + fun(Domain) -> + case find_routes(Domain) of + {ok, Routes} -> Routes; + {error, _} -> [] + end + end, Domains); + {error, _} -> + [] + end. + domain_key(Domain) -> <<"ejabberd:route:", Domain/binary>>. diff --git a/src/ejabberd_router_sql.erl b/src/ejabberd_router_sql.erl index 0747d0396..b354eb212 100644 --- a/src/ejabberd_router_sql.erl +++ b/src/ejabberd_router_sql.erl @@ -27,8 +27,7 @@ %% API -export([init/0, register_route/5, unregister_route/3, find_routes/1, - host_of_route/1, is_my_route/1, is_my_host/1, get_all_routes/0, - find_routes/0]). + get_all_routes/0]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -64,80 +63,47 @@ register_route(Domain, ServerHost, LocalHint, _, Pid) -> ok; Err -> ?ERROR_MSG("failed to update 'route' table: ~p", [Err]), - {error, Err} + {error, db_failure} end. unregister_route(Domain, _, Pid) -> PidS = misc:encode_pid(Pid), Node = erlang:atom_to_binary(node(Pid), latin1), - ejabberd_sql:sql_query( - ?MYNAME, - ?SQL("delete from route where domain=%(Domain)s " - "and pid=%(PidS)s and node=%(Node)s")), - %% TODO: return meaningful error - ok. - -find_routes(Domain) -> case ejabberd_sql:sql_query( ?MYNAME, - ?SQL("select @(server_host)s, @(node)s, @(pid)s, @(local_hint)s " - "from route where domain=%(Domain)s")) of - {selected, Rows} -> - lists:flatmap( - fun(Row) -> - row_to_route(Domain, Row) - end, Rows); + ?SQL("delete from route where domain=%(Domain)s " + "and pid=%(PidS)s and node=%(Node)s")) of + {updated, _} -> + ok; Err -> - ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]), - {error, Err} + ?ERROR_MSG("failed to delete from 'route' table: ~p", [Err]), + {error, db_failure} end. -host_of_route(Domain) -> +find_routes(Domain) -> case ejabberd_sql:sql_query( ?MYNAME, - ?SQL("select @(server_host)s from route where domain=%(Domain)s")) of - {selected, [{ServerHost}|_]} -> - {ok, ServerHost}; - {selected, []} -> - error; + ?SQL("select @(server_host)s, @(node)s, @(pid)s, @(local_hint)s " + "from route where domain=%(Domain)s")) of + {selected, Rows} -> + {ok, lists:flatmap( + fun(Row) -> + row_to_route(Domain, Row) + end, Rows)}; Err -> ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]), - error + {error, db_failure} end. -is_my_route(Domain) -> - case host_of_route(Domain) of - {ok, _} -> true; - _ -> false - end. - -is_my_host(Domain) -> - {ok, Domain} == host_of_route(Domain). - get_all_routes() -> case ejabberd_sql:sql_query( ?MYNAME, ?SQL("select @(domain)s from route where domain <> server_host")) of {selected, Domains} -> - [Domain || {Domain} <- Domains]; - Err -> - ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]), - [] - end. - -find_routes() -> - case ejabberd_sql:sql_query( - ?MYNAME, - ?SQL("select @(domain)s, @(server_host)s, @(node)s, @(pid)s, " - "@(local_hint)s from route")) of - {selected, Rows} -> - lists:flatmap( - fun({Domain, ServerHost, Node, Pid, LocalHint}) -> - row_to_route(Domain, {ServerHost, Node, Pid, LocalHint}) - end, Rows); + {ok, [Domain || {Domain} <- Domains]}; Err -> ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]), - [] + {error, db_failure} end. %%%=================================================================== diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 7c63292fc..1cd911e11 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -76,7 +76,9 @@ c2s_handle_info/2, host_up/1, host_down/1, - make_sid/0 + make_sid/0, + clean_cache/1, + config_reloaded/0 ]). -export([init/1, handle_call/3, handle_cast/2, @@ -91,13 +93,15 @@ -include("ejabberd_sm.hrl"). -callback init() -> ok | {error, any()}. --callback set_session(#session{}) -> ok. --callback delete_session(binary(), binary(), binary(), sid()) -> - {ok, #session{}} | {error, notfound}. +-callback set_session(#session{}) -> ok | {error, any()}. +-callback delete_session(#session{}) -> ok | {error, any()}. -callback get_sessions() -> [#session{}]. -callback get_sessions(binary()) -> [#session{}]. --callback get_sessions(binary(), binary()) -> [#session{}]. --callback get_sessions(binary(), binary(), binary()) -> [#session{}]. +-callback get_sessions(binary(), binary()) -> {ok, [#session{}]} | {error, any()}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). -record(state, {}). @@ -158,9 +162,12 @@ close_session(SID, User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - Info = case Mod:delete_session(LUser, LServer, LResource, SID) of - {ok, #session{info = I}} -> I; - {error, notfound} -> [] + Info = case get_sessions(Mod, LUser, LServer, LResource) of + [#session{info = I} = Session|_] -> + delete_session(Mod, Session), + I; + [] -> + [] end, JID = jid:make(User, Server, Resource), ejabberd_hooks:run(sm_remove_connection_hook, @@ -196,14 +203,14 @@ get_user_resources(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), [element(3, S#session.usr) || S <- clean_session_list(Ss)]. -spec get_user_present_resources(binary(), binary()) -> [tuple()]. get_user_present_resources(LUser, LServer) -> Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), [{S#session.priority, element(3, S#session.usr)} || S <- clean_session_list(Ss), is_integer(S#session.priority)]. @@ -214,7 +221,7 @@ get_user_ip(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> undefined; Ss -> @@ -227,7 +234,7 @@ get_user_info(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), [{LResource, [{node, node(Pid)}|Info]} || #session{usr = {_, _, LResource}, info = Info, @@ -240,7 +247,7 @@ get_user_info(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> offline; Ss -> @@ -288,7 +295,7 @@ get_session_pid(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [#session{sid = {_, Pid}}] -> Pid; _ -> none end. @@ -309,7 +316,7 @@ get_offline_info(Time, User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case Mod:get_sessions(LUser, LServer, LResource) of + case get_sessions(Mod, LUser, LServer, LResource) of [#session{sid = {Time, _}, info = Info}] -> case proplists:get_bool(offline, Info) of true -> @@ -326,7 +333,7 @@ get_offline_info(Time, User, Server, Resource) -> dirty_get_sessions_list() -> lists:flatmap( fun(Mod) -> - [S#session.usr || S <- online(Mod:get_sessions())] + [S#session.usr || S <- online(get_sessions(Mod))] end, get_sm_backends()). -spec dirty_get_my_sessions_list() -> [#session{}]. @@ -334,7 +341,7 @@ dirty_get_sessions_list() -> dirty_get_my_sessions_list() -> lists:flatmap( fun(Mod) -> - [S || S <- online(Mod:get_sessions()), + [S || S <- online(get_sessions(Mod)), node(element(2, S#session.sid)) == node()] end, get_sm_backends()). @@ -343,14 +350,14 @@ dirty_get_my_sessions_list() -> get_vh_session_list(Server) -> LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - [S#session.usr || S <- online(Mod:get_sessions(LServer))]. + [S#session.usr || S <- online(get_sessions(Mod, LServer))]. -spec get_all_pids() -> [pid()]. get_all_pids() -> lists:flatmap( fun(Mod) -> - [element(2, S#session.sid) || S <- online(Mod:get_sessions())] + [element(2, S#session.sid) || S <- online(get_sessions(Mod))] end, get_sm_backends()). -spec get_vh_session_number(binary()) -> non_neg_integer(). @@ -358,7 +365,7 @@ get_all_pids() -> get_vh_session_number(Server) -> LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - length(online(Mod:get_sessions(LServer))). + length(online(get_sessions(Mod, LServer))). -spec register_iq_handler(binary(), binary(), atom(), atom(), list()) -> ok. @@ -387,16 +394,23 @@ c2s_handle_info(#{lang := Lang} = State, {exit, Reason}) -> c2s_handle_info(State, _) -> State. +-spec config_reloaded() -> ok. +config_reloaded() -> + init_cache(). + %%==================================================================== %% gen_server callbacks %%==================================================================== init([]) -> process_flag(trap_exit, true), + init_cache(), lists:foreach(fun(Mod) -> Mod:init() end, get_sm_backends()), + clean_cache(), ets:new(sm_iqtable, [named_table, public, {read_concurrency, true}]), ejabberd_hooks:add(host_up, ?MODULE, host_up, 50), ejabberd_hooks:add(host_down, ?MODULE, host_down, 60), + ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50), lists:foreach(fun host_up/1, ?MYHOSTS), ejabberd_commands:register_commands(get_commands_spec()), {ok, #state{}}. @@ -432,6 +446,7 @@ terminate(_Reason, _State) -> lists:foreach(fun host_down/1, ?MYHOSTS), ejabberd_hooks:delete(host_up, ?MODULE, host_up, 50), ejabberd_hooks:delete(host_down, ?MODULE, host_down, 60), + ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 50), ejabberd_commands:unregister_commands(get_commands_spec()), ok. @@ -460,7 +475,7 @@ host_down(Host) -> ejabberd_c2s:send(Pid, xmpp:serr_system_shutdown()); (_) -> ok - end, Mod:get_sessions(Host)), + end, get_sessions(Mod, Host)), ejabberd_hooks:delete(c2s_handle_info, Host, ejabberd_sm, c2s_handle_info, 50), ejabberd_hooks:delete(roster_in_subscription, Host, @@ -472,7 +487,7 @@ host_down(Host) -> ejabberd_c2s:host_down(Host). -spec set_session(sid(), binary(), binary(), binary(), - prio(), info()) -> ok. + prio(), info()) -> ok | {error, any()}. set_session(SID, User, Server, Resource, Priority, Info) -> LUser = jid:nodeprep(User), @@ -481,8 +496,69 @@ set_session(SID, User, Server, Resource, Priority, Info) -> US = {LUser, LServer}, USR = {LUser, LServer, LResource}, Mod = get_sm_backend(LServer), - Mod:set_session(#session{sid = SID, usr = USR, us = US, - priority = Priority, info = Info}). + case Mod:set_session(#session{sid = SID, usr = USR, us = US, + priority = Priority, info = Info}) of + ok -> + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?SM_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)); + false -> + ok + end; + {error, _} = Err -> + Err + end. + +-spec get_sessions(module()) -> [#session{}]. +get_sessions(Mod) -> + Mod:get_sessions(). + +-spec get_sessions(module(), binary()) -> [#session{}]. +get_sessions(Mod, LServer) -> + Mod:get_sessions(LServer). + +-spec get_sessions(module(), binary(), binary()) -> [#session{}]. +get_sessions(Mod, LUser, LServer) -> + case use_cache(Mod, LServer) of + true -> + case ets_cache:lookup( + ?SM_CACHE, {LUser, LServer}, + fun() -> + case Mod:get_sessions(LUser, LServer) of + {ok, Ss} when Ss /= [] -> + {ok, Ss}; + _ -> + error + end + end) of + {ok, Sessions} -> + Sessions; + error -> + [] + end; + false -> + case Mod:get_sessions(LUser, LServer) of + {ok, Ss} -> Ss; + _ -> [] + end + end. + +-spec get_sessions(module(), binary(), binary(), binary()) -> [#session{}]. +get_sessions(Mod, LUser, LServer, LResource) -> + Sessions = get_sessions(Mod, LUser, LServer), + [S || S <- Sessions, element(3, S#session.usr) == LResource]. + +-spec delete_session(module(), #session{}) -> ok. +delete_session(Mod, #session{usr = {LUser, LServer, _}} = Session) -> + Mod:delete_session(Session), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?SM_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)); + false -> + ok + end. -spec online([#session{}]) -> [#session{}]. @@ -505,7 +581,7 @@ do_route(To, Term) -> ?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]), {U, S, R} = jid:tolower(To), Mod = get_sm_backend(S), - case online(Mod:get_sessions(U, S, R)) of + case online(get_sessions(Mod, U, S, R)) of [] -> ?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]); Ss -> @@ -541,7 +617,7 @@ do_route(#presence{from = From, to = To, type = T, status = Status} = Packet) ejabberd_c2s:route(Pid, {route, Packet1}); (_) -> ok - end, online(Mod:get_sessions(LUser, LServer))); + end, online(get_sessions(Mod, LUser, LServer))); false -> ok end; @@ -570,7 +646,7 @@ do_route(Packet) -> To = xmpp:get_to(Packet), {LUser, LServer, LResource} = jid:tolower(To), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> case Packet of #message{type = T} when T == chat; T == normal -> @@ -618,7 +694,7 @@ route_message(#message{to = To, type = Type} = Packet) -> (P >= 0) and (Type == headline) -> LResource = jid:resourceprep(R), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> ok; % Race condition @@ -689,10 +765,10 @@ check_for_sessions_to_replace(User, Server, Resource) -> -spec check_existing_resources(binary(), binary(), binary()) -> ok. check_existing_resources(LUser, LServer, LResource) -> Mod = get_sm_backend(LServer), - Ss = Mod:get_sessions(LUser, LServer, LResource), + Ss = get_sessions(Mod, LUser, LServer, LResource), {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss), - lists:foreach(fun(#session{sid = S}) -> - Mod:delete_session(LUser, LServer, LResource, S) + lists:foreach(fun(S) -> + delete_session(Mod, S) end, OfflineSs), if OnlineSs == [] -> ok; true -> @@ -716,12 +792,12 @@ get_resource_sessions(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - [S#session.sid || S <- online(Mod:get_sessions(LUser, LServer, LResource))]. + [S#session.sid || S <- online(get_sessions(Mod, LUser, LServer, LResource))]. -spec check_max_sessions(binary(), binary()) -> ok | replaced. check_max_sessions(LUser, LServer) -> Mod = get_sm_backend(LServer), - Ss = Mod:get_sessions(LUser, LServer), + Ss = get_sessions(Mod, LUser, LServer), {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss), MaxSessions = get_max_user_sessions(LUser, LServer), if length(OnlineSs) =< MaxSessions -> ok; @@ -731,8 +807,7 @@ check_max_sessions(LUser, LServer) -> end, if length(OfflineSs) =< MaxSessions -> ok; true -> - #session{sid = SID, usr = {_, _, R}} = lists:min(OfflineSs), - Mod:delete_session(LUser, LServer, R, SID) + delete_session(Mod, lists:min(OfflineSs)) end. %% Get the user_max_session setting @@ -779,7 +854,7 @@ process_iq(#iq{}) -> force_update_presence({LUser, LServer}) -> Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), lists:foreach(fun (#session{sid = {_, Pid}}) -> ejabberd_c2s:route(Pid, force_update_presence) end, @@ -811,6 +886,80 @@ get_vh_by_backend(Mod) -> get_sm_backend(Host) == Mod end, ?MYHOSTS). +%%-------------------------------------------------------------------- +%%% Cache stuff +%%-------------------------------------------------------------------- +-spec init_cache() -> ok. +init_cache() -> + case use_cache() of + true -> + ets_cache:new(?SM_CACHE, cache_opts()); + false -> + ets_cache:delete(?SM_CACHE) + end. + +-spec cache_opts() -> [proplists:property()]. +cache_opts() -> + MaxSize = ejabberd_config:get_option( + sm_cache_size, + opt_type(sm_cache_size), + ejabberd_config:cache_size(global)), + CacheMissed = ejabberd_config:get_option( + sm_cache_missed, + opt_type(sm_cache_missed), + ejabberd_config:cache_missed(global)), + LifeTime = case ejabberd_config:get_option( + sm_cache_life_time, + opt_type(sm_cache_life_time), + ejabberd_config:cache_life_time(global)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec clean_cache(node()) -> ok. +clean_cache(Node) -> + ets_cache:filter( + ?SM_CACHE, + fun(_, error) -> + false; + (_, {ok, Ss}) -> + not lists:any( + fun(#session{sid = {_, Pid}}) -> + node(Pid) == Node + end, Ss) + end). + +-spec clean_cache() -> ok. +clean_cache() -> + ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]). + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, LServer) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(LServer); + false -> + ejabberd_config:get_option( + {sm_use_cache, LServer}, + ejabberd_sm:opt_type(sm_use_cache), + ejabberd_config:use_cache(LServer)) + end. + +-spec use_cache() -> boolean(). +use_cache() -> + lists:any( + fun(Host) -> + Mod = get_sm_backend(Host), + use_cache(Mod, Host) + end, ?MYHOSTS). + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, LServer) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(LServer); + false -> ejabberd_cluster:get_nodes() + end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% ejabberd commands @@ -869,4 +1018,13 @@ make_sid() -> {p1_time_compat:unique_timestamp(), self()}. opt_type(sm_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; -opt_type(_) -> [sm_db_type]. +opt_type(O) when O == sm_use_cache; O == sm_cache_missed -> + fun(B) when is_boolean(B) -> B end; +opt_type(O) when O == sm_cache_size; O == sm_cache_life_time -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; +opt_type(_) -> + [sm_db_type, sm_use_cache, sm_cache_size, sm_cache_missed, + sm_cache_life_time]. diff --git a/src/ejabberd_sm_mnesia.erl b/src/ejabberd_sm_mnesia.erl index 35fc42e9d..99e53fa12 100644 --- a/src/ejabberd_sm_mnesia.erl +++ b/src/ejabberd_sm_mnesia.erl @@ -29,12 +29,12 @@ %% API -export([init/0, + use_cache/1, set_session/1, - delete_session/4, + delete_session/1, get_sessions/0, get_sessions/1, - get_sessions/2, - get_sessions/3]). + get_sessions/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -62,20 +62,17 @@ init() -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec use_cache(binary()) -> boolean(). +use_cache(_LServer) -> + false. + -spec set_session(#session{}) -> ok. set_session(Session) -> mnesia:dirty_write(Session). --spec delete_session(binary(), binary(), binary(), sid()) -> - {ok, #session{}} | {error, notfound}. -delete_session(_LUser, _LServer, _LResource, SID) -> - case mnesia:dirty_read(session, SID) of - [Session] -> - mnesia:dirty_delete(session, SID), - {ok, Session}; - [] -> - {error, notfound} - end. +-spec delete_session(#session{}) -> ok. +delete_session(#session{sid = SID}) -> + mnesia:dirty_delete(session, SID). -spec get_sessions() -> [#session{}]. get_sessions() -> @@ -87,13 +84,9 @@ get_sessions(LServer) -> [{#session{usr = '$1', _ = '_'}, [{'==', {element, 2, '$1'}, LServer}], ['$_']}]). --spec get_sessions(binary(), binary()) -> [#session{}]. +-spec get_sessions(binary(), binary()) -> {ok, [#session{}]}. get_sessions(LUser, LServer) -> - mnesia:dirty_index_read(session, {LUser, LServer}, #session.us). - --spec get_sessions(binary(), binary(), binary()) -> [#session{}]. -get_sessions(LUser, LServer, LResource) -> - mnesia:dirty_index_read(session, {LUser, LServer, LResource}, #session.usr). + {ok, mnesia:dirty_index_read(session, {LUser, LServer}, #session.us)}. %%%=================================================================== %%% gen_server callbacks diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl index 4854bf8a6..4314f8d27 100644 --- a/src/ejabberd_sm_redis.erl +++ b/src/ejabberd_sm_redis.erl @@ -23,63 +23,86 @@ %%%---------------------------------------------------------------------- -module(ejabberd_sm_redis). - +-ifndef(GEN_SERVER). +-define(GEN_SERVER, p1_server). +-endif. +-behaviour(?GEN_SERVER). -behaviour(ejabberd_config). -behaviour(ejabberd_sm). --export([init/0, set_session/1, delete_session/4, +-export([init/0, set_session/1, delete_session/1, get_sessions/0, get_sessions/1, get_sessions/2, - get_sessions/3, opt_type/1]). + cache_nodes/1, opt_type/1]). +%% gen_server callbacks +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, + terminate/2, code_change/3, start_link/0]). -include("ejabberd.hrl"). -include("ejabberd_sm.hrl"). -include("logger.hrl"). +-define(SM_KEY, <<"ejabberd:sm">>). +-record(state, {}). + %%%=================================================================== %%% API %%%=================================================================== -spec init() -> ok | {error, any()}. init() -> - clean_table(). + Spec = {?MODULE, {?MODULE, start_link, []}, + transient, 5000, worker, [?MODULE]}, + case supervisor:start_child(ejabberd_backend_sup, Spec) of + {ok, _Pid} -> ok; + Err -> Err + end. + +-spec start_link() -> {ok, pid()} | {error, any()}. +start_link() -> + ?GEN_SERVER:start_link({local, ?MODULE}, ?MODULE, [], []). --spec set_session(#session{}) -> ok. +-spec cache_nodes(binary()) -> [node()]. +cache_nodes(_LServer) -> + [node()]. + +-spec set_session(#session{}) -> ok | {error, ejabberd_redis:error_reason()}. set_session(Session) -> T = term_to_binary(Session), USKey = us_to_key(Session#session.us), SIDKey = sid_to_key(Session#session.sid), ServKey = server_to_key(element(2, Session#session.us)), USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid), - ejabberd_redis:multi( - fun() -> - ejabberd_redis:hset(USKey, SIDKey, T), - ejabberd_redis:hset(ServKey, USSIDKey, T) - end), - ok. + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hset(USKey, SIDKey, T), + ejabberd_redis:hset(ServKey, USSIDKey, T), + ejabberd_redis:publish( + ?SM_KEY, term_to_binary({delete, Session#session.us})) + end) of + {ok, _} -> + ok; + Err -> + Err + end. --spec delete_session(binary(), binary(), binary(), sid()) -> - {ok, #session{}} | {error, notfound}. -delete_session(LUser, LServer, _LResource, SID) -> - USKey = us_to_key({LUser, LServer}), - case ejabberd_redis:hgetall(USKey) of - {ok, Vals} -> - Ss = decode_session_list(Vals), - case lists:keyfind(SID, #session.sid, Ss) of - false -> - {error, notfound}; - Session -> - SIDKey = sid_to_key(SID), - ServKey = server_to_key(element(2, Session#session.us)), - USSIDKey = us_sid_to_key(Session#session.us, SID), - ejabberd_redis:multi( - fun() -> - ejabberd_redis:hdel(USKey, [SIDKey]), - ejabberd_redis:hdel(ServKey, [USSIDKey]) - end), - {ok, Session} - end; - {error, _} -> - {error, notfound} +-spec delete_session(#session{}) -> ok | {error, ejabberd_redis:error_reason()}. +delete_session(#session{sid = SID} = Session) -> + USKey = us_to_key(Session#session.us), + SIDKey = sid_to_key(SID), + ServKey = server_to_key(element(2, Session#session.us)), + USSIDKey = us_sid_to_key(Session#session.us, SID), + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hdel(USKey, [SIDKey]), + ejabberd_redis:hdel(ServKey, [USSIDKey]), + ejabberd_redis:publish( + ?SM_KEY, + term_to_binary({delete, Session#session.us})) + end) of + {ok, _} -> + ok; + Err -> + Err end. -spec get_sessions() -> [#session{}]. @@ -99,27 +122,49 @@ get_sessions(LServer) -> [] end. --spec get_sessions(binary(), binary()) -> [#session{}]. +-spec get_sessions(binary(), binary()) -> {ok, [#session{}]} | + {error, ejabberd_redis:error_reason()}. get_sessions(LUser, LServer) -> USKey = us_to_key({LUser, LServer}), case ejabberd_redis:hgetall(USKey) of {ok, Vals} -> - decode_session_list(Vals); - {error, _} -> - [] + {ok, decode_session_list(Vals)}; + Err -> + Err end. --spec get_sessions(binary(), binary(), binary()) -> - [#session{}]. -get_sessions(LUser, LServer, LResource) -> - USKey = us_to_key({LUser, LServer}), - case ejabberd_redis:hgetall(USKey) of - {ok, Vals} -> - [S || S <- decode_session_list(Vals), - element(3, S#session.usr) == LResource]; - {error, _} -> - [] - end. +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + ejabberd_redis:subscribe([?SM_KEY]), + clean_table(), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({redis_message, ?SM_KEY, Data}, State) -> + case binary_to_term(Data) of + {delete, Key} -> + ets_cache:delete(?SM_CACHE, Key); + Msg -> + ?WARNING_MSG("unexpected redis message: ~p", [Msg]) + end, + {noreply, State}; +handle_info(Info, State) -> + ?ERROR_MSG("unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions diff --git a/src/ejabberd_sm_sql.erl b/src/ejabberd_sm_sql.erl index 04f03f750..3fda1f5e5 100644 --- a/src/ejabberd_sm_sql.erl +++ b/src/ejabberd_sm_sql.erl @@ -31,11 +31,10 @@ %% API -export([init/0, set_session/1, - delete_session/4, + delete_session/1, get_sessions/0, get_sessions/1, - get_sessions/2, - get_sessions/3]). + get_sessions/2]). -include("ejabberd.hrl"). -include("ejabberd_sm.hrl"). @@ -81,30 +80,21 @@ set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R}, ok -> ok; Err -> - ?ERROR_MSG("failed to update 'sm' table: ~p", [Err]) + ?ERROR_MSG("failed to update 'sm' table: ~p", [Err]), + {error, db_failure} end. -delete_session(_LUser, LServer, _LResource, {Now, Pid}) -> +delete_session(#session{usr = {_, LServer, _}, sid = {Now, Pid}}) -> TS = now_to_timestamp(Now), PidS = list_to_binary(erlang:pid_to_list(Pid)), case ejabberd_sql:sql_query( LServer, - ?SQL("select @(usec)d, @(pid)s, @(node)s, @(username)s," - " @(resource)s, @(priority)s, @(info)s " - "from sm where usec=%(TS)d and pid=%(PidS)s")) of - {selected, [Row]} -> - ejabberd_sql:sql_query( - LServer, - ?SQL("delete from sm" - " where usec=%(TS)d and pid=%(PidS)s")), - try {ok, row_to_session(LServer, Row)} - catch _:{bad_node, _} -> {error, notfound} - end; - {selected, []} -> - {error, notfound}; + ?SQL("delete from sm where usec=%(TS)d and pid=%(PidS)s")) of + {updated, _} -> + ok; Err -> ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err]), - {error, notfound} + {error, db_failure} end. get_sessions() -> @@ -137,33 +127,15 @@ get_sessions(LUser, LServer) -> " @(resource)s, @(priority)s, @(info)s from sm" " where username=%(LUser)s")) of {selected, Rows} -> - lists:flatmap( - fun(Row) -> - try [row_to_session(LServer, Row)] - catch _:{bad_node, _} -> [] - end - end, Rows); - Err -> - ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]), - [] - end. - -get_sessions(LUser, LServer, LResource) -> - case ejabberd_sql:sql_query( - LServer, - ?SQL("select @(usec)d, @(pid)s, @(node)s, @(username)s," - " @(resource)s, @(priority)s, @(info)s from sm" - " where username=%(LUser)s and resource=%(LResource)s")) of - {selected, Rows} -> - lists:flatmap( - fun(Row) -> - try [row_to_session(LServer, Row)] - catch _:{bad_node, _} -> [] - end - end, Rows); + {ok, lists:flatmap( + fun(Row) -> + try [row_to_session(LServer, Row)] + catch _:{bad_node, _} -> [] + end + end, Rows)}; Err -> ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]), - [] + {error, db_failure} end. %%%=================================================================== diff --git a/src/mod_bosh.erl b/src/mod_bosh.erl index 57c819537..c2bf7600e 100644 --- a/src/mod_bosh.erl +++ b/src/mod_bosh.erl @@ -34,7 +34,7 @@ -export([start_link/0]). -export([start/2, stop/1, reload/3, process/2, open_session/2, - close_session/1, find_session/1]). + close_session/1, find_session/1, clean_cache/1]). -export([depends/2, mod_opt_type/1]). @@ -46,9 +46,13 @@ -include("bosh.hrl"). -callback init() -> any(). --callback open_session(binary(), pid()) -> any(). --callback close_session(binary()) -> any(). --callback find_session(binary()) -> {ok, pid()} | error. +-callback open_session(binary(), pid()) -> ok | {error, any()}. +-callback close_session(binary()) -> ok | {error, any()}. +-callback find_session(binary()) -> {ok, pid()} | {error, any()}. +-callback use_cache() -> boolean(). +-callback cache_nodes() -> [node()]. + +-optional_callbacks([use_cache/0, cache_nodes/0]). %%%---------------------------------------------------------------------- %%% API @@ -76,22 +80,48 @@ process(_Path, _Request) -> #xmlel{name = <<"h1">>, attrs = [], children = [{xmlcdata, <<"400 Bad Request">>}]}}. +-spec open_session(binary(), pid()) -> ok | {error, any()}. open_session(SID, Pid) -> Mod = gen_mod:ram_db_mod(global, ?MODULE), - Mod:open_session(SID, Pid). + case Mod:open_session(SID, Pid) of + ok -> + delete_cache(Mod, SID); + {error, _} = Err -> + Err + end. +-spec close_session(binary()) -> ok. close_session(SID) -> Mod = gen_mod:ram_db_mod(global, ?MODULE), - Mod:close_session(SID). + Mod:close_session(SID), + delete_cache(Mod, SID). +-spec find_session(binary()) -> {ok, pid()} | error. find_session(SID) -> Mod = gen_mod:ram_db_mod(global, ?MODULE), - Mod:find_session(SID). + case use_cache(Mod) of + true -> + ets_cache:lookup( + ?BOSH_CACHE, SID, + fun() -> + case Mod:find_session(SID) of + {ok, Pid} -> {ok, Pid}; + {error, _} -> error + end + end); + false -> + case Mod:find_session(SID) of + {ok, Pid} -> {ok, Pid}; + {error, _} -> error + end + end. start(Host, Opts) -> start_jiffy(Opts), Mod = gen_mod:ram_db_mod(global, ?MODULE), + init_cache(Mod), Mod:init(), + clean_cache(), TmpSup = gen_mod:get_module_proc(Host, ?MODULE), TmpSupSpec = {TmpSup, {ejabberd_tmp_sup, start_link, [TmpSup, ejabberd_bosh]}, @@ -106,6 +136,7 @@ stop(Host) -> reload(_Host, NewOpts, _OldOpts) -> start_jiffy(NewOpts), Mod = gen_mod:ram_db_mod(global, ?MODULE), + init_cache(Mod), Mod:init(), ok. @@ -160,9 +191,87 @@ mod_opt_type(ram_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; mod_opt_type(queue_type) -> fun(ram) -> ram; (file) -> file end; +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(O) when O == cache_size; O == cache_life_time -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; mod_opt_type(_) -> [json, max_concat, max_inactivity, max_pause, prebind, ram_db_type, - queue_type]. + queue_type, use_cache, cache_size, cache_missed, cache_life_time]. + +%%%---------------------------------------------------------------------- +%%% Cache stuff +%%%---------------------------------------------------------------------- +-spec init_cache(module()) -> ok. +init_cache(Mod) -> + case use_cache(Mod) of + true -> + ets_cache:new(?BOSH_CACHE, cache_opts()); + false -> + ets_cache:delete(?BOSH_CACHE) + end. + +-spec use_cache(module()) -> boolean(). +use_cache(Mod) -> + case erlang:function_exported(Mod, use_cache, 0) of + true -> Mod:use_cache(); + false -> + gen_mod:get_module_opt( + global, ?MODULE, use_cache, mod_opt_type(use_cache), + ejabberd_config:use_cache(global)) + end. + +-spec cache_nodes(module()) -> [node()]. +cache_nodes(Mod) -> + case erlang:function_exported(Mod, cache_nodes, 0) of + true -> Mod:cache_nodes(); + false -> ejabberd_cluster:get_nodes() + end. + +-spec delete_cache(module(), binary()) -> ok. +delete_cache(Mod, SID) -> + case use_cache(Mod) of + true -> + ets_cache:delete(?BOSH_CACHE, SID, cache_nodes(Mod)); + false -> + ok + end. + +-spec cache_opts() -> [proplists:property()]. +cache_opts() -> + MaxSize = gen_mod:get_module_opt( + global, ?MODULE, cache_size, + mod_opt_type(cache_size), + ejabberd_config:cache_size(global)), + CacheMissed = gen_mod:get_module_opt( + global, ?MODULE, cache_missed, + mod_opt_type(cache_missed), + ejabberd_config:cache_missed(global)), + LifeTime = case gen_mod:get_module_opt( + global, ?MODULE, cache_life_time, + mod_opt_type(cache_life_time), + ejabberd_config:cache_life_time(global)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec clean_cache(node()) -> ok. +clean_cache(Node) -> + ets_cache:filter( + ?BOSH_CACHE, + fun(_, error) -> + false; + (_, {ok, Pid}) -> + node(Pid) /= Node + end). + +-spec clean_cache() -> ok. +clean_cache() -> + ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]). %%%---------------------------------------------------------------------- %%% Help Web Page diff --git a/src/mod_bosh_mnesia.erl b/src/mod_bosh_mnesia.erl index b96d88d14..5954cbe49 100644 --- a/src/mod_bosh_mnesia.erl +++ b/src/mod_bosh_mnesia.erl @@ -25,7 +25,8 @@ -behaviour(mod_bosh). %% mod_bosh API --export([init/0, open_session/2, close_session/1, find_session/1]). +-export([init/0, open_session/2, close_session/1, find_session/1, + use_cache/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -55,6 +56,9 @@ init() -> start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +use_cache() -> + false. + open_session(SID, Pid) -> Session = #bosh{sid = SID, timestamp = p1_time_compat:timestamp(), pid = Pid}, lists:foreach( @@ -82,7 +86,7 @@ find_session(SID) -> [#bosh{pid = Pid}] -> {ok, Pid}; [] -> - error + {error, notfound} end. %%%=================================================================== diff --git a/src/mod_bosh_redis.erl b/src/mod_bosh_redis.erl index 156df368b..194d220a1 100644 --- a/src/mod_bosh_redis.erl +++ b/src/mod_bosh_redis.erl @@ -8,24 +8,45 @@ %%%------------------------------------------------------------------- -module(mod_bosh_redis). -behaviour(mod_bosh). +-behaviour(gen_server). %% API --export([init/0, open_session/2, close_session/1, find_session/1]). +-export([init/0, open_session/2, close_session/1, find_session/1, + cache_nodes/0]). +%% gen_server callbacks +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, + terminate/2, code_change/3, start_link/0]). -include("ejabberd.hrl"). -include("logger.hrl"). +-include("bosh.hrl"). --define(BOSH_KEY, "ejabberd:bosh"). +-record(state, {}). + +-define(BOSH_KEY, <<"ejabberd:bosh">>). %%%=================================================================== %%% API %%%=================================================================== init() -> - clean_table(). + Spec = {?MODULE, {?MODULE, start_link, []}, + transient, 5000, worker, [?MODULE]}, + case supervisor:start_child(ejabberd_backend_sup, Spec) of + {ok, _Pid} -> ok; + Err -> Err + end. + +-spec start_link() -> {ok, pid()} | {error, any()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). open_session(SID, Pid) -> PidBin = term_to_binary(Pid), - case ejabberd_redis:hset(?BOSH_KEY, SID, PidBin) of + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hset(?BOSH_KEY, SID, PidBin), + ejabberd_redis:publish(?BOSH_KEY, SID) + end) of {ok, _} -> ok; {error, _} -> @@ -33,23 +54,63 @@ open_session(SID, Pid) -> end. close_session(SID) -> - ejabberd_redis:hdel(?BOSH_KEY, [SID]), - ok. + case ejabberd_redis:multi( + fun() -> + ejabberd_redis:hdel(?BOSH_KEY, [SID]), + ejabberd_redis:publish(?BOSH_KEY, SID) + end) of + {ok, _} -> + ok; + {error, _} -> + {error, db_failure} + end. find_session(SID) -> case ejabberd_redis:hget(?BOSH_KEY, SID) of - {ok, Pid} when is_binary(Pid) -> + {ok, undefined} -> + {error, notfound}; + {ok, Pid} -> try {ok, binary_to_term(Pid)} catch _:badarg -> ?ERROR_MSG("malformed data in redis (key = '~s'): ~p", [SID, Pid]), - error + {error, db_failure} end; - _ -> - error + {error, _} -> + {error, db_failure} end. +cache_nodes() -> + [node()]. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + clean_table(), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({redis_message, ?BOSH_KEY, SID}, State) -> + ets_cache:delete(?BOSH_CACHE, SID), + {noreply, State}; +handle_info(Info, State) -> + ?ERROR_MSG("unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + %%%=================================================================== %%% Internal functions %%%=================================================================== diff --git a/src/mod_bosh_sql.erl b/src/mod_bosh_sql.erl index 0171ad8f4..73e42868d 100644 --- a/src/mod_bosh_sql.erl +++ b/src/mod_bosh_sql.erl @@ -44,13 +44,18 @@ open_session(SID, Pid) -> ok; Err -> ?ERROR_MSG("failed to update 'bosh' table: ~p", [Err]), - {error, Err} + {error, db_failure} end. close_session(SID) -> - %% TODO: report errors - ejabberd_sql:sql_query( - ?MYNAME, ?SQL("delete from bosh where sid=%(SID)s")). + case ejabberd_sql:sql_query( + ?MYNAME, ?SQL("delete from bosh where sid=%(SID)s")) of + {updated, _} -> + ok; + Err -> + ?ERROR_MSG("failed to delete from 'bosh' table: ~p", [Err]), + {error, db_failure} + end. find_session(SID) -> case ejabberd_sql:sql_query( @@ -58,13 +63,13 @@ find_session(SID) -> ?SQL("select @(pid)s, @(node)s from bosh where sid=%(SID)s")) of {selected, [{Pid, Node}]} -> try {ok, misc:decode_pid(Pid, Node)} - catch _:{bad_node, _} -> error + catch _:{bad_node, _} -> {error, notfound} end; {selected, []} -> - error; + {error, notfound}; Err -> ?ERROR_MSG("failed to select 'bosh' table: ~p", [Err]), - error + {error, db_failure} end. %%%=================================================================== diff --git a/src/mod_carboncopy.erl b/src/mod_carboncopy.erl index a7ae37f45..91c18aabf 100644 --- a/src/mod_carboncopy.erl +++ b/src/mod_carboncopy.erl @@ -36,18 +36,23 @@ -export([user_send_packet/1, user_receive_packet/1, iq_handler/1, remove_connection/4, disco_features/5, - is_carbon_copy/1, mod_opt_type/1, depends/2]). + is_carbon_copy/1, mod_opt_type/1, depends/2, clean_cache/1]). -include("ejabberd.hrl"). -include("logger.hrl"). -include("xmpp.hrl"). +-include("mod_carboncopy.hrl"). -type direction() :: sent | received. -callback init(binary(), gen_mod:opts()) -> any(). -callback enable(binary(), binary(), binary(), binary()) -> ok | {error, any()}. -callback disable(binary(), binary(), binary()) -> ok | {error, any()}. --callback list(binary(), binary()) -> [{binary(), binary()}]. +-callback list(binary(), binary()) -> [{binary(), binary(), node()}]. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). -spec is_carbon_copy(stanza()) -> boolean(). is_carbon_copy(#message{meta = #{carbon_copy := true}}) -> @@ -59,7 +64,9 @@ start(Host, Opts) -> IQDisc = gen_mod:get_opt(iqdisc, Opts,fun gen_iq_handler:check_type/1, one_queue), ejabberd_hooks:add(disco_local_features, Host, ?MODULE, disco_features, 50), Mod = gen_mod:ram_db_mod(Host, ?MODULE), + init_cache(Mod, Host, Opts), Mod:init(Host, Opts), + clean_cache(), ejabberd_hooks:add(unset_presence_hook,Host, ?MODULE, remove_connection, 10), %% why priority 89: to define clearly that we must run BEFORE mod_logdb hook (90) ejabberd_hooks:add(user_send_packet,Host, ?MODULE, user_send_packet, 89), @@ -82,6 +89,12 @@ reload(Host, NewOpts, OldOpts) -> true -> ok end, + case use_cache(NewMod, Host) of + true -> + ets_cache:new(?CARBONCOPY_CACHE, cache_opts(Host, NewOpts)); + false -> + ok + end, case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, fun gen_iq_handler:check_type/1, one_queue) of @@ -247,13 +260,20 @@ build_forward_packet(JID, #message{type = T} = Msg, Sender, Dest, Direction) -> enable(Host, U, R, CC)-> ?DEBUG("enabling for ~p", [U]), Mod = gen_mod:ram_db_mod(Host, ?MODULE), - Mod:enable(U, Host, R, CC). + case Mod:enable(U, Host, R, CC) of + ok -> + delete_cache(Mod, U, Host); + {error, _} = Err -> + Err + end. -spec disable(binary(), binary(), binary()) -> ok | {error, any()}. disable(Host, U, R)-> ?DEBUG("disabling for ~p", [U]), Mod = gen_mod:ram_db_mod(Host, ?MODULE), - Mod:disable(U, Host, R). + Res = Mod:disable(U, Host, R), + delete_cache(Mod, U, Host), + Res. -spec complete_packet(jid(), message(), direction()) -> message(). complete_packet(From, #message{from = undefined} = Msg, sent) -> @@ -276,15 +296,106 @@ is_muc_pm(#jid{lresource = <<>>}, _Packet) -> is_muc_pm(_To, Packet) -> xmpp:has_subtag(Packet, #muc_user{}). --spec list(binary(), binary()) -> [{binary(), binary()}]. -%% list {resource, cc_version} with carbons enabled for given user and host +-spec list(binary(), binary()) -> [{Resource :: binary(), Namespace :: binary()}]. list(User, Server) -> Mod = gen_mod:ram_db_mod(Server, ?MODULE), - Mod:list(User, Server). + case use_cache(Mod, Server) of + true -> + case ets_cache:lookup( + ?CARBONCOPY_CACHE, {User, Server}, + fun() -> + case Mod:list(User, Server) of + {ok, L} when L /= [] -> {ok, L}; + _ -> error + end + end) of + {ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L]; + error -> [] + end; + false -> + case Mod:list(User, Server) of + {ok, L} -> [{Resource, NS} || {Resource, NS, _} <- L]; + error -> [] + end + end. + +-spec init_cache(module(), binary(), gen_mod:opts()) -> ok. +init_cache(Mod, Host, Opts) -> + case use_cache(Mod, Host) of + true -> + ets_cache:new(?CARBONCOPY_CACHE, cache_opts(Host, Opts)); + false -> + ets_cache:delete(?CARBONCOPY_CACHE) + end. + +-spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()]. +cache_opts(Host, Opts) -> + MaxSize = gen_mod:get_opt( + cache_size, Opts, mod_opt_type(cache_size), + ejabberd_config:cache_size(Host)), + CacheMissed = gen_mod:get_opt( + cache_missed, Opts, mod_opt_type(cache_missed), + ejabberd_config:cache_missed(Host)), + LifeTime = case gen_mod:get_opt( + cache_life_time, Opts, mod_opt_type(cache_life_time), + ejabberd_config:cache_life_time(Host)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> + gen_mod:get_module_opt( + Host, ?MODULE, use_cache, mod_opt_type(use_cache), + ejabberd_config:use_cache(Host)) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. + +-spec clean_cache(node()) -> ok. +clean_cache(Node) -> + ets_cache:filter( + ?CARBONCOPY_CACHE, + fun(_, error) -> + false; + (_, {ok, L}) -> + not lists:any(fun({_, _, N}) -> N == Node end, L) + end). + +-spec clean_cache() -> ok. +clean_cache() -> + ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]). + +-spec delete_cache(module(), binary(), binary()) -> ok. +delete_cache(Mod, User, Server) -> + case use_cache(Mod, Server) of + true -> + ets_cache:delete(?CARBONCOPY_CACHE, {User, Server}, + cache_nodes(Mod, Server)); + false -> + ok + end. depends(_Host, _Opts) -> []. mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1; mod_opt_type(ram_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; -mod_opt_type(_) -> [ram_db_type, iqdisc]. +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun(B) when is_boolean(B) -> B end; +mod_opt_type(O) when O == cache_size; O == cache_life_time -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; +mod_opt_type(_) -> + [ram_db_type, iqdisc, use_cache, cache_size, cache_missed, cache_life_time]. diff --git a/src/mod_carboncopy_mnesia.erl b/src/mod_carboncopy_mnesia.erl index 9c6a2ffaf..62355165e 100644 --- a/src/mod_carboncopy_mnesia.erl +++ b/src/mod_carboncopy_mnesia.erl @@ -27,7 +27,7 @@ -behaviour(mod_carboncopy). %% API --export([init/2, enable/4, disable/3, list/2]). +-export([init/2, enable/4, disable/3, list/2, use_cache/1]). -include("mod_carboncopy.hrl"). @@ -53,31 +53,26 @@ init(_Host, _Opts) -> mnesia:add_table_copy(carboncopy, node(), ram_copies). enable(LUser, LServer, LResource, NS) -> - try mnesia:dirty_write( - #carboncopy{us = {LUser, LServer}, - resource = LResource, - version = NS}) of - ok -> ok - catch _:Error -> - {error, Error} - end. + mnesia:dirty_write( + #carboncopy{us = {LUser, LServer}, + resource = LResource, + version = NS}). disable(LUser, LServer, LResource) -> ToDelete = mnesia:dirty_match_object( #carboncopy{us = {LUser, LServer}, resource = LResource, version = '_'}), - try lists:foreach(fun mnesia:dirty_delete_object/1, ToDelete) of - ok -> ok - catch _:Error -> - {error, Error} - end. + lists:foreach(fun mnesia:dirty_delete_object/1, ToDelete). list(LUser, LServer) -> - mnesia:dirty_select( - carboncopy, - [{#carboncopy{us = {LUser, LServer}, resource = '$2', version = '$3'}, - [], [{{'$2','$3'}}]}]). + {ok, mnesia:dirty_select( + carboncopy, + [{#carboncopy{us = {LUser, LServer}, resource = '$2', version = '$3'}, + [], [{{'$2','$3', node()}}]}])}. + +use_cache(_LServer) -> + false. %%%=================================================================== %%% Internal functions diff --git a/src/mod_carboncopy_redis.erl b/src/mod_carboncopy_redis.erl index 8ed33468b..b72755f4e 100644 --- a/src/mod_carboncopy_redis.erl +++ b/src/mod_carboncopy_redis.erl @@ -22,27 +22,52 @@ %%%------------------------------------------------------------------- -module(mod_carboncopy_redis). -behaviour(mod_carboncopy). +-behaviour(gen_server). %% API --export([init/2, enable/4, disable/3, list/2]). +-export([init/2, enable/4, disable/3, list/2, cache_nodes/1]). +%% gen_server callbacks +-export([init/1, handle_cast/2, handle_call/3, handle_info/2, + terminate/2, code_change/3, start_link/0]). -include("ejabberd.hrl"). -include("logger.hrl"). +-include("mod_carboncopy.hrl"). + +-define(CARBONCOPY_KEY, <<"ejabberd:carboncopy">>). + +-record(state, {}). %%%=================================================================== %%% API %%%=================================================================== init(_Host, _Opts) -> - clean_table(). + Spec = {?MODULE, {?MODULE, start_link, []}, + transient, 5000, worker, [?MODULE]}, + case supervisor:start_child(ejabberd_backend_sup, Spec) of + {ok, _Pid} -> ok; + Err -> Err + end. + +-spec start_link() -> {ok, pid()} | {error, any()}. +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +cache_nodes(_LServer) -> + [node()]. enable(LUser, LServer, LResource, NS) -> USKey = us_key(LUser, LServer), NodeKey = node_key(), JID = jid:encode({LUser, LServer, LResource}), + Data = term_to_binary({NS, node()}), case ejabberd_redis:multi( fun() -> - ejabberd_redis:hset(USKey, LResource, NS), - ejabberd_redis:sadd(NodeKey, [JID]) + ejabberd_redis:hset(USKey, LResource, Data), + ejabberd_redis:sadd(NodeKey, [JID]), + ejabberd_redis:publish( + ?CARBONCOPY_KEY, + term_to_binary({delete, {LUser, LServer}})) end) of {ok, _} -> ok; @@ -57,7 +82,10 @@ disable(LUser, LServer, LResource) -> case ejabberd_redis:multi( fun() -> ejabberd_redis:hdel(USKey, [LResource]), - ejabberd_redis:srem(NodeKey, [JID]) + ejabberd_redis:srem(NodeKey, [JID]), + ejabberd_redis:publish( + ?CARBONCOPY_KEY, + term_to_binary({delete, {LUser, LServer}})) end) of {ok, _} -> ok; @@ -68,13 +96,57 @@ disable(LUser, LServer, LResource) -> list(LUser, LServer) -> USKey = us_key(LUser, LServer), case ejabberd_redis:hgetall(USKey) of - {ok, Vals} -> - Vals; + {ok, Pairs} -> + {ok, lists:flatmap( + fun({Resource, Data}) -> + try + {NS, Node} = binary_to_term(Data), + [{Resource, NS, Node}] + catch _:_ -> + ?ERROR_MSG("invalid term stored in Redis " + "(key = ~s): ~p", + [USKey, Data]), + [] + end + end, Pairs)}; {error, _} -> - [] + {error, db_failure} end. %%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + ejabberd_redis:subscribe([?CARBONCOPY_KEY]), + clean_table(), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({redis_message, ?CARBONCOPY_KEY, Data}, State) -> + case binary_to_term(Data) of + {delete, Key} -> + ets_cache:delete(?CARBONCOPY_CACHE, Key); + Msg -> + ?WARNING_MSG("unexpected redis message: ~p", [Msg]) + end, + {noreply, State}; +handle_info(Info, State) -> + ?ERROR_MSG("unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== %%% Internal functions %%%=================================================================== clean_table() -> diff --git a/src/mod_carboncopy_sql.erl b/src/mod_carboncopy_sql.erl index 2770d40aa..41d9a0632 100644 --- a/src/mod_carboncopy_sql.erl +++ b/src/mod_carboncopy_sql.erl @@ -49,7 +49,7 @@ enable(LUser, LServer, LResource, NS) -> ok; Err -> ?ERROR_MSG("failed to update 'carboncopy' table: ~p", [Err]), - Err + {error, db_failure} end. disable(LUser, LServer, LResource) -> @@ -61,19 +61,20 @@ disable(LUser, LServer, LResource) -> ok; Err -> ?ERROR_MSG("failed to delete from 'carboncopy' table: ~p", [Err]), - Err + {error, db_failure} end. list(LUser, LServer) -> case ejabberd_sql:sql_query( LServer, - ?SQL("select @(resource)s, @(namespace)s from carboncopy " + ?SQL("select @(resource)s, @(namespace)s, @(node)s from carboncopy " "where username=%(LUser)s")) of {selected, Rows} -> - Rows; + {ok, [{Resource, NS, binary_to_atom(Node, latin1)} + || {Resource, NS, Node} <- Rows]}; Err -> ?ERROR_MSG("failed to select from 'carboncopy' table: ~p", [Err]), - [] + {error, db_failure} end. %%%=================================================================== @@ -89,5 +90,5 @@ clean_table(LServer) -> ok; Err -> ?ERROR_MSG("failed to clean 'carboncopy' table: ~p", [Err]), - Err + {error, db_failure} end. diff --git a/src/randoms.erl b/src/randoms.erl index a5e33becd..ea21b4a1d 100644 --- a/src/randoms.erl +++ b/src/randoms.erl @@ -27,7 +27,7 @@ -author('alexey@process-one.net'). --export([get_string/0, uniform/0, uniform/1, bytes/1]). +-export([get_string/0, uniform/0, uniform/1, uniform/2, bytes/1]). -define(THRESHOLD, 16#10000000000000000). @@ -41,6 +41,9 @@ uniform() -> uniform(N) -> crypto:rand_uniform(1, N+1). +uniform(N, M) -> + crypto:rand_uniform(N, M+1). + -ifdef(STRONG_RAND_BYTES). bytes(N) -> crypto:strong_rand_bytes(N). |