summaryrefslogtreecommitdiff
path: root/src/ejabberd_riak.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2014-07-09 16:40:18 +0400
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2014-07-10 13:52:29 +0400
commitc15dc01cffecccc3fd626acc15460dbf2f95a1bc (patch)
tree9745688a84daa87391b2e37d10aa03a724f1b710 /src/ejabberd_riak.erl
parentFixate Riak client library (diff)
Improve Riak pool management
Diffstat (limited to '')
-rw-r--r--src/ejabberd_riak.erl109
1 files changed, 63 insertions, 46 deletions
diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl
index 04ff1ea1..0b576ca6 100644
--- a/src/ejabberd_riak.erl
+++ b/src/ejabberd_riak.erl
@@ -27,7 +27,7 @@
-behaviour(gen_server).
%% API
--export([start_link/3, make_bucket/1, put/1, put/2,
+-export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2,
get/1, get/2, get_by_index/3, delete/1, delete/2,
count_by_index/3, get_by_index_range/4,
get_keys/1, get_keys_by_index/3,
@@ -61,8 +61,14 @@
%%% API
%%%===================================================================
%% @private
-start_link(Server, Port, _StartInterval) ->
- gen_server:start_link(?MODULE, [Server, Port], []).
+start_link(Num, Server, Port, _StartInterval) ->
+ gen_server:start_link({local, get_proc(Num)}, ?MODULE, [Server, Port], []).
+
+%% @private
+get_proc(I) ->
+ jlib:binary_to_atom(
+ iolist_to_binary(
+ [atom_to_list(?MODULE), $_, integer_to_list(I)])).
-spec make_bucket(atom()) -> binary().
%% @doc Makes a bucket from a table name
@@ -101,21 +107,21 @@ put_raw(Table, Key, Value, Indexes) ->
true ->
Obj
end,
- riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1).
+ catch riakc_pb_socket:put(get_random_pid(), Obj1).
get_object_raw(Table, Key) ->
Bucket = make_bucket(Table),
- riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+ catch riakc_pb_socket:get(get_random_pid(), Bucket, Key).
-spec get(atom()) -> {ok, [any()]} | {error, any()}.
%% @doc Returns all objects from table `Table'
get(Table) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- Bucket,
- [{map, {modfun, riak_kv_mapreduce, map_object_value},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ Bucket,
+ [{map, {modfun, riak_kv_mapreduce, map_object_value},
+ none, true}]) of
{ok, [{_, Objs}]} ->
{ok, lists:flatmap(
fun(Obj) ->
@@ -131,7 +137,7 @@ get(Table) ->
end, Objs)};
{error, notfound} ->
{ok, []};
- Error ->
+ {error, _} = Error ->
Error
end.
@@ -228,13 +234,13 @@ get_raw(Table, Key) ->
%% @doc Returns a list of index values
get_keys(Table) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- Bucket,
- [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ Bucket,
+ [{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
- Error ->
+ {error, _} = Error ->
log_error(Error, get_keys, [{table, Table}]),
Error
end.
@@ -245,13 +251,13 @@ get_keys(Table) ->
get_keys_by_index(Table, Index, Key) ->
{NewIndex, NewKey} = encode_index_key(Index, Key),
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, NewIndex, NewKey},
- [{map, {modfun, ?MODULE, map_key}, none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ {index, Bucket, NewIndex, NewKey},
+ [{map, {modfun, ?MODULE, map_key}, none, true}]) of
{ok, [{_, Keys}]} ->
{ok, Keys};
- Error ->
+ {error, _} = Error ->
log_error(Error, get_keys_by_index, [{table, Table},
{index, Index},
{key, Key}]),
@@ -260,31 +266,31 @@ get_keys_by_index(Table, Index, Key) ->
%% @hidden
get_tables() ->
- riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()).
+ catch riakc_pb_socket:list_buckets(get_random_pid()).
get_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
+ get_random_pid(),
{index, Bucket, Index, Key},
[{map, {modfun, riak_kv_mapreduce, map_object_value},
none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
- Error ->
+ {error, _} = Error ->
Error
end.
get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, Index, FromKey, ToKey},
- [{map, {modfun, riak_kv_mapreduce, map_object_value},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ {index, Bucket, Index, FromKey, ToKey},
+ [{map, {modfun, riak_kv_mapreduce, map_object_value},
+ none, true}]) of
{ok, [{_, Objs}]} ->
{ok, Objs};
- Error ->
+ {error, _} = Error ->
Error
end.
@@ -292,14 +298,14 @@ get_by_index_range_raw(Table, Index, FromKey, ToKey) ->
%% @doc Returns the number of objects in the `Table'
count(Table) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- Bucket,
- [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ Bucket,
+ [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+ none, true}]) of
{ok, [{_, [Cnt]}]} ->
{ok, Cnt};
- Error ->
+ {error, _} = Error ->
log_error(Error, count, [{table, Table}]),
Error
end.
@@ -324,14 +330,14 @@ count_by_index(Tab, Index, Key) ->
count_by_index_raw(Table, Index, Key) ->
Bucket = make_bucket(Table),
- case riakc_pb_socket:mapred(
- ejabberd_riak_sup:get_random_pid(),
- {index, Bucket, Index, Key},
- [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
- none, true}]) of
+ case catch riakc_pb_socket:mapred(
+ get_random_pid(),
+ {index, Bucket, Index, Key},
+ [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs},
+ none, true}]) of
{ok, [{_, [Cnt]}]} ->
{ok, Cnt};
- Error ->
+ {error, _} = Error ->
Error
end.
@@ -368,7 +374,7 @@ delete(Table, Key) when is_atom(Table) ->
delete_raw(Table, Key) ->
Bucket = make_bucket(Table),
- riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key).
+ catch riakc_pb_socket:delete(get_random_pid(), Bucket, Key).
-spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}.
%% @doc Deletes objects by index
@@ -407,13 +413,14 @@ init([Server, Port]) ->
[auto_reconnect]) of
{ok, Pid} ->
erlang:monitor(process, Pid),
- ejabberd_riak_sup:add_pid(Pid),
{ok, #state{pid = Pid}};
Err ->
{stop, Err}
end.
%% @private
+handle_call(get_pid, _From, #state{pid = Pid} = State) ->
+ {reply, {ok, Pid}, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
@@ -430,8 +437,7 @@ handle_info(_Info, State) ->
{noreply, State}.
%% @private
-terminate(_Reason, State) ->
- ejabberd_riak_sup:remove_pid(State#state.pid),
+terminate(_Reason, _State) ->
ok.
%% @private
@@ -486,3 +492,14 @@ log_error(_, _, _) ->
make_invalid_object(Val) ->
list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])).
+
+get_random_pid() ->
+ PoolPid = ejabberd_riak_sup:get_random_pid(),
+ case catch gen_server:call(PoolPid, get_pid) of
+ {ok, Pid} ->
+ Pid;
+ {'EXIT', {timeout, _}} ->
+ throw({error, timeout});
+ {'EXIT', Err} ->
+ throw({error, Err})
+ end.