aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_redis.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_redis.erl')
-rw-r--r--src/ejabberd_redis.erl67
1 files changed, 36 insertions, 31 deletions
diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl
index 6284238a2..a9ceb1833 100644
--- a/src/ejabberd_redis.erl
+++ b/src/ejabberd_redis.erl
@@ -54,13 +54,15 @@
-record(state, {connection :: pid() | undefined,
num :: pos_integer(),
- subscriptions = #{} :: map(),
- pending_q :: p1_queue:queue()}).
+ subscriptions = #{} :: subscriptions(),
+ pending_q :: queue()}).
+-type queue() :: p1_queue:queue({{pid(), term()}, integer()}).
+-type subscriptions() :: #{binary() => [pid()]}.
-type error_reason() :: binary() | timeout | disconnected | overloaded.
-type redis_error() :: {error, error_reason()}.
--type redis_reply() :: binary() | [binary()].
--type redis_command() :: [binary()].
+-type redis_reply() :: undefined | binary() | [binary()].
+-type redis_command() :: [iodata() | integer()].
-type redis_pipeline() :: [redis_command()].
-type redis_info() :: server | clients | memory | persistence |
stats | replication | cpu | commandstats |
@@ -89,19 +91,18 @@ get_connection(I) ->
q(Command) ->
call(get_rnd_id(), {q, Command}, ?MAX_RETRIES).
--spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error().
+-spec qp(redis_pipeline()) -> [{ok, redis_reply()} | redis_error()] | redis_error().
qp(Pipeline) ->
call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES).
--spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error().
+-spec multi(fun(() -> any())) -> {ok, redis_reply()} | redis_error().
multi(F) ->
case erlang:get(?TR_STACK) of
undefined ->
erlang:put(?TR_STACK, []),
try F() of
_ ->
- Stack = erlang:get(?TR_STACK),
- erlang:erase(?TR_STACK),
+ Stack = erlang:erase(?TR_STACK),
Command = [["MULTI"]|lists:reverse([["EXEC"]|Stack])],
case qp(Command) of
{error, _} = Err -> Err;
@@ -298,7 +299,7 @@ hkeys(Key) ->
-spec subscribe([binary()]) -> ok | redis_error().
subscribe(Channels) ->
- try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT)
+ try gen_server_call(get_proc(1), {subscribe, self(), Channels})
catch exit:{Why, {?GEN_SERVER, call, _}} ->
Reason = case Why of
timeout -> timeout;
@@ -329,7 +330,7 @@ script_load(Data) ->
erlang:error(transaction_unsupported)
end.
--spec evalsha(binary(), [iodata()], [iodata()]) -> {ok, binary()} | redis_error().
+-spec evalsha(binary(), [iodata()], [iodata() | integer()]) -> {ok, binary()} | redis_error().
evalsha(SHA, Keys, Args) ->
case erlang:get(?TR_STACK) of
undefined ->
@@ -391,7 +392,7 @@ handle_call({subscribe, Caller, Channels}, _From,
eredis_subscribe(Pid, Channels),
{reply, ok, State#state{subscriptions = Subs1}};
handle_call(Request, _From, State) ->
- ?WARNING_MSG("unexepected call: ~p", [Request]),
+ ?WARNING_MSG("Unexepected call: ~p", [Request]),
{noreply, State}.
handle_cast(_Msg, State) ->
@@ -424,7 +425,7 @@ handle_info({subscribed, Channel, Pid}, State) ->
case maps:is_key(Channel, State#state.subscriptions) of
true -> eredis_sub:ack_message(Pid);
false ->
- ?WARNING_MSG("got subscription ack for unknown channel ~s",
+ ?WARNING_MSG("Got subscription ack for unknown channel ~s",
[Channel])
end;
_ ->
@@ -444,7 +445,7 @@ handle_info({message, Channel, Data, Pid}, State) ->
end,
{noreply, State};
handle_info(Info, State) ->
- ?WARNING_MSG("unexpected info = ~p", [Info]),
+ ?WARNING_MSG("Unexpected info = ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
@@ -458,13 +459,11 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
-spec connect(state()) -> {ok, pid()} | {error, any()}.
connect(#state{num = Num}) ->
- Server = ejabberd_config:get_option(redis_server, "localhost"),
- Port = ejabberd_config:get_option(redis_port, 6379),
- DB = ejabberd_config:get_option(redis_db, 0),
- Pass = ejabberd_config:get_option(redis_password, ""),
- ConnTimeout = timer:seconds(
- ejabberd_config:get_option(
- redis_connect_timeout, 1)),
+ Server = ejabberd_option:redis_server(),
+ Port = ejabberd_option:redis_port(),
+ DB = ejabberd_option:redis_db(),
+ Pass = ejabberd_option:redis_password(),
+ ConnTimeout = ejabberd_option:redis_connect_timeout(),
try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of
{ok, Client} ->
?DEBUG("Connection #~p established to Redis at ~s:~p",
@@ -498,9 +497,9 @@ do_connect(_, Server, Port, Pass, DB, ConnTimeout) ->
-spec call(pos_integer(), {q, redis_command()}, integer()) ->
{ok, redis_reply()} | redis_error();
(pos_integer(), {qp, redis_pipeline()}, integer()) ->
- {ok, [redis_reply()]} | redis_error().
+ [{ok, redis_reply()} | redis_error()] | redis_error().
call(I, {F, Cmd}, Retries) ->
- ?DEBUG("redis query: ~p", [Cmd]),
+ ?DEBUG("Redis query: ~p", [Cmd]),
Conn = get_connection(I),
Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
{error, Reason} when is_atom(Reason) ->
@@ -513,7 +512,7 @@ call(I, {F, Cmd}, Retries) ->
end,
case Res of
{error, disconnected} when Retries > 0 ->
- try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of
+ try gen_server_call(get_proc(I), connect) of
ok -> call(I, {F, Cmd}, Retries-1);
{error, _} = Err -> Err
catch exit:{Why, {?GEN_SERVER, call, _}} ->
@@ -531,6 +530,14 @@ call(I, {F, Cmd}, Retries) ->
Res
end.
+gen_server_call(Proc, Msg) ->
+ case ejabberd_redis_sup:start() of
+ ok ->
+ ?GEN_SERVER:call(Proc, Msg, ?CALL_TIMEOUT);
+ {error, _} ->
+ {error, disconnected}
+ end.
+
-spec log_error(redis_command() | redis_pipeline(), atom() | binary()) -> ok.
log_error(Cmd, Reason) ->
?ERROR_MSG("Redis request has failed:~n"
@@ -542,8 +549,8 @@ log_error(Cmd, Reason) ->
get_rnd_id() ->
p1_rand:round_robin(ejabberd_redis_sup:get_pool_size() - 1) + 2.
--spec get_result([{error, atom() | binary()} | {ok, iodata()}]) ->
- {ok, [redis_reply()]} | {error, binary()}.
+-spec get_result([{ok, redis_reply()} | redis_error()]) ->
+ {ok, redis_reply()} | redis_error().
get_result([{error, _} = Err|_]) ->
Err;
get_result([{ok, _} = OK]) ->
@@ -584,11 +591,9 @@ fsm_limit_opts() ->
ejabberd_config:fsm_limit_opts([]).
get_queue_type() ->
- ejabberd_config:get_option(
- redis_queue_type,
- ejabberd_config:default_queue_type(global)).
+ ejabberd_option:redis_queue_type().
--spec flush_queue(p1_queue:queue()) -> p1_queue:queue().
+-spec flush_queue(queue()) -> queue().
flush_queue(Q) ->
CurrTime = erlang:monotonic_time(millisecond),
p1_queue:dropwhile(
@@ -601,7 +606,7 @@ flush_queue(Q) ->
true
end, Q).
--spec clean_queue(p1_queue:queue(), integer()) -> p1_queue:queue().
+-spec clean_queue(queue(), integer()) -> queue().
clean_queue(Q, CurrTime) ->
Q1 = p1_queue:dropwhile(
fun({_From, Time}) ->
@@ -627,5 +632,5 @@ re_subscribe(Pid, Subs) ->
end.
eredis_subscribe(Pid, Channels) ->
- ?DEBUG("redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]),
+ ?DEBUG("Redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]),
eredis_sub:subscribe(Pid, Channels).