diff options
Diffstat (limited to 'src/ejabberd_cluster.erl')
-rw-r--r-- | src/ejabberd_cluster.erl | 232 |
1 files changed, 143 insertions, 89 deletions
diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index aeae294b0..d9429a108 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -1,8 +1,6 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_cluster.erl -%%% Author : Christophe Romain <christophe.romain@process-one.net> -%%% Purpose : Ejabberd clustering management -%%% Created : 7 Oct 2015 by Christophe Romain <christophe.romain@process-one.net> +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% Created : 5 Jul 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net> %%% %%% %%% ejabberd, Copyright (C) 2002-2017 ProcessOne @@ -21,132 +19,188 @@ %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% -%%%---------------------------------------------------------------------- - +%%%------------------------------------------------------------------- -module(ejabberd_cluster). +-behaviour(ejabberd_config). +-behaviour(gen_server). %% API --export([get_nodes/0, call/4, multicall/3, multicall/4, - eval_everywhere/3, eval_everywhere/4]). --export([join/1, leave/1, get_known_nodes/0]). --export([node_id/0, get_node_by_id/1]). +-export([start_link/0, call/4, multicall/3, multicall/4, eval_everywhere/3, + eval_everywhere/4]). +%% Backend dependent API +-export([get_nodes/0, get_known_nodes/0, join/1, leave/1, subscribe/0, + subscribe/1, node_id/0, get_node_by_id/1, send/2, wait_for_sync/1]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-export([opt_type/1]). --include("ejabberd.hrl"). -include("logger.hrl"). --spec get_nodes() -> [node()]. +-type dst() :: pid() | atom() | {atom(), node()}. -get_nodes() -> - mnesia:system_info(running_db_nodes). +-callback init() -> ok | {error, any()}. +-callback get_nodes() -> [node()]. +-callback get_known_nodes() -> [node()]. +-callback join(node()) -> ok | {error, any()}. +-callback leave(node()) -> ok | {error, any()}. +-callback node_id() -> binary(). +-callback get_node_by_id(binary()) -> node(). +-callback send({atom(), node()}, term()) -> boolean(). +-callback wait_for_sync(timeout()) -> ok | {error, any()}. +-callback subscribe(dst()) -> ok. --spec get_known_nodes() -> [node()]. +-record(state, {}). -get_known_nodes() -> - lists:usort(mnesia:system_info(db_nodes) - ++ mnesia:system_info(extra_db_nodes)). +%%%=================================================================== +%%% API +%%%=================================================================== +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). -spec call(node(), module(), atom(), [any()]) -> any(). - call(Node, Module, Function, Args) -> - rpc:call(Node, Module, Function, Args, 5000). + rpc:call(Node, Module, Function, Args, rpc_timeout()). -spec multicall(module(), atom(), [any()]) -> {list(), [node()]}. - multicall(Module, Function, Args) -> multicall(get_nodes(), Module, Function, Args). -spec multicall([node()], module(), atom(), list()) -> {list(), [node()]}. - multicall(Nodes, Module, Function, Args) -> - rpc:multicall(Nodes, Module, Function, Args, 5000). + rpc:multicall(Nodes, Module, Function, Args, rpc_timeout()). -spec eval_everywhere(module(), atom(), [any()]) -> ok. - eval_everywhere(Module, Function, Args) -> eval_everywhere(get_nodes(), Module, Function, Args), ok. -spec eval_everywhere([node()], module(), atom(), [any()]) -> ok. - eval_everywhere(Nodes, Module, Function, Args) -> rpc:eval_everywhere(Nodes, Module, Function, Args), ok. --spec join(node()) -> ok | {error, any()}. +%%%=================================================================== +%%% Backend dependent API +%%%=================================================================== +-spec get_nodes() -> [node()]. +get_nodes() -> + Mod = get_mod(), + Mod:get_nodes(). + +-spec get_known_nodes() -> [node()]. +get_known_nodes() -> + Mod = get_mod(), + Mod:get_known_nodes(). +-spec join(node()) -> ok | {error, any()}. join(Node) -> - case {node(), net_adm:ping(Node)} of - {Node, _} -> - {error, {not_master, Node}}; - {_, pong} -> - application:stop(ejabberd), - application:stop(mnesia), - mnesia:delete_schema([node()]), - application:start(mnesia), - mnesia:change_config(extra_db_nodes, [Node]), - mnesia:change_table_copy_type(schema, node(), disc_copies), - spawn(fun() -> - lists:foreach(fun(Table) -> - Type = call(Node, mnesia, table_info, [Table, storage_type]), - mnesia:add_table_copy(Table, node(), Type) - end, mnesia:system_info(tables)--[schema]) - end), - application:start(ejabberd); - _ -> - {error, {no_ping, Node}} - end. + Mod = get_mod(), + Mod:join(Node). -spec leave(node()) -> ok | {error, any()}. - leave(Node) -> - case {node(), net_adm:ping(Node)} of - {Node, _} -> - Cluster = get_nodes()--[Node], - leave(Cluster, Node); - {_, pong} -> - rpc:call(Node, ?MODULE, leave, [Node], 10000); - {_, pang} -> - case mnesia:del_table_copy(schema, Node) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} - end - end. -leave([], Node) -> - {error, {no_cluster, Node}}; -leave([Master|_], Node) -> - application:stop(ejabberd), - application:stop(mnesia), - call(Master, mnesia, del_table_copy, [schema, Node]), - spawn(fun() -> - mnesia:delete_schema([node()]), - erlang:halt(0) - end), - ok. + Mod = get_mod(), + Mod:leave(Node). -spec node_id() -> binary(). node_id() -> - integer_to_binary(erlang:phash2(node())). + Mod = get_mod(), + Mod:node_id(). -spec get_node_by_id(binary()) -> node(). -get_node_by_id(Hash) -> - try binary_to_integer(Hash) of - I -> match_node_id(I) - catch _:_ -> - node() +get_node_by_id(ID) -> + Mod = get_mod(), + Mod:get_node_by_id(ID). + +-spec send(dst(), term()) -> boolean(). +send(Dst, Msg) -> + IsLocal = case Dst of + {_, Node} -> Node == node(); + Pid when is_pid(Pid) -> node(Pid) == node(); + Name when is_atom(Name) -> true; + _ -> false + end, + if IsLocal -> + erlang:send(Dst, Msg), + true; + true -> + Mod = get_mod(), + Mod:send(Dst, Msg) end. +-spec wait_for_sync(timeout()) -> ok | {error, any()}. +wait_for_sync(Timeout) -> + Mod = get_mod(), + Mod:wait_for_sync(Timeout). + +-spec subscribe() -> ok. +subscribe() -> + subscribe(self()). + +-spec subscribe(dst()) -> ok. +subscribe(Proc) -> + Mod = get_mod(), + Mod:subscribe(Proc). + +%%%=================================================================== +%%% gen_server API +%%%=================================================================== +init([]) -> + Ticktime = ejabberd_config:get_option(net_ticktime, 60), + Nodes = ejabberd_config:get_option(cluster_nodes, []), + net_kernel:set_net_ticktime(Ticktime), + lists:foreach(fun(Node) -> + net_kernel:connect_node(Node) + end, Nodes), + Mod = get_mod(), + case Mod:init() of + ok -> + Mod:subscribe(?MODULE), + {ok, #state{}}; + {error, Reason} -> + {stop, Reason} + end. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({node_up, Node}, State) -> + ?INFO_MSG("Node ~s has joined", [Node]), + {noreply, State}; +handle_info({node_down, Node}, State) -> + ?INFO_MSG("Node ~s has left", [Node]), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + %%%=================================================================== %%% Internal functions %%%=================================================================== --spec match_node_id(integer()) -> node(). -match_node_id(I) -> - match_node_id(I, get_nodes()). - --spec match_node_id(integer(), [node()]) -> node(). -match_node_id(I, [Node|Nodes]) -> - case erlang:phash2(Node) of - I -> Node; - _ -> match_node_id(I, Nodes) - end; -match_node_id(_I, []) -> - node(). +get_mod() -> + Backend = ejabberd_config:get_option(cluster_backend, mnesia), + list_to_atom("ejabberd_cluster_" ++ atom_to_list(Backend)). + +rpc_timeout() -> + timer:seconds(ejabberd_config:get_option(rpc_timeout, 5)). + +opt_type(net_ticktime) -> + fun (P) when is_integer(P), P > 0 -> P end; +opt_type(cluster_nodes) -> + fun (Ns) -> true = lists:all(fun is_atom/1, Ns), Ns end; +opt_type(rpc_timeout) -> + fun (T) when is_integer(T), T > 0 -> T end; +opt_type(cluster_backend) -> + fun (T) -> ejabberd_config:v_db(?MODULE, T) end; +opt_type(_) -> + [rpc_timeout, cluster_backend, cluster_nodes, net_ticktime]. |