aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-05-09 08:36:30 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-05-09 08:36:30 +0300
commit068db1a2d98c338b408f84b5a294ae7400256e13 (patch)
tree55da1db14d3dfa6ec6b1ff56e774165da3f14f76
parentmod_client_state: Delete only the configured hooks (diff)
Handle Redis connection in a separate module
-rw-r--r--src/ejabberd_app.erl1
-rw-r--r--src/ejabberd_config.erl6
-rw-r--r--src/ejabberd_redis.erl179
-rw-r--r--src/ejabberd_sm_redis.erl58
4 files changed, 196 insertions, 48 deletions
diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl
index b25bf231b..44d0db626 100644
--- a/src/ejabberd_app.erl
+++ b/src/ejabberd_app.erl
@@ -63,6 +63,7 @@ start(normal, _Args) ->
Sup = ejabberd_sup:start_link(),
ejabberd_rdbms:start(),
ejabberd_riak_sup:start(),
+ ejabberd_redis:start(),
ejabberd_sm:start(),
cyrsasl:start(),
% Profiling
diff --git a/src/ejabberd_config.erl b/src/ejabberd_config.erl
index 06de61b5f..16eebc0e3 100644
--- a/src/ejabberd_config.erl
+++ b/src/ejabberd_config.erl
@@ -30,7 +30,7 @@
add_global_option/2, add_local_option/2,
get_global_option/2, get_local_option/2,
get_global_option/3, get_local_option/3,
- get_option/2, get_option/3, add_option/2,
+ get_option/2, get_option/3, add_option/2, has_option/1,
get_vh_by_auth_method/1, is_file_readable/1,
get_version/0, get_myhosts/0, get_mylang/0,
prepare_opt_val/4, convert_table_to_binary/5,
@@ -838,6 +838,10 @@ get_option(Opt, F, Default) ->
end
end.
+-spec has_option(atom() | {atom(), global | binary()}) -> any().
+has_option(Opt) ->
+ get_option(Opt, fun(_) -> true end, false).
+
init_module_db_table(Modules) ->
catch ets:new(module_db, [named_table, public, bag]),
%% Dirty hack for mod_pubsub
diff --git a/src/ejabberd_redis.erl b/src/ejabberd_redis.erl
new file mode 100644
index 000000000..c6e3b4dd0
--- /dev/null
+++ b/src/ejabberd_redis.erl
@@ -0,0 +1,179 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 8 May 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(ejabberd_redis).
+
+-behaviour(gen_server).
+-behaviour(ejabberd_config).
+
+%% API
+-export([start/0, start_link/0, q/1, qp/1, opt_type/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(SERVER, ?MODULE).
+-define(PROCNAME, 'ejabberd_redis_client').
+
+-include("logger.hrl").
+-include("ejabberd.hrl").
+
+-record(state, {}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+start() ->
+ case lists:any(
+ fun(Host) ->
+ is_redis_configured(Host)
+ end, ?MYHOSTS) of
+ true ->
+ Spec = {?MODULE, {?MODULE, start_link, []},
+ permanent, 2000, worker, [?MODULE]},
+ supervisor:start_child(ejabberd_sup, Spec);
+ false ->
+ ok
+ end.
+
+q(Command) ->
+ try eredis:q(?PROCNAME, Command)
+ catch _:Reason -> {error, Reason}
+ end.
+
+qp(Pipeline) ->
+ try eredis:qp(?PROCNAME, Pipeline)
+ catch _:Reason -> {error, Reason}
+ end.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([]) ->
+ process_flag(trap_exit, true),
+ connect(),
+ {ok, #state{}}.
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(connect, State) ->
+ connect(),
+ {noreply, State};
+handle_info({'DOWN', _MRef, _Type, _Pid, Reason}, State) ->
+ ?INFO_MSG("Redis connection has failed: ~p", [Reason]),
+ connect(),
+ {noreply, State};
+handle_info({'EXIT', _, _}, State) ->
+ {noreply, State};
+handle_info(Info, State) ->
+ ?INFO_MSG("unexpected info = ~p", [Info]),
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+is_redis_configured(Host) ->
+ ServerConfigured = ejabberd_config:has_option({redis_server, Host}),
+ PortConfigured = ejabberd_config:has_option({redis_port, Host}),
+ DBConfigured = ejabberd_config:has_option({redis_db, Host}),
+ PassConfigured = ejabberd_config:has_option({redis_password, Host}),
+ ReconnTimeoutConfigured = ejabberd_config:has_option(
+ {redis_reconnect_timeout, Host}),
+ ConnTimeoutConfigured = ejabberd_config:has_option(
+ {redis_connect_timeout, Host}),
+ Modules = ejabberd_config:get_option(
+ {modules, Host},
+ fun(L) when is_list(L) -> L end, []),
+ SMConfigured = ejabberd_config:get_option(
+ {sm_db_type, Host},
+ fun(V) -> V end) == redis,
+ ModuleWithRedisDBConfigured =
+ lists:any(
+ fun({Module, Opts}) ->
+ gen_mod:db_type(Host, Opts, Module) == redis
+ end, Modules),
+ ServerConfigured or PortConfigured or DBConfigured or PassConfigured or
+ ReconnTimeoutConfigured or ConnTimeoutConfigured or
+ SMConfigured or ModuleWithRedisDBConfigured.
+
+iolist_to_list(IOList) ->
+ binary_to_list(iolist_to_binary(IOList)).
+
+connect() ->
+ Server = ejabberd_config:get_option(redis_server,
+ fun iolist_to_list/1,
+ "localhost"),
+ Port = ejabberd_config:get_option(redis_port,
+ fun(P) when is_integer(P),
+ P>0, P<65536 ->
+ P
+ end, 6379),
+ DB = ejabberd_config:get_option(redis_db,
+ fun(I) when is_integer(I), I >= 0 ->
+ I
+ end, 0),
+ Pass = ejabberd_config:get_option(redis_password,
+ fun iolist_to_list/1,
+ ""),
+ ReconnTimeout = timer:seconds(
+ ejabberd_config:get_option(
+ redis_reconnect_timeout,
+ fun(I) when is_integer(I), I>0 -> I end,
+ 1)),
+ ConnTimeout = timer:seconds(
+ ejabberd_config:get_option(
+ redis_connect_timeout,
+ fun(I) when is_integer(I), I>0 -> I end,
+ 1)),
+ try case eredis:start_link(Server, Port, DB, Pass,
+ ReconnTimeout, ConnTimeout) of
+ {ok, Client} ->
+ ?INFO_MSG("Connected to Redis at ~s:~p", [Server, Port]),
+ unlink(Client),
+ erlang:monitor(process, Client),
+ register(?PROCNAME, Client),
+ {ok, Client};
+ {error, Why} ->
+ erlang:error(Why)
+ end
+ catch _:Reason ->
+ Timeout = 10,
+ ?ERROR_MSG("Redis connection at ~s:~p has failed: ~p; "
+ "reconnecting in ~p seconds",
+ [Server, Port, Reason, Timeout]),
+ erlang:send_after(timer:seconds(Timeout), self(), connect)
+ end.
+
+opt_type(redis_connect_timeout) ->
+ fun (I) when is_integer(I), I > 0 -> I end;
+opt_type(redis_db) ->
+ fun (I) when is_integer(I), I >= 0 -> I end;
+opt_type(redis_password) -> fun iolist_to_list/1;
+opt_type(redis_port) ->
+ fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
+opt_type(redis_reconnect_timeout) ->
+ fun (I) when is_integer(I), I > 0 -> I end;
+opt_type(redis_server) -> fun iolist_to_list/1;
+opt_type(_) ->
+ [redis_connect_timeout, redis_db, redis_password,
+ redis_port, redis_reconnect_timeout, redis_server].
diff --git a/src/ejabberd_sm_redis.erl b/src/ejabberd_sm_redis.erl
index bf9e0eff5..d25f777e3 100644
--- a/src/ejabberd_sm_redis.erl
+++ b/src/ejabberd_sm_redis.erl
@@ -21,48 +21,12 @@
-include("logger.hrl").
-include("jlib.hrl").
--define(PROCNAME, 'ejabberd_redis_client').
-
%%%===================================================================
%%% API
%%%===================================================================
-spec init() -> ok | {error, any()}.
init() ->
- Server = ejabberd_config:get_option(redis_server,
- fun iolist_to_list/1,
- "localhost"),
- Port = ejabberd_config:get_option(redis_port,
- fun(P) when is_integer(P),
- P>0, P<65536 ->
- P
- end, 6379),
- DB = ejabberd_config:get_option(redis_db,
- fun(I) when is_integer(I), I >= 0 ->
- I
- end, 0),
- Pass = ejabberd_config:get_option(redis_password,
- fun iolist_to_list/1,
- ""),
- ReconnTimeout = timer:seconds(
- ejabberd_config:get_option(
- redis_reconnect_timeout,
- fun(I) when is_integer(I), I>0 -> I end,
- 1)),
- ConnTimeout = timer:seconds(
- ejabberd_config:get_option(
- redis_connect_timeout,
- fun(I) when is_integer(I), I>0 -> I end,
- 1)),
- case eredis:start_link(Server, Port, DB, Pass,
- ReconnTimeout, ConnTimeout) of
- {ok, Client} ->
- register(?PROCNAME, Client),
- clean_table(),
- ok;
- {error, _} = Err ->
- ?ERROR_MSG("failed to start redis client: ~p", [Err]),
- Err
- end.
+ clean_table().
-spec set_session(#session{}) -> ok.
set_session(Session) ->
@@ -71,8 +35,8 @@ set_session(Session) ->
SIDKey = sid_to_key(Session#session.sid),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, Session#session.sid),
- case eredis:qp(?PROCNAME, [["HSET", USKey, SIDKey, T],
- ["HSET", ServKey, USSIDKey, T]]) of
+ case ejabberd_redis:qp([["HSET", USKey, SIDKey, T],
+ ["HSET", ServKey, USSIDKey, T]]) of
[{ok, _}, {ok, _}] ->
ok;
Err ->
@@ -83,7 +47,7 @@ set_session(Session) ->
{ok, #session{}} | {error, notfound}.
delete_session(LUser, LServer, _LResource, SID) ->
USKey = us_to_key({LUser, LServer}),
- case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
+ case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} ->
Ss = decode_session_list(Vals),
case lists:keyfind(SID, #session.sid, Ss) of
@@ -93,8 +57,8 @@ delete_session(LUser, LServer, _LResource, SID) ->
SIDKey = sid_to_key(SID),
ServKey = server_to_key(element(2, Session#session.us)),
USSIDKey = us_sid_to_key(Session#session.us, SID),
- eredis:qp(?PROCNAME, [["HDEL", USKey, SIDKey],
- ["HDEL", ServKey, USSIDKey]]),
+ ejabberd_redis:qp([["HDEL", USKey, SIDKey],
+ ["HDEL", ServKey, USSIDKey]]),
{ok, Session}
end;
Err ->
@@ -112,7 +76,7 @@ get_sessions() ->
-spec get_sessions(binary()) -> [#session{}].
get_sessions(LServer) ->
ServKey = server_to_key(LServer),
- case eredis:q(?PROCNAME, ["HGETALL", ServKey]) of
+ case ejabberd_redis:q(["HGETALL", ServKey]) of
{ok, Vals} ->
decode_session_list(Vals);
Err ->
@@ -123,7 +87,7 @@ get_sessions(LServer) ->
-spec get_sessions(binary(), binary()) -> [#session{}].
get_sessions(LUser, LServer) ->
USKey = us_to_key({LUser, LServer}),
- case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
+ case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} when is_list(Vals) ->
decode_session_list(Vals);
Err ->
@@ -135,7 +99,7 @@ get_sessions(LUser, LServer) ->
[#session{}].
get_sessions(LUser, LServer, LResource) ->
USKey = us_to_key({LUser, LServer}),
- case eredis:q(?PROCNAME, ["HGETALL", USKey]) of
+ case ejabberd_redis:q(["HGETALL", USKey]) of
{ok, Vals} when is_list(Vals) ->
[S || S <- decode_session_list(Vals),
element(3, S#session.usr) == LResource];
@@ -172,7 +136,7 @@ clean_table() ->
lists:foreach(
fun(LServer) ->
ServKey = server_to_key(LServer),
- case eredis:q(?PROCNAME, ["HKEYS", ServKey]) of
+ case ejabberd_redis:q(["HKEYS", ServKey]) of
{ok, []} ->
ok;
{ok, Vals} ->
@@ -189,7 +153,7 @@ clean_table() ->
SIDKey = sid_to_key(SID),
["HDEL", USKey, SIDKey]
end, Vals1),
- Res = eredis:qp(?PROCNAME, [Q1|Q2]),
+ Res = ejabberd_redis:qp([Q1|Q2]),
case lists:filter(
fun({ok, _}) -> false;
(_) -> true