aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_cluster.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_cluster.erl')
-rw-r--r--src/ejabberd_cluster.erl232
1 files changed, 143 insertions, 89 deletions
diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl
index aeae294b0..d9429a108 100644
--- a/src/ejabberd_cluster.erl
+++ b/src/ejabberd_cluster.erl
@@ -1,8 +1,6 @@
-%%%----------------------------------------------------------------------
-%%% File : ejabberd_cluster.erl
-%%% Author : Christophe Romain <christophe.romain@process-one.net>
-%%% Purpose : Ejabberd clustering management
-%%% Created : 7 Oct 2015 by Christophe Romain <christophe.romain@process-one.net>
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% Created : 5 Jul 2017 by Evgeny Khramtsov <ekhramtsov@process-one.net>
%%%
%%%
%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
@@ -21,132 +19,188 @@
%%% with this program; if not, write to the Free Software Foundation, Inc.,
%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
%%%
-%%%----------------------------------------------------------------------
-
+%%%-------------------------------------------------------------------
-module(ejabberd_cluster).
+-behaviour(ejabberd_config).
+-behaviour(gen_server).
%% API
--export([get_nodes/0, call/4, multicall/3, multicall/4,
- eval_everywhere/3, eval_everywhere/4]).
--export([join/1, leave/1, get_known_nodes/0]).
--export([node_id/0, get_node_by_id/1]).
+-export([start_link/0, call/4, multicall/3, multicall/4, eval_everywhere/3,
+ eval_everywhere/4]).
+%% Backend dependent API
+-export([get_nodes/0, get_known_nodes/0, join/1, leave/1, subscribe/0,
+ subscribe/1, node_id/0, get_node_by_id/1, send/2, wait_for_sync/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export([opt_type/1]).
--include("ejabberd.hrl").
-include("logger.hrl").
--spec get_nodes() -> [node()].
+-type dst() :: pid() | atom() | {atom(), node()}.
-get_nodes() ->
- mnesia:system_info(running_db_nodes).
+-callback init() -> ok | {error, any()}.
+-callback get_nodes() -> [node()].
+-callback get_known_nodes() -> [node()].
+-callback join(node()) -> ok | {error, any()}.
+-callback leave(node()) -> ok | {error, any()}.
+-callback node_id() -> binary().
+-callback get_node_by_id(binary()) -> node().
+-callback send({atom(), node()}, term()) -> boolean().
+-callback wait_for_sync(timeout()) -> ok | {error, any()}.
+-callback subscribe(dst()) -> ok.
--spec get_known_nodes() -> [node()].
+-record(state, {}).
-get_known_nodes() ->
- lists:usort(mnesia:system_info(db_nodes)
- ++ mnesia:system_info(extra_db_nodes)).
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec call(node(), module(), atom(), [any()]) -> any().
-
call(Node, Module, Function, Args) ->
- rpc:call(Node, Module, Function, Args, 5000).
+ rpc:call(Node, Module, Function, Args, rpc_timeout()).
-spec multicall(module(), atom(), [any()]) -> {list(), [node()]}.
-
multicall(Module, Function, Args) ->
multicall(get_nodes(), Module, Function, Args).
-spec multicall([node()], module(), atom(), list()) -> {list(), [node()]}.
-
multicall(Nodes, Module, Function, Args) ->
- rpc:multicall(Nodes, Module, Function, Args, 5000).
+ rpc:multicall(Nodes, Module, Function, Args, rpc_timeout()).
-spec eval_everywhere(module(), atom(), [any()]) -> ok.
-
eval_everywhere(Module, Function, Args) ->
eval_everywhere(get_nodes(), Module, Function, Args),
ok.
-spec eval_everywhere([node()], module(), atom(), [any()]) -> ok.
-
eval_everywhere(Nodes, Module, Function, Args) ->
rpc:eval_everywhere(Nodes, Module, Function, Args),
ok.
--spec join(node()) -> ok | {error, any()}.
+%%%===================================================================
+%%% Backend dependent API
+%%%===================================================================
+-spec get_nodes() -> [node()].
+get_nodes() ->
+ Mod = get_mod(),
+ Mod:get_nodes().
+
+-spec get_known_nodes() -> [node()].
+get_known_nodes() ->
+ Mod = get_mod(),
+ Mod:get_known_nodes().
+-spec join(node()) -> ok | {error, any()}.
join(Node) ->
- case {node(), net_adm:ping(Node)} of
- {Node, _} ->
- {error, {not_master, Node}};
- {_, pong} ->
- application:stop(ejabberd),
- application:stop(mnesia),
- mnesia:delete_schema([node()]),
- application:start(mnesia),
- mnesia:change_config(extra_db_nodes, [Node]),
- mnesia:change_table_copy_type(schema, node(), disc_copies),
- spawn(fun() ->
- lists:foreach(fun(Table) ->
- Type = call(Node, mnesia, table_info, [Table, storage_type]),
- mnesia:add_table_copy(Table, node(), Type)
- end, mnesia:system_info(tables)--[schema])
- end),
- application:start(ejabberd);
- _ ->
- {error, {no_ping, Node}}
- end.
+ Mod = get_mod(),
+ Mod:join(Node).
-spec leave(node()) -> ok | {error, any()}.
-
leave(Node) ->
- case {node(), net_adm:ping(Node)} of
- {Node, _} ->
- Cluster = get_nodes()--[Node],
- leave(Cluster, Node);
- {_, pong} ->
- rpc:call(Node, ?MODULE, leave, [Node], 10000);
- {_, pang} ->
- case mnesia:del_table_copy(schema, Node) of
- {atomic, ok} -> ok;
- {aborted, Reason} -> {error, Reason}
- end
- end.
-leave([], Node) ->
- {error, {no_cluster, Node}};
-leave([Master|_], Node) ->
- application:stop(ejabberd),
- application:stop(mnesia),
- call(Master, mnesia, del_table_copy, [schema, Node]),
- spawn(fun() ->
- mnesia:delete_schema([node()]),
- erlang:halt(0)
- end),
- ok.
+ Mod = get_mod(),
+ Mod:leave(Node).
-spec node_id() -> binary().
node_id() ->
- integer_to_binary(erlang:phash2(node())).
+ Mod = get_mod(),
+ Mod:node_id().
-spec get_node_by_id(binary()) -> node().
-get_node_by_id(Hash) ->
- try binary_to_integer(Hash) of
- I -> match_node_id(I)
- catch _:_ ->
- node()
+get_node_by_id(ID) ->
+ Mod = get_mod(),
+ Mod:get_node_by_id(ID).
+
+-spec send(dst(), term()) -> boolean().
+send(Dst, Msg) ->
+ IsLocal = case Dst of
+ {_, Node} -> Node == node();
+ Pid when is_pid(Pid) -> node(Pid) == node();
+ Name when is_atom(Name) -> true;
+ _ -> false
+ end,
+ if IsLocal ->
+ erlang:send(Dst, Msg),
+ true;
+ true ->
+ Mod = get_mod(),
+ Mod:send(Dst, Msg)
end.
+-spec wait_for_sync(timeout()) -> ok | {error, any()}.
+wait_for_sync(Timeout) ->
+ Mod = get_mod(),
+ Mod:wait_for_sync(Timeout).
+
+-spec subscribe() -> ok.
+subscribe() ->
+ subscribe(self()).
+
+-spec subscribe(dst()) -> ok.
+subscribe(Proc) ->
+ Mod = get_mod(),
+ Mod:subscribe(Proc).
+
+%%%===================================================================
+%%% gen_server API
+%%%===================================================================
+init([]) ->
+ Ticktime = ejabberd_config:get_option(net_ticktime, 60),
+ Nodes = ejabberd_config:get_option(cluster_nodes, []),
+ net_kernel:set_net_ticktime(Ticktime),
+ lists:foreach(fun(Node) ->
+ net_kernel:connect_node(Node)
+ end, Nodes),
+ Mod = get_mod(),
+ case Mod:init() of
+ ok ->
+ Mod:subscribe(?MODULE),
+ {ok, #state{}};
+ {error, Reason} ->
+ {stop, Reason}
+ end.
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info({node_up, Node}, State) ->
+ ?INFO_MSG("Node ~s has joined", [Node]),
+ {noreply, State};
+handle_info({node_down, Node}, State) ->
+ ?INFO_MSG("Node ~s has left", [Node]),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
%%%===================================================================
%%% Internal functions
%%%===================================================================
--spec match_node_id(integer()) -> node().
-match_node_id(I) ->
- match_node_id(I, get_nodes()).
-
--spec match_node_id(integer(), [node()]) -> node().
-match_node_id(I, [Node|Nodes]) ->
- case erlang:phash2(Node) of
- I -> Node;
- _ -> match_node_id(I, Nodes)
- end;
-match_node_id(_I, []) ->
- node().
+get_mod() ->
+ Backend = ejabberd_config:get_option(cluster_backend, mnesia),
+ list_to_atom("ejabberd_cluster_" ++ atom_to_list(Backend)).
+
+rpc_timeout() ->
+ timer:seconds(ejabberd_config:get_option(rpc_timeout, 5)).
+
+opt_type(net_ticktime) ->
+ fun (P) when is_integer(P), P > 0 -> P end;
+opt_type(cluster_nodes) ->
+ fun (Ns) -> true = lists:all(fun is_atom/1, Ns), Ns end;
+opt_type(rpc_timeout) ->
+ fun (T) when is_integer(T), T > 0 -> T end;
+opt_type(cluster_backend) ->
+ fun (T) -> ejabberd_config:v_db(?MODULE, T) end;
+opt_type(_) ->
+ [rpc_timeout, cluster_backend, cluster_nodes, net_ticktime].