aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_redis.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_redis.erl')
-rw-r--r--src/ejabberd_redis.erl47
1 files changed, 25 insertions, 22 deletions
diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl
index 6284238a2..55e9dfcff 100644
--- a/src/ejabberd_redis.erl
+++ b/src/ejabberd_redis.erl
@@ -59,8 +59,8 @@
-type error_reason() :: binary() | timeout | disconnected | overloaded.
-type redis_error() :: {error, error_reason()}.
--type redis_reply() :: binary() | [binary()].
--type redis_command() :: [binary()].
+-type redis_reply() :: undefined | binary() | [binary()].
+-type redis_command() :: [iodata() | integer()].
-type redis_pipeline() :: [redis_command()].
-type redis_info() :: server | clients | memory | persistence |
stats | replication | cpu | commandstats |
@@ -89,19 +89,18 @@ get_connection(I) ->
q(Command) ->
call(get_rnd_id(), {q, Command}, ?MAX_RETRIES).
--spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error().
+-spec qp(redis_pipeline()) -> [{ok, redis_reply()} | redis_error()] | redis_error().
qp(Pipeline) ->
call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES).
--spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error().
+-spec multi(fun(() -> any())) -> {ok, redis_reply()} | redis_error().
multi(F) ->
case erlang:get(?TR_STACK) of
undefined ->
erlang:put(?TR_STACK, []),
try F() of
_ ->
- Stack = erlang:get(?TR_STACK),
- erlang:erase(?TR_STACK),
+ Stack = erlang:erase(?TR_STACK),
Command = [["MULTI"]|lists:reverse([["EXEC"]|Stack])],
case qp(Command) of
{error, _} = Err -> Err;
@@ -298,7 +297,7 @@ hkeys(Key) ->
-spec subscribe([binary()]) -> ok | redis_error().
subscribe(Channels) ->
- try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT)
+ try gen_server_call(get_proc(1), {subscribe, self(), Channels})
catch exit:{Why, {?GEN_SERVER, call, _}} ->
Reason = case Why of
timeout -> timeout;
@@ -329,7 +328,7 @@ script_load(Data) ->
erlang:error(transaction_unsupported)
end.
--spec evalsha(binary(), [iodata()], [iodata()]) -> {ok, binary()} | redis_error().
+-spec evalsha(binary(), [iodata()], [iodata() | integer()]) -> {ok, binary()} | redis_error().
evalsha(SHA, Keys, Args) ->
case erlang:get(?TR_STACK) of
undefined ->
@@ -458,13 +457,11 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
-spec connect(state()) -> {ok, pid()} | {error, any()}.
connect(#state{num = Num}) ->
- Server = ejabberd_config:get_option(redis_server, "localhost"),
- Port = ejabberd_config:get_option(redis_port, 6379),
- DB = ejabberd_config:get_option(redis_db, 0),
- Pass = ejabberd_config:get_option(redis_password, ""),
- ConnTimeout = timer:seconds(
- ejabberd_config:get_option(
- redis_connect_timeout, 1)),
+ Server = ejabberd_option:redis_server(),
+ Port = ejabberd_option:redis_port(),
+ DB = ejabberd_option:redis_db(),
+ Pass = ejabberd_option:redis_password(),
+ ConnTimeout = ejabberd_option:redis_connect_timeout(),
try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of
{ok, Client} ->
?DEBUG("Connection #~p established to Redis at ~s:~p",
@@ -498,7 +495,7 @@ do_connect(_, Server, Port, Pass, DB, ConnTimeout) ->
-spec call(pos_integer(), {q, redis_command()}, integer()) ->
{ok, redis_reply()} | redis_error();
(pos_integer(), {qp, redis_pipeline()}, integer()) ->
- {ok, [redis_reply()]} | redis_error().
+ [{ok, redis_reply()} | redis_error()] | redis_error().
call(I, {F, Cmd}, Retries) ->
?DEBUG("redis query: ~p", [Cmd]),
Conn = get_connection(I),
@@ -513,7 +510,7 @@ call(I, {F, Cmd}, Retries) ->
end,
case Res of
{error, disconnected} when Retries > 0 ->
- try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of
+ try gen_server_call(get_proc(I), connect) of
ok -> call(I, {F, Cmd}, Retries-1);
{error, _} = Err -> Err
catch exit:{Why, {?GEN_SERVER, call, _}} ->
@@ -531,6 +528,14 @@ call(I, {F, Cmd}, Retries) ->
Res
end.
+gen_server_call(Proc, Msg) ->
+ case ejabberd_redis_sup:start() of
+ ok ->
+ ?GEN_SERVER:call(Proc, Msg, ?CALL_TIMEOUT);
+ {error, _} ->
+ {error, disconnected}
+ end.
+
-spec log_error(redis_command() | redis_pipeline(), atom() | binary()) -> ok.
log_error(Cmd, Reason) ->
?ERROR_MSG("Redis request has failed:~n"
@@ -542,8 +547,8 @@ log_error(Cmd, Reason) ->
get_rnd_id() ->
p1_rand:round_robin(ejabberd_redis_sup:get_pool_size() - 1) + 2.
--spec get_result([{error, atom() | binary()} | {ok, iodata()}]) ->
- {ok, [redis_reply()]} | {error, binary()}.
+-spec get_result([{ok, redis_reply()} | redis_error()]) ->
+ {ok, redis_reply()} | redis_error().
get_result([{error, _} = Err|_]) ->
Err;
get_result([{ok, _} = OK]) ->
@@ -584,9 +589,7 @@ fsm_limit_opts() ->
ejabberd_config:fsm_limit_opts([]).
get_queue_type() ->
- ejabberd_config:get_option(
- redis_queue_type,
- ejabberd_config:default_queue_type(global)).
+ ejabberd_option:redis_queue_type().
-spec flush_queue(p1_queue:queue()) -> p1_queue:queue().
flush_queue(Q) ->