diff options
1 files changed, 79 insertions, 78 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl
index b225a107a..d085d3ee4 100644
--- a/src/ejabberd_sql.erl
+++ b/src/ejabberd_sql.erl
@@ -70,45 +70,27 @@
- {db_ref = self() :: pid(),
- db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
- db_version = undefined :: undefined | non_neg_integer(),
- host = <<"">> :: binary(),
- pending_requests :: p1_queue:queue()}).
+ {db_ref :: undefined | pid(),
+ db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
+ db_version :: undefined | non_neg_integer(),
+ host :: binary(),
+ pending_requests :: p1_queue:queue(),
+ overload_reported :: undefined | integer()}).
-define(STATE_KEY, ejabberd_sql_state).
-define(NESTING_KEY, ejabberd_sql_nesting_level).
-define(TOP_LEVEL_TXN, 0).
-define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]).
-define(PREPARE_KEY, ejabberd_sql_prepare).
%%-define(DBGFSM, true).
-define(FSMOPTS, [{debug, [trace]}]).
-define(FSMOPTS, []).
-%%% API
--spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
-start_link(Host, I) ->
- Proc = binary_to_atom(get_worker_name(Host, I), utf8),
- p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
- fsm_limit_opts() ++ ?FSMOPTS).
+-type state() :: #state{}.
-type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} |
fun(() -> any()) | fun((atom(), _) -> any()).
-type sql_query() :: sql_query_simple() |
@@ -119,8 +101,16 @@ start_link(Host, I) ->
{selected, [any()]} |
--spec sql_query(binary(), sql_query()) -> sql_query_result().
+%%% API
+-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
+start_link(Host, I) ->
+ Proc = binary_to_atom(get_worker_name(Host, I), utf8),
+ p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
+ fsm_limit_opts() ++ ?FSMOPTS).
+-spec sql_query(binary(), sql_query()) -> sql_query_result().
sql_query(Host, Query) ->
sql_call(Host, {sql_query, Query}).
@@ -129,7 +119,6 @@ sql_query(Host, Query) ->
-spec sql_transaction(binary(), [sql_query()] | fun(() -> any())) ->
{atomic, any()} |
{aborted, any()}.
sql_transaction(Host, Queries)
when is_list(Queries) ->
F = fun () ->
@@ -149,26 +138,27 @@ sql_transaction(Host, F) when is_function(F) ->
sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}).
sql_call(Host, Msg) ->
+ Timeout = query_timeout(Host),
case get(?STATE_KEY) of
undefined ->
Proc = get_worker(Host),
- sync_send_event(Proc, {sql_cmd, Msg,
- erlang:monotonic_time(millisecond)},
- query_timeout(Host));
+ sync_send_event(Proc, {sql_cmd, Msg, current_time() + Timeout},
+ Timeout);
_State ->
keep_alive(Host, Proc) ->
- case sync_send_event(Proc,
- {sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
- erlang:monotonic_time(millisecond)},
- query_timeout(Host)) of
+ Timeout = query_timeout(Host),
+ case sync_send_event(
+ Proc,
+ {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, current_time() + Timeout},
+ Timeout) of
{selected,_,[[<<"1">>]]} ->
_Err ->
?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]),
- sync_send_event(Proc, force_timeout, query_timeout(Host))
+ sync_send_event(Proc, force_timeout, Timeout)
sync_send_event(Proc, Msg, Timeout) ->
@@ -178,15 +168,14 @@ sync_send_event(Proc, Msg, Timeout) ->
-spec sql_query_t(sql_query()) -> sql_query_result().
%% This function is intended to be used from inside an sql_transaction:
sql_query_t(Query) ->
QRes = sql_query_internal(Query),
case QRes of
- {error, Reason} -> throw({aborted, Reason});
+ {error, Reason} -> restart(Reason);
Rs when is_list(Rs) ->
case lists:keysearch(error, 1, Rs) of
- {value, {error, Reason}} -> throw({aborted, Reason});
+ {value, {error, Reason}} -> restart(Reason);
_ -> QRes
_ -> QRes
@@ -372,11 +361,9 @@ connecting(Event, State) ->
{next_state, connecting, State}.
-connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
- _Timestamp},
+connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, Timestamp},
From, State) ->
- p1_fsm:reply(From,
- {error, <<"SQL connection failed">>}),
+ reply(From, {error, <<"SQL connection failed">>}, Timestamp),
{next_state, connecting, State};
connecting({sql_cmd, Command, Timestamp} = Req, From,
State) ->
@@ -386,10 +373,11 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
try p1_queue:in({sql_cmd, Command, From, Timestamp},
catch error:full ->
+ Err = <<"SQL request queue is overfilled">>,
+ ?ERROR_MSG("~s, bouncing all pending requests", [Err]),
Q = p1_queue:dropwhile(
- fun({sql_cmd, _, To, _Timestamp}) ->
- p1_fsm:reply(
- To, {error, <<"SQL connection failed">>}),
+ fun({sql_cmd, _, To, TS}) ->
+ reply(To, {error, Err}, TS),
end, State#state.pending_requests),
p1_queue:in({sql_cmd, Command, From, Timestamp}, Q)
@@ -399,16 +387,15 @@ connecting({sql_cmd, Command, Timestamp} = Req, From,
connecting(Request, {Who, _Ref}, State) ->
?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'",
[Request, Who]),
- {reply, {error, badarg}, connecting, State}.
+ {next_state, connecting, State}.
session_established({sql_cmd, Command, Timestamp}, From,
State) ->
run_sql_cmd(Command, From, State, Timestamp);
session_established(Request, {Who, _Ref}, State) ->
- ?WARNING_MSG("Unexpected call ~p from ~p in 'session_establ"
- "ished'",
+ ?WARNING_MSG("Unexpected call ~p from ~p in 'session_established'",
[Request, Who]),
- {reply, {error, badarg}, session_established, State}.
+ {next_state, session_established, State}.
session_established({sql_cmd, Command, From, Timestamp},
State) ->
@@ -465,17 +452,14 @@ handle_reconnect(Reason, #state{host = Host} = State) ->
{next_state, connecting, State}.
run_sql_cmd(Command, From, State, Timestamp) ->
- QueryTimeout = query_timeout(State#state.host),
- case erlang:monotonic_time(millisecond) - Timestamp of
- Age when Age < QueryTimeout ->
- put(?STATE_KEY, State),
- abort_on_driver_error(outer_op(Command), From);
- Age ->
- ?ERROR_MSG("Database was not available or too slow, "
- "discarding ~p milliseconds old request~n~p~n",
- [Age, Command]),
- {next_state, session_established, State}
+ case current_time() >= Timestamp of
+ true ->
+ State1 = report_overload(State),
+ {next_state, session_established, State1};
+ false ->
+ put(?STATE_KEY, State),
+ abort_on_driver_error(outer_op(Command), From, Timestamp)
%% Only called by handle_call, only handles top level operations.
@@ -620,6 +604,8 @@ sql_query_internal(#sql_query{} = Query) ->
{error, <<"killed">>};
exit:{normal, _} ->
{error, <<"terminated unexpectedly">>};
+ exit:{shutdown, _} ->
+ {error, <<"shutdown">>};
?EX_RULE(Class, Reason, Stack) ->
StackTrace = ?EX_STACK(Stack),
?ERROR_MSG("Internal error while processing SQL query:~n** ~s",
@@ -779,30 +765,42 @@ sql_query_to_iolist(SQLQuery) ->
%% Generate the OTP callback return tuple depending on the driver result.
- <<"query timed out">>} = Reply,
- From) ->
- p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"query timed out">>} = Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{stop, timeout, get(?STATE_KEY)};
- <<"Failed sending data on socket", _/binary>>} = Reply,
- From) ->
- p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"Failed sending data on socket", _/binary>>} = Reply,
+ From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{stop, closed, get(?STATE_KEY)};
- <<"SQL connection failed">>} = Reply,
- From) ->
- p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"SQL connection failed">>} = Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{stop, timeout, get(?STATE_KEY)};
- <<"Communication link failure">>} = Reply,
- From) ->
- p1_fsm:reply(From, Reply),
+abort_on_driver_error({error, <<"Communication link failure">>} = Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{stop, closed, get(?STATE_KEY)};
-abort_on_driver_error(Reply, From) ->
- p1_fsm:reply(From, Reply),
+abort_on_driver_error(Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{next_state, session_established, get(?STATE_KEY)}.
+-spec report_overload(state()) -> state().
+report_overload(#state{overload_reported = PrevTime} = State) ->
+ CurrTime = current_time(),
+ case PrevTime == undefined orelse (CurrTime - PrevTime) > timer:seconds(30) of
+ true ->
+ ?ERROR_MSG("SQL connection pool is overloaded, "
+ "discarding stale requests", []),
+ State#state{overload_reported = current_time()};
+ false ->
+ State
+ end.
+-spec reply({pid(), term()}, term(), integer()) -> term().
+reply(From, Reply, Timestamp) ->
+ case current_time() >= Timestamp of
+ true -> ok;
+ false -> p1_fsm:reply(From, Reply)
+ end.
%% == pure ODBC code
%% part of init/1
@@ -1143,6 +1141,9 @@ fsm_limit_opts() ->
query_timeout(LServer) ->
+current_time() ->
+ erlang:monotonic_time(millisecond).
%% ***IMPORTANT*** This error format requires extended_errors turned on.
extended_error({"08S01", _, Reason}) ->
% TCP Provider: The specified network name is no longer available