diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ejabberd_app.erl | 1 | ||||
-rw-r--r-- | src/ejabberd_hooks.erl | 169 | ||||
-rw-r--r-- | src/ejabberd_sm.erl | 261 | ||||
-rw-r--r-- | src/ejabberd_sm_mnesia.erl | 148 | ||||
-rw-r--r-- | src/ejabberd_sm_odbc.erl | 179 | ||||
-rw-r--r-- | src/ejabberd_sup.erl | 8 | ||||
-rw-r--r-- | src/odbc_queries.erl | 2 |
7 files changed, 508 insertions, 260 deletions
diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 957aa5d46..15170daee 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -58,6 +58,7 @@ start(normal, _Args) -> Sup = ejabberd_sup:start_link(), ejabberd_rdbms:start(), ejabberd_riak_sup:start(), + ejabberd_sm:start(), ejabberd_auth:start(), cyrsasl:start(), % Profiling diff --git a/src/ejabberd_hooks.erl b/src/ejabberd_hooks.erl index c1cdefcb2..d136ba705 100644 --- a/src/ejabberd_hooks.erl +++ b/src/ejabberd_hooks.erl @@ -32,18 +32,21 @@ -export([start_link/0, add/3, add/4, + add/5, add_dist/5, + add_dist/6, delete/3, delete/4, - delete_dist/5, - run/2, - run_fold/3, - add/5, - add_dist/6, delete/5, + delete_dist/5, delete_dist/6, + run/2, run/3, - run_fold/4]). + run_fold/3, + run_fold/4, + get_handlers/2]). + +-export([delete_all_hooks/0]). %% gen_server callbacks -export([init/1, @@ -53,13 +56,14 @@ handle_info/2, terminate/2]). --include("ejabberd.hrl"). -include("logger.hrl"). %% Timeout of 5 seconds in calls to distributed hooks -define(TIMEOUT_DISTRIBUTED_HOOK, 5000). -record(state, {}). +-type local_hook() :: { Seq :: integer(), Module :: atom(), Function :: atom()}. +-type distributed_hook() :: { Seq :: integer(), Node :: atom(), Module :: atom(), Function :: atom()}. %%%---------------------------------------------------------------------- %%% API @@ -67,13 +71,13 @@ start_link() -> gen_server:start_link({local, ejabberd_hooks}, ejabberd_hooks, [], []). --spec add(atom(), fun(), number()) -> any(). +-spec add(atom(), fun(), number()) -> ok. %% @doc See add/4. add(Hook, Function, Seq) when is_function(Function) -> add(Hook, global, undefined, Function, Seq). --spec add(atom(), binary() | atom(), fun() | atom() , number()) -> any(). +-spec add(atom(), HostOrModule :: binary() | atom(), fun() | atom() , number()) -> ok. add(Hook, Host, Function, Seq) when is_function(Function) -> add(Hook, Host, undefined, Function, Seq); @@ -82,17 +86,17 @@ add(Hook, Host, Function, Seq) when is_function(Function) -> add(Hook, Module, Function, Seq) -> add(Hook, global, Module, Function, Seq). --spec add(atom(), binary() | global, atom(), atom() | fun(), number()) -> any(). +-spec add(atom(), binary() | global, atom(), atom() | fun(), number()) -> ok. add(Hook, Host, Module, Function, Seq) -> gen_server:call(ejabberd_hooks, {add, Hook, Host, Module, Function, Seq}). --spec add_dist(atom(), atom(), atom(), atom() | fun(), number()) -> any(). +-spec add_dist(atom(), atom(), atom(), atom() | fun(), number()) -> ok. add_dist(Hook, Node, Module, Function, Seq) -> gen_server:call(ejabberd_hooks, {add, Hook, global, Node, Module, Function, Seq}). --spec add_dist(atom(), binary() | global, atom(), atom(), atom() | fun(), number()) -> any(). +-spec add_dist(atom(), binary() | global, atom(), atom(), atom() | fun(), number()) -> ok. add_dist(Hook, Host, Node, Module, Function, Seq) -> gen_server:call(ejabberd_hooks, {add, Hook, Host, Node, Module, Function, Seq}). @@ -128,6 +132,17 @@ delete_dist(Hook, Node, Module, Function, Seq) -> delete_dist(Hook, Host, Node, Module, Function, Seq) -> gen_server:call(ejabberd_hooks, {delete, Hook, Host, Node, Module, Function, Seq}). +-spec delete_all_hooks() -> true. + +%% @doc Primarily for testing / instrumentation +delete_all_hooks() -> + gen_server:call(ejabberd_hooks, {delete_all}). + +-spec get_handlers(atom(), binary() | global) -> [local_hook() | distributed_hook()]. +%% @doc Returns currently set handler for hook name +get_handlers(Hookname, Host) -> + gen_server:call(ejabberd_hooks, {get_handlers, Hookname, Host}). + -spec run(atom(), list()) -> ok. %% @doc Run the calls of this hook in order, don't care about function results. @@ -190,65 +205,70 @@ init([]) -> %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) -> - Reply = case ets:lookup(hooks, {Hook, Host}) of - [{_, Ls}] -> - El = {Seq, Module, Function}, - case lists:member(El, Ls) of - true -> - ok; - false -> - NewLs = lists:merge(Ls, [El]), - ets:insert(hooks, {{Hook, Host}, NewLs}), - ok - end; - [] -> - NewLs = [{Seq, Module, Function}], - ets:insert(hooks, {{Hook, Host}, NewLs}), - ok - end, + HookFormat = {Seq, Module, Function}, + Reply = handle_add(Hook, Host, HookFormat), {reply, Reply, State}; handle_call({add, Hook, Host, Node, Module, Function, Seq}, _From, State) -> - Reply = case ets:lookup(hooks, {Hook, Host}) of - [{_, Ls}] -> - El = {Seq, Node, Module, Function}, - case lists:member(El, Ls) of - true -> - ok; - false -> - NewLs = lists:merge(Ls, [El]), - ets:insert(hooks, {{Hook, Host}, NewLs}), - ok - end; - [] -> - NewLs = [{Seq, Node, Module, Function}], - ets:insert(hooks, {{Hook, Host}, NewLs}), - ok - end, + HookFormat = {Seq, Node, Module, Function}, + Reply = handle_add(Hook, Host, HookFormat), {reply, Reply, State}; + handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) -> - Reply = case ets:lookup(hooks, {Hook, Host}) of - [{_, Ls}] -> - NewLs = lists:delete({Seq, Module, Function}, Ls), - ets:insert(hooks, {{Hook, Host}, NewLs}), - ok; - [] -> - ok - end, + HookFormat = {Seq, Module, Function}, + Reply = handle_delete(Hook, Host, HookFormat), {reply, Reply, State}; handle_call({delete, Hook, Host, Node, Module, Function, Seq}, _From, State) -> + HookFormat = {Seq, Node, Module, Function}, + Reply = handle_delete(Hook, Host, HookFormat), + {reply, Reply, State}; + +handle_call({get_handlers, Hook, Host}, _From, State) -> Reply = case ets:lookup(hooks, {Hook, Host}) of - [{_, Ls}] -> - NewLs = lists:delete({Seq, Node, Module, Function}, Ls), - ets:insert(hooks, {{Hook, Host}, NewLs}), - ok; - [] -> - ok - end, + [{_, Handlers}] -> Handlers; + [] -> [] + end, + {reply, Reply, State}; + +handle_call({delete_all}, _From, State) -> + Reply = ets:delete_all_objects(hooks), {reply, Reply, State}; + handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. +-spec handle_add(atom(), atom(), local_hook() | distributed_hook()) -> ok. +%% in-memory storage operation: Handle adding hook in ETS table +handle_add(Hook, Host, El) -> + case ets:lookup(hooks, {Hook, Host}) of + [{_, Ls}] -> + case lists:member(El, Ls) of + true -> + ok; + false -> + NewLs = lists:merge(Ls, [El]), + ets:insert(hooks, {{Hook, Host}, NewLs}), + ok + end; + [] -> + NewLs = [El], + ets:insert(hooks, {{Hook, Host}, NewLs}), + ok + end. + + +-spec handle_delete(atom(), atom(), local_hook() | distributed_hook()) -> ok. +%% in-memory storage operation: Handle deleting hook from ETS table +handle_delete(Hook, Host, El) -> + case ets:lookup(hooks, {Hook, Host}) of + [{_, Ls}] -> + NewLs = lists:delete(El, Ls), + ets:insert(hooks, {{Hook, Host}, NewLs}), + ok; + [] -> + ok + end. + %%---------------------------------------------------------------------- %% Func: handle_cast/2 %% Returns: {noreply, State} | @@ -275,7 +295,6 @@ handle_info(_Info, State) -> terminate(_Reason, _State) -> ok. - code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -283,9 +302,14 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%---------------------------------------------------------------------- +-spec run1([local_hook()|distributed_hook()], atom(), list()) -> ok. + run1([], _Hook, _Args) -> ok; +%% Run distributed hook on target node. +%% It is not attempted again in case of failure. Next hook will be executed run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) -> + %% MR: Should we have a safe rpc, like we have a safe apply or is bad_rpc enough ? case rpc:call(Node, Module, Function, Args, ?TIMEOUT_DISTRIBUTED_HOOK) of timeout -> ?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p", @@ -305,15 +329,10 @@ run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) -> run1(Ls, Hook, Args) end; run1([{_Seq, Module, Function} | Ls], Hook, Args) -> - Res = if is_function(Function) -> - catch apply(Function, Args); - true -> - catch apply(Module, Function, Args) - end, + Res = safe_apply(Module, Function, Args), case Res of {'EXIT', Reason} -> - ?ERROR_MSG("~p~nrunning hook: ~p", - [Reason, {Hook, Args}]), + ?ERROR_MSG("~p~nrunning hook: ~p", [Reason, {Hook, Args}]), run1(Ls, Hook, Args); stop -> ok; @@ -346,15 +365,10 @@ run_fold1([{_Seq, Node, Module, Function} | Ls], Hook, Val, Args) -> run_fold1(Ls, Hook, NewVal, Args) end; run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) -> - Res = if is_function(Function) -> - catch apply(Function, [Val | Args]); - true -> - catch apply(Module, Function, [Val | Args]) - end, + Res = safe_apply(Module, Function, [Val | Args]), case Res of {'EXIT', Reason} -> - ?ERROR_MSG("~p~nrunning hook: ~p", - [Reason, {Hook, Args}]), + ?ERROR_MSG("~p~nrunning hook: ~p", [Reason, {Hook, Args}]), run_fold1(Ls, Hook, Val, Args); stop -> stopped; @@ -363,3 +377,10 @@ run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) -> NewVal -> run_fold1(Ls, Hook, NewVal, Args) end. + +safe_apply(Module, Function, Args) -> + if is_function(Function) -> + catch apply(Function, Args); + true -> + catch apply(Module, Function, Args) + end. diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index b3a46ba2d..67a82d024 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -30,7 +30,8 @@ -behaviour(gen_server). %% API --export([start_link/0, +-export([start/0, + start_link/0, route/3, open_session/5, open_session/6, @@ -73,11 +74,18 @@ -include("jlib.hrl"). -include("ejabberd_commands.hrl"). --include_lib("stdlib/include/ms_transform.hrl"). -include("mod_privacy.hrl"). +-include("ejabberd_sm.hrl"). + +-callback init() -> ok | {error, any()}. +-callback get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}. +-callback set_session(#session{}) -> ok. +-callback delete_session(binary(), sid()) -> ok. +-callback get_sessions() -> [#session{}]. +-callback get_sessions(binary()) -> [#session{}]. +-callback get_sessions(binary(), binary()) -> [#session{}]. +-callback get_sessions(binary(), binary(), binary()) -> [#session{}]. --record(session, {sid, usr, us, priority, info}). --record(session_counter, {vhost, count}). -record(state, {}). %% default value for the maximum number of user connections @@ -90,14 +98,13 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- --type sid() :: {erlang:timestamp(), pid()}. --type ip() :: {inet:ip_address(), inet:port_number()} | undefined. --type info() :: [{conn, atom()} | {ip, ip()} | {node, atom()} - | {oor, boolean()} | {auth_module, atom()}]. --type prio() :: undefined | integer(). - -export_type([sid/0]). +start() -> + ChildSpec = {?MODULE, {?MODULE, start_link, []}, + transient, 1000, worker, [?MODULE]}, + supervisor:start_child(ejabberd_sup, ChildSpec). + start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). @@ -116,8 +123,6 @@ route(From, To, Packet) -> open_session(SID, User, Server, Resource, Priority, Info) -> set_session(SID, User, Server, Resource, Priority, Info), - mnesia:dirty_update_counter(session_counter, - jlib:nameprep(Server), 1), check_for_sessions_to_replace(User, Server, Resource), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_register_connection_hook, @@ -131,16 +136,13 @@ open_session(SID, User, Server, Resource, Info) -> -spec close_session(sid(), binary(), binary(), binary()) -> ok. close_session(SID, User, Server, Resource) -> - Info = case mnesia:dirty_read({session, SID}) of - [] -> []; - [#session{info=I}] -> I - end, - F = fun() -> - mnesia:delete({session, SID}), - mnesia:dirty_update_counter(session_counter, - jlib:nameprep(Server), -1) - end, - mnesia:sync_dirty(F), + Mod = get_sm_backend(), + LServer = jlib:nameprep(Server), + Info = case Mod:get_session(LServer, SID) of + {ok, #session{info = I}} -> I; + {error, notfound} -> [] + end, + Mod:delete_session(LServer, SID), JID = jlib:make_jid(User, Server, Resource), ejabberd_hooks:run(sm_remove_connection_hook, JID#jid.lserver, [SID, JID, Info]). @@ -169,27 +171,17 @@ disconnect_removed_user(User, Server) -> get_user_resources(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), - US = {LUser, LServer}, - case catch mnesia:dirty_index_read(session, US, #session.us) of - {'EXIT', _Reason} -> - []; - Ss -> - [element(3, S#session.usr) || S <- clean_session_list(Ss)] - end. + Mod = get_sm_backend(), + Ss = Mod:get_sessions(LUser, LServer), + [element(3, S#session.usr) || S <- clean_session_list(Ss)]. -spec get_user_present_resources(binary(), binary()) -> [tuple()]. get_user_present_resources(LUser, LServer) -> - US = {LUser, LServer}, - case catch mnesia:dirty_index_read(session, US, - #session.us) - of - {'EXIT', _Reason} -> []; - Ss -> - [{S#session.priority, element(3, S#session.usr)} - || S <- clean_session_list(Ss), - is_integer(S#session.priority)] - end. + Mod = get_sm_backend(), + Ss = Mod:get_sessions(LUser, LServer), + [{S#session.priority, element(3, S#session.usr)} + || S <- clean_session_list(Ss), is_integer(S#session.priority)]. -spec get_user_ip(binary(), binary(), binary()) -> ip(). @@ -197,8 +189,8 @@ get_user_ip(User, Server, Resource) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, #session.usr) of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [] -> undefined; Ss -> @@ -212,8 +204,8 @@ get_user_info(User, Server, Resource) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, #session.usr) of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [] -> offline; Ss -> @@ -262,8 +254,8 @@ get_session_pid(User, Server, Resource) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), LResource = jlib:resourceprep(Resource), - USR = {LUser, LServer, LResource}, - case catch mnesia:dirty_index_read(session, USR, #session.usr) of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [#session{sid = {_, Pid}}] -> Pid; _ -> none end. @@ -271,49 +263,30 @@ get_session_pid(User, Server, Resource) -> -spec dirty_get_sessions_list() -> [ljid()]. dirty_get_sessions_list() -> - mnesia:dirty_select( - session, - [{#session{usr = '$1', _ = '_'}, - [], - ['$1']}]). + Mod = get_sm_backend(), + [S#session.usr || S <- Mod:get_sessions()]. dirty_get_my_sessions_list() -> - mnesia:dirty_select( - session, - [{#session{sid = {'_', '$1'}, _ = '_'}, - [{'==', {node, '$1'}, node()}], - ['$_']}]). + Mod = get_sm_backend(), + [S || S <- Mod:get_sessions(), node(element(2, S#session.sid)) == node()]. -spec get_vh_session_list(binary()) -> [ljid()]. get_vh_session_list(Server) -> LServer = jlib:nameprep(Server), - mnesia:dirty_select(session, - [{#session{usr = '$1', _ = '_'}, - [{'==', {element, 2, '$1'}, LServer}], ['$1']}]). + Mod = get_sm_backend(), + [S#session.usr || S <- Mod:get_sessions(LServer)]. -spec get_all_pids() -> [pid()]. get_all_pids() -> - mnesia:dirty_select( - session, - ets:fun2ms( - fun(#session{sid = {_, Pid}}) -> - Pid - end)). + Mod = get_sm_backend(), + [element(2, S#session.sid) || S <- Mod:get_sessions()]. get_vh_session_number(Server) -> LServer = jlib:nameprep(Server), - Query = mnesia:dirty_select( - session_counter, - [{#session_counter{vhost = LServer, count = '$1'}, - [], - ['$1']}]), - case Query of - [Count] -> - Count; - _ -> 0 - end. + Mod = get_sm_backend(), + length(Mod:get_sessions(LServer)). register_iq_handler(Host, XMLNS, Module, Fun) -> ejabberd_sm ! @@ -343,18 +316,8 @@ unregister_iq_handler(Host, XMLNS) -> %% Description: Initiates the server %%-------------------------------------------------------------------- init([]) -> - update_tables(), - mnesia:create_table(session, - [{ram_copies, [node()]}, - {attributes, record_info(fields, session)}]), - mnesia:create_table(session_counter, - [{ram_copies, [node()]}, - {attributes, record_info(fields, session_counter)}]), - mnesia:add_table_index(session, usr), - mnesia:add_table_index(session, us), - mnesia:add_table_copy(session, node(), ram_copies), - mnesia:add_table_copy(session_counter, node(), ram_copies), - mnesia:subscribe(system), + Mod = get_sm_backend(), + Mod:init(), ets:new(sm_iqtable, [named_table]), lists:foreach( fun(Host) -> @@ -366,7 +329,6 @@ init([]) -> ejabberd_sm, disconnect_removed_user, 100) end, ?MYHOSTS), ejabberd_commands:register_commands(commands()), - {ok, #state{}}. %%-------------------------------------------------------------------- @@ -404,9 +366,6 @@ handle_info({route, From, To, Packet}, State) -> ok end, {noreply, State}; -handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> - recount_session_table(Node), - {noreply, State}; handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) -> ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function}), {noreply, State}; @@ -454,38 +413,9 @@ set_session(SID, User, Server, Resource, Priority, Info) -> LResource = jlib:resourceprep(Resource), US = {LUser, LServer}, USR = {LUser, LServer, LResource}, - F = fun () -> - mnesia:write(#session{sid = SID, usr = USR, us = US, - priority = Priority, info = Info}) - end, - mnesia:sync_dirty(F). - -%% Recalculates alive sessions when Node goes down -%% and updates session and session_counter tables -recount_session_table(Node) -> - F = fun() -> - Es = mnesia:select( - session, - [{#session{sid = {'_', '$1'}, _ = '_'}, - [{'==', {node, '$1'}, Node}], - ['$_']}]), - lists:foreach(fun(E) -> - mnesia:delete({session, E#session.sid}) - end, Es), - %% reset session_counter table with active sessions - mnesia:clear_table(session_counter), - lists:foreach(fun(Server) -> - LServer = jlib:nameprep(Server), - Hs = mnesia:select(session, - [{#session{usr = '$1', _ = '_'}, - [{'==', {element, 2, '$1'}, LServer}], - ['$1']}]), - mnesia:write( - #session_counter{vhost = LServer, - count = length(Hs)}) - end, ?MYHOSTS) - end, - mnesia:async_dirty(F). + Mod = get_sm_backend(), + Mod:set_session(#session{sid = SID, usr = USR, us = US, + priority = Priority, info = Info}). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -499,8 +429,9 @@ do_route(From, To, {broadcast, _} = Packet) -> end, get_user_resources(To#jid.user, To#jid.server)); _ -> - USR = jlib:jid_tolower(To), - case mnesia:dirty_index_read(session, USR, #session.usr) of + {U, S, R} = jlib:jid_tolower(To), + Mod = get_sm_backend(), + case Mod:get_sessions(U, S, R) of [] -> ?DEBUG("packet dropped~n", []); Ss -> @@ -589,9 +520,8 @@ do_route(From, To, #xmlel{} = Packet) -> _ -> ok end; _ -> - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, #session.usr) - of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, LResource) of [] -> case Name of <<"message">> -> route_message(From, To, Packet); @@ -646,10 +576,9 @@ route_message(From, To, Packet) -> when is_integer(Priority), Priority >= 0 -> lists:foreach(fun ({P, R}) when P == Priority -> LResource = jlib:resourceprep(R), - USR = {LUser, LServer, LResource}, - case mnesia:dirty_index_read(session, USR, - #session.usr) - of + Mod = get_sm_backend(), + case Mod:get_sessions(LUser, LServer, + LResource) of [] -> ok; % Race condition Ss -> @@ -730,17 +659,15 @@ is_existing_resource(LUser, LServer, LResource) -> [] /= get_resource_sessions(LUser, LServer, LResource). get_resource_sessions(User, Server, Resource) -> - USR = {jlib:nodeprep(User), jlib:nameprep(Server), - jlib:resourceprep(Resource)}, - mnesia:dirty_select(session, - [{#session{sid = '$1', usr = USR, _ = '_'}, [], - ['$1']}]). + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + LResource = jlib:resourceprep(Resource), + Mod = get_sm_backend(), + [S#session.sid || S <- Mod:get_sessions(LUser, LServer, LResource)]. check_max_sessions(LUser, LServer) -> - SIDs = mnesia:dirty_select(session, - [{#session{sid = '$1', us = {LUser, LServer}, - _ = '_'}, - [], ['$1']}]), + Mod = get_sm_backend(), + SIDs = [S#session.sid || S <- Mod:get_sessions(LUser, LServer)], MaxSessions = get_max_user_sessions(LUser, LServer), if length(SIDs) =< MaxSessions -> ok; true -> {_, Pid} = lists:min(SIDs), Pid ! replaced @@ -790,17 +717,23 @@ process_iq(From, To, Packet) -> -spec force_update_presence({binary(), binary()}) -> any(). -force_update_presence({LUser, _LServer} = US) -> - case catch mnesia:dirty_index_read(session, US, - #session.us) - of - {'EXIT', _Reason} -> ok; - Ss -> - lists:foreach(fun (#session{sid = {_, Pid}}) -> - Pid ! {force_update_presence, LUser} - end, - Ss) - end. +force_update_presence({LUser, LServer}) -> + Mod = get_sm_backend(), + Ss = Mod:get_sessions(LUser, LServer), + lists:foreach(fun (#session{sid = {_, Pid}}) -> + Pid ! {force_update_presence, LUser} + end, + Ss). + +-spec get_sm_backend() -> module(). + +get_sm_backend() -> + DBType = ejabberd_config:get_option(sm_db_type, + fun(mnesia) -> mnesia; + (internal) -> mnesia; + (odbc) -> odbc + end, mnesia), + list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% ejabberd commands @@ -852,29 +785,3 @@ kick_user(User, Server) -> PID ! kick end, Resources), length(Resources). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%% Update Mnesia tables - -update_tables() -> - case catch mnesia:table_info(session, attributes) of - [ur, user, node] -> mnesia:delete_table(session); - [ur, user, pid] -> mnesia:delete_table(session); - [usr, us, pid] -> mnesia:delete_table(session); - [usr, us, sid, priority, info] -> mnesia:delete_table(session); - [sid, usr, us, priority] -> - mnesia:delete_table(session); - [sid, usr, us, priority, info] -> ok; - {'EXIT', _} -> ok - end, - case lists:member(presence, mnesia:system_info(tables)) - of - true -> mnesia:delete_table(presence); - false -> ok - end, - case lists:member(local_session, mnesia:system_info(tables)) of - true -> - mnesia:delete_table(local_session); - false -> - ok - end. diff --git a/src/ejabberd_sm_mnesia.erl b/src/ejabberd_sm_mnesia.erl new file mode 100644 index 000000000..59a6c64f6 --- /dev/null +++ b/src/ejabberd_sm_mnesia.erl @@ -0,0 +1,148 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% @copyright (C) 2015, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 9 Mar 2015 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%%------------------------------------------------------------------- +-module(ejabberd_sm_mnesia). + +-behaviour(gen_server). +-behaviour(ejabberd_sm). + +%% API +-export([init/0, + get_session/2, + set_session/1, + delete_session/2, + get_sessions/0, + get_sessions/1, + get_sessions/2, + get_sessions/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("ejabberd.hrl"). +-include("ejabberd_sm.hrl"). +-include("jlib.hrl"). +-include_lib("stdlib/include/ms_transform.hrl"). + +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec init() -> ok | {error, any()}. +init() -> + case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of + {ok, _Pid} -> + ok; + Err -> + Err + end. + +-spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}. +get_session(_LServer, SID) -> + case mnesia:dirty_read(session, SID) of + [] -> + {error, notfound}; + [Session] -> + {ok, Session} + end. + +-spec set_session(#session{}) -> ok. +set_session(Session) -> + mnesia:dirty_write(Session). + +-spec delete_session(binary(), sid()) -> ok. +delete_session(_LServer, SID) -> + mnesia:dirty_delete(session, SID). + +-spec get_sessions() -> [#session{}]. +get_sessions() -> + ets:tab2list(session). + +-spec get_sessions(binary()) -> [#session{}]. +get_sessions(LServer) -> + mnesia:dirty_select(session, + [{#session{usr = '$1', _ = '_'}, + [{'==', {element, 2, '$1'}, LServer}], ['$_']}]). + +-spec get_sessions(binary(), binary()) -> [#session{}]. +get_sessions(LUser, LServer) -> + mnesia:dirty_index_read(session, {LUser, LServer}, #session.us). + +-spec get_sessions(binary(), binary(), binary()) -> [#session{}]. +get_sessions(LUser, LServer, LResource) -> + mnesia:dirty_index_read(session, {LUser, LServer, LResource}, #session.usr). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== +init([]) -> + update_tables(), + mnesia:create_table(session, + [{ram_copies, [node()]}, + {attributes, record_info(fields, session)}]), + mnesia:create_table(session_counter, + [{ram_copies, [node()]}, + {attributes, record_info(fields, session_counter)}]), + mnesia:add_table_index(session, usr), + mnesia:add_table_index(session, us), + mnesia:add_table_copy(session, node(), ram_copies), + mnesia:add_table_copy(session_counter, node(), ram_copies), + mnesia:subscribe(system), + {ok, #state{}}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> + ets:select_delete( + session, + ets:fun2ms( + fun(#session{sid = {_, Pid}}) -> + node(Pid) == Node + end)), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +update_tables() -> + case catch mnesia:table_info(session, attributes) of + [ur, user, node] -> mnesia:delete_table(session); + [ur, user, pid] -> mnesia:delete_table(session); + [usr, us, pid] -> mnesia:delete_table(session); + [usr, us, sid, priority, info] -> mnesia:delete_table(session); + [sid, usr, us, priority] -> + mnesia:delete_table(session); + [sid, usr, us, priority, info] -> ok; + {'EXIT', _} -> ok + end, + case lists:member(presence, mnesia:system_info(tables)) + of + true -> mnesia:delete_table(presence); + false -> ok + end, + case lists:member(local_session, mnesia:system_info(tables)) of + true -> + mnesia:delete_table(local_session); + false -> + ok + end. diff --git a/src/ejabberd_sm_odbc.erl b/src/ejabberd_sm_odbc.erl new file mode 100644 index 000000000..55bbc74fb --- /dev/null +++ b/src/ejabberd_sm_odbc.erl @@ -0,0 +1,179 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% @copyright (C) 2015, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 9 Mar 2015 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%%------------------------------------------------------------------- +-module(ejabberd_sm_odbc). + +-behaviour(ejabberd_sm). + +%% API +-export([init/0, + get_session/2, + set_session/1, + delete_session/2, + get_sessions/0, + get_sessions/1, + get_sessions/2, + get_sessions/3]). + +-include("ejabberd.hrl"). +-include("ejabberd_sm.hrl"). +-include("logger.hrl"). +-include("jlib.hrl"). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec init() -> ok | {error, any()}. +init() -> + Node = ejabberd_odbc:escape(jlib:atom_to_binary(node())), + lists:foldl( + fun(Host, ok) -> + case ejabberd_odbc:sql_query( + Host, [<<"delete from sm where node='">>, Node, <<"'">>]) of + {updated, _} -> + ok; + Err -> + ?ERROR_MSG("failed to clean 'sm' table: ~p", [Err]), + Err + end; + (_, Err) -> + Err + end, ok, ?MYHOSTS). + +-spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}. +get_session(LServer, {Now, Pid} = SID) -> + Host = ejabberd_odbc:escape(LServer), + PidS = list_to_binary(erlang:pid_to_list(Pid)), + TS = now_to_timestamp(Now), + case ejabberd_odbc:sql_query( + Host, [<<"select username, resource, priority, info from sm ">>, + <<"where usec='">>, TS, <<"' and pid='">>, PidS, <<"'">>]) of + {selected, _, [[User, Resource, Priority, Info]|_]} -> + {ok, #session{sid = SID, us = {User, Resource}, + usr = {User, Resource, LServer}, + priority = dec_priority(Priority), + info = ejabberd_odbc:decode_term(Info)}}; + {selected, _, []} -> + {error, notfound} + end. + +set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R}, + priority = Priority, info = Info}) -> + Username = ejabberd_odbc:escape(U), + Resource = ejabberd_odbc:escape(R), + InfoS = ejabberd_odbc:encode_term(Info), + PrioS = enc_priority(Priority), + TS = now_to_timestamp(Now), + PidS = list_to_binary(erlang:pid_to_list(Pid)), + Node = ejabberd_odbc:escape(jlib:atom_to_binary(node(Pid))), + case odbc_queries:update( + LServer, + <<"sm">>, + [<<"usec">>, <<"pid">>, <<"node">>, <<"username">>, + <<"resource">>, <<"priority">>, <<"info">>], + [TS, PidS, Node, Username, Resource, PrioS, InfoS], + [<<"usec='">>, TS, <<"' and pid='">>, PidS, <<"'">>]) of + ok -> + ok; + Err -> + ?ERROR_MSG("failed to update 'sm' table: ~p", [Err]) + end. + +delete_session(LServer, {Now, Pid}) -> + TS = now_to_timestamp(Now), + PidS = list_to_binary(erlang:pid_to_list(Pid)), + case ejabberd_odbc:sql_query( + LServer, [<<"delete from sm where usec='">>, + TS, <<"' and pid='">>, PidS, <<"'">>]) of + {updated, _} -> + ok; + Err -> + ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err]) + end. + +get_sessions() -> + lists:flatmap( + fun(LServer) -> + get_sessions(LServer) + end, ?MYHOSTS). + +get_sessions(LServer) -> + case ejabberd_odbc:sql_query( + LServer, [<<"select usec, pid, username, ">>, + <<"resource, priority, info from sm">>]) of + {selected, _, Rows} -> + [row_to_session(LServer, Row) || Row <- Rows]; + Err -> + ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]), + [] + end. + +get_sessions(LUser, LServer) -> + Username = ejabberd_odbc:escape(LUser), + case ejabberd_odbc:sql_query( + LServer, [<<"select usec, pid, username, ">>, + <<"resource, priority, info from sm where ">>, + <<"username='">>, Username, <<"'">>]) of + {selected, _, Rows} -> + [row_to_session(LServer, Row) || Row <- Rows]; + Err -> + ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]), + [] + end. + +get_sessions(LUser, LServer, LResource) -> + Username = ejabberd_odbc:escape(LUser), + Resource = ejabberd_odbc:escape(LResource), + case ejabberd_odbc:sql_query( + LServer, [<<"select usec, pid, username, ">>, + <<"resource, priority, info from sm where ">>, + <<"username='">>, Username, <<"' and resource='">>, + Resource, <<"'">>]) of + {selected, _, Rows} -> + [row_to_session(LServer, Row) || Row <- Rows]; + Err -> + ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]), + [] + end. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +now_to_timestamp({MSec, Sec, USec}) -> + jlib:integer_to_binary((MSec * 1000000 + Sec) * 1000000 + USec). + +timestamp_to_now(TS) -> + I = jlib:binary_to_integer(TS), + Head = I div 1000000, + USec = I rem 1000000, + MSec = Head div 1000000, + Sec = Head div 1000000, + {MSec, Sec, USec}. + +dec_priority(Prio) -> + case catch jlib:binary_to_integer(Prio) of + {'EXIT', _} -> + undefined; + Int -> + Int + end. + +enc_priority(undefined) -> + <<"">>; +enc_priority(Int) when is_integer(Int) -> + jlib:integer_to_binary(Int). + +row_to_session(LServer, [USec, PidS, User, Resource, PrioS, InfoS]) -> + Now = timestamp_to_now(USec), + Pid = erlang:list_to_pid(binary_to_list(PidS)), + Priority = dec_priority(PrioS), + Info = ejabberd_odbc:decode_term(InfoS), + #session{sid = {Now, Pid}, us = {User, LServer}, + usr = {User, LServer, Resource}, + priority = Priority, + info = Info}. diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 35c79f429..ecedfa502 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -62,13 +62,6 @@ init([]) -> brutal_kill, worker, [ejabberd_router]}, - SM = - {ejabberd_sm, - {ejabberd_sm, start_link, []}, - permanent, - brutal_kill, - worker, - [ejabberd_sm]}, S2S = {ejabberd_s2s, {ejabberd_s2s, start_link, []}, @@ -173,7 +166,6 @@ init([]) -> NodeGroups, SystemMonitor, Router, - SM, S2S, Local, Captcha, diff --git a/src/odbc_queries.erl b/src/odbc_queries.erl index 1fa16b896..f2771e52f 100644 --- a/src/odbc_queries.erl +++ b/src/odbc_queries.erl @@ -27,7 +27,7 @@ -author("mremond@process-one.net"). --export([get_db_type/0, update_t/4, sql_transaction/2, +-export([get_db_type/0, update/5, update_t/4, sql_transaction/2, get_last/2, set_last_t/4, del_last/2, get_password/2, set_password_t/3, add_user/3, del_user/2, del_user_return_password/3, list_users/1, list_users/2, |