summaryrefslogtreecommitdiff
path: root/src/ejabberd_redis.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-06 17:56:37 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-06 17:56:37 +0300
commit6876a37e617189af8208baa10ab2be9987c02b4e (patch)
treedc7a58af7d3d4d81d5783d8169b608f872d38de0 /src/ejabberd_redis.erl
parentSet default prefix to /usr/local (diff)
Add Redis pool support
Fixes #1624
Diffstat (limited to 'src/ejabberd_redis.erl')
-rw-r--r--src/ejabberd_redis.erl268
1 files changed, 153 insertions, 115 deletions
diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl
index ec5d7359..8eeb1960 100644
--- a/src/ejabberd_redis.erl
+++ b/src/ejabberd_redis.erl
@@ -23,14 +23,15 @@
%%%----------------------------------------------------------------------
-module(ejabberd_redis).
-
--behaviour(gen_server).
--behaviour(ejabberd_config).
+-ifndef(GEN_SERVER).
+-define(GEN_SERVER, gen_server).
+-endif.
+-behaviour(?GEN_SERVER).
-compile({no_auto_import, [get/1, put/2]}).
%% API
--export([start_link/0, q/1, qp/1, config_reloaded/0, opt_type/1]).
+-export([start_link/1, get_proc/1, q/1, qp/1]).
%% Commands
-export([multi/1, get/1, set/2, del/1,
sadd/2, srem/2, smembers/1, sismember/2, scard/1,
@@ -43,31 +44,40 @@
-define(SERVER, ?MODULE).
-define(PROCNAME, 'ejabberd_redis_client').
-define(TR_STACK, redis_transaction_stack).
+-define(DEFAULT_MAX_QUEUE, 5000).
+-define(MAX_RETRIES, 1).
+-define(CALL_TIMEOUT, 60*1000). %% 60 seconds
-include("logger.hrl").
-include("ejabberd.hrl").
--record(state, {connection :: {pid(), reference()} | undefined}).
+-record(state, {connection :: pid() | undefined,
+ num :: pos_integer(),
+ pending_q :: p1_queue:queue()}).
-type redis_error() :: {error, binary() | atom()}.
%%%===================================================================
%%% API
%%%===================================================================
-start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+start_link(I) ->
+ ?GEN_SERVER:start_link({local, get_proc(I)}, ?MODULE, [I], []).
+
+get_proc(I) ->
+ aux:binary_to_atom(
+ iolist_to_binary(
+ [atom_to_list(?MODULE), $_, integer_to_list(I)])).
+
+get_connection(I) ->
+ aux:binary_to_atom(
+ iolist_to_binary(
+ [atom_to_list(?MODULE), "_connection_", integer_to_list(I)])).
q(Command) ->
- try eredis:q(?PROCNAME, Command)
- catch _:{noproc, _} -> {error, disconnected};
- _:{timeout, _} -> {error, timeout}
- end.
+ call(get_worker(), {q, Command}, ?MAX_RETRIES).
qp(Pipeline) ->
- try eredis:qp(?PROCNAME, Pipeline)
- catch _:{noproc, _} -> {error, disconnected};
- _:{timeout, _} -> {error, timeout}
- end.
+ call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES).
-spec multi(fun(() -> any())) -> {ok, list()} | redis_error().
multi(F) ->
@@ -91,14 +101,6 @@ multi(F) ->
{error, nested_transaction}
end.
-config_reloaded() ->
- case is_redis_configured() of
- true ->
- ?MODULE ! connect;
- false ->
- ?MODULE ! disconnect
- end.
-
-spec get(iodata()) -> {ok, undefined | binary()} | redis_error().
get(Key) ->
case erlang:get(?TR_STACK) of
@@ -274,56 +276,63 @@ hkeys(Key) ->
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
-init([]) ->
- ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 20),
+init([I]) ->
process_flag(trap_exit, true),
- {_, State} = handle_info(connect, #state{}),
- {ok, State}.
-
-handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
+ QueueType = get_queue_type(),
+ Limit = max_fsm_queue(),
+ self() ! connect,
+ {ok, #state{num = I, pending_q = p1_queue:new(QueueType, Limit)}}.
+
+handle_call(connect, From, #state{connection = undefined,
+ pending_q = Q} = State) ->
+ CurrTime = p1_time_compat:monotonic_time(milli_seconds),
+ Q2 = try p1_queue:in({From, CurrTime}, Q)
+ catch error:full ->
+ Q1 = clean_queue(Q, CurrTime),
+ p1_queue:in({From, CurrTime}, Q1)
+ end,
+ {noreply, State#state{pending_q = Q2}};
+handle_call(connect, From, #state{connection = Pid} = State) ->
+ case is_process_alive(Pid) of
+ true ->
+ {reply, ok, State};
+ false ->
+ self() ! connect,
+ handle_call(connect, From, State#state{connection = undefined})
+ end;
+handle_call(Request, _From, State) ->
+ ?WARNING_MSG("unexepected call: ~p", [Request]),
+ {noreply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(connect, #state{connection = undefined} = State) ->
- NewState = case is_redis_configured() of
- true ->
- case connect() of
- {ok, Connection} ->
- State#state{connection = Connection};
- {error, _} ->
- State
- end;
- false ->
+ NewState = case connect(State) of
+ {ok, Connection} ->
+ Q1 = flush_queue(State#state.pending_q),
+ State#state{connection = Connection, pending_q = Q1};
+ {error, _} ->
State
end,
{noreply, NewState};
handle_info(connect, State) ->
%% Already connected
{noreply, State};
-handle_info(disconnect, #state{connection = {Pid, MRef}} = State) ->
- ?INFO_MSG("Disconnecting from Redis server", []),
- erlang:demonitor(MRef, [flush]),
- eredis:stop(Pid),
- {noreply, State#state{connection = undefined}};
-handle_info(disconnect, State) ->
- %% Not connected
- {noreply, State};
-handle_info({'DOWN', MRef, _Type, Pid, Reason},
- #state{connection = {Pid, MRef}} = State) ->
- ?INFO_MSG("Redis connection has failed: ~p", [Reason]),
- connect(),
- {noreply, State#state{connection = undefined}};
-handle_info({'EXIT', _, _}, State) ->
- {noreply, State};
+handle_info({'EXIT', Pid, _}, State) ->
+ case State#state.connection of
+ Pid ->
+ self() ! connect,
+ {noreply, State#state{connection = undefined}};
+ _ ->
+ {noreply, State}
+ end;
handle_info(Info, State) ->
- ?INFO_MSG("unexpected info = ~p", [Info]),
+ ?WARNING_MSG("unexpected info = ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
- ejabberd_hooks:delete(config_reloaded, ?MODULE, config_reloaded, 20).
+ ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -331,37 +340,7 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
-is_redis_configured() ->
- lists:any(fun is_redis_configured/1, ?MYHOSTS).
-
-is_redis_configured(Host) ->
- ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
- PortConfigured = ejabberd_config:has_option({redis_port, Host}),
- DBConfigured = ejabberd_config:has_option({redis_db, Host}),
- PassConfigured = ejabberd_config:has_option({redis_password, Host}),
- ReconnTimeoutConfigured = ejabberd_config:has_option(
- {redis_reconnect_timeout, Host}),
- ConnTimeoutConfigured = ejabberd_config:has_option(
- {redis_connect_timeout, Host}),
- Modules = ejabberd_config:get_option(
- {modules, Host},
- fun(L) when is_list(L) -> L end, []),
- SMConfigured = ejabberd_config:get_option(
- {sm_db_type, Host},
- fun(V) -> V end) == redis,
- ModuleWithRedisDBConfigured =
- lists:any(
- fun({Module, Opts}) ->
- gen_mod:db_type(Host, Opts, Module) == redis
- end, Modules),
- ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
- ReconnTimeoutConfigured or ConnTimeoutConfigured or
- SMConfigured or ModuleWithRedisDBConfigured.
-
-iolist_to_list(IOList) ->
- binary_to_list(iolist_to_binary(IOList)).
-
-connect() ->
+connect(#state{num = Num}) ->
Server = ejabberd_config:get_option(redis_server,
fun iolist_to_list/1,
"localhost"),
@@ -377,36 +356,60 @@ connect() ->
Pass = ejabberd_config:get_option(redis_password,
fun iolist_to_list/1,
""),
- ReconnTimeout = timer:seconds(
- ejabberd_config:get_option(
- redis_reconnect_timeout,
- fun(I) when is_integer(I), I>0 -> I end,
- 1)),
ConnTimeout = timer:seconds(
ejabberd_config:get_option(
redis_connect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
try case eredis:start_link(Server, Port, DB, Pass,
- ReconnTimeout, ConnTimeout) of
+ no_reconnect, ConnTimeout) of
{ok, Client} ->
- ?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]),
- unlink(Client),
- MRef = erlang:monitor(process, Client),
- register(?PROCNAME, Client),
- {ok, {Client, MRef}};
+ ?DEBUG("Connection #~p established to Redis at ~s:~p",
+ [Num, Server, Port]),
+ register(get_connection(Num), Client),
+ {ok, Client};
{error, Why} ->
erlang:error(Why)
end
catch _:Reason ->
- Timeout = 10,
- ?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; "
+ Timeout = randoms:uniform(
+ min(10, ejabberd_redis_sup:get_pool_size())),
+ ?ERROR_MSG("Redis connection #~p at ~s:~p has failed: ~p; "
"reconnecting in ~p seconds",
- [Server, Port, Reason, Timeout]),
+ [Num, Server, Port, Reason, Timeout]),
erlang:send_after(timer:seconds(Timeout), self(), connect),
{error, Reason}
end.
+-spec call({atom(), atom()}, {q | qp, list()}, integer()) ->
+ {error, disconnected | timeout | binary()} | {ok, iodata()}.
+call({Conn, Parent}, {F, Cmd}, Retries) ->
+ Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
+ {error, Reason} when is_atom(Reason) ->
+ try exit(whereis(Conn), kill) catch _:_ -> ok end,
+ {error, disconnected};
+ Other ->
+ Other
+ catch exit:{timeout, _} -> {error, timeout};
+ exit:{_, {gen_server, call, _}} -> {error, disconnected}
+ end,
+ case Res of
+ {error, disconnected} when Retries > 0 ->
+ try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of
+ ok -> call({Conn, Parent}, {F, Cmd}, Retries-1);
+ {error, _} = Err -> Err
+ catch exit:{timeout, _} -> {error, timeout};
+ exit:{_, {?GEN_SERVER, call, _}} -> {error, disconnected}
+ end;
+ _ ->
+ Res
+ end.
+
+get_worker() ->
+ Time = p1_time_compat:system_time(),
+ I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1,
+ {get_connection(I), get_proc(I)}.
+
get_result([{error, _} = Err|_]) ->
Err;
get_result([{ok, _} = OK]) ->
@@ -436,16 +439,51 @@ reply(Val) ->
_ -> queued
end.
-opt_type(redis_connect_timeout) ->
- fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(redis_db) ->
- fun (I) when is_integer(I), I >= 0 -> I end;
-opt_type(redis_password) -> fun iolist_to_list/1;
-opt_type(redis_port) ->
- fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
-opt_type(redis_reconnect_timeout) ->
- fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(redis_server) -> fun iolist_to_list/1;
-opt_type(_) ->
- [redis_connect_timeout, redis_db, redis_password,
- redis_port, redis_reconnect_timeout, redis_server].
+iolist_to_list(IOList) ->
+ binary_to_list(iolist_to_binary(IOList)).
+
+max_fsm_queue() ->
+ proplists:get_value(max_queue, fsm_limit_opts(), ?DEFAULT_MAX_QUEUE).
+
+fsm_limit_opts() ->
+ ejabberd_config:fsm_limit_opts([]).
+
+get_queue_type() ->
+ case ejabberd_config:get_option(
+ redis_queue_type,
+ ejabberd_redis_sup:opt_type(redis_queue_type)) of
+ undefined ->
+ ejabberd_config:default_queue_type(global);
+ Type ->
+ Type
+ end.
+
+flush_queue(Q) ->
+ CurrTime = p1_time_compat:monotonic_time(milli_seconds),
+ p1_queue:dropwhile(
+ fun({From, Time}) ->
+ if (CurrTime - Time) >= ?CALL_TIMEOUT ->
+ ok;
+ true ->
+ ?GEN_SERVER:reply(From, ok)
+ end,
+ true
+ end, Q).
+
+clean_queue(Q, CurrTime) ->
+ Q1 = p1_queue:dropwhile(
+ fun({_From, Time}) ->
+ (CurrTime - Time) >= ?CALL_TIMEOUT
+ end, Q),
+ Len = p1_queue:len(Q1),
+ Limit = p1_queue:get_limit(Q1),
+ if Len >= Limit ->
+ ?ERROR_MSG("Redis request queue is overloaded", []),
+ p1_queue:dropwhile(
+ fun({From, _Time}) ->
+ ?GEN_SERVER:reply(From, {error, disconnected}),
+ true
+ end, Q1);
+ true ->
+ Q1
+ end.