diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
commit | e40baf0bdaecf3206420fe8c16c33f2c166cb717 (patch) | |
tree | 75d9fe880e8257ea9fd20c095c252d7940cea89d /src/ejabberd_sm.erl | |
parent | Bump xmpp dependency, it's required by previous commit (diff) |
Use cache in front of Redis/SQL RAM backends
Diffstat (limited to 'src/ejabberd_sm.erl')
-rw-r--r-- | src/ejabberd_sm.erl | 234 |
1 files changed, 196 insertions, 38 deletions
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 7c63292f..1cd911e1 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -76,7 +76,9 @@ c2s_handle_info/2, host_up/1, host_down/1, - make_sid/0 + make_sid/0, + clean_cache/1, + config_reloaded/0 ]). -export([init/1, handle_call/3, handle_cast/2, @@ -91,13 +93,15 @@ -include("ejabberd_sm.hrl"). -callback init() -> ok | {error, any()}. --callback set_session(#session{}) -> ok. --callback delete_session(binary(), binary(), binary(), sid()) -> - {ok, #session{}} | {error, notfound}. +-callback set_session(#session{}) -> ok | {error, any()}. +-callback delete_session(#session{}) -> ok | {error, any()}. -callback get_sessions() -> [#session{}]. -callback get_sessions(binary()) -> [#session{}]. --callback get_sessions(binary(), binary()) -> [#session{}]. --callback get_sessions(binary(), binary(), binary()) -> [#session{}]. +-callback get_sessions(binary(), binary()) -> {ok, [#session{}]} | {error, any()}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). -record(state, {}). @@ -158,9 +162,12 @@ close_session(SID, User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - Info = case Mod:delete_session(LUser, LServer, LResource, SID) of - {ok, #session{info = I}} -> I; - {error, notfound} -> [] + Info = case get_sessions(Mod, LUser, LServer, LResource) of + [#session{info = I} = Session|_] -> + delete_session(Mod, Session), + I; + [] -> + [] end, JID = jid:make(User, Server, Resource), ejabberd_hooks:run(sm_remove_connection_hook, @@ -196,14 +203,14 @@ get_user_resources(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), [element(3, S#session.usr) || S <- clean_session_list(Ss)]. -spec get_user_present_resources(binary(), binary()) -> [tuple()]. get_user_present_resources(LUser, LServer) -> Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), [{S#session.priority, element(3, S#session.usr)} || S <- clean_session_list(Ss), is_integer(S#session.priority)]. @@ -214,7 +221,7 @@ get_user_ip(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> undefined; Ss -> @@ -227,7 +234,7 @@ get_user_info(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), [{LResource, [{node, node(Pid)}|Info]} || #session{usr = {_, _, LResource}, info = Info, @@ -240,7 +247,7 @@ get_user_info(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> offline; Ss -> @@ -288,7 +295,7 @@ get_session_pid(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [#session{sid = {_, Pid}}] -> Pid; _ -> none end. @@ -309,7 +316,7 @@ get_offline_info(Time, User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - case Mod:get_sessions(LUser, LServer, LResource) of + case get_sessions(Mod, LUser, LServer, LResource) of [#session{sid = {Time, _}, info = Info}] -> case proplists:get_bool(offline, Info) of true -> @@ -326,7 +333,7 @@ get_offline_info(Time, User, Server, Resource) -> dirty_get_sessions_list() -> lists:flatmap( fun(Mod) -> - [S#session.usr || S <- online(Mod:get_sessions())] + [S#session.usr || S <- online(get_sessions(Mod))] end, get_sm_backends()). -spec dirty_get_my_sessions_list() -> [#session{}]. @@ -334,7 +341,7 @@ dirty_get_sessions_list() -> dirty_get_my_sessions_list() -> lists:flatmap( fun(Mod) -> - [S || S <- online(Mod:get_sessions()), + [S || S <- online(get_sessions(Mod)), node(element(2, S#session.sid)) == node()] end, get_sm_backends()). @@ -343,14 +350,14 @@ dirty_get_my_sessions_list() -> get_vh_session_list(Server) -> LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - [S#session.usr || S <- online(Mod:get_sessions(LServer))]. + [S#session.usr || S <- online(get_sessions(Mod, LServer))]. -spec get_all_pids() -> [pid()]. get_all_pids() -> lists:flatmap( fun(Mod) -> - [element(2, S#session.sid) || S <- online(Mod:get_sessions())] + [element(2, S#session.sid) || S <- online(get_sessions(Mod))] end, get_sm_backends()). -spec get_vh_session_number(binary()) -> non_neg_integer(). @@ -358,7 +365,7 @@ get_all_pids() -> get_vh_session_number(Server) -> LServer = jid:nameprep(Server), Mod = get_sm_backend(LServer), - length(online(Mod:get_sessions(LServer))). + length(online(get_sessions(Mod, LServer))). -spec register_iq_handler(binary(), binary(), atom(), atom(), list()) -> ok. @@ -387,16 +394,23 @@ c2s_handle_info(#{lang := Lang} = State, {exit, Reason}) -> c2s_handle_info(State, _) -> State. +-spec config_reloaded() -> ok. +config_reloaded() -> + init_cache(). + %%==================================================================== %% gen_server callbacks %%==================================================================== init([]) -> process_flag(trap_exit, true), + init_cache(), lists:foreach(fun(Mod) -> Mod:init() end, get_sm_backends()), + clean_cache(), ets:new(sm_iqtable, [named_table, public, {read_concurrency, true}]), ejabberd_hooks:add(host_up, ?MODULE, host_up, 50), ejabberd_hooks:add(host_down, ?MODULE, host_down, 60), + ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50), lists:foreach(fun host_up/1, ?MYHOSTS), ejabberd_commands:register_commands(get_commands_spec()), {ok, #state{}}. @@ -432,6 +446,7 @@ terminate(_Reason, _State) -> lists:foreach(fun host_down/1, ?MYHOSTS), ejabberd_hooks:delete(host_up, ?MODULE, host_up, 50), ejabberd_hooks:delete(host_down, ?MODULE, host_down, 60), + ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 50), ejabberd_commands:unregister_commands(get_commands_spec()), ok. @@ -460,7 +475,7 @@ host_down(Host) -> ejabberd_c2s:send(Pid, xmpp:serr_system_shutdown()); (_) -> ok - end, Mod:get_sessions(Host)), + end, get_sessions(Mod, Host)), ejabberd_hooks:delete(c2s_handle_info, Host, ejabberd_sm, c2s_handle_info, 50), ejabberd_hooks:delete(roster_in_subscription, Host, @@ -472,7 +487,7 @@ host_down(Host) -> ejabberd_c2s:host_down(Host). -spec set_session(sid(), binary(), binary(), binary(), - prio(), info()) -> ok. + prio(), info()) -> ok | {error, any()}. set_session(SID, User, Server, Resource, Priority, Info) -> LUser = jid:nodeprep(User), @@ -481,8 +496,69 @@ set_session(SID, User, Server, Resource, Priority, Info) -> US = {LUser, LServer}, USR = {LUser, LServer, LResource}, Mod = get_sm_backend(LServer), - Mod:set_session(#session{sid = SID, usr = USR, us = US, - priority = Priority, info = Info}). + case Mod:set_session(#session{sid = SID, usr = USR, us = US, + priority = Priority, info = Info}) of + ok -> + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?SM_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)); + false -> + ok + end; + {error, _} = Err -> + Err + end. + +-spec get_sessions(module()) -> [#session{}]. +get_sessions(Mod) -> + Mod:get_sessions(). + +-spec get_sessions(module(), binary()) -> [#session{}]. +get_sessions(Mod, LServer) -> + Mod:get_sessions(LServer). + +-spec get_sessions(module(), binary(), binary()) -> [#session{}]. +get_sessions(Mod, LUser, LServer) -> + case use_cache(Mod, LServer) of + true -> + case ets_cache:lookup( + ?SM_CACHE, {LUser, LServer}, + fun() -> + case Mod:get_sessions(LUser, LServer) of + {ok, Ss} when Ss /= [] -> + {ok, Ss}; + _ -> + error + end + end) of + {ok, Sessions} -> + Sessions; + error -> + [] + end; + false -> + case Mod:get_sessions(LUser, LServer) of + {ok, Ss} -> Ss; + _ -> [] + end + end. + +-spec get_sessions(module(), binary(), binary(), binary()) -> [#session{}]. +get_sessions(Mod, LUser, LServer, LResource) -> + Sessions = get_sessions(Mod, LUser, LServer), + [S || S <- Sessions, element(3, S#session.usr) == LResource]. + +-spec delete_session(module(), #session{}) -> ok. +delete_session(Mod, #session{usr = {LUser, LServer, _}} = Session) -> + Mod:delete_session(Session), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?SM_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)); + false -> + ok + end. -spec online([#session{}]) -> [#session{}]. @@ -505,7 +581,7 @@ do_route(To, Term) -> ?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]), {U, S, R} = jid:tolower(To), Mod = get_sm_backend(S), - case online(Mod:get_sessions(U, S, R)) of + case online(get_sessions(Mod, U, S, R)) of [] -> ?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]); Ss -> @@ -541,7 +617,7 @@ do_route(#presence{from = From, to = To, type = T, status = Status} = Packet) ejabberd_c2s:route(Pid, {route, Packet1}); (_) -> ok - end, online(Mod:get_sessions(LUser, LServer))); + end, online(get_sessions(Mod, LUser, LServer))); false -> ok end; @@ -570,7 +646,7 @@ do_route(Packet) -> To = xmpp:get_to(Packet), {LUser, LServer, LResource} = jid:tolower(To), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, LResource)) of + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> case Packet of #message{type = T} when T == chat; T == normal -> @@ -618,7 +694,7 @@ route_message(#message{to = To, type = Type} = Packet) -> (P >= 0) and (Type == headline) -> LResource = jid:resourceprep(R), Mod = get_sm_backend(LServer), - case online(Mod:get_sessions(LUser, LServer, + case online(get_sessions(Mod, LUser, LServer, LResource)) of [] -> ok; % Race condition @@ -689,10 +765,10 @@ check_for_sessions_to_replace(User, Server, Resource) -> -spec check_existing_resources(binary(), binary(), binary()) -> ok. check_existing_resources(LUser, LServer, LResource) -> Mod = get_sm_backend(LServer), - Ss = Mod:get_sessions(LUser, LServer, LResource), + Ss = get_sessions(Mod, LUser, LServer, LResource), {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss), - lists:foreach(fun(#session{sid = S}) -> - Mod:delete_session(LUser, LServer, LResource, S) + lists:foreach(fun(S) -> + delete_session(Mod, S) end, OfflineSs), if OnlineSs == [] -> ok; true -> @@ -716,12 +792,12 @@ get_resource_sessions(User, Server, Resource) -> LServer = jid:nameprep(Server), LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), - [S#session.sid || S <- online(Mod:get_sessions(LUser, LServer, LResource))]. + [S#session.sid || S <- online(get_sessions(Mod, LUser, LServer, LResource))]. -spec check_max_sessions(binary(), binary()) -> ok | replaced. check_max_sessions(LUser, LServer) -> Mod = get_sm_backend(LServer), - Ss = Mod:get_sessions(LUser, LServer), + Ss = get_sessions(Mod, LUser, LServer), {OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss), MaxSessions = get_max_user_sessions(LUser, LServer), if length(OnlineSs) =< MaxSessions -> ok; @@ -731,8 +807,7 @@ check_max_sessions(LUser, LServer) -> end, if length(OfflineSs) =< MaxSessions -> ok; true -> - #session{sid = SID, usr = {_, _, R}} = lists:min(OfflineSs), - Mod:delete_session(LUser, LServer, R, SID) + delete_session(Mod, lists:min(OfflineSs)) end. %% Get the user_max_session setting @@ -779,7 +854,7 @@ process_iq(#iq{}) -> force_update_presence({LUser, LServer}) -> Mod = get_sm_backend(LServer), - Ss = online(Mod:get_sessions(LUser, LServer)), + Ss = online(get_sessions(Mod, LUser, LServer)), lists:foreach(fun (#session{sid = {_, Pid}}) -> ejabberd_c2s:route(Pid, force_update_presence) end, @@ -811,6 +886,80 @@ get_vh_by_backend(Mod) -> get_sm_backend(Host) == Mod end, ?MYHOSTS). +%%-------------------------------------------------------------------- +%%% Cache stuff +%%-------------------------------------------------------------------- +-spec init_cache() -> ok. +init_cache() -> + case use_cache() of + true -> + ets_cache:new(?SM_CACHE, cache_opts()); + false -> + ets_cache:delete(?SM_CACHE) + end. + +-spec cache_opts() -> [proplists:property()]. +cache_opts() -> + MaxSize = ejabberd_config:get_option( + sm_cache_size, + opt_type(sm_cache_size), + ejabberd_config:cache_size(global)), + CacheMissed = ejabberd_config:get_option( + sm_cache_missed, + opt_type(sm_cache_missed), + ejabberd_config:cache_missed(global)), + LifeTime = case ejabberd_config:get_option( + sm_cache_life_time, + opt_type(sm_cache_life_time), + ejabberd_config:cache_life_time(global)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec clean_cache(node()) -> ok. +clean_cache(Node) -> + ets_cache:filter( + ?SM_CACHE, + fun(_, error) -> + false; + (_, {ok, Ss}) -> + not lists:any( + fun(#session{sid = {_, Pid}}) -> + node(Pid) == Node + end, Ss) + end). + +-spec clean_cache() -> ok. +clean_cache() -> + ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]). + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, LServer) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(LServer); + false -> + ejabberd_config:get_option( + {sm_use_cache, LServer}, + ejabberd_sm:opt_type(sm_use_cache), + ejabberd_config:use_cache(LServer)) + end. + +-spec use_cache() -> boolean(). +use_cache() -> + lists:any( + fun(Host) -> + Mod = get_sm_backend(Host), + use_cache(Mod, Host) + end, ?MYHOSTS). + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, LServer) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(LServer); + false -> ejabberd_cluster:get_nodes() + end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% ejabberd commands @@ -869,4 +1018,13 @@ make_sid() -> {p1_time_compat:unique_timestamp(), self()}. opt_type(sm_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; -opt_type(_) -> [sm_db_type]. +opt_type(O) when O == sm_use_cache; O == sm_cache_missed -> + fun(B) when is_boolean(B) -> B end; +opt_type(O) when O == sm_cache_size; O == sm_cache_life_time -> + fun(I) when is_integer(I), I>0 -> I; + (unlimited) -> infinity; + (infinity) -> infinity + end; +opt_type(_) -> + [sm_db_type, sm_use_cache, sm_cache_size, sm_cache_missed, + sm_cache_life_time]. |