diff options
author | Christophe Romain <christophe.romain@process-one.net> | 2012-09-11 15:45:59 +0200 |
---|---|---|
committer | Christophe Romain <christophe.romain@process-one.net> | 2012-09-11 15:45:59 +0200 |
commit | 011535f0de1a14d6f5f411035bff9eeafec1c612 (patch) | |
tree | e60951904fbdc14dc126450c4d7515f51188d4b7 /src/ejabberd_cluster.erl | |
parent | Merge branch '2.1.x' into 2.2.x (diff) |
binary refactoring
Diffstat (limited to 'src/ejabberd_cluster.erl')
-rw-r--r-- | src/ejabberd_cluster.erl | 248 |
1 files changed, 118 insertions, 130 deletions
diff --git a/src/ejabberd_cluster.erl b/src/ejabberd_cluster.erl index ec72b063e..284c01892 100644 --- a/src/ejabberd_cluster.erl +++ b/src/ejabberd_cluster.erl @@ -10,124 +10,123 @@ -behaviour(gen_server). %% API --export([start_link/0, get_node/1, get_node_new/1, announce/1, shutdown/0, - node_id/0, get_node_by_id/1, get_nodes/0, rehash_timeout/0, start/0, - shutdown_migrate/1, migrate_timeout/0]). +-export([start_link/0, get_node/1, get_node_new/1, + announce/1, shutdown/0, node_id/0, get_node_by_id/1, + get_nodes/0, rehash_timeout/0, start/0, + shutdown_migrate/1, migrate_timeout/0]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). -include("ejabberd.hrl"). -define(HASHTBL, nodes_hash). + -define(HASHTBL_NEW, nodes_hash_new). + -define(POINTS, 64). + -define(REHASH_TIMEOUT, timer:seconds(30)). + -define(MIGRATE_TIMEOUT, timer:minutes(2)). + -define(LOCK, {migrate, node()}). -record(state, {}). -%%==================================================================== -%% API -%%==================================================================== start() -> - ChildSpec = {?MODULE, - {?MODULE, start_link, []}, - permanent, - brutal_kill, - worker, - [?MODULE]}, + ChildSpec = {?MODULE, {?MODULE, start_link, []}, + permanent, brutal_kill, worker, [?MODULE]}, supervisor:start_child(ejabberd_sup, ChildSpec). -start_link() -> - gen_server:start_link(?MODULE, [], []). +start_link() -> gen_server:start_link(?MODULE, [], []). + +-spec get_node(any()) -> atom(). get_node(Key) -> Hash = erlang:phash2(Key), get_node_by_hash(?HASHTBL, Hash). +-spec get_node_new(any()) -> atom(). + get_node_new(Key) -> Hash = erlang:phash2(Key), get_node_by_hash(?HASHTBL_NEW, Hash). -get_nodes() -> - %% TODO - mnesia:system_info(running_db_nodes). +-spec get_nodes() -> [atom()]. + +get_nodes() -> mnesia:system_info(running_db_nodes). + +-spec announce(pid()) -> any(). announce(Pid) -> gen_server:call(Pid, announce, infinity). node_id() -> - integer_to_list(erlang:phash2(node())). + jlib:integer_to_binary(erlang:phash2(node())). rehash_timeout() -> - case ejabberd_config:get_local_option(rehash_timeout) of - N when is_integer(N), N > 0 -> - timer:seconds(N); - _ -> - ?REHASH_TIMEOUT + case ejabberd_config:get_local_option( + rehash_timeout, + fun(I) when is_integer(I), I > 0 -> I end) of + undefined -> ?REHASH_TIMEOUT; + Secs -> timer:seconds(Secs) end. migrate_timeout() -> - case ejabberd_config:get_local_option(migrate_timeout) of - N when is_integer(N), N > 0 -> - timer:seconds(N); - _ -> - ?MIGRATE_TIMEOUT + case ejabberd_config:get_local_option( + migrate_timeout, + fun(N) when is_integer(N), N > 0 -> N end) of + undefined -> ?MIGRATE_TIMEOUT; + Secs -> timer:seconds(Secs) end. -get_node_by_id(NodeID) when is_list(NodeID) -> - case catch list_to_existing_atom(NodeID) of - {'EXIT', _} -> - node(); - Res -> - get_node_by_id(Res) +-spec get_node_by_id(binary() | atom()) -> atom(). + +get_node_by_id(NodeID) when is_binary(NodeID) -> + case catch list_to_existing_atom(binary_to_list(NodeID)) of + {'EXIT', _} -> node(); + Res -> get_node_by_id(Res) end; get_node_by_id(NodeID) -> case global:whereis_name(NodeID) of - Pid when is_pid(Pid) -> - node(Pid); - _ -> - node() + Pid when is_pid(Pid) -> node(Pid); + _ -> node() end. shutdown() -> - lists:foreach( - fun(Node) when Node /= node() -> - {ejabberd_cluster, Node} ! {node_down, node()}; - (_) -> - ok - end, get_nodes()). + lists:foreach(fun (Node) when Node /= node() -> + {ejabberd_cluster, Node} ! {node_down, node()}; + (_) -> ok + end, + get_nodes()). shutdown_migrate(WaitTime) -> delete_node(?HASHTBL_NEW, node()), ejabberd_hooks:run(node_down, [node()]), shutdown(), delete_node(?HASHTBL, node()), - ejabberd_hooks:run(node_hash_update, [node(), down, WaitTime]), - ?INFO_MSG("Waiting ~p seconds for the migration to be completed.", - [WaitTime div 1000]), + ejabberd_hooks:run(node_hash_update, + [node(), down, WaitTime]), + ?INFO_MSG("Waiting ~p seconds for the migration " + "to be completed.", + [WaitTime div 1000]), timer:sleep(WaitTime), ok. -%%==================================================================== -%% gen_server callbacks -%%==================================================================== init([]) -> {A, B, C} = now(), random:seed(A, B, C), net_kernel:monitor_nodes(true, [{node_type, visible}]), ets:new(?HASHTBL, [named_table, public, ordered_set]), - ets:new(?HASHTBL_NEW, [named_table, public, ordered_set]), + ets:new(?HASHTBL_NEW, + [named_table, public, ordered_set]), register_node(), AllNodes = get_nodes(), OtherNodes = case AllNodes of - [_MyNode] -> - AllNodes; - _ -> - AllNodes -- [node()] + [_MyNode] -> AllNodes; + _ -> AllNodes -- [node()] end, append_nodes(?HASHTBL, OtherNodes), append_nodes(?HASHTBL_NEW, AllNodes), @@ -136,59 +135,60 @@ init([]) -> handle_call(announce, _From, State) -> Migrate_timeout = migrate_timeout(), case global:set_lock(?LOCK, get_nodes(), 0) of - false -> - ?WARNING_MSG("Another node is recently attached to " - "the cluster and is being rebalanced. " - "Waiting for the rebalancing to be completed " - "before starting this node. " - "This will take at least ~p seconds. " - "Please, be patient.", [Migrate_timeout div 1000]), - global:set_lock(?LOCK, get_nodes(), infinity); - true -> - ok + false -> + ?WARNING_MSG("Another node is recently attached to " + "the cluster and is being rebalanced. " + "Waiting for the rebalancing to be completed " + "before starting this node. This will " + "take at least ~p seconds. Please, be " + "patient.", + [Migrate_timeout div 1000]), + global:set_lock(?LOCK, get_nodes(), infinity); + true -> ok end, case get_nodes() of - [_MyNode] -> - register(?MODULE, self()), - global:del_lock(?LOCK); - Nodes -> - OtherNodes = Nodes -- [node()], - ?INFO_MSG("waiting for migration from nodes: ~w", - [OtherNodes]), - {_Res, BadNodes} = gen_server:multi_call( - OtherNodes, ?MODULE, - {node_ready, node()}, ?REHASH_TIMEOUT), - append_node(?HASHTBL, node()), - register(?MODULE, self()), - case OtherNodes -- BadNodes of - [] -> - global:del_lock(?LOCK); - WorkingNodes -> - gen_server:abcast(WorkingNodes, ?MODULE, {node_ready, node()}), - erlang:send_after(Migrate_timeout, self(), del_lock) - end + [_MyNode] -> + register(?MODULE, self()), global:del_lock(?LOCK); + Nodes -> + OtherNodes = Nodes -- [node()], + ?INFO_MSG("waiting for migration from nodes: ~w", + [OtherNodes]), + {_Res, BadNodes} = gen_server:multi_call(OtherNodes, + ?MODULE, + {node_ready, node()}, + ?REHASH_TIMEOUT), + append_node(?HASHTBL, node()), + register(?MODULE, self()), + case OtherNodes -- BadNodes of + [] -> global:del_lock(?LOCK); + WorkingNodes -> + gen_server:abcast(WorkingNodes, ?MODULE, + {node_ready, node()}), + erlang:send_after(Migrate_timeout, self(), del_lock) + end end, {reply, ok, State}; handle_call({node_ready, Node}, _From, State) -> - ?INFO_MSG("node ~p is ready, preparing migration", [Node]), + ?INFO_MSG("node ~p is ready, preparing migration", + [Node]), append_node(?HASHTBL_NEW, Node), ejabberd_hooks:run(node_up, [Node]), {reply, ok, State}; handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. + Reply = ok, {reply, Reply, State}. handle_cast({node_ready, Node}, State) -> - ?INFO_MSG("adding node ~p to hash and starting migration", [Node]), + ?INFO_MSG("adding node ~p to hash and starting " + "migration", + [Node]), append_node(?HASHTBL, Node), - ejabberd_hooks:run(node_hash_update, [Node, up, migrate_timeout()]), + ejabberd_hooks:run(node_hash_update, + [Node, up, migrate_timeout()]), {noreply, State}; -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(_Msg, State) -> {noreply, State}. handle_info(del_lock, State) -> - global:del_lock(?LOCK), - {noreply, State}; + global:del_lock(?LOCK), {noreply, State}; handle_info({node_down, Node}, State) -> delete_node(?HASHTBL, Node), delete_node(?HASHTBL_NEW, Node), @@ -198,55 +198,43 @@ handle_info({nodedown, Node, _}, State) -> delete_node(?HASHTBL, Node), delete_node(?HASHTBL_NEW, Node), {noreply, State}; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> - ok. +terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- append_nodes(Tab, Nodes) -> - lists:foreach( - fun(Node) -> - append_node(Tab, Node) - end, Nodes). + lists:foreach(fun (Node) -> append_node(Tab, Node) end, + Nodes). append_node(Tab, Node) -> - lists:foreach( - fun(I) -> - Hash = erlang:phash2({I, Node}), - ets:insert(Tab, {Hash, Node}) - end, lists:seq(1, ?POINTS)). + lists:foreach(fun (I) -> + Hash = erlang:phash2({I, Node}), + ets:insert(Tab, {Hash, Node}) + end, + lists:seq(1, ?POINTS)). delete_node(Tab, Node) -> - lists:foreach( - fun(I) -> - Hash = erlang:phash2({I, Node}), - ets:delete(Tab, Hash) - end, lists:seq(1, ?POINTS)). + lists:foreach(fun (I) -> + Hash = erlang:phash2({I, Node}), ets:delete(Tab, Hash) + end, + lists:seq(1, ?POINTS)). get_node_by_hash(Tab, Hash) -> NodeHash = case ets:next(Tab, Hash) of - '$end_of_table' -> - ets:first(Tab); - NH -> - NH + '$end_of_table' -> ets:first(Tab); + NH -> NH end, if NodeHash == '$end_of_table' -> - erlang:error(no_running_nodes); + erlang:error(no_running_nodes); true -> - case ets:lookup(Tab, NodeHash) of - [] -> - get_node_by_hash(Tab, Hash); - [{_, Node}] -> - Node - end + case ets:lookup(Tab, NodeHash) of + [] -> get_node_by_hash(Tab, Hash); + [{_, Node}] -> Node + end end. register_node() -> - global:register_name(list_to_atom(node_id()), self()). + global:register_name(jlib:binary_to_atom(node_id()), + self()). |