diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2014-07-09 16:40:18 +0400 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2014-07-10 13:52:29 +0400 |
commit | c15dc01cffecccc3fd626acc15460dbf2f95a1bc (patch) | |
tree | 9745688a84daa87391b2e37d10aa03a724f1b710 /src/ejabberd_riak.erl | |
parent | Fixate Riak client library (diff) |
Improve Riak pool management
Diffstat (limited to '')
-rw-r--r-- | src/ejabberd_riak.erl | 109 |
1 files changed, 63 insertions, 46 deletions
diff --git a/src/ejabberd_riak.erl b/src/ejabberd_riak.erl index 04ff1ea1..0b576ca6 100644 --- a/src/ejabberd_riak.erl +++ b/src/ejabberd_riak.erl @@ -27,7 +27,7 @@ -behaviour(gen_server). %% API --export([start_link/3, make_bucket/1, put/1, put/2, +-export([start_link/4, get_proc/1, make_bucket/1, put/1, put/2, get/1, get/2, get_by_index/3, delete/1, delete/2, count_by_index/3, get_by_index_range/4, get_keys/1, get_keys_by_index/3, @@ -61,8 +61,14 @@ %%% API %%%=================================================================== %% @private -start_link(Server, Port, _StartInterval) -> - gen_server:start_link(?MODULE, [Server, Port], []). +start_link(Num, Server, Port, _StartInterval) -> + gen_server:start_link({local, get_proc(Num)}, ?MODULE, [Server, Port], []). + +%% @private +get_proc(I) -> + jlib:binary_to_atom( + iolist_to_binary( + [atom_to_list(?MODULE), $_, integer_to_list(I)])). -spec make_bucket(atom()) -> binary(). %% @doc Makes a bucket from a table name @@ -101,21 +107,21 @@ put_raw(Table, Key, Value, Indexes) -> true -> Obj end, - riakc_pb_socket:put(ejabberd_riak_sup:get_random_pid(), Obj1). + catch riakc_pb_socket:put(get_random_pid(), Obj1). get_object_raw(Table, Key) -> Bucket = make_bucket(Table), - riakc_pb_socket:get(ejabberd_riak_sup:get_random_pid(), Bucket, Key). + catch riakc_pb_socket:get(get_random_pid(), Bucket, Key). -spec get(atom()) -> {ok, [any()]} | {error, any()}. %% @doc Returns all objects from table `Table' get(Table) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - Bucket, - [{map, {modfun, riak_kv_mapreduce, map_object_value}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + Bucket, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of {ok, [{_, Objs}]} -> {ok, lists:flatmap( fun(Obj) -> @@ -131,7 +137,7 @@ get(Table) -> end, Objs)}; {error, notfound} -> {ok, []}; - Error -> + {error, _} = Error -> Error end. @@ -228,13 +234,13 @@ get_raw(Table, Key) -> %% @doc Returns a list of index values get_keys(Table) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - Bucket, - [{map, {modfun, ?MODULE, map_key}, none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + Bucket, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of {ok, [{_, Keys}]} -> {ok, Keys}; - Error -> + {error, _} = Error -> log_error(Error, get_keys, [{table, Table}]), Error end. @@ -245,13 +251,13 @@ get_keys(Table) -> get_keys_by_index(Table, Index, Key) -> {NewIndex, NewKey} = encode_index_key(Index, Key), Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - {index, Bucket, NewIndex, NewKey}, - [{map, {modfun, ?MODULE, map_key}, none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + {index, Bucket, NewIndex, NewKey}, + [{map, {modfun, ?MODULE, map_key}, none, true}]) of {ok, [{_, Keys}]} -> {ok, Keys}; - Error -> + {error, _} = Error -> log_error(Error, get_keys_by_index, [{table, Table}, {index, Index}, {key, Key}]), @@ -260,31 +266,31 @@ get_keys_by_index(Table, Index, Key) -> %% @hidden get_tables() -> - riakc_pb_socket:list_buckets(ejabberd_riak_sup:get_random_pid()). + catch riakc_pb_socket:list_buckets(get_random_pid()). get_by_index_raw(Table, Index, Key) -> Bucket = make_bucket(Table), case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), + get_random_pid(), {index, Bucket, Index, Key}, [{map, {modfun, riak_kv_mapreduce, map_object_value}, none, true}]) of {ok, [{_, Objs}]} -> {ok, Objs}; - Error -> + {error, _} = Error -> Error end. get_by_index_range_raw(Table, Index, FromKey, ToKey) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, FromKey, ToKey}, - [{map, {modfun, riak_kv_mapreduce, map_object_value}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + {index, Bucket, Index, FromKey, ToKey}, + [{map, {modfun, riak_kv_mapreduce, map_object_value}, + none, true}]) of {ok, [{_, Objs}]} -> {ok, Objs}; - Error -> + {error, _} = Error -> Error end. @@ -292,14 +298,14 @@ get_by_index_range_raw(Table, Index, FromKey, ToKey) -> %% @doc Returns the number of objects in the `Table' count(Table) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - Bucket, - [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + Bucket, + [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, + none, true}]) of {ok, [{_, [Cnt]}]} -> {ok, Cnt}; - Error -> + {error, _} = Error -> log_error(Error, count, [{table, Table}]), Error end. @@ -324,14 +330,14 @@ count_by_index(Tab, Index, Key) -> count_by_index_raw(Table, Index, Key) -> Bucket = make_bucket(Table), - case riakc_pb_socket:mapred( - ejabberd_riak_sup:get_random_pid(), - {index, Bucket, Index, Key}, - [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, - none, true}]) of + case catch riakc_pb_socket:mapred( + get_random_pid(), + {index, Bucket, Index, Key}, + [{reduce, {modfun, riak_kv_mapreduce, reduce_count_inputs}, + none, true}]) of {ok, [{_, [Cnt]}]} -> {ok, Cnt}; - Error -> + {error, _} = Error -> Error end. @@ -368,7 +374,7 @@ delete(Table, Key) when is_atom(Table) -> delete_raw(Table, Key) -> Bucket = make_bucket(Table), - riakc_pb_socket:delete(ejabberd_riak_sup:get_random_pid(), Bucket, Key). + catch riakc_pb_socket:delete(get_random_pid(), Bucket, Key). -spec delete_by_index(atom(), binary(), any()) -> ok | {error, any()}. %% @doc Deletes objects by index @@ -407,13 +413,14 @@ init([Server, Port]) -> [auto_reconnect]) of {ok, Pid} -> erlang:monitor(process, Pid), - ejabberd_riak_sup:add_pid(Pid), {ok, #state{pid = Pid}}; Err -> {stop, Err} end. %% @private +handle_call(get_pid, _From, #state{pid = Pid} = State) -> + {reply, {ok, Pid}, State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. @@ -430,8 +437,7 @@ handle_info(_Info, State) -> {noreply, State}. %% @private -terminate(_Reason, State) -> - ejabberd_riak_sup:remove_pid(State#state.pid), +terminate(_Reason, _State) -> ok. %% @private @@ -486,3 +492,14 @@ log_error(_, _, _) -> make_invalid_object(Val) -> list_to_binary(io_lib:fwrite("Invalid object: ~p", [Val])). + +get_random_pid() -> + PoolPid = ejabberd_riak_sup:get_random_pid(), + case catch gen_server:call(PoolPid, get_pid) of + {ok, Pid} -> + Pid; + {'EXIT', {timeout, _}} -> + throw({error, timeout}); + {'EXIT', Err} -> + throw({error, Err}) + end. |