summaryrefslogtreecommitdiff
path: root/src/ejabberd_sql.erl
diff options
context:
space:
mode:
authorEvgeny Khramtsov <ekhramtsov@process-one.net>2019-07-24 14:28:43 +0300
committerEvgeny Khramtsov <ekhramtsov@process-one.net>2019-07-24 14:28:43 +0300
commitae135e57d971df0e43a5f1a0189dfcd9f11ae130 (patch)
tree252ad02f713f42367a07d2e9c46bc0b0a8cf28b4 /src/ejabberd_sql.erl
parentRevert "mod_privacy: Don't try to look up 'undefined' list" (diff)
Improve SQL pool logic
Avoid using ETS table for SQL workers: rely on processes names instead
Diffstat (limited to 'src/ejabberd_sql.erl')
-rw-r--r--src/ejabberd_sql.erl73
1 files changed, 37 insertions, 36 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl
index 5a94dcf8..e0f1e9e1 100644
--- a/src/ejabberd_sql.erl
+++ b/src/ejabberd_sql.erl
@@ -30,7 +30,7 @@
-behaviour(p1_fsm).
%% External exports
--export([start/1, start_link/2,
+-export([start_link/2,
sql_query/2,
sql_query_t/1,
sql_transaction/2,
@@ -73,7 +73,6 @@
{db_ref = self() :: pid(),
db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
db_version = undefined :: undefined | non_neg_integer(),
- start_interval = 0 :: non_neg_integer(),
host = <<"">> :: binary(),
pending_requests :: p1_queue:queue()}).
@@ -104,14 +103,11 @@
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
-start(Host) ->
- p1_fsm:start(ejabberd_sql, [Host],
- fsm_limit_opts() ++ (?FSMOPTS)).
-
-start_link(Host, StartInterval) ->
- p1_fsm:start_link(ejabberd_sql,
- [Host, StartInterval],
- fsm_limit_opts() ++ (?FSMOPTS)).
+-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
+start_link(Host, I) ->
+ Proc = binary_to_atom(get_worker_name(Host, I), utf8),
+ p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
+ fsm_limit_opts() ++ ?FSMOPTS).
-type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} |
fun(() -> any()) | fun((atom(), _) -> any()).
@@ -154,19 +150,17 @@ sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}).
sql_call(Host, Msg) ->
case get(?STATE_KEY) of
- undefined ->
- case ejabberd_sql_sup:get_random_pid(Host) of
- none -> {error, <<"Unknown Host">>};
- Pid ->
- sync_send_event(Pid,{sql_cmd, Msg,
- erlang:monotonic_time(millisecond)},
- query_timeout(Host))
- end;
- _State -> nested_op(Msg)
+ undefined ->
+ Proc = get_worker(Host),
+ sync_send_event(Proc, {sql_cmd, Msg,
+ erlang:monotonic_time(millisecond)},
+ query_timeout(Host));
+ _State ->
+ nested_op(Msg)
end.
-keep_alive(Host, PID) ->
- case sync_send_event(PID,
+keep_alive(Host, Proc) ->
+ case sync_send_event(Proc,
{sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
erlang:monotonic_time(millisecond)},
query_timeout(Host)) of
@@ -174,11 +168,11 @@ keep_alive(Host, PID) ->
ok;
_Err ->
?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]),
- sync_send_event(PID, force_timeout, query_timeout(Host))
+ sync_send_event(Proc, force_timeout, query_timeout(Host))
end.
-sync_send_event(Pid, Msg, Timeout) ->
- try p1_fsm:sync_send_event(Pid, Msg, Timeout)
+sync_send_event(Proc, Msg, Timeout) ->
+ try p1_fsm:sync_send_event(Proc, Msg, Timeout)
catch _:{Reason, {p1_fsm, _, _}} ->
{error, Reason}
end.
@@ -310,10 +304,20 @@ sqlite_file(Host) ->
use_new_schema() ->
ejabberd_option:new_sql_schema().
+-spec get_worker(binary()) -> atom().
+get_worker(Host) ->
+ PoolSize = ejabberd_option:sql_pool_size(Host),
+ I = p1_rand:round_robin(PoolSize) + 1,
+ binary_to_existing_atom(get_worker_name(Host, I), utf8).
+
+-spec get_worker_name(binary(), pos_integer()) -> binary().
+get_worker_name(Host, I) ->
+ <<"ejabberd_sql_", Host/binary, $_, (integer_to_binary(I))/binary>>.
+
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
-init([Host, StartInterval]) ->
+init([Host]) ->
process_flag(trap_exit, true),
case ejabberd_option:sql_keepalive_interval(Host) of
undefined ->
@@ -324,12 +328,10 @@ init([Host, StartInterval]) ->
end,
[DBType | _] = db_opts(Host),
p1_fsm:send_event(self(), connect),
- ejabberd_sql_sup:add_pid(Host, self()),
QueueType = ejabberd_option:sql_queue_type(Host),
{ok, connecting,
#state{db_type = DBType, host = Host,
- pending_requests = p1_queue:new(QueueType, max_fsm_queue()),
- start_interval = StartInterval}}.
+ pending_requests = p1_queue:new(QueueType, max_fsm_queue())}}.
connecting(connect, #state{host = Host} = State) ->
ConnectRes = case db_opts(Host) of
@@ -359,13 +361,13 @@ connecting(connect, #state{host = Host} = State) ->
State2 = get_db_version(State1),
{next_state, session_established, State2};
{error, Reason} ->
- ?WARNING_MSG("~p connection failed:~n** Reason: ~p~n** "
- "Retry after: ~p seconds",
- [State#state.db_type, Reason,
- State#state.start_interval div 1000]),
- p1_fsm:send_event_after(State#state.start_interval,
- connect),
- {next_state, connecting, State}
+ StartInterval = ejabberd_option:sql_start_interval(Host),
+ ?WARNING_MSG("~p connection failed:~n** Reason: ~p~n** "
+ "Retry after: ~B seconds",
+ [State#state.db_type, Reason,
+ StartInterval div 1000]),
+ p1_fsm:send_event_after(StartInterval, connect),
+ {next_state, connecting, State}
end;
connecting(Event, State) ->
?WARNING_MSG("Unexpected event in 'connecting': ~p",
@@ -441,7 +443,6 @@ handle_info(Info, StateName, State) ->
{next_state, StateName, State}.
terminate(_Reason, _StateName, State) ->
- ejabberd_sql_sup:remove_pid(State#state.host, self()),
case State#state.db_type of
mysql -> catch p1_mysql_conn:stop(State#state.db_ref);
sqlite -> catch sqlite3:close(sqlite_db(State#state.host));