aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_app.erl1
-rw-r--r--src/ejabberd_hooks.erl169
-rw-r--r--src/ejabberd_sm.erl261
-rw-r--r--src/ejabberd_sm_mnesia.erl148
-rw-r--r--src/ejabberd_sm_odbc.erl179
-rw-r--r--src/ejabberd_sup.erl8
-rw-r--r--src/odbc_queries.erl2
7 files changed, 508 insertions, 260 deletions
diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl
index 957aa5d46..15170daee 100644
--- a/src/ejabberd_app.erl
+++ b/src/ejabberd_app.erl
@@ -58,6 +58,7 @@ start(normal, _Args) ->
Sup = ejabberd_sup:start_link(),
ejabberd_rdbms:start(),
ejabberd_riak_sup:start(),
+ ejabberd_sm:start(),
ejabberd_auth:start(),
cyrsasl:start(),
% Profiling
diff --git a/src/ejabberd_hooks.erl b/src/ejabberd_hooks.erl
index c1cdefcb2..d136ba705 100644
--- a/src/ejabberd_hooks.erl
+++ b/src/ejabberd_hooks.erl
@@ -32,18 +32,21 @@
-export([start_link/0,
add/3,
add/4,
+ add/5,
add_dist/5,
+ add_dist/6,
delete/3,
delete/4,
- delete_dist/5,
- run/2,
- run_fold/3,
- add/5,
- add_dist/6,
delete/5,
+ delete_dist/5,
delete_dist/6,
+ run/2,
run/3,
- run_fold/4]).
+ run_fold/3,
+ run_fold/4,
+ get_handlers/2]).
+
+-export([delete_all_hooks/0]).
%% gen_server callbacks
-export([init/1,
@@ -53,13 +56,14 @@
handle_info/2,
terminate/2]).
--include("ejabberd.hrl").
-include("logger.hrl").
%% Timeout of 5 seconds in calls to distributed hooks
-define(TIMEOUT_DISTRIBUTED_HOOK, 5000).
-record(state, {}).
+-type local_hook() :: { Seq :: integer(), Module :: atom(), Function :: atom()}.
+-type distributed_hook() :: { Seq :: integer(), Node :: atom(), Module :: atom(), Function :: atom()}.
%%%----------------------------------------------------------------------
%%% API
@@ -67,13 +71,13 @@
start_link() ->
gen_server:start_link({local, ejabberd_hooks}, ejabberd_hooks, [], []).
--spec add(atom(), fun(), number()) -> any().
+-spec add(atom(), fun(), number()) -> ok.
%% @doc See add/4.
add(Hook, Function, Seq) when is_function(Function) ->
add(Hook, global, undefined, Function, Seq).
--spec add(atom(), binary() | atom(), fun() | atom() , number()) -> any().
+-spec add(atom(), HostOrModule :: binary() | atom(), fun() | atom() , number()) -> ok.
add(Hook, Host, Function, Seq) when is_function(Function) ->
add(Hook, Host, undefined, Function, Seq);
@@ -82,17 +86,17 @@ add(Hook, Host, Function, Seq) when is_function(Function) ->
add(Hook, Module, Function, Seq) ->
add(Hook, global, Module, Function, Seq).
--spec add(atom(), binary() | global, atom(), atom() | fun(), number()) -> any().
+-spec add(atom(), binary() | global, atom(), atom() | fun(), number()) -> ok.
add(Hook, Host, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {add, Hook, Host, Module, Function, Seq}).
--spec add_dist(atom(), atom(), atom(), atom() | fun(), number()) -> any().
+-spec add_dist(atom(), atom(), atom(), atom() | fun(), number()) -> ok.
add_dist(Hook, Node, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {add, Hook, global, Node, Module, Function, Seq}).
--spec add_dist(atom(), binary() | global, atom(), atom(), atom() | fun(), number()) -> any().
+-spec add_dist(atom(), binary() | global, atom(), atom(), atom() | fun(), number()) -> ok.
add_dist(Hook, Host, Node, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {add, Hook, Host, Node, Module, Function, Seq}).
@@ -128,6 +132,17 @@ delete_dist(Hook, Node, Module, Function, Seq) ->
delete_dist(Hook, Host, Node, Module, Function, Seq) ->
gen_server:call(ejabberd_hooks, {delete, Hook, Host, Node, Module, Function, Seq}).
+-spec delete_all_hooks() -> true.
+
+%% @doc Primarily for testing / instrumentation
+delete_all_hooks() ->
+ gen_server:call(ejabberd_hooks, {delete_all}).
+
+-spec get_handlers(atom(), binary() | global) -> [local_hook() | distributed_hook()].
+%% @doc Returns currently set handler for hook name
+get_handlers(Hookname, Host) ->
+ gen_server:call(ejabberd_hooks, {get_handlers, Hookname, Host}).
+
-spec run(atom(), list()) -> ok.
%% @doc Run the calls of this hook in order, don't care about function results.
@@ -190,65 +205,70 @@ init([]) ->
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call({add, Hook, Host, Module, Function, Seq}, _From, State) ->
- Reply = case ets:lookup(hooks, {Hook, Host}) of
- [{_, Ls}] ->
- El = {Seq, Module, Function},
- case lists:member(El, Ls) of
- true ->
- ok;
- false ->
- NewLs = lists:merge(Ls, [El]),
- ets:insert(hooks, {{Hook, Host}, NewLs}),
- ok
- end;
- [] ->
- NewLs = [{Seq, Module, Function}],
- ets:insert(hooks, {{Hook, Host}, NewLs}),
- ok
- end,
+ HookFormat = {Seq, Module, Function},
+ Reply = handle_add(Hook, Host, HookFormat),
{reply, Reply, State};
handle_call({add, Hook, Host, Node, Module, Function, Seq}, _From, State) ->
- Reply = case ets:lookup(hooks, {Hook, Host}) of
- [{_, Ls}] ->
- El = {Seq, Node, Module, Function},
- case lists:member(El, Ls) of
- true ->
- ok;
- false ->
- NewLs = lists:merge(Ls, [El]),
- ets:insert(hooks, {{Hook, Host}, NewLs}),
- ok
- end;
- [] ->
- NewLs = [{Seq, Node, Module, Function}],
- ets:insert(hooks, {{Hook, Host}, NewLs}),
- ok
- end,
+ HookFormat = {Seq, Node, Module, Function},
+ Reply = handle_add(Hook, Host, HookFormat),
{reply, Reply, State};
+
handle_call({delete, Hook, Host, Module, Function, Seq}, _From, State) ->
- Reply = case ets:lookup(hooks, {Hook, Host}) of
- [{_, Ls}] ->
- NewLs = lists:delete({Seq, Module, Function}, Ls),
- ets:insert(hooks, {{Hook, Host}, NewLs}),
- ok;
- [] ->
- ok
- end,
+ HookFormat = {Seq, Module, Function},
+ Reply = handle_delete(Hook, Host, HookFormat),
{reply, Reply, State};
handle_call({delete, Hook, Host, Node, Module, Function, Seq}, _From, State) ->
+ HookFormat = {Seq, Node, Module, Function},
+ Reply = handle_delete(Hook, Host, HookFormat),
+ {reply, Reply, State};
+
+handle_call({get_handlers, Hook, Host}, _From, State) ->
Reply = case ets:lookup(hooks, {Hook, Host}) of
- [{_, Ls}] ->
- NewLs = lists:delete({Seq, Node, Module, Function}, Ls),
- ets:insert(hooks, {{Hook, Host}, NewLs}),
- ok;
- [] ->
- ok
- end,
+ [{_, Handlers}] -> Handlers;
+ [] -> []
+ end,
+ {reply, Reply, State};
+
+handle_call({delete_all}, _From, State) ->
+ Reply = ets:delete_all_objects(hooks),
{reply, Reply, State};
+
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
+-spec handle_add(atom(), atom(), local_hook() | distributed_hook()) -> ok.
+%% in-memory storage operation: Handle adding hook in ETS table
+handle_add(Hook, Host, El) ->
+ case ets:lookup(hooks, {Hook, Host}) of
+ [{_, Ls}] ->
+ case lists:member(El, Ls) of
+ true ->
+ ok;
+ false ->
+ NewLs = lists:merge(Ls, [El]),
+ ets:insert(hooks, {{Hook, Host}, NewLs}),
+ ok
+ end;
+ [] ->
+ NewLs = [El],
+ ets:insert(hooks, {{Hook, Host}, NewLs}),
+ ok
+ end.
+
+
+-spec handle_delete(atom(), atom(), local_hook() | distributed_hook()) -> ok.
+%% in-memory storage operation: Handle deleting hook from ETS table
+handle_delete(Hook, Host, El) ->
+ case ets:lookup(hooks, {Hook, Host}) of
+ [{_, Ls}] ->
+ NewLs = lists:delete(El, Ls),
+ ets:insert(hooks, {{Hook, Host}, NewLs}),
+ ok;
+ [] ->
+ ok
+ end.
+
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
@@ -275,7 +295,6 @@ handle_info(_Info, State) ->
terminate(_Reason, _State) ->
ok.
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -283,9 +302,14 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%----------------------------------------------------------------------
+-spec run1([local_hook()|distributed_hook()], atom(), list()) -> ok.
+
run1([], _Hook, _Args) ->
ok;
+%% Run distributed hook on target node.
+%% It is not attempted again in case of failure. Next hook will be executed
run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) ->
+ %% MR: Should we have a safe rpc, like we have a safe apply or is bad_rpc enough ?
case rpc:call(Node, Module, Function, Args, ?TIMEOUT_DISTRIBUTED_HOOK) of
timeout ->
?ERROR_MSG("Timeout on RPC to ~p~nrunning hook: ~p",
@@ -305,15 +329,10 @@ run1([{_Seq, Node, Module, Function} | Ls], Hook, Args) ->
run1(Ls, Hook, Args)
end;
run1([{_Seq, Module, Function} | Ls], Hook, Args) ->
- Res = if is_function(Function) ->
- catch apply(Function, Args);
- true ->
- catch apply(Module, Function, Args)
- end,
+ Res = safe_apply(Module, Function, Args),
case Res of
{'EXIT', Reason} ->
- ?ERROR_MSG("~p~nrunning hook: ~p",
- [Reason, {Hook, Args}]),
+ ?ERROR_MSG("~p~nrunning hook: ~p", [Reason, {Hook, Args}]),
run1(Ls, Hook, Args);
stop ->
ok;
@@ -346,15 +365,10 @@ run_fold1([{_Seq, Node, Module, Function} | Ls], Hook, Val, Args) ->
run_fold1(Ls, Hook, NewVal, Args)
end;
run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) ->
- Res = if is_function(Function) ->
- catch apply(Function, [Val | Args]);
- true ->
- catch apply(Module, Function, [Val | Args])
- end,
+ Res = safe_apply(Module, Function, [Val | Args]),
case Res of
{'EXIT', Reason} ->
- ?ERROR_MSG("~p~nrunning hook: ~p",
- [Reason, {Hook, Args}]),
+ ?ERROR_MSG("~p~nrunning hook: ~p", [Reason, {Hook, Args}]),
run_fold1(Ls, Hook, Val, Args);
stop ->
stopped;
@@ -363,3 +377,10 @@ run_fold1([{_Seq, Module, Function} | Ls], Hook, Val, Args) ->
NewVal ->
run_fold1(Ls, Hook, NewVal, Args)
end.
+
+safe_apply(Module, Function, Args) ->
+ if is_function(Function) ->
+ catch apply(Function, Args);
+ true ->
+ catch apply(Module, Function, Args)
+ end.
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index b3a46ba2d..67a82d024 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -30,7 +30,8 @@
-behaviour(gen_server).
%% API
--export([start_link/0,
+-export([start/0,
+ start_link/0,
route/3,
open_session/5,
open_session/6,
@@ -73,11 +74,18 @@
-include("jlib.hrl").
-include("ejabberd_commands.hrl").
--include_lib("stdlib/include/ms_transform.hrl").
-include("mod_privacy.hrl").
+-include("ejabberd_sm.hrl").
+
+-callback init() -> ok | {error, any()}.
+-callback get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}.
+-callback set_session(#session{}) -> ok.
+-callback delete_session(binary(), sid()) -> ok.
+-callback get_sessions() -> [#session{}].
+-callback get_sessions(binary()) -> [#session{}].
+-callback get_sessions(binary(), binary()) -> [#session{}].
+-callback get_sessions(binary(), binary(), binary()) -> [#session{}].
--record(session, {sid, usr, us, priority, info}).
--record(session_counter, {vhost, count}).
-record(state, {}).
%% default value for the maximum number of user connections
@@ -90,14 +98,13 @@
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
--type sid() :: {erlang:timestamp(), pid()}.
--type ip() :: {inet:ip_address(), inet:port_number()} | undefined.
--type info() :: [{conn, atom()} | {ip, ip()} | {node, atom()}
- | {oor, boolean()} | {auth_module, atom()}].
--type prio() :: undefined | integer().
-
-export_type([sid/0]).
+start() ->
+ ChildSpec = {?MODULE, {?MODULE, start_link, []},
+ transient, 1000, worker, [?MODULE]},
+ supervisor:start_child(ejabberd_sup, ChildSpec).
+
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [],
[]).
@@ -116,8 +123,6 @@ route(From, To, Packet) ->
open_session(SID, User, Server, Resource, Priority, Info) ->
set_session(SID, User, Server, Resource, Priority, Info),
- mnesia:dirty_update_counter(session_counter,
- jlib:nameprep(Server), 1),
check_for_sessions_to_replace(User, Server, Resource),
JID = jlib:make_jid(User, Server, Resource),
ejabberd_hooks:run(sm_register_connection_hook,
@@ -131,16 +136,13 @@ open_session(SID, User, Server, Resource, Info) ->
-spec close_session(sid(), binary(), binary(), binary()) -> ok.
close_session(SID, User, Server, Resource) ->
- Info = case mnesia:dirty_read({session, SID}) of
- [] -> [];
- [#session{info=I}] -> I
- end,
- F = fun() ->
- mnesia:delete({session, SID}),
- mnesia:dirty_update_counter(session_counter,
- jlib:nameprep(Server), -1)
- end,
- mnesia:sync_dirty(F),
+ Mod = get_sm_backend(),
+ LServer = jlib:nameprep(Server),
+ Info = case Mod:get_session(LServer, SID) of
+ {ok, #session{info = I}} -> I;
+ {error, notfound} -> []
+ end,
+ Mod:delete_session(LServer, SID),
JID = jlib:make_jid(User, Server, Resource),
ejabberd_hooks:run(sm_remove_connection_hook,
JID#jid.lserver, [SID, JID, Info]).
@@ -169,27 +171,17 @@ disconnect_removed_user(User, Server) ->
get_user_resources(User, Server) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
- US = {LUser, LServer},
- case catch mnesia:dirty_index_read(session, US, #session.us) of
- {'EXIT', _Reason} ->
- [];
- Ss ->
- [element(3, S#session.usr) || S <- clean_session_list(Ss)]
- end.
+ Mod = get_sm_backend(),
+ Ss = Mod:get_sessions(LUser, LServer),
+ [element(3, S#session.usr) || S <- clean_session_list(Ss)].
-spec get_user_present_resources(binary(), binary()) -> [tuple()].
get_user_present_resources(LUser, LServer) ->
- US = {LUser, LServer},
- case catch mnesia:dirty_index_read(session, US,
- #session.us)
- of
- {'EXIT', _Reason} -> [];
- Ss ->
- [{S#session.priority, element(3, S#session.usr)}
- || S <- clean_session_list(Ss),
- is_integer(S#session.priority)]
- end.
+ Mod = get_sm_backend(),
+ Ss = Mod:get_sessions(LUser, LServer),
+ [{S#session.priority, element(3, S#session.usr)}
+ || S <- clean_session_list(Ss), is_integer(S#session.priority)].
-spec get_user_ip(binary(), binary(), binary()) -> ip().
@@ -197,8 +189,8 @@ get_user_ip(User, Server, Resource) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
- USR = {LUser, LServer, LResource},
- case mnesia:dirty_index_read(session, USR, #session.usr) of
+ Mod = get_sm_backend(),
+ case Mod:get_sessions(LUser, LServer, LResource) of
[] ->
undefined;
Ss ->
@@ -212,8 +204,8 @@ get_user_info(User, Server, Resource) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
- USR = {LUser, LServer, LResource},
- case mnesia:dirty_index_read(session, USR, #session.usr) of
+ Mod = get_sm_backend(),
+ case Mod:get_sessions(LUser, LServer, LResource) of
[] ->
offline;
Ss ->
@@ -262,8 +254,8 @@ get_session_pid(User, Server, Resource) ->
LUser = jlib:nodeprep(User),
LServer = jlib:nameprep(Server),
LResource = jlib:resourceprep(Resource),
- USR = {LUser, LServer, LResource},
- case catch mnesia:dirty_index_read(session, USR, #session.usr) of
+ Mod = get_sm_backend(),
+ case Mod:get_sessions(LUser, LServer, LResource) of
[#session{sid = {_, Pid}}] -> Pid;
_ -> none
end.
@@ -271,49 +263,30 @@ get_session_pid(User, Server, Resource) ->
-spec dirty_get_sessions_list() -> [ljid()].
dirty_get_sessions_list() ->
- mnesia:dirty_select(
- session,
- [{#session{usr = '$1', _ = '_'},
- [],
- ['$1']}]).
+ Mod = get_sm_backend(),
+ [S#session.usr || S <- Mod:get_sessions()].
dirty_get_my_sessions_list() ->
- mnesia:dirty_select(
- session,
- [{#session{sid = {'_', '$1'}, _ = '_'},
- [{'==', {node, '$1'}, node()}],
- ['$_']}]).
+ Mod = get_sm_backend(),
+ [S || S <- Mod:get_sessions(), node(element(2, S#session.sid)) == node()].
-spec get_vh_session_list(binary()) -> [ljid()].
get_vh_session_list(Server) ->
LServer = jlib:nameprep(Server),
- mnesia:dirty_select(session,
- [{#session{usr = '$1', _ = '_'},
- [{'==', {element, 2, '$1'}, LServer}], ['$1']}]).
+ Mod = get_sm_backend(),
+ [S#session.usr || S <- Mod:get_sessions(LServer)].
-spec get_all_pids() -> [pid()].
get_all_pids() ->
- mnesia:dirty_select(
- session,
- ets:fun2ms(
- fun(#session{sid = {_, Pid}}) ->
- Pid
- end)).
+ Mod = get_sm_backend(),
+ [element(2, S#session.sid) || S <- Mod:get_sessions()].
get_vh_session_number(Server) ->
LServer = jlib:nameprep(Server),
- Query = mnesia:dirty_select(
- session_counter,
- [{#session_counter{vhost = LServer, count = '$1'},
- [],
- ['$1']}]),
- case Query of
- [Count] ->
- Count;
- _ -> 0
- end.
+ Mod = get_sm_backend(),
+ length(Mod:get_sessions(LServer)).
register_iq_handler(Host, XMLNS, Module, Fun) ->
ejabberd_sm !
@@ -343,18 +316,8 @@ unregister_iq_handler(Host, XMLNS) ->
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
- update_tables(),
- mnesia:create_table(session,
- [{ram_copies, [node()]},
- {attributes, record_info(fields, session)}]),
- mnesia:create_table(session_counter,
- [{ram_copies, [node()]},
- {attributes, record_info(fields, session_counter)}]),
- mnesia:add_table_index(session, usr),
- mnesia:add_table_index(session, us),
- mnesia:add_table_copy(session, node(), ram_copies),
- mnesia:add_table_copy(session_counter, node(), ram_copies),
- mnesia:subscribe(system),
+ Mod = get_sm_backend(),
+ Mod:init(),
ets:new(sm_iqtable, [named_table]),
lists:foreach(
fun(Host) ->
@@ -366,7 +329,6 @@ init([]) ->
ejabberd_sm, disconnect_removed_user, 100)
end, ?MYHOSTS),
ejabberd_commands:register_commands(commands()),
-
{ok, #state{}}.
%%--------------------------------------------------------------------
@@ -404,9 +366,6 @@ handle_info({route, From, To, Packet}, State) ->
ok
end,
{noreply, State};
-handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
- recount_session_table(Node),
- {noreply, State};
handle_info({register_iq_handler, Host, XMLNS, Module, Function}, State) ->
ets:insert(sm_iqtable, {{XMLNS, Host}, Module, Function}),
{noreply, State};
@@ -454,38 +413,9 @@ set_session(SID, User, Server, Resource, Priority, Info) ->
LResource = jlib:resourceprep(Resource),
US = {LUser, LServer},
USR = {LUser, LServer, LResource},
- F = fun () ->
- mnesia:write(#session{sid = SID, usr = USR, us = US,
- priority = Priority, info = Info})
- end,
- mnesia:sync_dirty(F).
-
-%% Recalculates alive sessions when Node goes down
-%% and updates session and session_counter tables
-recount_session_table(Node) ->
- F = fun() ->
- Es = mnesia:select(
- session,
- [{#session{sid = {'_', '$1'}, _ = '_'},
- [{'==', {node, '$1'}, Node}],
- ['$_']}]),
- lists:foreach(fun(E) ->
- mnesia:delete({session, E#session.sid})
- end, Es),
- %% reset session_counter table with active sessions
- mnesia:clear_table(session_counter),
- lists:foreach(fun(Server) ->
- LServer = jlib:nameprep(Server),
- Hs = mnesia:select(session,
- [{#session{usr = '$1', _ = '_'},
- [{'==', {element, 2, '$1'}, LServer}],
- ['$1']}]),
- mnesia:write(
- #session_counter{vhost = LServer,
- count = length(Hs)})
- end, ?MYHOSTS)
- end,
- mnesia:async_dirty(F).
+ Mod = get_sm_backend(),
+ Mod:set_session(#session{sid = SID, usr = USR, us = US,
+ priority = Priority, info = Info}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -499,8 +429,9 @@ do_route(From, To, {broadcast, _} = Packet) ->
end,
get_user_resources(To#jid.user, To#jid.server));
_ ->
- USR = jlib:jid_tolower(To),
- case mnesia:dirty_index_read(session, USR, #session.usr) of
+ {U, S, R} = jlib:jid_tolower(To),
+ Mod = get_sm_backend(),
+ case Mod:get_sessions(U, S, R) of
[] ->
?DEBUG("packet dropped~n", []);
Ss ->
@@ -589,9 +520,8 @@ do_route(From, To, #xmlel{} = Packet) ->
_ -> ok
end;
_ ->
- USR = {LUser, LServer, LResource},
- case mnesia:dirty_index_read(session, USR, #session.usr)
- of
+ Mod = get_sm_backend(),
+ case Mod:get_sessions(LUser, LServer, LResource) of
[] ->
case Name of
<<"message">> -> route_message(From, To, Packet);
@@ -646,10 +576,9 @@ route_message(From, To, Packet) ->
when is_integer(Priority), Priority >= 0 ->
lists:foreach(fun ({P, R}) when P == Priority ->
LResource = jlib:resourceprep(R),
- USR = {LUser, LServer, LResource},
- case mnesia:dirty_index_read(session, USR,
- #session.usr)
- of
+ Mod = get_sm_backend(),
+ case Mod:get_sessions(LUser, LServer,
+ LResource) of
[] ->
ok; % Race condition
Ss ->
@@ -730,17 +659,15 @@ is_existing_resource(LUser, LServer, LResource) ->
[] /= get_resource_sessions(LUser, LServer, LResource).
get_resource_sessions(User, Server, Resource) ->
- USR = {jlib:nodeprep(User), jlib:nameprep(Server),
- jlib:resourceprep(Resource)},
- mnesia:dirty_select(session,
- [{#session{sid = '$1', usr = USR, _ = '_'}, [],
- ['$1']}]).
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ LResource = jlib:resourceprep(Resource),
+ Mod = get_sm_backend(),
+ [S#session.sid || S <- Mod:get_sessions(LUser, LServer, LResource)].
check_max_sessions(LUser, LServer) ->
- SIDs = mnesia:dirty_select(session,
- [{#session{sid = '$1', us = {LUser, LServer},
- _ = '_'},
- [], ['$1']}]),
+ Mod = get_sm_backend(),
+ SIDs = [S#session.sid || S <- Mod:get_sessions(LUser, LServer)],
MaxSessions = get_max_user_sessions(LUser, LServer),
if length(SIDs) =< MaxSessions -> ok;
true -> {_, Pid} = lists:min(SIDs), Pid ! replaced
@@ -790,17 +717,23 @@ process_iq(From, To, Packet) ->
-spec force_update_presence({binary(), binary()}) -> any().
-force_update_presence({LUser, _LServer} = US) ->
- case catch mnesia:dirty_index_read(session, US,
- #session.us)
- of
- {'EXIT', _Reason} -> ok;
- Ss ->
- lists:foreach(fun (#session{sid = {_, Pid}}) ->
- Pid ! {force_update_presence, LUser}
- end,
- Ss)
- end.
+force_update_presence({LUser, LServer}) ->
+ Mod = get_sm_backend(),
+ Ss = Mod:get_sessions(LUser, LServer),
+ lists:foreach(fun (#session{sid = {_, Pid}}) ->
+ Pid ! {force_update_presence, LUser}
+ end,
+ Ss).
+
+-spec get_sm_backend() -> module().
+
+get_sm_backend() ->
+ DBType = ejabberd_config:get_option(sm_db_type,
+ fun(mnesia) -> mnesia;
+ (internal) -> mnesia;
+ (odbc) -> odbc
+ end, mnesia),
+ list_to_atom("ejabberd_sm_" ++ atom_to_list(DBType)).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% ejabberd commands
@@ -852,29 +785,3 @@ kick_user(User, Server) ->
PID ! kick
end, Resources),
length(Resources).
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%%% Update Mnesia tables
-
-update_tables() ->
- case catch mnesia:table_info(session, attributes) of
- [ur, user, node] -> mnesia:delete_table(session);
- [ur, user, pid] -> mnesia:delete_table(session);
- [usr, us, pid] -> mnesia:delete_table(session);
- [usr, us, sid, priority, info] -> mnesia:delete_table(session);
- [sid, usr, us, priority] ->
- mnesia:delete_table(session);
- [sid, usr, us, priority, info] -> ok;
- {'EXIT', _} -> ok
- end,
- case lists:member(presence, mnesia:system_info(tables))
- of
- true -> mnesia:delete_table(presence);
- false -> ok
- end,
- case lists:member(local_session, mnesia:system_info(tables)) of
- true ->
- mnesia:delete_table(local_session);
- false ->
- ok
- end.
diff --git a/src/ejabberd_sm_mnesia.erl b/src/ejabberd_sm_mnesia.erl
new file mode 100644
index 000000000..59a6c64f6
--- /dev/null
+++ b/src/ejabberd_sm_mnesia.erl
@@ -0,0 +1,148 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2015, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 9 Mar 2015 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(ejabberd_sm_mnesia).
+
+-behaviour(gen_server).
+-behaviour(ejabberd_sm).
+
+%% API
+-export([init/0,
+ get_session/2,
+ set_session/1,
+ delete_session/2,
+ get_sessions/0,
+ get_sessions/1,
+ get_sessions/2,
+ get_sessions/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("ejabberd.hrl").
+-include("ejabberd_sm.hrl").
+-include("jlib.hrl").
+-include_lib("stdlib/include/ms_transform.hrl").
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec init() -> ok | {error, any()}.
+init() ->
+ case gen_server:start_link({local, ?MODULE}, ?MODULE, [], []) of
+ {ok, _Pid} ->
+ ok;
+ Err ->
+ Err
+ end.
+
+-spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}.
+get_session(_LServer, SID) ->
+ case mnesia:dirty_read(session, SID) of
+ [] ->
+ {error, notfound};
+ [Session] ->
+ {ok, Session}
+ end.
+
+-spec set_session(#session{}) -> ok.
+set_session(Session) ->
+ mnesia:dirty_write(Session).
+
+-spec delete_session(binary(), sid()) -> ok.
+delete_session(_LServer, SID) ->
+ mnesia:dirty_delete(session, SID).
+
+-spec get_sessions() -> [#session{}].
+get_sessions() ->
+ ets:tab2list(session).
+
+-spec get_sessions(binary()) -> [#session{}].
+get_sessions(LServer) ->
+ mnesia:dirty_select(session,
+ [{#session{usr = '$1', _ = '_'},
+ [{'==', {element, 2, '$1'}, LServer}], ['$_']}]).
+
+-spec get_sessions(binary(), binary()) -> [#session{}].
+get_sessions(LUser, LServer) ->
+ mnesia:dirty_index_read(session, {LUser, LServer}, #session.us).
+
+-spec get_sessions(binary(), binary(), binary()) -> [#session{}].
+get_sessions(LUser, LServer, LResource) ->
+ mnesia:dirty_index_read(session, {LUser, LServer, LResource}, #session.usr).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([]) ->
+ update_tables(),
+ mnesia:create_table(session,
+ [{ram_copies, [node()]},
+ {attributes, record_info(fields, session)}]),
+ mnesia:create_table(session_counter,
+ [{ram_copies, [node()]},
+ {attributes, record_info(fields, session_counter)}]),
+ mnesia:add_table_index(session, usr),
+ mnesia:add_table_index(session, us),
+ mnesia:add_table_copy(session, node(), ram_copies),
+ mnesia:add_table_copy(session_counter, node(), ram_copies),
+ mnesia:subscribe(system),
+ {ok, #state{}}.
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
+ ets:select_delete(
+ session,
+ ets:fun2ms(
+ fun(#session{sid = {_, Pid}}) ->
+ node(Pid) == Node
+ end)),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+update_tables() ->
+ case catch mnesia:table_info(session, attributes) of
+ [ur, user, node] -> mnesia:delete_table(session);
+ [ur, user, pid] -> mnesia:delete_table(session);
+ [usr, us, pid] -> mnesia:delete_table(session);
+ [usr, us, sid, priority, info] -> mnesia:delete_table(session);
+ [sid, usr, us, priority] ->
+ mnesia:delete_table(session);
+ [sid, usr, us, priority, info] -> ok;
+ {'EXIT', _} -> ok
+ end,
+ case lists:member(presence, mnesia:system_info(tables))
+ of
+ true -> mnesia:delete_table(presence);
+ false -> ok
+ end,
+ case lists:member(local_session, mnesia:system_info(tables)) of
+ true ->
+ mnesia:delete_table(local_session);
+ false ->
+ ok
+ end.
diff --git a/src/ejabberd_sm_odbc.erl b/src/ejabberd_sm_odbc.erl
new file mode 100644
index 000000000..55bbc74fb
--- /dev/null
+++ b/src/ejabberd_sm_odbc.erl
@@ -0,0 +1,179 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2015, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 9 Mar 2015 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(ejabberd_sm_odbc).
+
+-behaviour(ejabberd_sm).
+
+%% API
+-export([init/0,
+ get_session/2,
+ set_session/1,
+ delete_session/2,
+ get_sessions/0,
+ get_sessions/1,
+ get_sessions/2,
+ get_sessions/3]).
+
+-include("ejabberd.hrl").
+-include("ejabberd_sm.hrl").
+-include("logger.hrl").
+-include("jlib.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec init() -> ok | {error, any()}.
+init() ->
+ Node = ejabberd_odbc:escape(jlib:atom_to_binary(node())),
+ lists:foldl(
+ fun(Host, ok) ->
+ case ejabberd_odbc:sql_query(
+ Host, [<<"delete from sm where node='">>, Node, <<"'">>]) of
+ {updated, _} ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to clean 'sm' table: ~p", [Err]),
+ Err
+ end;
+ (_, Err) ->
+ Err
+ end, ok, ?MYHOSTS).
+
+-spec get_session(binary(), sid()) -> {ok, #session{}} | {error, notfound}.
+get_session(LServer, {Now, Pid} = SID) ->
+ Host = ejabberd_odbc:escape(LServer),
+ PidS = list_to_binary(erlang:pid_to_list(Pid)),
+ TS = now_to_timestamp(Now),
+ case ejabberd_odbc:sql_query(
+ Host, [<<"select username, resource, priority, info from sm ">>,
+ <<"where usec='">>, TS, <<"' and pid='">>, PidS, <<"'">>]) of
+ {selected, _, [[User, Resource, Priority, Info]|_]} ->
+ {ok, #session{sid = SID, us = {User, Resource},
+ usr = {User, Resource, LServer},
+ priority = dec_priority(Priority),
+ info = ejabberd_odbc:decode_term(Info)}};
+ {selected, _, []} ->
+ {error, notfound}
+ end.
+
+set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R},
+ priority = Priority, info = Info}) ->
+ Username = ejabberd_odbc:escape(U),
+ Resource = ejabberd_odbc:escape(R),
+ InfoS = ejabberd_odbc:encode_term(Info),
+ PrioS = enc_priority(Priority),
+ TS = now_to_timestamp(Now),
+ PidS = list_to_binary(erlang:pid_to_list(Pid)),
+ Node = ejabberd_odbc:escape(jlib:atom_to_binary(node(Pid))),
+ case odbc_queries:update(
+ LServer,
+ <<"sm">>,
+ [<<"usec">>, <<"pid">>, <<"node">>, <<"username">>,
+ <<"resource">>, <<"priority">>, <<"info">>],
+ [TS, PidS, Node, Username, Resource, PrioS, InfoS],
+ [<<"usec='">>, TS, <<"' and pid='">>, PidS, <<"'">>]) of
+ ok ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to update 'sm' table: ~p", [Err])
+ end.
+
+delete_session(LServer, {Now, Pid}) ->
+ TS = now_to_timestamp(Now),
+ PidS = list_to_binary(erlang:pid_to_list(Pid)),
+ case ejabberd_odbc:sql_query(
+ LServer, [<<"delete from sm where usec='">>,
+ TS, <<"' and pid='">>, PidS, <<"'">>]) of
+ {updated, _} ->
+ ok;
+ Err ->
+ ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err])
+ end.
+
+get_sessions() ->
+ lists:flatmap(
+ fun(LServer) ->
+ get_sessions(LServer)
+ end, ?MYHOSTS).
+
+get_sessions(LServer) ->
+ case ejabberd_odbc:sql_query(
+ LServer, [<<"select usec, pid, username, ">>,
+ <<"resource, priority, info from sm">>]) of
+ {selected, _, Rows} ->
+ [row_to_session(LServer, Row) || Row <- Rows];
+ Err ->
+ ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]),
+ []
+ end.
+
+get_sessions(LUser, LServer) ->
+ Username = ejabberd_odbc:escape(LUser),
+ case ejabberd_odbc:sql_query(
+ LServer, [<<"select usec, pid, username, ">>,
+ <<"resource, priority, info from sm where ">>,
+ <<"username='">>, Username, <<"'">>]) of
+ {selected, _, Rows} ->
+ [row_to_session(LServer, Row) || Row <- Rows];
+ Err ->
+ ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]),
+ []
+ end.
+
+get_sessions(LUser, LServer, LResource) ->
+ Username = ejabberd_odbc:escape(LUser),
+ Resource = ejabberd_odbc:escape(LResource),
+ case ejabberd_odbc:sql_query(
+ LServer, [<<"select usec, pid, username, ">>,
+ <<"resource, priority, info from sm where ">>,
+ <<"username='">>, Username, <<"' and resource='">>,
+ Resource, <<"'">>]) of
+ {selected, _, Rows} ->
+ [row_to_session(LServer, Row) || Row <- Rows];
+ Err ->
+ ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]),
+ []
+ end.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+now_to_timestamp({MSec, Sec, USec}) ->
+ jlib:integer_to_binary((MSec * 1000000 + Sec) * 1000000 + USec).
+
+timestamp_to_now(TS) ->
+ I = jlib:binary_to_integer(TS),
+ Head = I div 1000000,
+ USec = I rem 1000000,
+ MSec = Head div 1000000,
+ Sec = Head div 1000000,
+ {MSec, Sec, USec}.
+
+dec_priority(Prio) ->
+ case catch jlib:binary_to_integer(Prio) of
+ {'EXIT', _} ->
+ undefined;
+ Int ->
+ Int
+ end.
+
+enc_priority(undefined) ->
+ <<"">>;
+enc_priority(Int) when is_integer(Int) ->
+ jlib:integer_to_binary(Int).
+
+row_to_session(LServer, [USec, PidS, User, Resource, PrioS, InfoS]) ->
+ Now = timestamp_to_now(USec),
+ Pid = erlang:list_to_pid(binary_to_list(PidS)),
+ Priority = dec_priority(PrioS),
+ Info = ejabberd_odbc:decode_term(InfoS),
+ #session{sid = {Now, Pid}, us = {User, LServer},
+ usr = {User, LServer, Resource},
+ priority = Priority,
+ info = Info}.
diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl
index 35c79f429..ecedfa502 100644
--- a/src/ejabberd_sup.erl
+++ b/src/ejabberd_sup.erl
@@ -62,13 +62,6 @@ init([]) ->
brutal_kill,
worker,
[ejabberd_router]},
- SM =
- {ejabberd_sm,
- {ejabberd_sm, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [ejabberd_sm]},
S2S =
{ejabberd_s2s,
{ejabberd_s2s, start_link, []},
@@ -173,7 +166,6 @@ init([]) ->
NodeGroups,
SystemMonitor,
Router,
- SM,
S2S,
Local,
Captcha,
diff --git a/src/odbc_queries.erl b/src/odbc_queries.erl
index 1fa16b896..f2771e52f 100644
--- a/src/odbc_queries.erl
+++ b/src/odbc_queries.erl
@@ -27,7 +27,7 @@
-author("mremond@process-one.net").
--export([get_db_type/0, update_t/4, sql_transaction/2,
+-export([get_db_type/0, update/5, update_t/4, sql_transaction/2,
get_last/2, set_last_t/4, del_last/2, get_password/2,
set_password_t/3, add_user/3, del_user/2,
del_user_return_password/3, list_users/1, list_users/2,