diff options
Diffstat (limited to 'src/ejabberd_cluster.erl')
-rw-r--r-- | src/ejabberd_cluster.erl | 249 |
1 files changed, 186 insertions, 63 deletions
diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index 1e3f02a9e..b29beeb2e 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -1,11 +1,9 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_cluster.erl -%%% Author : Christophe Romain <christophe.romain@process-one.net> -%%% Purpose : Ejabberd clustering management -%%% Created : 7 Oct 2015 by Christophe Romain <christophe.romain@process-one.net> +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% Created : 5 Jul 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net> %%% %%% -%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% ejabberd, Copyright (C) 2002-2019 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -21,84 +19,209 @@ %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% -%%%---------------------------------------------------------------------- - +%%%------------------------------------------------------------------- -module(ejabberd_cluster). +-behaviour(gen_server). %% API --export([get_nodes/0, call/4, multicall/3, multicall/4]). --export([join/1, leave/1]). +-export([start_link/0, call/4, call/5, multicall/3, multicall/4, multicall/5, + eval_everywhere/3, eval_everywhere/4]). +%% Backend dependent API +-export([get_nodes/0, get_known_nodes/0, join/1, leave/1, subscribe/0, + subscribe/1, node_id/0, get_node_by_id/1, send/2, wait_for_sync/1]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +%% hooks +-export([set_ticktime/0]). --include("ejabberd.hrl"). -include("logger.hrl"). --spec get_nodes() -> [node()]. +-type dst() :: pid() | atom() | {atom(), node()}. -get_nodes() -> - mnesia:system_info(running_db_nodes). +-callback init() -> ok | {error, any()}. +-callback get_nodes() -> [node()]. +-callback get_known_nodes() -> [node()]. +-callback join(node()) -> ok | {error, any()}. +-callback leave(node()) -> ok | {error, any()}. +-callback node_id() -> binary(). +-callback get_node_by_id(binary()) -> node(). +-callback send({atom(), node()}, term()) -> boolean(). +-callback wait_for_sync(timeout()) -> ok | {error, any()}. +-callback subscribe(dst()) -> ok. --spec call(node(), module(), atom(), [any()]) -> any(). +-record(state, {}). + +%%%=================================================================== +%%% API +%%%=================================================================== +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +-spec call(node(), module(), atom(), [any()]) -> any(). call(Node, Module, Function, Args) -> - rpc:call(Node, Module, Function, Args, 5000). + call(Node, Module, Function, Args, rpc_timeout()). --spec multicall(module(), atom(), [any()]) -> {list(), [node()]}. +-spec call(node(), module(), atom(), [any()], timeout()) -> any(). +call(Node, Module, Function, Args, Timeout) -> + rpc:call(Node, Module, Function, Args, Timeout). +-spec multicall(module(), atom(), [any()]) -> {list(), [node()]}. multicall(Module, Function, Args) -> multicall(get_nodes(), Module, Function, Args). -spec multicall([node()], module(), atom(), list()) -> {list(), [node()]}. - multicall(Nodes, Module, Function, Args) -> - rpc:multicall(Nodes, Module, Function, Args, 5000). + multicall(Nodes, Module, Function, Args, rpc_timeout()). --spec join(node()) -> ok | {error, any()}. +-spec multicall([node()], module(), atom(), list(), timeout()) -> {list(), [node()]}. +multicall(Nodes, Module, Function, Args, Timeout) -> + rpc:multicall(Nodes, Module, Function, Args, Timeout). +-spec eval_everywhere(module(), atom(), [any()]) -> ok. +eval_everywhere(Module, Function, Args) -> + eval_everywhere(get_nodes(), Module, Function, Args), + ok. + +-spec eval_everywhere([node()], module(), atom(), [any()]) -> ok. +eval_everywhere(Nodes, Module, Function, Args) -> + rpc:eval_everywhere(Nodes, Module, Function, Args), + ok. + +%%%=================================================================== +%%% Backend dependent API +%%%=================================================================== +-spec get_nodes() -> [node()]. +get_nodes() -> + Mod = get_mod(), + Mod:get_nodes(). + +-spec get_known_nodes() -> [node()]. +get_known_nodes() -> + Mod = get_mod(), + Mod:get_known_nodes(). + +-spec join(node()) -> ok | {error, any()}. join(Node) -> - case {node(), net_adm:ping(Node)} of - {Node, _} -> - {error, {not_master, Node}}; - {_, pong} -> - application:stop(ejabberd), - application:stop(mnesia), - mnesia:delete_schema([node()]), - application:start(mnesia), - mnesia:change_config(extra_db_nodes, [Node]), - mnesia:change_table_copy_type(schema, node(), disc_copies), - spawn(fun() -> - lists:foreach(fun(Table) -> - Type = call(Node, mnesia, table_info, [Table, storage_type]), - mnesia:add_table_copy(Table, node(), Type) - end, mnesia:system_info(tables)--[schema]) - end), - application:start(ejabberd); - _ -> - {error, {no_ping, Node}} - end. + Mod = get_mod(), + Mod:join(Node). -spec leave(node()) -> ok | {error, any()}. - leave(Node) -> - case {node(), net_adm:ping(Node)} of - {Node, _} -> - Cluster = get_nodes()--[Node], - leave(Cluster, Node); - {_, pong} -> - rpc:call(Node, ?MODULE, leave, [Node], 10000); - {_, pang} -> - case mnesia:del_table_copy(schema, Node) of - {atomic, ok} -> ok; - {aborted, Reason} -> {error, Reason} - end + Mod = get_mod(), + Mod:leave(Node). + +-spec node_id() -> binary(). +node_id() -> + Mod = get_mod(), + Mod:node_id(). + +-spec get_node_by_id(binary()) -> node(). +get_node_by_id(ID) -> + Mod = get_mod(), + Mod:get_node_by_id(ID). + +%% Note that false positive returns are possible, while false negatives are not. +%% In other words: positive return value (i.e. 'true') doesn't guarantee +%% successful delivery, while negative return value ('false') means +%% the delivery has definitely failed. +-spec send(dst(), term()) -> boolean(). +send({Name, Node}, Msg) when Node == node() -> + send(Name, Msg); +send(undefined, _Msg) -> + false; +send(Name, Msg) when is_atom(Name) -> + send(whereis(Name), Msg); +send(Pid, Msg) when is_pid(Pid) andalso node(Pid) == node() -> + case erlang:is_process_alive(Pid) of + true -> + erlang:send(Pid, Msg), + true; + false -> + false + end; +send(Dst, Msg) -> + Mod = get_mod(), + Mod:send(Dst, Msg). + +-spec wait_for_sync(timeout()) -> ok | {error, any()}. +wait_for_sync(Timeout) -> + Mod = get_mod(), + Mod:wait_for_sync(Timeout). + +-spec subscribe() -> ok. +subscribe() -> + subscribe(self()). + +-spec subscribe(dst()) -> ok. +subscribe(Proc) -> + Mod = get_mod(), + Mod:subscribe(Proc). + +%%%=================================================================== +%%% Hooks +%%%=================================================================== +set_ticktime() -> + Ticktime = ejabberd_option:net_ticktime() div 1000, + case net_kernel:set_net_ticktime(Ticktime) of + {ongoing_change_to, Time} when Time /= Ticktime -> + ?ERROR_MSG("Failed to set new net_ticktime because " + "the net kernel is busy changing it to the " + "previously configured value. Please wait for " + "~B seconds and retry", [Time]); + _ -> + ok end. -leave([], Node) -> - {error, {no_cluster, Node}}; -leave([Master|_], Node) -> - application:stop(ejabberd), - application:stop(mnesia), - call(Master, mnesia, del_table_copy, [schema, Node]), - spawn(fun() -> - mnesia:delete_schema([node()]), - erlang:halt(0) - end), - ok. + +%%%=================================================================== +%%% gen_server API +%%%=================================================================== +init([]) -> + set_ticktime(), + Nodes = ejabberd_option:cluster_nodes(), + lists:foreach(fun(Node) -> + net_kernel:connect_node(Node) + end, Nodes), + Mod = get_mod(), + case Mod:init() of + ok -> + ejabberd_hooks:add(config_reloaded, ?MODULE, set_ticktime, 50), + Mod:subscribe(?MODULE), + {ok, #state{}}; + {error, Reason} -> + {stop, Reason} + end. + +handle_call(Request, From, State) -> + ?WARNING_MSG("Unexpected call from ~p: ~p", [From, Request]), + {noreply, State}. + +handle_cast(Msg, State) -> + ?WARNING_MSG("Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info({node_up, Node}, State) -> + ?INFO_MSG("Node ~ts has joined", [Node]), + {noreply, State}; +handle_info({node_down, Node}, State) -> + ?INFO_MSG("Node ~ts has left", [Node]), + {noreply, State}; +handle_info(Info, State) -> + ?WARNING_MSG("Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, _State) -> + ejabberd_hooks:delete(config_reloaded, ?MODULE, set_ticktime, 50). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +get_mod() -> + Backend = ejabberd_option:cluster_backend(), + list_to_existing_atom("ejabberd_cluster_" ++ atom_to_list(Backend)). + +rpc_timeout() -> + ejabberd_option:rpc_timeout(). |