summaryrefslogtreecommitdiff
path: root/src/ejabberd_sm.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-14 13:57:52 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-14 13:57:52 +0300
commite40baf0bdaecf3206420fe8c16c33f2c166cb717 (patch)
tree75d9fe880e8257ea9fd20c095c252d7940cea89d /src/ejabberd_sm.erl
parentBump xmpp dependency, it's required by previous commit (diff)
Use cache in front of Redis/SQL RAM backends
Diffstat (limited to 'src/ejabberd_sm.erl')
-rw-r--r--src/ejabberd_sm.erl234
1 files changed, 196 insertions, 38 deletions
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index 7c63292f..1cd911e1 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -76,7 +76,9 @@
c2s_handle_info/2,
host_up/1,
host_down/1,
- make_sid/0
+ make_sid/0,
+ clean_cache/1,
+ config_reloaded/0
]).
-export([init/1, handle_call/3, handle_cast/2,
@@ -91,13 +93,15 @@
-include("ejabberd_sm.hrl").
-callback init() -> ok | {error, any()}.
--callback set_session(#session{}) -> ok.
--callback delete_session(binary(), binary(), binary(), sid()) ->
- {ok, #session{}} | {error, notfound}.
+-callback set_session(#session{}) -> ok | {error, any()}.
+-callback delete_session(#session{}) -> ok | {error, any()}.
-callback get_sessions() -> [#session{}].
-callback get_sessions(binary()) -> [#session{}].
--callback get_sessions(binary(), binary()) -> [#session{}].
--callback get_sessions(binary(), binary(), binary()) -> [#session{}].
+-callback get_sessions(binary(), binary()) -> {ok, [#session{}]} | {error, any()}.
+-callback use_cache(binary()) -> boolean().
+-callback cache_nodes(binary()) -> [node()].
+
+-optional_callbacks([use_cache/1, cache_nodes/1]).
-record(state, {}).
@@ -158,9 +162,12 @@ close_session(SID, User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- Info = case Mod:delete_session(LUser, LServer, LResource, SID) of
- {ok, #session{info = I}} -> I;
- {error, notfound} -> []
+ Info = case get_sessions(Mod, LUser, LServer, LResource) of
+ [#session{info = I} = Session|_] ->
+ delete_session(Mod, Session),
+ I;
+ [] ->
+ []
end,
JID = jid:make(User, Server, Resource),
ejabberd_hooks:run(sm_remove_connection_hook,
@@ -196,14 +203,14 @@ get_user_resources(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- Ss = online(Mod:get_sessions(LUser, LServer)),
+ Ss = online(get_sessions(Mod, LUser, LServer)),
[element(3, S#session.usr) || S <- clean_session_list(Ss)].
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
get_user_present_resources(LUser, LServer) ->
Mod = get_sm_backend(LServer),
- Ss = online(Mod:get_sessions(LUser, LServer)),
+ Ss = online(get_sessions(Mod, LUser, LServer)),
[{S#session.priority, element(3, S#session.usr)}
|| S <- clean_session_list(Ss), is_integer(S#session.priority)].
@@ -214,7 +221,7 @@ get_user_ip(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case online(Mod:get_sessions(LUser, LServer, LResource)) of
+ case online(get_sessions(Mod, LUser, LServer, LResource)) of
[] ->
undefined;
Ss ->
@@ -227,7 +234,7 @@ get_user_info(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- Ss = online(Mod:get_sessions(LUser, LServer)),
+ Ss = online(get_sessions(Mod, LUser, LServer)),
[{LResource, [{node, node(Pid)}|Info]}
|| #session{usr = {_, _, LResource},
info = Info,
@@ -240,7 +247,7 @@ get_user_info(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case online(Mod:get_sessions(LUser, LServer, LResource)) of
+ case online(get_sessions(Mod, LUser, LServer, LResource)) of
[] ->
offline;
Ss ->
@@ -288,7 +295,7 @@ get_session_pid(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case online(Mod:get_sessions(LUser, LServer, LResource)) of
+ case online(get_sessions(Mod, LUser, LServer, LResource)) of
[#session{sid = {_, Pid}}] -> Pid;
_ -> none
end.
@@ -309,7 +316,7 @@ get_offline_info(Time, User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- case Mod:get_sessions(LUser, LServer, LResource) of
+ case get_sessions(Mod, LUser, LServer, LResource) of
[#session{sid = {Time, _}, info = Info}] ->
case proplists:get_bool(offline, Info) of
true ->
@@ -326,7 +333,7 @@ get_offline_info(Time, User, Server, Resource) ->
dirty_get_sessions_list() ->
lists:flatmap(
fun(Mod) ->
- [S#session.usr || S <- online(Mod:get_sessions())]
+ [S#session.usr || S <- online(get_sessions(Mod))]
end, get_sm_backends()).
-spec dirty_get_my_sessions_list() -> [#session{}].
@@ -334,7 +341,7 @@ dirty_get_sessions_list() ->
dirty_get_my_sessions_list() ->
lists:flatmap(
fun(Mod) ->
- [S || S <- online(Mod:get_sessions()),
+ [S || S <- online(get_sessions(Mod)),
node(element(2, S#session.sid)) == node()]
end, get_sm_backends()).
@@ -343,14 +350,14 @@ dirty_get_my_sessions_list() ->
get_vh_session_list(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- [S#session.usr || S <- online(Mod:get_sessions(LServer))].
+ [S#session.usr || S <- online(get_sessions(Mod, LServer))].
-spec get_all_pids() -> [pid()].
get_all_pids() ->
lists:flatmap(
fun(Mod) ->
- [element(2, S#session.sid) || S <- online(Mod:get_sessions())]
+ [element(2, S#session.sid) || S <- online(get_sessions(Mod))]
end, get_sm_backends()).
-spec get_vh_session_number(binary()) -> non_neg_integer().
@@ -358,7 +365,7 @@ get_all_pids() ->
get_vh_session_number(Server) ->
LServer = jid:nameprep(Server),
Mod = get_sm_backend(LServer),
- length(online(Mod:get_sessions(LServer))).
+ length(online(get_sessions(Mod, LServer))).
-spec register_iq_handler(binary(), binary(), atom(), atom(), list()) -> ok.
@@ -387,16 +394,23 @@ c2s_handle_info(#{lang := Lang} = State, {exit, Reason}) ->
c2s_handle_info(State, _) ->
State.
+-spec config_reloaded() -> ok.
+config_reloaded() ->
+ init_cache().
+
%%====================================================================
%% gen_server callbacks
%%====================================================================
init([]) ->
process_flag(trap_exit, true),
+ init_cache(),
lists:foreach(fun(Mod) -> Mod:init() end, get_sm_backends()),
+ clean_cache(),
ets:new(sm_iqtable, [named_table, public, {read_concurrency, true}]),
ejabberd_hooks:add(host_up, ?MODULE, host_up, 50),
ejabberd_hooks:add(host_down, ?MODULE, host_down, 60),
+ ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50),
lists:foreach(fun host_up/1, ?MYHOSTS),
ejabberd_commands:register_commands(get_commands_spec()),
{ok, #state{}}.
@@ -432,6 +446,7 @@ terminate(_Reason, _State) ->
lists:foreach(fun host_down/1, ?MYHOSTS),
ejabberd_hooks:delete(host_up, ?MODULE, host_up, 50),
ejabberd_hooks:delete(host_down, ?MODULE, host_down, 60),
+ ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 50),
ejabberd_commands:unregister_commands(get_commands_spec()),
ok.
@@ -460,7 +475,7 @@ host_down(Host) ->
ejabberd_c2s:send(Pid, xmpp:serr_system_shutdown());
(_) ->
ok
- end, Mod:get_sessions(Host)),
+ end, get_sessions(Mod, Host)),
ejabberd_hooks:delete(c2s_handle_info, Host,
ejabberd_sm, c2s_handle_info, 50),
ejabberd_hooks:delete(roster_in_subscription, Host,
@@ -472,7 +487,7 @@ host_down(Host) ->
ejabberd_c2s:host_down(Host).
-spec set_session(sid(), binary(), binary(), binary(),
- prio(), info()) -> ok.
+ prio(), info()) -> ok | {error, any()}.
set_session(SID, User, Server, Resource, Priority, Info) ->
LUser = jid:nodeprep(User),
@@ -481,8 +496,69 @@ set_session(SID, User, Server, Resource, Priority, Info) ->
US = {LUser, LServer},
USR = {LUser, LServer, LResource},
Mod = get_sm_backend(LServer),
- Mod:set_session(#session{sid = SID, usr = USR, us = US,
- priority = Priority, info = Info}).
+ case Mod:set_session(#session{sid = SID, usr = USR, us = US,
+ priority = Priority, info = Info}) of
+ ok ->
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:delete(?SM_CACHE, {LUser, LServer},
+ cache_nodes(Mod, LServer));
+ false ->
+ ok
+ end;
+ {error, _} = Err ->
+ Err
+ end.
+
+-spec get_sessions(module()) -> [#session{}].
+get_sessions(Mod) ->
+ Mod:get_sessions().
+
+-spec get_sessions(module(), binary()) -> [#session{}].
+get_sessions(Mod, LServer) ->
+ Mod:get_sessions(LServer).
+
+-spec get_sessions(module(), binary(), binary()) -> [#session{}].
+get_sessions(Mod, LUser, LServer) ->
+ case use_cache(Mod, LServer) of
+ true ->
+ case ets_cache:lookup(
+ ?SM_CACHE, {LUser, LServer},
+ fun() ->
+ case Mod:get_sessions(LUser, LServer) of
+ {ok, Ss} when Ss /= [] ->
+ {ok, Ss};
+ _ ->
+ error
+ end
+ end) of
+ {ok, Sessions} ->
+ Sessions;
+ error ->
+ []
+ end;
+ false ->
+ case Mod:get_sessions(LUser, LServer) of
+ {ok, Ss} -> Ss;
+ _ -> []
+ end
+ end.
+
+-spec get_sessions(module(), binary(), binary(), binary()) -> [#session{}].
+get_sessions(Mod, LUser, LServer, LResource) ->
+ Sessions = get_sessions(Mod, LUser, LServer),
+ [S || S <- Sessions, element(3, S#session.usr) == LResource].
+
+-spec delete_session(module(), #session{}) -> ok.
+delete_session(Mod, #session{usr = {LUser, LServer, _}} = Session) ->
+ Mod:delete_session(Session),
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:delete(?SM_CACHE, {LUser, LServer},
+ cache_nodes(Mod, LServer));
+ false ->
+ ok
+ end.
-spec online([#session{}]) -> [#session{}].
@@ -505,7 +581,7 @@ do_route(To, Term) ->
?DEBUG("broadcasting ~p to ~s", [Term, jid:encode(To)]),
{U, S, R} = jid:tolower(To),
Mod = get_sm_backend(S),
- case online(Mod:get_sessions(U, S, R)) of
+ case online(get_sessions(Mod, U, S, R)) of
[] ->
?DEBUG("dropping broadcast to unavailable resourse: ~p", [Term]);
Ss ->
@@ -541,7 +617,7 @@ do_route(#presence{from = From, to = To, type = T, status = Status} = Packet)
ejabberd_c2s:route(Pid, {route, Packet1});
(_) ->
ok
- end, online(Mod:get_sessions(LUser, LServer)));
+ end, online(get_sessions(Mod, LUser, LServer)));
false ->
ok
end;
@@ -570,7 +646,7 @@ do_route(Packet) ->
To = xmpp:get_to(Packet),
{LUser, LServer, LResource} = jid:tolower(To),
Mod = get_sm_backend(LServer),
- case online(Mod:get_sessions(LUser, LServer, LResource)) of
+ case online(get_sessions(Mod, LUser, LServer, LResource)) of
[] ->
case Packet of
#message{type = T} when T == chat; T == normal ->
@@ -618,7 +694,7 @@ route_message(#message{to = To, type = Type} = Packet) ->
(P >= 0) and (Type == headline) ->
LResource = jid:resourceprep(R),
Mod = get_sm_backend(LServer),
- case online(Mod:get_sessions(LUser, LServer,
+ case online(get_sessions(Mod, LUser, LServer,
LResource)) of
[] ->
ok; % Race condition
@@ -689,10 +765,10 @@ check_for_sessions_to_replace(User, Server, Resource) ->
-spec check_existing_resources(binary(), binary(), binary()) -> ok.
check_existing_resources(LUser, LServer, LResource) ->
Mod = get_sm_backend(LServer),
- Ss = Mod:get_sessions(LUser, LServer, LResource),
+ Ss = get_sessions(Mod, LUser, LServer, LResource),
{OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
- lists:foreach(fun(#session{sid = S}) ->
- Mod:delete_session(LUser, LServer, LResource, S)
+ lists:foreach(fun(S) ->
+ delete_session(Mod, S)
end, OfflineSs),
if OnlineSs == [] -> ok;
true ->
@@ -716,12 +792,12 @@ get_resource_sessions(User, Server, Resource) ->
LServer = jid:nameprep(Server),
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
- [S#session.sid || S <- online(Mod:get_sessions(LUser, LServer, LResource))].
+ [S#session.sid || S <- online(get_sessions(Mod, LUser, LServer, LResource))].
-spec check_max_sessions(binary(), binary()) -> ok | replaced.
check_max_sessions(LUser, LServer) ->
Mod = get_sm_backend(LServer),
- Ss = Mod:get_sessions(LUser, LServer),
+ Ss = get_sessions(Mod, LUser, LServer),
{OnlineSs, OfflineSs} = lists:partition(fun is_online/1, Ss),
MaxSessions = get_max_user_sessions(LUser, LServer),
if length(OnlineSs) =< MaxSessions -> ok;
@@ -731,8 +807,7 @@ check_max_sessions(LUser, LServer) ->
end,
if length(OfflineSs) =< MaxSessions -> ok;
true ->
- #session{sid = SID, usr = {_, _, R}} = lists:min(OfflineSs),
- Mod:delete_session(LUser, LServer, R, SID)
+ delete_session(Mod, lists:min(OfflineSs))
end.
%% Get the user_max_session setting
@@ -779,7 +854,7 @@ process_iq(#iq{}) ->
force_update_presence({LUser, LServer}) ->
Mod = get_sm_backend(LServer),
- Ss = online(Mod:get_sessions(LUser, LServer)),
+ Ss = online(get_sessions(Mod, LUser, LServer)),
lists:foreach(fun (#session{sid = {_, Pid}}) ->
ejabberd_c2s:route(Pid, force_update_presence)
end,
@@ -811,6 +886,80 @@ get_vh_by_backend(Mod) ->
get_sm_backend(Host) == Mod
end, ?MYHOSTS).
+%%--------------------------------------------------------------------
+%%% Cache stuff
+%%--------------------------------------------------------------------
+-spec init_cache() -> ok.
+init_cache() ->
+ case use_cache() of
+ true ->
+ ets_cache:new(?SM_CACHE, cache_opts());
+ false ->
+ ets_cache:delete(?SM_CACHE)
+ end.
+
+-spec cache_opts() -> [proplists:property()].
+cache_opts() ->
+ MaxSize = ejabberd_config:get_option(
+ sm_cache_size,
+ opt_type(sm_cache_size),
+ ejabberd_config:cache_size(global)),
+ CacheMissed = ejabberd_config:get_option(
+ sm_cache_missed,
+ opt_type(sm_cache_missed),
+ ejabberd_config:cache_missed(global)),
+ LifeTime = case ejabberd_config:get_option(
+ sm_cache_life_time,
+ opt_type(sm_cache_life_time),
+ ejabberd_config:cache_life_time(global)) of
+ infinity -> infinity;
+ I -> timer:seconds(I)
+ end,
+ [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}].
+
+-spec clean_cache(node()) -> ok.
+clean_cache(Node) ->
+ ets_cache:filter(
+ ?SM_CACHE,
+ fun(_, error) ->
+ false;
+ (_, {ok, Ss}) ->
+ not lists:any(
+ fun(#session{sid = {_, Pid}}) ->
+ node(Pid) == Node
+ end, Ss)
+ end).
+
+-spec clean_cache() -> ok.
+clean_cache() ->
+ ejabberd_cluster:eval_everywhere(?MODULE, clean_cache, [node()]).
+
+-spec use_cache(module(), binary()) -> boolean().
+use_cache(Mod, LServer) ->
+ case erlang:function_exported(Mod, use_cache, 1) of
+ true -> Mod:use_cache(LServer);
+ false ->
+ ejabberd_config:get_option(
+ {sm_use_cache, LServer},
+ ejabberd_sm:opt_type(sm_use_cache),
+ ejabberd_config:use_cache(LServer))
+ end.
+
+-spec use_cache() -> boolean().
+use_cache() ->
+ lists:any(
+ fun(Host) ->
+ Mod = get_sm_backend(Host),
+ use_cache(Mod, Host)
+ end, ?MYHOSTS).
+
+-spec cache_nodes(module(), binary()) -> [node()].
+cache_nodes(Mod, LServer) ->
+ case erlang:function_exported(Mod, cache_nodes, 1) of
+ true -> Mod:cache_nodes(LServer);
+ false -> ejabberd_cluster:get_nodes()
+ end.
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% ejabberd commands
@@ -869,4 +1018,13 @@ make_sid() ->
{p1_time_compat:unique_timestamp(), self()}.
opt_type(sm_db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
-opt_type(_) -> [sm_db_type].
+opt_type(O) when O == sm_use_cache; O == sm_cache_missed ->
+ fun(B) when is_boolean(B) -> B end;
+opt_type(O) when O == sm_cache_size; O == sm_cache_life_time ->
+ fun(I) when is_integer(I), I>0 -> I;
+ (unlimited) -> infinity;
+ (infinity) -> infinity
+ end;
+opt_type(_) ->
+ [sm_db_type, sm_use_cache, sm_cache_size, sm_cache_missed,
+ sm_cache_life_time].