summaryrefslogtreecommitdiff
path: root/src/ejabberd_redis.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-14 13:57:52 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-04-14 13:57:52 +0300
commite40baf0bdaecf3206420fe8c16c33f2c166cb717 (patch)
tree75d9fe880e8257ea9fd20c095c252d7940cea89d /src/ejabberd_redis.erl
parentBump xmpp dependency, it's required by previous commit (diff)
Use cache in front of Redis/SQL RAM backends
Diffstat (limited to 'src/ejabberd_redis.erl')
-rw-r--r--src/ejabberd_redis.erl117
1 files changed, 100 insertions, 17 deletions
diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl
index e7cc74d9..bd85f0ee 100644
--- a/src/ejabberd_redis.erl
+++ b/src/ejabberd_redis.erl
@@ -31,11 +31,12 @@
-compile({no_auto_import, [get/1, put/2]}).
%% API
--export([start_link/1, get_proc/1, q/1, qp/1, format_error/1]).
+-export([start_link/1, get_proc/1, get_connection/1, q/1, qp/1, format_error/1]).
%% Commands
-export([multi/1, get/1, set/2, del/1,
sadd/2, srem/2, smembers/1, sismember/2, scard/1,
- hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]).
+ hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1,
+ subscribe/1, publish/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -53,14 +54,18 @@
-record(state, {connection :: pid() | undefined,
num :: pos_integer(),
+ subscriptions = #{} :: map(),
pending_q :: p1_queue:queue()}).
--type redis_error() :: {error, binary() | timeout | disconnected | overloaded}.
+-type error_reason() :: binary() | timeout | disconnected | overloaded.
+-type redis_error() :: {error, error_reason()}.
-type redis_reply() :: binary() | [binary()].
-type redis_command() :: [binary()].
-type redis_pipeline() :: [redis_command()].
-type state() :: #state{}.
+-export_type([error_reason/0]).
+
%%%===================================================================
%%% API
%%%===================================================================
@@ -79,11 +84,11 @@ get_connection(I) ->
-spec q(redis_command()) -> {ok, redis_reply()} | redis_error().
q(Command) ->
- call(get_worker(), {q, Command}, ?MAX_RETRIES).
+ call(get_rnd_id(), {q, Command}, ?MAX_RETRIES).
-spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error().
qp(Pipeline) ->
- call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES).
+ call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES).
-spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error().
multi(F) ->
@@ -288,6 +293,30 @@ hkeys(Key) ->
erlang:error(transaction_unsupported)
end.
+-spec subscribe([binary()]) -> ok | redis_error().
+subscribe(Channels) ->
+ try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT)
+ catch exit:{Why, {?GEN_SERVER, call, _}} ->
+ Reason = case Why of
+ timeout -> timeout;
+ _ -> disconnected
+ end,
+ {error, Reason}
+ end.
+
+-spec publish(iodata(), iodata()) -> {ok, non_neg_integer()} | redis_error() | queued.
+publish(Channel, Data) ->
+ Cmd = [<<"PUBLISH">>, Channel, Data],
+ case erlang:get(?TR_STACK) of
+ undefined ->
+ case q(Cmd) of
+ {ok, N} -> {ok, binary_to_integer(N)};
+ {error, _} = Err -> Err
+ end;
+ Stack ->
+ tr_enq(Cmd, Stack)
+ end.
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -315,6 +344,15 @@ handle_call(connect, From, #state{connection = Pid} = State) ->
self() ! connect,
handle_call(connect, From, State#state{connection = undefined})
end;
+handle_call({subscribe, Caller, Channels}, _From,
+ #state{connection = Pid, subscriptions = Subs} = State) ->
+ Subs1 = lists:foldl(
+ fun(Channel, Acc) ->
+ Callers = maps:get(Channel, Acc, []) -- [Caller],
+ maps:put(Channel, [Caller|Callers], Acc)
+ end, Subs, Channels),
+ eredis_subscribe(Pid, Channels),
+ {reply, ok, State#state{subscriptions = Subs1}};
handle_call(Request, _From, State) ->
?WARNING_MSG("unexepected call: ~p", [Request]),
{noreply, State}.
@@ -326,6 +364,7 @@ handle_info(connect, #state{connection = undefined} = State) ->
NewState = case connect(State) of
{ok, Connection} ->
Q1 = flush_queue(State#state.pending_q),
+ re_subscribe(Connection, State#state.subscriptions),
State#state{connection = Connection, pending_q = Q1};
{error, _} ->
State
@@ -342,6 +381,31 @@ handle_info({'EXIT', Pid, _}, State) ->
_ ->
{noreply, State}
end;
+handle_info({subscribed, Channel, Pid}, State) ->
+ case State#state.connection of
+ Pid ->
+ case maps:is_key(Channel, State#state.subscriptions) of
+ true -> eredis_sub:ack_message(Pid);
+ false ->
+ ?WARNING_MSG("got subscription ack for unknown channel ~s",
+ [Channel])
+ end;
+ _ ->
+ ok
+ end,
+ {noreply, State};
+handle_info({message, Channel, Data, Pid}, State) ->
+ case State#state.connection of
+ Pid ->
+ lists:foreach(
+ fun(Subscriber) ->
+ erlang:send(Subscriber, {redis_message, Channel, Data})
+ end, maps:get(Channel, State#state.subscriptions, [])),
+ eredis_sub:ack_message(Pid);
+ _ ->
+ ok
+ end,
+ {noreply, State};
handle_info(Info, State) ->
?WARNING_MSG("unexpected info = ~p", [Info]),
{noreply, State}.
@@ -377,8 +441,7 @@ connect(#state{num = Num}) ->
redis_connect_timeout,
fun(I) when is_integer(I), I>0 -> I end,
1)),
- try case eredis:start_link(Server, Port, DB, Pass,
- no_reconnect, ConnTimeout) of
+ try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of
{ok, Client} ->
?DEBUG("Connection #~p established to Redis at ~s:~p",
[Num, Server, Port]),
@@ -397,12 +460,24 @@ connect(#state{num = Num}) ->
{error, Reason}
end.
--spec call({atom(), atom()}, {q, redis_command()}, integer()) ->
+do_connect(1, Server, Port, Pass, _DB, _ConnTimeout) ->
+ %% First connection in the pool is always a subscriber
+ Res = eredis_sub:start_link(Server, Port, Pass, no_reconnect, infinity, drop),
+ case Res of
+ {ok, Pid} -> eredis_sub:controlling_process(Pid);
+ _ -> ok
+ end,
+ Res;
+do_connect(_, Server, Port, Pass, DB, ConnTimeout) ->
+ eredis:start_link(Server, Port, DB, Pass, no_reconnect, ConnTimeout).
+
+-spec call(pos_integer(), {q, redis_command()}, integer()) ->
{ok, redis_reply()} | redis_error();
- ({atom(), atom()}, {qp, redis_pipeline()}, integer()) ->
+ (pos_integer(), {qp, redis_pipeline()}, integer()) ->
{ok, [redis_reply()]} | redis_error().
-call({Conn, Parent}, {F, Cmd}, Retries) ->
+call(I, {F, Cmd}, Retries) ->
?DEBUG("redis query: ~p", [Cmd]),
+ Conn = get_connection(I),
Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of
{error, Reason} when is_atom(Reason) ->
try exit(whereis(Conn), kill) catch _:_ -> ok end,
@@ -414,8 +489,8 @@ call({Conn, Parent}, {F, Cmd}, Retries) ->
end,
case Res of
{error, disconnected} when Retries > 0 ->
- try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of
- ok -> call({Conn, Parent}, {F, Cmd}, Retries-1);
+ try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of
+ ok -> call(I, {F, Cmd}, Retries-1);
{error, _} = Err -> Err
catch exit:{Why, {?GEN_SERVER, call, _}} ->
Reason1 = case Why of
@@ -439,11 +514,9 @@ log_error(Cmd, Reason) ->
"** response = ~s",
[Cmd, format_error(Reason)]).
--spec get_worker() -> {atom(), atom()}.
-get_worker() ->
- Time = p1_time_compat:system_time(),
- I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1,
- {get_connection(I), get_proc(I)}.
+-spec get_rnd_id() -> pos_integer().
+get_rnd_id() ->
+ randoms:uniform(2, ejabberd_redis_sup:get_pool_size()).
-spec get_result([{error, atom() | binary()} | {ok, iodata()}]) ->
{ok, [redis_reply()]} | {error, binary()}.
@@ -531,3 +604,13 @@ clean_queue(Q, CurrTime) ->
true ->
Q1
end.
+
+re_subscribe(Pid, Subs) ->
+ case maps:keys(Subs) of
+ [] -> ok;
+ Channels -> eredis_subscribe(Pid, Channels)
+ end.
+
+eredis_subscribe(Pid, Channels) ->
+ ?DEBUG("redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]),
+ eredis_sub:subscribe(Pid, Channels).