diff options
Diffstat (limited to 'src/ejabberd_riak_sup.erl')
-rw-r--r-- | src/ejabberd_riak_sup.erl | 138 |
1 files changed, 76 insertions, 62 deletions
diff --git a/src/ejabberd_riak_sup.erl b/src/ejabberd_riak_sup.erl index d19b9fbe9..a066a3c8c 100644 --- a/src/ejabberd_riak_sup.erl +++ b/src/ejabberd_riak_sup.erl @@ -31,34 +31,52 @@ -export([start/0, start_link/0, init/1, - add_pid/1, - remove_pid/1, get_pids/0, - get_random_pid/0 + transform_options/1, + get_random_pid/0, + get_random_pid/1 ]). -include("ejabberd.hrl"). +-include("logger.hrl"). -define(DEFAULT_POOL_SIZE, 10). -define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds +-define(DEFAULT_RIAK_HOST, "127.0.0.1"). +-define(DEFAULT_RIAK_PORT, 8087). % time to wait for the supervisor to start its child before returning % a timeout error to the request -define(CONNECT_TIMEOUT, 500). % milliseconds - --record(riak_pool, {undefined, pid}). - start() -> - StartRiak = ejabberd_config:get_local_option( - riak_server, fun(_) -> true end, false), - if - StartRiak -> + case lists:any( + fun(Host) -> + is_riak_configured(Host) + end, ?MYHOSTS) of + true -> + ejabberd:start_app(riakc), do_start(); - true -> - ok + false -> + ok end. +is_riak_configured(Host) -> + ServerConfigured = ejabberd_config:get_option( + {riak_server, Host}, + fun(_) -> true end, false), + PortConfigured = ejabberd_config:get_option( + {riak_port, Host}, + fun(_) -> true end, false), + Modules = ejabberd_config:get_option( + {modules, Host}, + fun(L) when is_list(L) -> L end, []), + ModuleWithRiakDBConfigured = lists:any( + fun({_Module, Opts}) -> + gen_mod:db_type(Opts) == riak + end, Modules), + ServerConfigured or PortConfigured or ModuleWithRiakDBConfigured. + do_start() -> SupervisorName = ?MODULE, ChildSpec = @@ -79,65 +97,61 @@ do_start() -> end. start_link() -> - mnesia:create_table(riak_pool, - [{ram_copies, [node()]}, - {type, bag}, - {local_content, true}, - {attributes, record_info(fields, riak_pool)}]), - mnesia:add_table_copy(riak_pool, node(), ram_copies), - F = fun() -> - mnesia:delete({riak_pool, undefined}) - end, - mnesia:ets(F), supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - PoolSize = - ejabberd_config:get_local_option( - riak_pool_size, - fun(N) when is_integer(N), N >= 1 -> N end, - ?DEFAULT_POOL_SIZE), - StartInterval = - ejabberd_config:get_local_option( - riak_start_interval, - fun(N) when is_integer(N), N >= 1 -> N end, - ?DEFAULT_RIAK_START_INTERVAL), - {Server, Port} = - ejabberd_config:get_local_option( - riak_server, - fun({S, P}) when is_integer(P), P > 0, P < 65536 -> - {binary_to_list(iolist_to_binary(S)), P} - end, {"127.0.0.1", 8081}), + PoolSize = get_pool_size(), + StartInterval = get_start_interval(), + Server = get_riak_server(), + Port = get_riak_port(), {ok, {{one_for_one, PoolSize*10, 1}, lists:map( fun(I) -> - {I, + {ejabberd_riak:get_proc(I), {ejabberd_riak, start_link, - [Server, Port, StartInterval*1000]}, - transient, - 2000, - worker, - [?MODULE]} + [I, Server, Port, StartInterval*1000]}, + transient, 2000, worker, [?MODULE]} end, lists:seq(1, PoolSize))}}. +get_start_interval() -> + ejabberd_config:get_option( + riak_start_interval, + fun(N) when is_integer(N), N >= 1 -> N end, + ?DEFAULT_RIAK_START_INTERVAL). + +get_pool_size() -> + ejabberd_config:get_option( + riak_pool_size, + fun(N) when is_integer(N), N >= 1 -> N end, + ?DEFAULT_POOL_SIZE). + +get_riak_server() -> + ejabberd_config:get_option( + riak_server, + fun(S) -> + binary_to_list(iolist_to_binary(S)) + end, ?DEFAULT_RIAK_HOST). + +get_riak_port() -> + ejabberd_config:get_option( + riak_port, + fun(P) when is_integer(P), P > 0, P < 65536 -> P end, + ?DEFAULT_RIAK_PORT). + get_pids() -> - Rs = mnesia:dirty_read(riak_pool, undefined), - [R#riak_pool.pid || R <- Rs]. + [ejabberd_riak:get_proc(I) || I <- lists:seq(1, get_pool_size())]. get_random_pid() -> - Pids = get_pids(), - lists:nth(erlang:phash(now(), length(Pids)), Pids). - -add_pid(Pid) -> - F = fun() -> - mnesia:write( - #riak_pool{pid = Pid}) - end, - mnesia:ets(F). - -remove_pid(Pid) -> - F = fun() -> - mnesia:delete_object( - #riak_pool{pid = Pid}) - end, - mnesia:ets(F). + get_random_pid(now()). + +get_random_pid(Term) -> + I = erlang:phash2(Term, get_pool_size()) + 1, + ejabberd_riak:get_proc(I). + +transform_options(Opts) -> + lists:foldl(fun transform_options/2, [], Opts). + +transform_options({riak_server, {S, P}}, Opts) -> + [{riak_server, S}, {riak_port, P}|Opts]; +transform_options(Opt, Opts) -> + [Opt|Opts]. |