diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-04-14 13:57:52 +0300 |
commit | e40baf0bdaecf3206420fe8c16c33f2c166cb717 (patch) | |
tree | 75d9fe880e8257ea9fd20c095c252d7940cea89d /src/ejabberd_redis.erl | |
parent | Bump xmpp dependency, it's required by previous commit (diff) |
Use cache in front of Redis/SQL RAM backends
Diffstat (limited to 'src/ejabberd_redis.erl')
-rw-r--r-- | src/ejabberd_redis.erl | 117 |
1 files changed, 100 insertions, 17 deletions
diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl index e7cc74d9..bd85f0ee 100644 --- a/src/ejabberd_redis.erl +++ b/src/ejabberd_redis.erl @@ -31,11 +31,12 @@ -compile({no_auto_import, [get/1, put/2]}). %% API --export([start_link/1, get_proc/1, q/1, qp/1, format_error/1]). +-export([start_link/1, get_proc/1, get_connection/1, q/1, qp/1, format_error/1]). %% Commands -export([multi/1, get/1, set/2, del/1, sadd/2, srem/2, smembers/1, sismember/2, scard/1, - hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1]). + hget/2, hset/3, hdel/2, hlen/1, hgetall/1, hkeys/1, + subscribe/1, publish/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -53,14 +54,18 @@ -record(state, {connection :: pid() | undefined, num :: pos_integer(), + subscriptions = #{} :: map(), pending_q :: p1_queue:queue()}). --type redis_error() :: {error, binary() | timeout | disconnected | overloaded}. +-type error_reason() :: binary() | timeout | disconnected | overloaded. +-type redis_error() :: {error, error_reason()}. -type redis_reply() :: binary() | [binary()]. -type redis_command() :: [binary()]. -type redis_pipeline() :: [redis_command()]. -type state() :: #state{}. +-export_type([error_reason/0]). + %%%=================================================================== %%% API %%%=================================================================== @@ -79,11 +84,11 @@ get_connection(I) -> -spec q(redis_command()) -> {ok, redis_reply()} | redis_error(). q(Command) -> - call(get_worker(), {q, Command}, ?MAX_RETRIES). + call(get_rnd_id(), {q, Command}, ?MAX_RETRIES). -spec qp(redis_pipeline()) -> {ok, [redis_reply()]} | redis_error(). qp(Pipeline) -> - call(get_worker(), {qp, Pipeline}, ?MAX_RETRIES). + call(get_rnd_id(), {qp, Pipeline}, ?MAX_RETRIES). -spec multi(fun(() -> any())) -> {ok, [redis_reply()]} | redis_error(). multi(F) -> @@ -288,6 +293,30 @@ hkeys(Key) -> erlang:error(transaction_unsupported) end. +-spec subscribe([binary()]) -> ok | redis_error(). +subscribe(Channels) -> + try ?GEN_SERVER:call(get_proc(1), {subscribe, self(), Channels}, ?CALL_TIMEOUT) + catch exit:{Why, {?GEN_SERVER, call, _}} -> + Reason = case Why of + timeout -> timeout; + _ -> disconnected + end, + {error, Reason} + end. + +-spec publish(iodata(), iodata()) -> {ok, non_neg_integer()} | redis_error() | queued. +publish(Channel, Data) -> + Cmd = [<<"PUBLISH">>, Channel, Data], + case erlang:get(?TR_STACK) of + undefined -> + case q(Cmd) of + {ok, N} -> {ok, binary_to_integer(N)}; + {error, _} = Err -> Err + end; + Stack -> + tr_enq(Cmd, Stack) + end. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -315,6 +344,15 @@ handle_call(connect, From, #state{connection = Pid} = State) -> self() ! connect, handle_call(connect, From, State#state{connection = undefined}) end; +handle_call({subscribe, Caller, Channels}, _From, + #state{connection = Pid, subscriptions = Subs} = State) -> + Subs1 = lists:foldl( + fun(Channel, Acc) -> + Callers = maps:get(Channel, Acc, []) -- [Caller], + maps:put(Channel, [Caller|Callers], Acc) + end, Subs, Channels), + eredis_subscribe(Pid, Channels), + {reply, ok, State#state{subscriptions = Subs1}}; handle_call(Request, _From, State) -> ?WARNING_MSG("unexepected call: ~p", [Request]), {noreply, State}. @@ -326,6 +364,7 @@ handle_info(connect, #state{connection = undefined} = State) -> NewState = case connect(State) of {ok, Connection} -> Q1 = flush_queue(State#state.pending_q), + re_subscribe(Connection, State#state.subscriptions), State#state{connection = Connection, pending_q = Q1}; {error, _} -> State @@ -342,6 +381,31 @@ handle_info({'EXIT', Pid, _}, State) -> _ -> {noreply, State} end; +handle_info({subscribed, Channel, Pid}, State) -> + case State#state.connection of + Pid -> + case maps:is_key(Channel, State#state.subscriptions) of + true -> eredis_sub:ack_message(Pid); + false -> + ?WARNING_MSG("got subscription ack for unknown channel ~s", + [Channel]) + end; + _ -> + ok + end, + {noreply, State}; +handle_info({message, Channel, Data, Pid}, State) -> + case State#state.connection of + Pid -> + lists:foreach( + fun(Subscriber) -> + erlang:send(Subscriber, {redis_message, Channel, Data}) + end, maps:get(Channel, State#state.subscriptions, [])), + eredis_sub:ack_message(Pid); + _ -> + ok + end, + {noreply, State}; handle_info(Info, State) -> ?WARNING_MSG("unexpected info = ~p", [Info]), {noreply, State}. @@ -377,8 +441,7 @@ connect(#state{num = Num}) -> redis_connect_timeout, fun(I) when is_integer(I), I>0 -> I end, 1)), - try case eredis:start_link(Server, Port, DB, Pass, - no_reconnect, ConnTimeout) of + try case do_connect(Num, Server, Port, Pass, DB, ConnTimeout) of {ok, Client} -> ?DEBUG("Connection #~p established to Redis at ~s:~p", [Num, Server, Port]), @@ -397,12 +460,24 @@ connect(#state{num = Num}) -> {error, Reason} end. --spec call({atom(), atom()}, {q, redis_command()}, integer()) -> +do_connect(1, Server, Port, Pass, _DB, _ConnTimeout) -> + %% First connection in the pool is always a subscriber + Res = eredis_sub:start_link(Server, Port, Pass, no_reconnect, infinity, drop), + case Res of + {ok, Pid} -> eredis_sub:controlling_process(Pid); + _ -> ok + end, + Res; +do_connect(_, Server, Port, Pass, DB, ConnTimeout) -> + eredis:start_link(Server, Port, DB, Pass, no_reconnect, ConnTimeout). + +-spec call(pos_integer(), {q, redis_command()}, integer()) -> {ok, redis_reply()} | redis_error(); - ({atom(), atom()}, {qp, redis_pipeline()}, integer()) -> + (pos_integer(), {qp, redis_pipeline()}, integer()) -> {ok, [redis_reply()]} | redis_error(). -call({Conn, Parent}, {F, Cmd}, Retries) -> +call(I, {F, Cmd}, Retries) -> ?DEBUG("redis query: ~p", [Cmd]), + Conn = get_connection(I), Res = try eredis:F(Conn, Cmd, ?CALL_TIMEOUT) of {error, Reason} when is_atom(Reason) -> try exit(whereis(Conn), kill) catch _:_ -> ok end, @@ -414,8 +489,8 @@ call({Conn, Parent}, {F, Cmd}, Retries) -> end, case Res of {error, disconnected} when Retries > 0 -> - try ?GEN_SERVER:call(Parent, connect, ?CALL_TIMEOUT) of - ok -> call({Conn, Parent}, {F, Cmd}, Retries-1); + try ?GEN_SERVER:call(get_proc(I), connect, ?CALL_TIMEOUT) of + ok -> call(I, {F, Cmd}, Retries-1); {error, _} = Err -> Err catch exit:{Why, {?GEN_SERVER, call, _}} -> Reason1 = case Why of @@ -439,11 +514,9 @@ log_error(Cmd, Reason) -> "** response = ~s", [Cmd, format_error(Reason)]). --spec get_worker() -> {atom(), atom()}. -get_worker() -> - Time = p1_time_compat:system_time(), - I = erlang:phash2(Time, ejabberd_redis_sup:get_pool_size()) + 1, - {get_connection(I), get_proc(I)}. +-spec get_rnd_id() -> pos_integer(). +get_rnd_id() -> + randoms:uniform(2, ejabberd_redis_sup:get_pool_size()). -spec get_result([{error, atom() | binary()} | {ok, iodata()}]) -> {ok, [redis_reply()]} | {error, binary()}. @@ -531,3 +604,13 @@ clean_queue(Q, CurrTime) -> true -> Q1 end. + +re_subscribe(Pid, Subs) -> + case maps:keys(Subs) of + [] -> ok; + Channels -> eredis_subscribe(Pid, Channels) + end. + +eredis_subscribe(Pid, Channels) -> + ?DEBUG("redis query: ~p", [[<<"SUBSCRIBE">>|Channels]]), + eredis_sub:subscribe(Pid, Channels). |