aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_riak_sup.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_riak_sup.erl')
-rw-r--r--src/ejabberd_riak_sup.erl138
1 files changed, 76 insertions, 62 deletions
diff --git a/src/ejabberd_riak_sup.erl b/src/ejabberd_riak_sup.erl
index d19b9fbe9..a066a3c8c 100644
--- a/src/ejabberd_riak_sup.erl
+++ b/src/ejabberd_riak_sup.erl
@@ -31,34 +31,52 @@
-export([start/0,
start_link/0,
init/1,
- add_pid/1,
- remove_pid/1,
get_pids/0,
- get_random_pid/0
+ transform_options/1,
+ get_random_pid/0,
+ get_random_pid/1
]).
-include("ejabberd.hrl").
+-include("logger.hrl").
-define(DEFAULT_POOL_SIZE, 10).
-define(DEFAULT_RIAK_START_INTERVAL, 30). % 30 seconds
+-define(DEFAULT_RIAK_HOST, "127.0.0.1").
+-define(DEFAULT_RIAK_PORT, 8087).
% time to wait for the supervisor to start its child before returning
% a timeout error to the request
-define(CONNECT_TIMEOUT, 500). % milliseconds
-
--record(riak_pool, {undefined, pid}).
-
start() ->
- StartRiak = ejabberd_config:get_local_option(
- riak_server, fun(_) -> true end, false),
- if
- StartRiak ->
+ case lists:any(
+ fun(Host) ->
+ is_riak_configured(Host)
+ end, ?MYHOSTS) of
+ true ->
+ ejabberd:start_app(riakc),
do_start();
- true ->
- ok
+ false ->
+ ok
end.
+is_riak_configured(Host) ->
+ ServerConfigured = ejabberd_config:get_option(
+ {riak_server, Host},
+ fun(_) -> true end, false),
+ PortConfigured = ejabberd_config:get_option(
+ {riak_port, Host},
+ fun(_) -> true end, false),
+ Modules = ejabberd_config:get_option(
+ {modules, Host},
+ fun(L) when is_list(L) -> L end, []),
+ ModuleWithRiakDBConfigured = lists:any(
+ fun({_Module, Opts}) ->
+ gen_mod:db_type(Opts) == riak
+ end, Modules),
+ ServerConfigured or PortConfigured or ModuleWithRiakDBConfigured.
+
do_start() ->
SupervisorName = ?MODULE,
ChildSpec =
@@ -79,65 +97,61 @@ do_start() ->
end.
start_link() ->
- mnesia:create_table(riak_pool,
- [{ram_copies, [node()]},
- {type, bag},
- {local_content, true},
- {attributes, record_info(fields, riak_pool)}]),
- mnesia:add_table_copy(riak_pool, node(), ram_copies),
- F = fun() ->
- mnesia:delete({riak_pool, undefined})
- end,
- mnesia:ets(F),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- PoolSize =
- ejabberd_config:get_local_option(
- riak_pool_size,
- fun(N) when is_integer(N), N >= 1 -> N end,
- ?DEFAULT_POOL_SIZE),
- StartInterval =
- ejabberd_config:get_local_option(
- riak_start_interval,
- fun(N) when is_integer(N), N >= 1 -> N end,
- ?DEFAULT_RIAK_START_INTERVAL),
- {Server, Port} =
- ejabberd_config:get_local_option(
- riak_server,
- fun({S, P}) when is_integer(P), P > 0, P < 65536 ->
- {binary_to_list(iolist_to_binary(S)), P}
- end, {"127.0.0.1", 8081}),
+ PoolSize = get_pool_size(),
+ StartInterval = get_start_interval(),
+ Server = get_riak_server(),
+ Port = get_riak_port(),
{ok, {{one_for_one, PoolSize*10, 1},
lists:map(
fun(I) ->
- {I,
+ {ejabberd_riak:get_proc(I),
{ejabberd_riak, start_link,
- [Server, Port, StartInterval*1000]},
- transient,
- 2000,
- worker,
- [?MODULE]}
+ [I, Server, Port, StartInterval*1000]},
+ transient, 2000, worker, [?MODULE]}
end, lists:seq(1, PoolSize))}}.
+get_start_interval() ->
+ ejabberd_config:get_option(
+ riak_start_interval,
+ fun(N) when is_integer(N), N >= 1 -> N end,
+ ?DEFAULT_RIAK_START_INTERVAL).
+
+get_pool_size() ->
+ ejabberd_config:get_option(
+ riak_pool_size,
+ fun(N) when is_integer(N), N >= 1 -> N end,
+ ?DEFAULT_POOL_SIZE).
+
+get_riak_server() ->
+ ejabberd_config:get_option(
+ riak_server,
+ fun(S) ->
+ binary_to_list(iolist_to_binary(S))
+ end, ?DEFAULT_RIAK_HOST).
+
+get_riak_port() ->
+ ejabberd_config:get_option(
+ riak_port,
+ fun(P) when is_integer(P), P > 0, P < 65536 -> P end,
+ ?DEFAULT_RIAK_PORT).
+
get_pids() ->
- Rs = mnesia:dirty_read(riak_pool, undefined),
- [R#riak_pool.pid || R <- Rs].
+ [ejabberd_riak:get_proc(I) || I <- lists:seq(1, get_pool_size())].
get_random_pid() ->
- Pids = get_pids(),
- lists:nth(erlang:phash(now(), length(Pids)), Pids).
-
-add_pid(Pid) ->
- F = fun() ->
- mnesia:write(
- #riak_pool{pid = Pid})
- end,
- mnesia:ets(F).
-
-remove_pid(Pid) ->
- F = fun() ->
- mnesia:delete_object(
- #riak_pool{pid = Pid})
- end,
- mnesia:ets(F).
+ get_random_pid(now()).
+
+get_random_pid(Term) ->
+ I = erlang:phash2(Term, get_pool_size()) + 1,
+ ejabberd_riak:get_proc(I).
+
+transform_options(Opts) ->
+ lists:foldl(fun transform_options/2, [], Opts).
+
+transform_options({riak_server, {S, P}}, Opts) ->
+ [{riak_server, S}, {riak_port, P}|Opts];
+transform_options(Opt, Opts) ->
+ [Opt|Opts].