diff options
Diffstat (limited to 'src/ejabberd_redis.erl')
-rw-r--r-- | src/ejabberd_redis.erl | 67 |
1 files changed, 36 insertions, 31 deletions
diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index 6284238a2..a9ceb1833 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -54,13 +54,15 @@ -record(state, {connection :: pid() | undefined, num :: pos_integer(), - subscriptions = #{} :: map(), - pending_q :: p1_queue:queue()}). + subscriptions = #{} :: subscriptions(), + pending_q :: queue()}). +-type queue() :: p1_queue:queue({{pid(), term()}, integer()}). +-type subscriptions() :: #{binary() => [pid()]}. -type error_reason() :: binary() | timeout | disconnected | overloaded. -type redis_error() :: {error, error_reason()}. --type redis_reply() :: binary() | [binary()]. --type redis_command() :: [binary()]. +-type redis_reply() :: undefined | binary() | [binary()]. +-type redis_command() :: [iodata() | integer()]. -type redis_pipeline() :: [redis_command()]. -type redis_info() :: server | clients | memory | persistence | stats | replication | cpu | commandstats | @@ -89,19 +91,18 @@ get_connection(I) -> q(Command) -> call(get_rnd_id(), {q, Command}, ?MAX_RETRIES). --spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error(). +-spec qp(redis_pipeline()) -> [{ok, redis_reply()} | redis_error()] | redis_error(). qp(Pipeline) -> call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES). --spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error(). +-spec multi(fun(() -> any())) -> {ok, redis_reply()} | redis_error(). multi(F) -> case erlang:get(?TR_STACK) of undefined -> erlang:put(?TR_STACK, []), try F() of _ -> - Stack = erlang:get(?TR_STACK), - erlang:erase(?TR_STACK), + Stack = erlang:erase(?TR_STACK), Command = [["MULTI"]|lists:reverse([["EXEC"]|Stack])], case qp(Command) of {error, _} = Err -> Err; @@ -298,7 +299,7 @@ hkeys(Key) -> -spec subscribe([binary()]) -> ok | redis_error(). subscribe(Channels) -> - try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT) + try gen_server_call(get_proc(1), {subscribe, self(), Channels}) catch exit:{Why, {?GEN_SERVER, call, _}} -> Reason = case Why of timeout -> timeout; @@ -329,7 +330,7 @@ script_load(Data) -> erlang:error(transaction_unsupported) end. --spec evalsha(binary(), [iodata()], [iodata()]) -> {ok, binary()} | redis_error(). +-spec evalsha(binary(), [iodata()], [iodata() | integer()]) -> {ok, binary()} | redis_error(). evalsha(SHA, Keys, Args) -> case erlang:get(?TR_STACK) of undefined -> @@ -391,7 +392,7 @@ handle_call({subscribe, Caller, Channels}, _From, eredis_subscribe(Pid, Channels), {reply, ok, State#state{subscriptions = Subs1}}; handle_call(Request, _From, State) -> - ?WARNING_MSG("unexepected call: ~p", [Request]), + ?WARNING_MSG("Unexepected call: ~p", [Request]), {noreply, State}. handle_cast(_Msg, State) -> @@ -424,7 +425,7 @@ handle_info({subscribed, Channel, Pid}, State) -> case maps:is_key(Channel, State#state.subscriptions) of true -> eredis_sub:ack_message(Pid); false -> - ?WARNING_MSG("got subscription ack for unknown channel ~s", + ?WARNING_MSG("Got subscription ack for unknown channel ~s", [Channel]) end; _ -> @@ -444,7 +445,7 @@ handle_info({message, Channel, Data, Pid}, State) -> end, {noreply, State}; handle_info(Info, State) -> - ?WARNING_MSG("unexpected info = ~p", [Info]), + ?WARNING_MSG("Unexpected info = ~p", [Info]), {noreply, State}. terminate(_Reason, _State) -> @@ -458,13 +459,11 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== -spec connect(state()) -> {ok, pid()} | {error, any()}. connect(#state{num = Num}) -> - Server = ejabberd_config:get_option(redis_server, "localhost"), - Port = ejabberd_config:get_option(redis_port, 6379), - DB = ejabberd_config:get_option(redis_db, 0), - Pass = ejabberd_config:get_option(redis_password, ""), - ConnTimeout = timer:seconds( - ejabberd_config:get_option( - redis_connect_timeout, 1)), + Server = ejabberd_option:redis_server(), + Port = ejabberd_option:redis_port(), + DB = ejabberd_option:redis_db(), + Pass = ejabberd_option:redis_password(), + ConnTimeout = ejabberd_option:redis_connect_timeout(), try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of {ok, Client} -> ?DEBUG("Connection #~p established to Redis at ~s:~p", @@ -498,9 +497,9 @@ do_connect(_, Server, Port, Pass, DB, ConnTimeout) -> -spec call(pos_integer(), {q, redis_command()}, integer()) -> {ok, redis_reply()} | redis_error(); (pos_integer(), {qp, redis_pipeline()}, integer()) -> - {ok, [redis_reply()]} | redis_error(). + [{ok, redis_reply()} | redis_error()] | redis_error(). call(I, {F, Cmd}, Retries) -> - ?DEBUG("redis query: ~p", [Cmd]), + ?DEBUG("Redis query: ~p", [Cmd]), Conn = get_connection(I), Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of {error, Reason} when is_atom(Reason) -> @@ -513,7 +512,7 @@ call(I, {F, Cmd}, Retries) -> end, case Res of {error, disconnected} when Retries > 0 -> - try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of + try gen_server_call(get_proc(I), connect) of ok -> call(I, {F, Cmd}, Retries-1); {error, _} = Err -> Err catch exit:{Why, {?GEN_SERVER, call, _}} -> @@ -531,6 +530,14 @@ call(I, {F, Cmd}, Retries) -> Res end. +gen_server_call(Proc, Msg) -> + case ejabberd_redis_sup:start() of + ok -> + ?GEN_SERVER:call(Proc, Msg, ?CALL_TIMEOUT); + {error, _} -> + {error, disconnected} + end. + -spec log_error(redis_command() | redis_pipeline(), atom() | binary()) -> ok. log_error(Cmd, Reason) -> ?ERROR_MSG("Redis request has failed:~n" @@ -542,8 +549,8 @@ log_error(Cmd, Reason) -> get_rnd_id() -> p1_rand:round_robin(ejabberd_redis_sup:get_pool_size() - 1) + 2. --spec get_result([{error, atom() | binary()} | {ok, iodata()}]) -> - {ok, [redis_reply()]} | {error, binary()}. +-spec get_result([{ok, redis_reply()} | redis_error()]) -> + {ok, redis_reply()} | redis_error(). get_result([{error, _} = Err|_]) -> Err; get_result([{ok, _} = OK]) -> @@ -584,11 +591,9 @@ fsm_limit_opts() -> ejabberd_config:fsm_limit_opts([]). get_queue_type() -> - ejabberd_config:get_option( - redis_queue_type, - ejabberd_config:default_queue_type(global)). + ejabberd_option:redis_queue_type(). --spec flush_queue(p1_queue:queue()) -> p1_queue:queue(). +-spec flush_queue(queue()) -> queue(). flush_queue(Q) -> CurrTime = erlang:monotonic_time(millisecond), p1_queue:dropwhile( @@ -601,7 +606,7 @@ flush_queue(Q) -> true end, Q). --spec clean_queue(p1_queue:queue(), integer()) -> p1_queue:queue(). +-spec clean_queue(queue(), integer()) -> queue(). clean_queue(Q, CurrTime) -> Q1 = p1_queue:dropwhile( fun({_From, Time}) -> @@ -627,5 +632,5 @@ re_subscribe(Pid, Subs) -> end. eredis_subscribe(Pid, Channels) -> - ?DEBUG("redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]), + ?DEBUG("Redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]), eredis_sub:subscribe(Pid, Channels). |