diff options
-rw-r--r-- | src/ejabberd_sql.erl | 157 |
1 files changed, 79 insertions, 78 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl index b225a107a..d085d3ee4 100644 --- a/src/ejabberd_sql.erl +++ b/src/ejabberd_sql.erl @@ -70,45 +70,27 @@ -include("ejabberd_stacktrace.hrl"). -record(state, - {db_ref = self() :: pid(), - db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql, - db_version = undefined :: undefined | non_neg_integer(), - host = <<"">> :: binary(), - pending_requests :: p1_queue:queue()}). + {db_ref :: undefined | pid(), + db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql, + db_version :: undefined | non_neg_integer(), + host :: binary(), + pending_requests :: p1_queue:queue(), + overload_reported :: undefined | integer()}). -define(STATE_KEY, ejabberd_sql_state). - -define(NESTING_KEY, ejabberd_sql_nesting_level). - -define(TOP_LEVEL_TXN, 0). - -define(MAX_TRANSACTION_RESTARTS, 10). - -define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]). - -define(PREPARE_KEY, ejabberd_sql_prepare). - %%-define(DBGFSM, true). - -ifdef(DBGFSM). - -define(FSMOPTS, [{debug, [trace]}]). - -else. - -define(FSMOPTS, []). - -endif. -%%%---------------------------------------------------------------------- -%%% API -%%%---------------------------------------------------------------------- --spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}. -start_link(Host, I) -> - Proc = binary_to_atom(get_worker_name(Host, I), utf8), - p1_fsm:start_link({local, Proc}, ?MODULE, [Host], - fsm_limit_opts() ++ ?FSMOPTS). - +-type state() :: #state{}. -type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} | fun(() -> any()) | fun((atom(), _) -> any()). -type sql_query() :: sql_query_simple() | @@ -119,8 +101,16 @@ start_link(Host, I) -> {selected, [any()]} | ok. --spec sql_query(binary(), sql_query()) -> sql_query_result(). +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- +-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}. +start_link(Host, I) -> + Proc = binary_to_atom(get_worker_name(Host, I), utf8), + p1_fsm:start_link({local, Proc}, ?MODULE, [Host], + fsm_limit_opts() ++ ?FSMOPTS). +-spec sql_query(binary(), sql_query()) -> sql_query_result(). sql_query(Host, Query) -> sql_call(Host, {sql_query, Query}). @@ -129,7 +119,6 @@ sql_query(Host, Query) -> -spec sql_transaction(binary(), [sql_query()] | fun(() -> any())) -> {atomic, any()} | {aborted, any()}. - sql_transaction(Host, Queries) when is_list(Queries) -> F = fun () -> @@ -149,26 +138,27 @@ sql_transaction(Host, F) when is_function(F) -> sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}). sql_call(Host, Msg) -> + Timeout = query_timeout(Host), case get(?STATE_KEY) of undefined -> Proc = get_worker(Host), - sync_send_event(Proc, {sql_cmd, Msg, - erlang:monotonic_time(millisecond)}, - query_timeout(Host)); + sync_send_event(Proc, {sql_cmd, Msg, current_time() + Timeout}, + Timeout); _State -> nested_op(Msg) end. keep_alive(Host, Proc) -> - case sync_send_event(Proc, - {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, - erlang:monotonic_time(millisecond)}, - query_timeout(Host)) of + Timeout = query_timeout(Host), + case sync_send_event( + Proc, + {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, current_time() + Timeout}, + Timeout) of {selected,_,[[<<"1">>]]} -> ok; _Err -> ?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]), - sync_send_event(Proc, force_timeout, query_timeout(Host)) + sync_send_event(Proc, force_timeout, Timeout) end. sync_send_event(Proc, Msg, Timeout) -> @@ -178,15 +168,14 @@ sync_send_event(Proc, Msg, Timeout) -> end. -spec sql_query_t(sql_query()) -> sql_query_result(). - %% This function is intended to be used from inside an sql_transaction: sql_query_t(Query) -> QRes = sql_query_internal(Query), case QRes of - {error, Reason} -> throw({aborted, Reason}); + {error, Reason} -> restart(Reason); Rs when is_list(Rs) -> case lists:keysearch(error, 1, Rs) of - {value, {error, Reason}} -> throw({aborted, Reason}); + {value, {error, Reason}} -> restart(Reason); _ -> QRes end; _ -> QRes @@ -372,11 +361,9 @@ connecting(Event, State) -> [Event]), {next_state, connecting, State}. -connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, - _Timestamp}, +connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, Timestamp}, From, State) -> - p1_fsm:reply(From, - {error, <<"SQL connection failed">>}), + reply(From, {error, <<"SQL connection failed">>}, Timestamp), {next_state, connecting, State}; connecting({sql_cmd, Command, Timestamp} = Req, From, State) -> @@ -386,10 +373,11 @@ connecting({sql_cmd, Command, Timestamp} = Req, From, try p1_queue:in({sql_cmd, Command, From, Timestamp}, State#state.pending_requests) catch error:full -> + Err = <<"SQL request queue is overfilled">>, + ?ERROR_MSG("~s, bouncing all pending requests", [Err]), Q = p1_queue:dropwhile( - fun({sql_cmd, _, To, _Timestamp}) -> - p1_fsm:reply( - To, {error, <<"SQL connection failed">>}), + fun({sql_cmd, _, To, TS}) -> + reply(To, {error, Err}, TS), true end, State#state.pending_requests), p1_queue:in({sql_cmd, Command, From, Timestamp}, Q) @@ -399,16 +387,15 @@ connecting({sql_cmd, Command, Timestamp} = Req, From, connecting(Request, {Who, _Ref}, State) -> ?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'", [Request, Who]), - {reply, {error, badarg}, connecting, State}. + {next_state, connecting, State}. session_established({sql_cmd, Command, Timestamp}, From, State) -> run_sql_cmd(Command, From, State, Timestamp); session_established(Request, {Who, _Ref}, State) -> - ?WARNING_MSG("Unexpected call ~p from ~p in 'session_establ" - "ished'", + ?WARNING_MSG("Unexpected call ~p from ~p in 'session_established'", [Request, Who]), - {reply, {error, badarg}, session_established, State}. + {next_state, session_established, State}. session_established({sql_cmd, Command, From, Timestamp}, State) -> @@ -465,17 +452,14 @@ handle_reconnect(Reason, #state{host = Host} = State) -> {next_state, connecting, State}. run_sql_cmd(Command, From, State, Timestamp) -> - QueryTimeout = query_timeout(State#state.host), - case erlang:monotonic_time(millisecond) - Timestamp of - Age when Age < QueryTimeout -> - put(?NESTING_KEY, ?TOP_LEVEL_TXN), - put(?STATE_KEY, State), - abort_on_driver_error(outer_op(Command), From); - Age -> - ?ERROR_MSG("Database was not available or too slow, " - "discarding ~p milliseconds old request~n~p~n", - [Age, Command]), - {next_state, session_established, State} + case current_time() >= Timestamp of + true -> + State1 = report_overload(State), + {next_state, session_established, State1}; + false -> + put(?NESTING_KEY, ?TOP_LEVEL_TXN), + put(?STATE_KEY, State), + abort_on_driver_error(outer_op(Command), From, Timestamp) end. %% Only called by handle_call, only handles top level operations. @@ -620,6 +604,8 @@ sql_query_internal(#sql_query{} = Query) -> {error, <<"killed">>}; exit:{normal, _} -> {error, <<"terminated unexpectedly">>}; + exit:{shutdown, _} -> + {error, <<"shutdown">>}; ?EX_RULE(Class, Reason, Stack) -> StackTrace = ?EX_STACK(Stack), ?ERROR_MSG("Internal error while processing SQL query:~n** ~s", @@ -779,30 +765,42 @@ sql_query_to_iolist(SQLQuery) -> generic_sql_query_format(SQLQuery). %% Generate the OTP callback return tuple depending on the driver result. -abort_on_driver_error({error, - <<"query timed out">>} = Reply, - From) -> - p1_fsm:reply(From, Reply), +abort_on_driver_error({error, <<"query timed out">>} = Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), {stop, timeout, get(?STATE_KEY)}; -abort_on_driver_error({error, - <<"Failed sending data on socket", _/binary>>} = Reply, - From) -> - p1_fsm:reply(From, Reply), +abort_on_driver_error({error, <<"Failed sending data on socket", _/binary>>} = Reply, + From, Timestamp) -> + reply(From, Reply, Timestamp), {stop, closed, get(?STATE_KEY)}; -abort_on_driver_error({error, - <<"SQL connection failed">>} = Reply, - From) -> - p1_fsm:reply(From, Reply), +abort_on_driver_error({error, <<"SQL connection failed">>} = Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), {stop, timeout, get(?STATE_KEY)}; -abort_on_driver_error({error, - <<"Communication link failure">>} = Reply, - From) -> - p1_fsm:reply(From, Reply), +abort_on_driver_error({error, <<"Communication link failure">>} = Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), {stop, closed, get(?STATE_KEY)}; -abort_on_driver_error(Reply, From) -> - p1_fsm:reply(From, Reply), +abort_on_driver_error(Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), {next_state, session_established, get(?STATE_KEY)}. +-spec report_overload(state()) -> state(). +report_overload(#state{overload_reported = PrevTime} = State) -> + CurrTime = current_time(), + case PrevTime == undefined orelse (CurrTime - PrevTime) > timer:seconds(30) of + true -> + ?ERROR_MSG("SQL connection pool is overloaded, " + "discarding stale requests", []), + State#state{overload_reported = current_time()}; + false -> + State + end. + +-spec reply({pid(), term()}, term(), integer()) -> term(). +reply(From, Reply, Timestamp) -> + case current_time() >= Timestamp of + true -> ok; + false -> p1_fsm:reply(From, Reply) + end. + %% == pure ODBC code %% part of init/1 @@ -1143,6 +1141,9 @@ fsm_limit_opts() -> query_timeout(LServer) -> ejabberd_option:sql_query_timeout(LServer). +current_time() -> + erlang:monotonic_time(millisecond). + %% ***IMPORTANT*** This error format requires extended_errors turned on. extended_error({"08S01", _, Reason}) -> % TCP Provider: The specified network name is no longer available |