aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_cluster.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_cluster.erl')
-rw-r--r--src/ejabberd_cluster.erl248
1 files changed, 118 insertions, 130 deletions
diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl
index ec72b063e..284c01892 100644
--- a/src/ejabberd_cluster.erl
+++ b/src/ejabberd_cluster.erl
@@ -10,124 +10,123 @@
-behaviour(gen_server).
%% API
--export([start_link/0, get_node/1, get_node_new/1, announce/1, shutdown/0,
- node_id/0, get_node_by_id/1, get_nodes/0, rehash_timeout/0, start/0,
- shutdown_migrate/1, migrate_timeout/0]).
+-export([start_link/0, get_node/1, get_node_new/1,
+ announce/1, shutdown/0, node_id/0, get_node_by_id/1,
+ get_nodes/0, rehash_timeout/0, start/0,
+ shutdown_migrate/1, migrate_timeout/0]).
%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+-export([init/1, handle_call/3, handle_cast/2,
+ handle_info/2, terminate/2, code_change/3]).
-include("ejabberd.hrl").
-define(HASHTBL, nodes_hash).
+
-define(HASHTBL_NEW, nodes_hash_new).
+
-define(POINTS, 64).
+
-define(REHASH_TIMEOUT, timer:seconds(30)).
+
-define(MIGRATE_TIMEOUT, timer:minutes(2)).
+
-define(LOCK, {migrate, node()}).
-record(state, {}).
-%%====================================================================
-%% API
-%%====================================================================
start() ->
- ChildSpec = {?MODULE,
- {?MODULE, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [?MODULE]},
+ ChildSpec = {?MODULE, {?MODULE, start_link, []},
+ permanent, brutal_kill, worker, [?MODULE]},
supervisor:start_child(ejabberd_sup, ChildSpec).
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
+start_link() -> gen_server:start_link(?MODULE, [], []).
+
+-spec get_node(any()) -> atom().
get_node(Key) ->
Hash = erlang:phash2(Key),
get_node_by_hash(?HASHTBL, Hash).
+-spec get_node_new(any()) -> atom().
+
get_node_new(Key) ->
Hash = erlang:phash2(Key),
get_node_by_hash(?HASHTBL_NEW, Hash).
-get_nodes() ->
- %% TODO
- mnesia:system_info(running_db_nodes).
+-spec get_nodes() -> [atom()].
+
+get_nodes() -> mnesia:system_info(running_db_nodes).
+
+-spec announce(pid()) -> any().
announce(Pid) ->
gen_server:call(Pid, announce, infinity).
node_id() ->
- integer_to_list(erlang:phash2(node())).
+ jlib:integer_to_binary(erlang:phash2(node())).
rehash_timeout() ->
- case ejabberd_config:get_local_option(rehash_timeout) of
- N when is_integer(N), N > 0 ->
- timer:seconds(N);
- _ ->
- ?REHASH_TIMEOUT
+ case ejabberd_config:get_local_option(
+ rehash_timeout,
+ fun(I) when is_integer(I), I > 0 -> I end) of
+ undefined -> ?REHASH_TIMEOUT;
+ Secs -> timer:seconds(Secs)
end.
migrate_timeout() ->
- case ejabberd_config:get_local_option(migrate_timeout) of
- N when is_integer(N), N > 0 ->
- timer:seconds(N);
- _ ->
- ?MIGRATE_TIMEOUT
+ case ejabberd_config:get_local_option(
+ migrate_timeout,
+ fun(N) when is_integer(N), N > 0 -> N end) of
+ undefined -> ?MIGRATE_TIMEOUT;
+ Secs -> timer:seconds(Secs)
end.
-get_node_by_id(NodeID) when is_list(NodeID) ->
- case catch list_to_existing_atom(NodeID) of
- {'EXIT', _} ->
- node();
- Res ->
- get_node_by_id(Res)
+-spec get_node_by_id(binary() | atom()) -> atom().
+
+get_node_by_id(NodeID) when is_binary(NodeID) ->
+ case catch list_to_existing_atom(binary_to_list(NodeID)) of
+ {'EXIT', _} -> node();
+ Res -> get_node_by_id(Res)
end;
get_node_by_id(NodeID) ->
case global:whereis_name(NodeID) of
- Pid when is_pid(Pid) ->
- node(Pid);
- _ ->
- node()
+ Pid when is_pid(Pid) -> node(Pid);
+ _ -> node()
end.
shutdown() ->
- lists:foreach(
- fun(Node) when Node /= node() ->
- {ejabberd_cluster, Node} ! {node_down, node()};
- (_) ->
- ok
- end, get_nodes()).
+ lists:foreach(fun (Node) when Node /= node() ->
+ {ejabberd_cluster, Node} ! {node_down, node()};
+ (_) -> ok
+ end,
+ get_nodes()).
shutdown_migrate(WaitTime) ->
delete_node(?HASHTBL_NEW, node()),
ejabberd_hooks:run(node_down, [node()]),
shutdown(),
delete_node(?HASHTBL, node()),
- ejabberd_hooks:run(node_hash_update, [node(), down, WaitTime]),
- ?INFO_MSG("Waiting ~p seconds for the migration to be completed.",
- [WaitTime div 1000]),
+ ejabberd_hooks:run(node_hash_update,
+ [node(), down, WaitTime]),
+ ?INFO_MSG("Waiting ~p seconds for the migration "
+ "to be completed.",
+ [WaitTime div 1000]),
timer:sleep(WaitTime),
ok.
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
init([]) ->
{A, B, C} = now(),
random:seed(A, B, C),
net_kernel:monitor_nodes(true, [{node_type, visible}]),
ets:new(?HASHTBL, [named_table, public, ordered_set]),
- ets:new(?HASHTBL_NEW, [named_table, public, ordered_set]),
+ ets:new(?HASHTBL_NEW,
+ [named_table, public, ordered_set]),
register_node(),
AllNodes = get_nodes(),
OtherNodes = case AllNodes of
- [_MyNode] ->
- AllNodes;
- _ ->
- AllNodes -- [node()]
+ [_MyNode] -> AllNodes;
+ _ -> AllNodes -- [node()]
end,
append_nodes(?HASHTBL, OtherNodes),
append_nodes(?HASHTBL_NEW, AllNodes),
@@ -136,59 +135,60 @@ init([]) ->
handle_call(announce, _From, State) ->
Migrate_timeout = migrate_timeout(),
case global:set_lock(?LOCK, get_nodes(), 0) of
- false ->
- ?WARNING_MSG("Another node is recently attached to "
- "the cluster and is being rebalanced. "
- "Waiting for the rebalancing to be completed "
- "before starting this node. "
- "This will take at least ~p seconds. "
- "Please, be patient.", [Migrate_timeout div 1000]),
- global:set_lock(?LOCK, get_nodes(), infinity);
- true ->
- ok
+ false ->
+ ?WARNING_MSG("Another node is recently attached to "
+ "the cluster and is being rebalanced. "
+ "Waiting for the rebalancing to be completed "
+ "before starting this node. This will "
+ "take at least ~p seconds. Please, be "
+ "patient.",
+ [Migrate_timeout div 1000]),
+ global:set_lock(?LOCK, get_nodes(), infinity);
+ true -> ok
end,
case get_nodes() of
- [_MyNode] ->
- register(?MODULE, self()),
- global:del_lock(?LOCK);
- Nodes ->
- OtherNodes = Nodes -- [node()],
- ?INFO_MSG("waiting for migration from nodes: ~w",
- [OtherNodes]),
- {_Res, BadNodes} = gen_server:multi_call(
- OtherNodes, ?MODULE,
- {node_ready, node()}, ?REHASH_TIMEOUT),
- append_node(?HASHTBL, node()),
- register(?MODULE, self()),
- case OtherNodes -- BadNodes of
- [] ->
- global:del_lock(?LOCK);
- WorkingNodes ->
- gen_server:abcast(WorkingNodes, ?MODULE, {node_ready, node()}),
- erlang:send_after(Migrate_timeout, self(), del_lock)
- end
+ [_MyNode] ->
+ register(?MODULE, self()), global:del_lock(?LOCK);
+ Nodes ->
+ OtherNodes = Nodes -- [node()],
+ ?INFO_MSG("waiting for migration from nodes: ~w",
+ [OtherNodes]),
+ {_Res, BadNodes} = gen_server:multi_call(OtherNodes,
+ ?MODULE,
+ {node_ready, node()},
+ ?REHASH_TIMEOUT),
+ append_node(?HASHTBL, node()),
+ register(?MODULE, self()),
+ case OtherNodes -- BadNodes of
+ [] -> global:del_lock(?LOCK);
+ WorkingNodes ->
+ gen_server:abcast(WorkingNodes, ?MODULE,
+ {node_ready, node()}),
+ erlang:send_after(Migrate_timeout, self(), del_lock)
+ end
end,
{reply, ok, State};
handle_call({node_ready, Node}, _From, State) ->
- ?INFO_MSG("node ~p is ready, preparing migration", [Node]),
+ ?INFO_MSG("node ~p is ready, preparing migration",
+ [Node]),
append_node(?HASHTBL_NEW, Node),
ejabberd_hooks:run(node_up, [Node]),
{reply, ok, State};
handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
+ Reply = ok, {reply, Reply, State}.
handle_cast({node_ready, Node}, State) ->
- ?INFO_MSG("adding node ~p to hash and starting migration", [Node]),
+ ?INFO_MSG("adding node ~p to hash and starting "
+ "migration",
+ [Node]),
append_node(?HASHTBL, Node),
- ejabberd_hooks:run(node_hash_update, [Node, up, migrate_timeout()]),
+ ejabberd_hooks:run(node_hash_update,
+ [Node, up, migrate_timeout()]),
{noreply, State};
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast(_Msg, State) -> {noreply, State}.
handle_info(del_lock, State) ->
- global:del_lock(?LOCK),
- {noreply, State};
+ global:del_lock(?LOCK), {noreply, State};
handle_info({node_down, Node}, State) ->
delete_node(?HASHTBL, Node),
delete_node(?HASHTBL_NEW, Node),
@@ -198,55 +198,43 @@ handle_info({nodedown, Node, _}, State) ->
delete_node(?HASHTBL, Node),
delete_node(?HASHTBL_NEW, Node),
{noreply, State};
-handle_info(_Info, State) ->
- {noreply, State}.
+handle_info(_Info, State) -> {noreply, State}.
-terminate(_Reason, _State) ->
- ok.
+terminate(_Reason, _State) -> ok.
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
-%%--------------------------------------------------------------------
-%% Internal functions
-%%--------------------------------------------------------------------
append_nodes(Tab, Nodes) ->
- lists:foreach(
- fun(Node) ->
- append_node(Tab, Node)
- end, Nodes).
+ lists:foreach(fun (Node) -> append_node(Tab, Node) end,
+ Nodes).
append_node(Tab, Node) ->
- lists:foreach(
- fun(I) ->
- Hash = erlang:phash2({I, Node}),
- ets:insert(Tab, {Hash, Node})
- end, lists:seq(1, ?POINTS)).
+ lists:foreach(fun (I) ->
+ Hash = erlang:phash2({I, Node}),
+ ets:insert(Tab, {Hash, Node})
+ end,
+ lists:seq(1, ?POINTS)).
delete_node(Tab, Node) ->
- lists:foreach(
- fun(I) ->
- Hash = erlang:phash2({I, Node}),
- ets:delete(Tab, Hash)
- end, lists:seq(1, ?POINTS)).
+ lists:foreach(fun (I) ->
+ Hash = erlang:phash2({I, Node}), ets:delete(Tab, Hash)
+ end,
+ lists:seq(1, ?POINTS)).
get_node_by_hash(Tab, Hash) ->
NodeHash = case ets:next(Tab, Hash) of
- '$end_of_table' ->
- ets:first(Tab);
- NH ->
- NH
+ '$end_of_table' -> ets:first(Tab);
+ NH -> NH
end,
if NodeHash == '$end_of_table' ->
- erlang:error(no_running_nodes);
+ erlang:error(no_running_nodes);
true ->
- case ets:lookup(Tab, NodeHash) of
- [] ->
- get_node_by_hash(Tab, Hash);
- [{_, Node}] ->
- Node
- end
+ case ets:lookup(Tab, NodeHash) of
+ [] -> get_node_by_hash(Tab, Hash);
+ [{_, Node}] -> Node
+ end
end.
register_node() ->
- global:register_name(list_to_atom(node_id()), self()).
+ global:register_name(jlib:binary_to_atom(node_id()),
+ self()).