aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_sql.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_sql.erl')
-rw-r--r--src/ejabberd_sql.erl881
1 files changed, 520 insertions, 361 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl
index 4f50cfa01..c1d9383a0 100644
--- a/src/ejabberd_sql.erl
+++ b/src/ejabberd_sql.erl
@@ -5,7 +5,7 @@
%%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net>
%%%
%%%
-%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
+%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
@@ -25,23 +25,22 @@
-module(ejabberd_sql).
--behaviour(ejabberd_config).
-
-author('alexey@process-one.net').
--define(GEN_FSM, p1_fsm).
-
--behaviour(?GEN_FSM).
+-behaviour(p1_fsm).
%% External exports
--export([start/1, start_link/2,
+-export([start_link/2,
sql_query/2,
sql_query_t/1,
sql_transaction/2,
sql_bloc/2,
- sql_query_to_iolist/1,
+ abort/1,
+ restart/1,
+ use_new_schema/0,
+ sql_query_to_iolist/1,
escape/1,
- standard_escape/1,
+ standard_escape/1,
escape_like/1,
escape_like_arg/1,
escape_like_arg_circumflex/1,
@@ -54,7 +53,9 @@
freetds_config/0,
odbcinst_config/0,
init_mssql/1,
- keep_alive/1]).
+ keep_alive/2,
+ to_list/2,
+ to_array/2]).
%% gen_fsm callbacks
-export([init/1, handle_event/3, handle_sync_event/4,
@@ -62,87 +63,62 @@
code_change/4]).
-export([connecting/2, connecting/3,
- session_established/2, session_established/3,
- opt_type/1]).
+ session_established/2, session_established/3]).
--include("ejabberd.hrl").
-include("logger.hrl").
-include("ejabberd_sql_pt.hrl").
+-include("ejabberd_stacktrace.hrl").
-record(state,
- {db_ref = self() :: pid(),
- db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
- db_version = undefined :: undefined | non_neg_integer(),
- start_interval = 0 :: non_neg_integer(),
- host = <<"">> :: binary(),
- max_pending_requests_len :: non_neg_integer(),
- pending_requests = {0, queue:new()} :: {non_neg_integer(), ?TQUEUE}}).
+ {db_ref :: undefined | pid(),
+ db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql,
+ db_version :: undefined | non_neg_integer(),
+ host :: binary(),
+ pending_requests :: p1_queue:queue(),
+ overload_reported :: undefined | integer()}).
-define(STATE_KEY, ejabberd_sql_state).
-
-define(NESTING_KEY, ejabberd_sql_nesting_level).
-
-define(TOP_LEVEL_TXN, 0).
-
--define(PGSQL_PORT, 5432).
-
--define(MYSQL_PORT, 3306).
-
--define(MSSQL_PORT, 1433).
-
-define(MAX_TRANSACTION_RESTARTS, 10).
-
--define(TRANSACTION_TIMEOUT, 60000).
-
--define(KEEPALIVE_TIMEOUT, 60000).
-
-define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]).
-
-define(PREPARE_KEY, ejabberd_sql_prepare).
-
%%-define(DBGFSM, true).
-
-ifdef(DBGFSM).
-
-define(FSMOPTS, [{debug, [trace]}]).
-
-else.
-
-define(FSMOPTS, []).
-
-endif.
+-type state() :: #state{}.
+-type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} |
+ fun(() -> any()) | fun((atom(), _) -> any()).
+-type sql_query() :: sql_query_simple() |
+ [{atom() | {atom(), any()}, sql_query_simple()}].
+-type sql_query_result() :: {updated, non_neg_integer()} |
+ {error, binary() | atom()} |
+ {selected, [binary()], [[binary()]]} |
+ {selected, [any()]} |
+ ok.
+
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
-start(Host) ->
- (?GEN_FSM):start(ejabberd_sql, [Host],
- fsm_limit_opts() ++ (?FSMOPTS)).
-
-start_link(Host, StartInterval) ->
- (?GEN_FSM):start_link(ejabberd_sql,
- [Host, StartInterval],
- fsm_limit_opts() ++ (?FSMOPTS)).
-
--type sql_query() :: [sql_query() | binary()] | #sql_query{} |
- fun(() -> any()) | fun((atom(), _) -> any()).
--type sql_query_result() :: {updated, non_neg_integer()} |
- {error, binary()} |
- {selected, [binary()],
- [[binary()]]} |
- {selected, [any()]}.
+-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}.
+start_link(Host, I) ->
+ Proc = binary_to_atom(get_worker_name(Host, I), utf8),
+ p1_fsm:start_link({local, Proc}, ?MODULE, [Host],
+ fsm_limit_opts() ++ ?FSMOPTS).
-spec sql_query(binary(), sql_query()) -> sql_query_result().
-
sql_query(Host, Query) ->
- check_error(sql_call(Host, {sql_query, Query}), Query).
+ sql_call(Host, {sql_query, Query}).
%% SQL transaction based on a list of queries
%% This function automatically
-spec sql_transaction(binary(), [sql_query()] | fun(() -> any())) ->
{atomic, any()} |
{aborted, any()}.
-
sql_transaction(Host, Queries)
when is_list(Queries) ->
F = fun () ->
@@ -152,48 +128,87 @@ sql_transaction(Host, Queries)
sql_transaction(Host, F);
%% SQL transaction, based on a erlang anonymous function (F = fun)
sql_transaction(Host, F) when is_function(F) ->
- sql_call(Host, {sql_transaction, F}).
+ case sql_call(Host, {sql_transaction, F}) of
+ {atomic, _} = Ret -> Ret;
+ {aborted, _} = Ret -> Ret;
+ Err -> {aborted, Err}
+ end.
%% SQL bloc, based on a erlang anonymous function (F = fun)
sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}).
sql_call(Host, Msg) ->
+ Timeout = query_timeout(Host),
case get(?STATE_KEY) of
- undefined ->
- case ejabberd_sql_sup:get_random_pid(Host) of
- none -> {error, <<"Unknown Host">>};
- Pid ->
- (?GEN_FSM):sync_send_event(Pid,{sql_cmd, Msg,
- p1_time_compat:monotonic_time(milli_seconds)},
- ?TRANSACTION_TIMEOUT)
- end;
- _State -> nested_op(Msg)
+ undefined ->
+ sync_send_event(Host,
+ {sql_cmd, Msg, current_time() + Timeout},
+ Timeout);
+ _State ->
+ nested_op(Msg)
end.
-keep_alive(PID) ->
- (?GEN_FSM):sync_send_event(PID,
- {sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
- p1_time_compat:monotonic_time(milli_seconds)},
- ?KEEPALIVE_TIMEOUT).
+keep_alive(Host, Proc) ->
+ Timeout = query_timeout(Host),
+ case sync_send_event(
+ Proc,
+ {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, current_time() + Timeout},
+ Timeout) of
+ {selected,_,[[<<"1">>]]} ->
+ ok;
+ _Err ->
+ ?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]),
+ sync_send_event(Proc, force_timeout, Timeout)
+ end.
--spec sql_query_t(sql_query()) -> sql_query_result().
+sync_send_event(Host, Msg, Timeout) when is_binary(Host) ->
+ case ejabberd_sql_sup:start(Host) of
+ ok ->
+ Proc = get_worker(Host),
+ sync_send_event(Proc, Msg, Timeout);
+ {error, _} = Err ->
+ Err
+ end;
+sync_send_event(Proc, Msg, Timeout) ->
+ try p1_fsm:sync_send_event(Proc, Msg, Timeout)
+ catch _:{Reason, {p1_fsm, _, _}} ->
+ {error, Reason}
+ end.
+-spec sql_query_t(sql_query()) -> sql_query_result().
%% This function is intended to be used from inside an sql_transaction:
sql_query_t(Query) ->
QRes = sql_query_internal(Query),
case QRes of
- {error, Reason} -> throw({aborted, Reason});
+ {error, Reason} -> restart(Reason);
Rs when is_list(Rs) ->
case lists:keysearch(error, 1, Rs) of
- {value, {error, Reason}} -> throw({aborted, Reason});
+ {value, {error, Reason}} -> restart(Reason);
_ -> QRes
end;
_ -> QRes
end.
-%% Escape character that will confuse an SQL engine
+abort(Reason) ->
+ exit(Reason).
+
+restart(Reason) ->
+ throw({aborted, Reason}).
+
+-spec escape_char(char()) -> binary().
+escape_char($\000) -> <<"\\0">>;
+escape_char($\n) -> <<"\\n">>;
+escape_char($\t) -> <<"\\t">>;
+escape_char($\b) -> <<"\\b">>;
+escape_char($\r) -> <<"\\r">>;
+escape_char($') -> <<"''">>;
+escape_char($") -> <<"\\\"">>;
+escape_char($\\) -> <<"\\\\">>;
+escape_char(C) -> <<C>>.
+
+-spec escape(binary()) -> binary().
escape(S) ->
- << <<(sql_queries:escape(Char))/binary>> || <<Char>> <= S >>.
+ << <<(escape_char(Char))/binary>> || <<Char>> <= S >>.
%% Escape character that will confuse an SQL engine
%% Percent and underscore only need to be escaped for pattern matching like
@@ -203,7 +218,7 @@ escape_like(S) when is_binary(S) ->
escape_like($%) -> <<"\\%">>;
escape_like($_) -> <<"\\_">>;
escape_like($\\) -> <<"\\\\\\\\">>;
-escape_like(C) when is_integer(C), C >= 0, C =< 255 -> sql_queries:escape(C).
+escape_like(C) when is_integer(C), C >= 0, C =< 255 -> escape_char(C).
escape_like_arg(S) when is_binary(S) ->
<< <<(escape_like_arg(C))/binary>> || <<C>> <= S >>;
@@ -228,6 +243,14 @@ to_bool(true) -> true;
to_bool(1) -> true;
to_bool(_) -> false.
+to_list(EscapeFun, Val) ->
+ Escaped = lists:join(<<",">>, lists:map(EscapeFun, Val)),
+ [<<"(">>, Escaped, <<")">>].
+
+to_array(EscapeFun, Val) ->
+ Escaped = lists:join(<<",">>, lists:map(EscapeFun, Val)),
+ [<<"{">>, Escaped, <<"}">>].
+
encode_term(Term) ->
escape(list_to_binary(
erl_prettypr:format(erl_syntax:abstract(Term),
@@ -235,9 +258,23 @@ encode_term(Term) ->
decode_term(Bin) ->
Str = binary_to_list(<<Bin/binary, ".">>),
- {ok, Tokens, _} = erl_scan:string(Str),
- {ok, Term} = erl_parse:parse_term(Tokens),
- Term.
+ try
+ {ok, Tokens, _} = erl_scan:string(Str),
+ {ok, Term} = erl_parse:parse_term(Tokens),
+ Term
+ catch _:{badmatch, {error, {Line, Mod, Reason}, _}} ->
+ ?ERROR_MSG("Corrupted Erlang term in SQL database:~n"
+ "** Scanner error: at line ~B: ~ts~n"
+ "** Term: ~ts",
+ [Line, Mod:format_error(Reason), Bin]),
+ erlang:error(badarg);
+ _:{badmatch, {error, {Line, Mod, Reason}}} ->
+ ?ERROR_MSG("Corrupted Erlang term in SQL database:~n"
+ "** Parser error: at line ~B: ~ts~n"
+ "** Term: ~ts",
+ [Line, Mod:format_error(Reason), Bin]),
+ erlang:error(badarg)
+ end.
-spec sqlite_db(binary()) -> atom().
sqlite_db(Host) ->
@@ -245,121 +282,136 @@ sqlite_db(Host) ->
-spec sqlite_file(binary()) -> string().
sqlite_file(Host) ->
- case ejabberd_config:get_option({sql_database, Host},
- fun iolist_to_binary/1) of
+ case ejabberd_option:sql_database(Host) of
undefined ->
- {ok, Cwd} = file:get_cwd(),
- filename:join([Cwd, "sqlite", atom_to_list(node()),
- binary_to_list(Host), "ejabberd.db"]);
+ Path = ["sqlite", atom_to_list(node()),
+ binary_to_list(Host), "ejabberd.db"],
+ case file:get_cwd() of
+ {ok, Cwd} ->
+ filename:join([Cwd|Path]);
+ {error, Reason} ->
+ ?ERROR_MSG("Failed to get current directory: ~ts",
+ [file:format_error(Reason)]),
+ filename:join(Path)
+ end;
File ->
binary_to_list(File)
end.
+use_new_schema() ->
+ ejabberd_option:new_sql_schema().
+
+-spec get_worker(binary()) -> atom().
+get_worker(Host) ->
+ PoolSize = ejabberd_option:sql_pool_size(Host),
+ I = p1_rand:round_robin(PoolSize) + 1,
+ binary_to_existing_atom(get_worker_name(Host, I), utf8).
+
+-spec get_worker_name(binary(), pos_integer()) -> binary().
+get_worker_name(Host, I) ->
+ <<"ejabberd_sql_", Host/binary, $_, (integer_to_binary(I))/binary>>.
+
%%%----------------------------------------------------------------------
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
-init([Host, StartInterval]) ->
- case ejabberd_config:get_option(
- {sql_keepalive_interval, Host},
- fun(I) when is_integer(I), I>0 -> I end) of
+init([Host]) ->
+ process_flag(trap_exit, true),
+ case ejabberd_option:sql_keepalive_interval(Host) of
undefined ->
ok;
KeepaliveInterval ->
- timer:apply_interval(KeepaliveInterval * 1000, ?MODULE,
- keep_alive, [self()])
+ timer:apply_interval(KeepaliveInterval, ?MODULE,
+ keep_alive, [Host, self()])
end,
[DBType | _] = db_opts(Host),
- (?GEN_FSM):send_event(self(), connect),
- ejabberd_sql_sup:add_pid(Host, self()),
+ p1_fsm:send_event(self(), connect),
+ QueueType = ejabberd_option:sql_queue_type(Host),
{ok, connecting,
#state{db_type = DBType, host = Host,
- max_pending_requests_len = max_fsm_queue(),
- pending_requests = {0, queue:new()},
- start_interval = StartInterval}}.
+ pending_requests = p1_queue:new(QueueType, max_fsm_queue())}}.
connecting(connect, #state{host = Host} = State) ->
ConnectRes = case db_opts(Host) of
- [mysql | Args] -> apply(fun mysql_connect/5, Args);
- [pgsql | Args] -> apply(fun pgsql_connect/5, Args);
+ [mysql | Args] -> apply(fun mysql_connect/8, Args);
+ [pgsql | Args] -> apply(fun pgsql_connect/8, Args);
[sqlite | Args] -> apply(fun sqlite_connect/1, Args);
- [mssql | Args] -> apply(fun odbc_connect/1, Args);
- [odbc | Args] -> apply(fun odbc_connect/1, Args)
+ [mssql | Args] -> apply(fun odbc_connect/2, Args);
+ [odbc | Args] -> apply(fun odbc_connect/2, Args)
end,
- {_, PendingRequests} = State#state.pending_requests,
case ConnectRes of
{ok, Ref} ->
- erlang:monitor(process, Ref),
- lists:foreach(fun (Req) ->
- (?GEN_FSM):send_event(self(), Req)
- end,
- queue:to_list(PendingRequests)),
- State1 = State#state{db_ref = Ref,
- pending_requests = {0, queue:new()}},
- State2 = get_db_version(State1),
- {next_state, session_established, State2};
- {error, Reason} ->
- ?INFO_MSG("~p connection failed:~n** Reason: ~p~n** "
- "Retry after: ~p seconds",
- [State#state.db_type, Reason,
- State#state.start_interval div 1000]),
- (?GEN_FSM):send_event_after(State#state.start_interval,
- connect),
- {next_state, connecting, State}
+ try link(Ref) of
+ _ ->
+ lists:foreach(
+ fun({{?PREPARE_KEY, _} = Key, _}) ->
+ erase(Key);
+ (_) ->
+ ok
+ end, get()),
+ PendingRequests =
+ p1_queue:dropwhile(
+ fun(Req) ->
+ p1_fsm:send_event(self(), Req),
+ true
+ end, State#state.pending_requests),
+ State1 = State#state{db_ref = Ref,
+ pending_requests = PendingRequests},
+ State2 = get_db_version(State1),
+ {next_state, session_established, State2}
+ catch _:Reason ->
+ handle_reconnect(Reason, State)
+ end;
+ {error, Reason} ->
+ handle_reconnect(Reason, State)
end;
connecting(Event, State) ->
- ?WARNING_MSG("unexpected event in 'connecting': ~p",
+ ?WARNING_MSG("Unexpected event in 'connecting': ~p",
[Event]),
{next_state, connecting, State}.
-connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
- _Timestamp},
+connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, Timestamp},
From, State) ->
- (?GEN_FSM):reply(From,
- {error, <<"SQL connection failed">>}),
+ reply(From, {error, <<"SQL connection failed">>}, Timestamp),
{next_state, connecting, State};
connecting({sql_cmd, Command, Timestamp} = Req, From,
State) ->
- ?DEBUG("queuing pending request while connecting:~n\t~p",
+ ?DEBUG("Queuing pending request while connecting:~n\t~p",
[Req]),
- {Len, PendingRequests} = State#state.pending_requests,
- NewPendingRequests = if Len <
- State#state.max_pending_requests_len ->
- {Len + 1,
- queue:in({sql_cmd, Command, From, Timestamp},
- PendingRequests)};
- true ->
- lists:foreach(fun ({sql_cmd, _, To,
- _Timestamp}) ->
- (?GEN_FSM):reply(To,
- {error,
- <<"SQL connection failed">>})
- end,
- queue:to_list(PendingRequests)),
- {1,
- queue:from_list([{sql_cmd, Command, From,
- Timestamp}])}
- end,
+ PendingRequests =
+ try p1_queue:in({sql_cmd, Command, From, Timestamp},
+ State#state.pending_requests)
+ catch error:full ->
+ Err = <<"SQL request queue is overfilled">>,
+ ?ERROR_MSG("~ts, bouncing all pending requests", [Err]),
+ Q = p1_queue:dropwhile(
+ fun({sql_cmd, _, To, TS}) ->
+ reply(To, {error, Err}, TS),
+ true
+ end, State#state.pending_requests),
+ p1_queue:in({sql_cmd, Command, From, Timestamp}, Q)
+ end,
{next_state, connecting,
- State#state{pending_requests = NewPendingRequests}};
+ State#state{pending_requests = PendingRequests}};
connecting(Request, {Who, _Ref}, State) ->
- ?WARNING_MSG("unexpected call ~p from ~p in 'connecting'",
+ ?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'",
[Request, Who]),
- {reply, {error, badarg}, connecting, State}.
+ {next_state, connecting, State}.
session_established({sql_cmd, Command, Timestamp}, From,
State) ->
run_sql_cmd(Command, From, State, Timestamp);
session_established(Request, {Who, _Ref}, State) ->
- ?WARNING_MSG("unexpected call ~p from ~p in 'session_establ"
- "ished'",
+ ?WARNING_MSG("Unexpected call ~p from ~p in 'session_established'",
[Request, Who]),
- {reply, {error, badarg}, session_established, State}.
+ {next_state, session_established, State}.
session_established({sql_cmd, Command, From, Timestamp},
State) ->
run_sql_cmd(Command, From, State, Timestamp);
+session_established(force_timeout, State) ->
+ {stop, timeout, State};
session_established(Event, State) ->
- ?WARNING_MSG("unexpected event in 'session_established': ~p",
+ ?WARNING_MSG("Unexpected event in 'session_established': ~p",
[Event]),
{next_state, session_established, State}.
@@ -372,19 +424,14 @@ handle_sync_event(_Event, _From, StateName, State) ->
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.
-%% We receive the down signal when we loose the MySQL connection (we are
-%% monitoring the connection)
-handle_info({'DOWN', _MonitorRef, process, _Pid, _Info},
- _StateName, State) ->
- (?GEN_FSM):send_event(self(), connect),
- {next_state, connecting, State};
+handle_info({'EXIT', _Pid, Reason}, _StateName, State) ->
+ handle_reconnect(Reason, State);
handle_info(Info, StateName, State) ->
- ?WARNING_MSG("unexpected info in ~p: ~p",
+ ?WARNING_MSG("Unexpected info in ~p: ~p",
[StateName, Info]),
{next_state, StateName, State}.
terminate(_Reason, _StateName, State) ->
- ejabberd_sql_sup:remove_pid(State#state.host, self()),
case State#state.db_type of
mysql -> catch p1_mysql_conn:stop(State#state.db_ref);
sqlite -> catch sqlite3:close(sqlite_db(State#state.host));
@@ -402,18 +449,25 @@ print_state(State) -> State.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
+handle_reconnect(Reason, #state{host = Host} = State) ->
+ StartInterval = ejabberd_option:sql_start_interval(Host),
+ ?WARNING_MSG("~p connection failed:~n"
+ "** Reason: ~p~n"
+ "** Retry after: ~B seconds",
+ [State#state.db_type, Reason,
+ StartInterval div 1000]),
+ p1_fsm:send_event_after(StartInterval, connect),
+ {next_state, connecting, State}.
run_sql_cmd(Command, From, State, Timestamp) ->
- case p1_time_compat:monotonic_time(milli_seconds) - Timestamp of
- Age when Age < (?TRANSACTION_TIMEOUT) ->
- put(?NESTING_KEY, ?TOP_LEVEL_TXN),
- put(?STATE_KEY, State),
- abort_on_driver_error(outer_op(Command), From);
- Age ->
- ?ERROR_MSG("Database was not available or too slow, "
- "discarding ~p milliseconds old request~n~p~n",
- [Age, Command]),
- {next_state, session_established, State}
+ case current_time() >= Timestamp of
+ true ->
+ State1 = report_overload(State),
+ {next_state, session_established, State1};
+ false ->
+ put(?NESTING_KEY, ?TOP_LEVEL_TXN),
+ put(?STATE_KEY, State),
+ abort_on_driver_error(outer_op(Command), From, Timestamp)
end.
%% Only called by handle_call, only handles top level operations.
@@ -442,8 +496,8 @@ inner_transaction(F) ->
case get(?NESTING_KEY) of
?TOP_LEVEL_TXN ->
{backtrace, T} = process_info(self(), backtrace),
- ?ERROR_MSG("inner transaction called at outer txn "
- "level. Trace: ~s",
+ ?ERROR_MSG("Inner transaction called at outer txn "
+ "level. Trace: ~ts",
[T]),
erlang:exit(implementation_faulty);
_N -> ok
@@ -464,31 +518,35 @@ outer_transaction(F, NRestarts, _Reason) ->
?TOP_LEVEL_TXN -> ok;
_N ->
{backtrace, T} = process_info(self(), backtrace),
- ?ERROR_MSG("outer transaction called at inner txn "
- "level. Trace: ~s",
+ ?ERROR_MSG("Outer transaction called at inner txn "
+ "level. Trace: ~ts",
[T]),
erlang:exit(implementation_faulty)
end,
- sql_query_internal([<<"begin;">>]),
+ sql_begin(),
put(?NESTING_KEY, PreviousNestingLevel + 1),
- Result = (catch F()),
- put(?NESTING_KEY, PreviousNestingLevel),
- case Result of
- {aborted, Reason} when NRestarts > 0 ->
- sql_query_internal([<<"rollback;">>]),
- outer_transaction(F, NRestarts - 1, Reason);
- {aborted, Reason} when NRestarts =:= 0 ->
- ?ERROR_MSG("SQL transaction restarts exceeded~n** "
- "Restarts: ~p~n** Last abort reason: "
- "~p~n** Stacktrace: ~p~n** When State "
- "== ~p",
- [?MAX_TRANSACTION_RESTARTS, Reason,
- erlang:get_stacktrace(), get(?STATE_KEY)]),
- sql_query_internal([<<"rollback;">>]),
- {aborted, Reason};
- {'EXIT', Reason} ->
- sql_query_internal([<<"rollback;">>]), {aborted, Reason};
- Res -> sql_query_internal([<<"commit;">>]), {atomic, Res}
+ try F() of
+ Res ->
+ sql_commit(),
+ {atomic, Res}
+ catch
+ ?EX_RULE(throw, {aborted, Reason}, _) when NRestarts > 0 ->
+ sql_rollback(),
+ put(?NESTING_KEY, ?TOP_LEVEL_TXN),
+ outer_transaction(F, NRestarts - 1, Reason);
+ ?EX_RULE(throw, {aborted, Reason}, Stack) when NRestarts =:= 0 ->
+ StackTrace = ?EX_STACK(Stack),
+ ?ERROR_MSG("SQL transaction restarts exceeded~n** "
+ "Restarts: ~p~n** Last abort reason: "
+ "~p~n** Stacktrace: ~p~n** When State "
+ "== ~p",
+ [?MAX_TRANSACTION_RESTARTS, Reason,
+ StackTrace, get(?STATE_KEY)]),
+ sql_rollback(),
+ {aborted, Reason};
+ ?EX_RULE(exit, Reason, _) ->
+ sql_rollback(),
+ {aborted, Reason}
end.
execute_bloc(F) ->
@@ -522,24 +580,26 @@ sql_query_internal(#sql_query{} = Query) ->
mssql ->
mssql_sql_query(Query);
pgsql ->
- PreparedStatements = ejabberd_config:get_option(
- {sql_prepared_statements, ?MYNAME},
- fun(A) when is_boolean(A) -> A end,
- true),
Key = {?PREPARE_KEY, Query#sql_query.hash},
- if not PreparedStatements -> put(Key, ignore);
- true -> ok
- end,
case get(Key) of
undefined ->
- case pgsql_prepare(Query, State) of
- {ok, _, _, _} ->
- put(Key, prepared);
- {error, Error} ->
- ?ERROR_MSG("PREPARE failed for SQL query "
+ Host = State#state.host,
+ PreparedStatements =
+ ejabberd_option:sql_prepared_statements(Host),
+ case PreparedStatements of
+ false ->
+ put(Key, ignore);
+ true ->
+ case pgsql_prepare(Query, State) of
+ {ok, _, _, _} ->
+ put(Key, prepared);
+ {error, Error} ->
+ ?ERROR_MSG(
+ "PREPARE failed for SQL query "
"at ~p: ~p",
[Query#sql_query.loc, Error]),
- put(Key, ignore)
+ put(Key, ignore)
+ end
end;
_ ->
ok
@@ -548,58 +608,59 @@ sql_query_internal(#sql_query{} = Query) ->
prepared ->
pgsql_execute_sql_query(Query, State);
_ ->
- generic_sql_query(Query)
+ pgsql_sql_query(Query)
end;
mysql ->
generic_sql_query(Query);
sqlite ->
sqlite_sql_query(Query)
end
- catch
- Class:Reason ->
- ST = erlang:get_stacktrace(),
- ?ERROR_MSG("Internal error while processing SQL query: ~p",
- [{Class, Reason, ST}]),
+ catch exit:{timeout, _} ->
+ {error, <<"timed out">>};
+ exit:{killed, _} ->
+ {error, <<"killed">>};
+ exit:{normal, _} ->
+ {error, <<"terminated unexpectedly">>};
+ exit:{shutdown, _} ->
+ {error, <<"shutdown">>};
+ ?EX_RULE(Class, Reason, Stack) ->
+ StackTrace = ?EX_STACK(Stack),
+ ?ERROR_MSG("Internal error while processing SQL query:~n** ~ts",
+ [misc:format_exception(2, Class, Reason, StackTrace)]),
{error, <<"internal error">>}
end,
- case Res of
- {error, <<"No SQL-driver information available.">>} ->
- {updated, 0};
- _Else -> Res
- end;
+ check_error(Res, Query);
sql_query_internal(F) when is_function(F) ->
case catch execute_fun(F) of
+ {aborted, Reason} -> {error, Reason};
{'EXIT', Reason} -> {error, Reason};
Res -> Res
end;
sql_query_internal(Query) ->
State = get(?STATE_KEY),
- ?DEBUG("SQL: \"~s\"", [Query]),
+ ?DEBUG("SQL: \"~ts\"", [Query]),
+ QueryTimeout = query_timeout(State#state.host),
Res = case State#state.db_type of
odbc ->
to_odbc(odbc:sql_query(State#state.db_ref, [Query],
- (?TRANSACTION_TIMEOUT) - 1000));
+ QueryTimeout - 1000));
mssql ->
to_odbc(odbc:sql_query(State#state.db_ref, [Query],
- (?TRANSACTION_TIMEOUT) - 1000));
+ QueryTimeout - 1000));
pgsql ->
- pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
+ pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query,
+ QueryTimeout - 1000));
mysql ->
R = mysql_to_odbc(p1_mysql_conn:squery(State#state.db_ref,
[Query], self(),
- [{timeout, (?TRANSACTION_TIMEOUT) - 1000},
+ [{timeout, QueryTimeout - 1000},
{result_type, binary}])),
- %% ?INFO_MSG("MySQL, Received result~n~p~n", [R]),
R;
sqlite ->
Host = State#state.host,
sqlite_to_odbc(Host, sqlite3:sql_exec(sqlite_db(Host), Query))
end,
- case Res of
- {error, <<"No SQL-driver information available.">>} ->
- {updated, 0};
- _Else -> Res
- end.
+ check_error(Res, Query).
select_sql_query(Queries, State) ->
select_sql_query(
@@ -636,10 +697,29 @@ generic_sql_query_format(SQLQuery) ->
generic_escape() ->
#sql_escape{string = fun(X) -> <<"'", (escape(X))/binary, "'">> end,
- integer = fun(X) -> integer_to_binary(X) end,
- boolean = fun(true) -> <<"1">>;
+ integer = fun(X) -> misc:i2l(X) end,
+ boolean = fun(true) -> <<"1">>;
(false) -> <<"0">>
- end
+ end,
+ in_array_string = fun(X) -> <<"'", (escape(X))/binary, "'">> end
+ }.
+
+pgsql_sql_query(SQLQuery) ->
+ sql_query_format_res(
+ sql_query_internal(pgsql_sql_query_format(SQLQuery)),
+ SQLQuery).
+
+pgsql_sql_query_format(SQLQuery) ->
+ Args = (SQLQuery#sql_query.args)(pgsql_escape()),
+ (SQLQuery#sql_query.format_query)(Args).
+
+pgsql_escape() ->
+ #sql_escape{string = fun(X) -> <<"E'", (escape(X))/binary, "'">> end,
+ integer = fun(X) -> misc:i2l(X) end,
+ boolean = fun(true) -> <<"1">>;
+ (false) -> <<"0">>
+ end,
+ in_array_string = fun(X) -> <<"E'", (escape(X))/binary, "'">> end
}.
sqlite_sql_query(SQLQuery) ->
@@ -653,10 +733,11 @@ sqlite_sql_query_format(SQLQuery) ->
sqlite_escape() ->
#sql_escape{string = fun(X) -> <<"'", (standard_escape(X))/binary, "'">> end,
- integer = fun(X) -> integer_to_binary(X) end,
- boolean = fun(true) -> <<"1">>;
+ integer = fun(X) -> misc:i2l(X) end,
+ boolean = fun(true) -> <<"1">>;
(false) -> <<"0">>
- end
+ end,
+ in_array_string = fun(X) -> <<"'", (standard_escape(X))/binary, "'">> end
}.
standard_escape(S) ->
@@ -677,10 +758,11 @@ pgsql_prepare(SQLQuery, State) ->
pgsql_execute_escape() ->
#sql_escape{string = fun(X) -> X end,
- integer = fun(X) -> [integer_to_binary(X)] end,
- boolean = fun(true) -> "1";
+ integer = fun(X) -> [misc:i2l(X)] end,
+ boolean = fun(true) -> "1";
(false) -> "0"
- end
+ end,
+ in_array_string = fun(X) -> <<"\"", (escape(X))/binary, "\"">> end
}.
pgsql_execute_sql_query(SQLQuery, State) ->
@@ -689,7 +771,7 @@ pgsql_execute_sql_query(SQLQuery, State) ->
pgsql:execute(State#state.db_ref, SQLQuery#sql_query.hash, Args),
% {T, ExecuteRes} =
% timer:tc(pgsql, execute, [State#state.db_ref, SQLQuery#sql_query.hash, Args]),
-% io:format("T ~s ~p~n", [SQLQuery#sql_query.hash, T]),
+% io:format("T ~ts ~p~n", [SQLQuery#sql_query.hash, T]),
Res = pgsql_execute_to_odbc(ExecuteRes),
sql_query_format_res(Res, SQLQuery).
@@ -701,12 +783,12 @@ sql_query_format_res({selected, _, Rows}, SQLQuery) ->
try
[(SQLQuery#sql_query.format_res)(Row)]
catch
- Class:Reason ->
- ST = erlang:get_stacktrace(),
- ?ERROR_MSG("Error while processing "
- "SQL query result: ~p~n"
- "row: ~p",
- [{Class, Reason, ST}, Row]),
+ ?EX_RULE(Class, Reason, Stack) ->
+ StackTrace = ?EX_STACK(Stack),
+ ?ERROR_MSG("Error while processing SQL query result:~n"
+ "** Row: ~p~n** ~ts",
+ [Row,
+ misc:format_exception(2, Class, Reason, StackTrace)]),
[]
end
end, Rows),
@@ -717,31 +799,70 @@ sql_query_format_res(Res, _SQLQuery) ->
sql_query_to_iolist(SQLQuery) ->
generic_sql_query_format(SQLQuery).
+sql_begin() ->
+ sql_query_internal(
+ [{mssql, [<<"begin transaction;">>]},
+ {any, [<<"begin;">>]}]).
+
+sql_commit() ->
+ sql_query_internal(
+ [{mssql, [<<"commit transaction;">>]},
+ {any, [<<"commit;">>]}]).
+
+sql_rollback() ->
+ sql_query_internal(
+ [{mssql, [<<"rollback transaction;">>]},
+ {any, [<<"rollback;">>]}]).
+
+
%% Generate the OTP callback return tuple depending on the driver result.
-abort_on_driver_error({error, <<"query timed out">>} =
- Reply,
- From) ->
- (?GEN_FSM):reply(From, Reply),
+abort_on_driver_error({error, <<"query timed out">>} = Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
+ {stop, timeout, get(?STATE_KEY)};
+abort_on_driver_error({error, <<"Failed sending data on socket", _/binary>>} = Reply,
+ From, Timestamp) ->
+ reply(From, Reply, Timestamp),
+ {stop, closed, get(?STATE_KEY)};
+abort_on_driver_error({error, <<"SQL connection failed">>} = Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{stop, timeout, get(?STATE_KEY)};
-abort_on_driver_error({error,
- <<"Failed sending data on socket", _/binary>>} =
- Reply,
- From) ->
- (?GEN_FSM):reply(From, Reply),
+abort_on_driver_error({error, <<"Communication link failure">>} = Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{stop, closed, get(?STATE_KEY)};
-abort_on_driver_error(Reply, From) ->
- (?GEN_FSM):reply(From, Reply),
+abort_on_driver_error(Reply, From, Timestamp) ->
+ reply(From, Reply, Timestamp),
{next_state, session_established, get(?STATE_KEY)}.
+-spec report_overload(state()) -> state().
+report_overload(#state{overload_reported = PrevTime} = State) ->
+ CurrTime = current_time(),
+ case PrevTime == undefined orelse (CurrTime - PrevTime) > timer:seconds(30) of
+ true ->
+ ?ERROR_MSG("SQL connection pool is overloaded, "
+ "discarding stale requests", []),
+ State#state{overload_reported = current_time()};
+ false ->
+ State
+ end.
+
+-spec reply({pid(), term()}, term(), integer()) -> term().
+reply(From, Reply, Timestamp) ->
+ case current_time() >= Timestamp of
+ true -> ok;
+ false -> p1_fsm:reply(From, Reply)
+ end.
+
%% == pure ODBC code
%% part of init/1
%% Open an ODBC database connection
-odbc_connect(SQLServer) ->
+odbc_connect(SQLServer, Timeout) ->
ejabberd:start_app(odbc),
odbc:connect(binary_to_list(SQLServer),
[{scrollable_cursors, off},
+ {extended_errors, on},
{tuple_row, off},
+ {timeout, Timeout},
{binary_strings, on}]).
%% == Native SQLite code
@@ -775,7 +896,7 @@ sqlite_to_odbc(Host, {rowid, _}) ->
sqlite_to_odbc(_Host, [{columns, Columns}, {rows, TRows}]) ->
Rows = [lists:map(
fun(I) when is_integer(I) ->
- jlib:integer_to_binary(I);
+ integer_to_binary(I);
(B) ->
B
end, tuple_to_list(Row)) || Row <- TRows],
@@ -789,13 +910,16 @@ sqlite_to_odbc(_Host, _) ->
%% part of init/1
%% Open a database connection to PostgreSQL
-pgsql_connect(Server, Port, DB, Username, Password) ->
+pgsql_connect(Server, Port, DB, Username, Password, ConnectTimeout,
+ Transport, SSLOpts) ->
case pgsql:connect([{host, Server},
{database, DB},
{user, Username},
{password, Password},
{port, Port},
- {as_binary, true}]) of
+ {transport, Transport},
+ {connect_timeout, ConnectTimeout},
+ {as_binary, true}|SSLOpts]) of
{ok, Ref} ->
pgsql:squery(Ref, [<<"alter database \"">>, DB, <<"\" set ">>,
<<"standard_conforming_strings='off';">>]),
@@ -820,11 +944,11 @@ pgsql_item_to_odbc({<<"FETCH", _/binary>>, Rows,
{selected, [element(1, Row) || Row <- Rows], Recs};
pgsql_item_to_odbc(<<"INSERT ", OIDN/binary>>) ->
[_OID, N] = str:tokens(OIDN, <<" ">>),
- {updated, jlib:binary_to_integer(N)};
+ {updated, binary_to_integer(N)};
pgsql_item_to_odbc(<<"DELETE ", N/binary>>) ->
- {updated, jlib:binary_to_integer(N)};
+ {updated, binary_to_integer(N)};
pgsql_item_to_odbc(<<"UPDATE ", N/binary>>) ->
- {updated, jlib:binary_to_integer(N)};
+ {updated, binary_to_integer(N)};
pgsql_item_to_odbc({error, Error}) -> {error, Error};
pgsql_item_to_odbc(_) -> {updated, undefined}.
@@ -844,11 +968,12 @@ pgsql_execute_to_odbc(_) -> {updated, undefined}.
%% part of init/1
%% Open a database connection to MySQL
-mysql_connect(Server, Port, DB, Username, Password) ->
+mysql_connect(Server, Port, DB, Username, Password, ConnectTimeout, _, _) ->
case p1_mysql_conn:start(binary_to_list(Server), Port,
binary_to_list(Username),
binary_to_list(Password),
- binary_to_list(DB), fun log/3)
+ binary_to_list(DB),
+ ConnectTimeout, fun log/3)
of
{ok, Ref} ->
p1_mysql_conn:fetch(
@@ -882,7 +1007,7 @@ mysql_item_to_odbc(Columns, Recs) ->
to_odbc({selected, Columns, Recs}) ->
Rows = [lists:map(
fun(I) when is_integer(I) ->
- jlib:integer_to_binary(I);
+ integer_to_binary(I);
(B) ->
B
end, Row) || Row <- Recs],
@@ -900,11 +1025,11 @@ get_db_version(#state{db_type = pgsql} = State) ->
Version when is_integer(Version) ->
State#state{db_version = Version};
Error ->
- ?WARNING_MSG("error getting pgsql version: ~p", [Error]),
+ ?WARNING_MSG("Error getting pgsql version: ~p", [Error]),
State
end;
Res ->
- ?WARNING_MSG("error getting pgsql version: ~p", [Res]),
+ ?WARNING_MSG("Error getting pgsql version: ~p", [Res]),
State
end;
get_db_version(State) ->
@@ -913,67 +1038,88 @@ get_db_version(State) ->
log(Level, Format, Args) ->
case Level of
debug -> ?DEBUG(Format, Args);
+ info -> ?INFO_MSG(Format, Args);
normal -> ?INFO_MSG(Format, Args);
error -> ?ERROR_MSG(Format, Args)
end.
db_opts(Host) ->
- Type = ejabberd_config:get_option({sql_type, Host},
- fun(mysql) -> mysql;
- (pgsql) -> pgsql;
- (sqlite) -> sqlite;
- (mssql) -> mssql;
- (odbc) -> odbc
- end, odbc),
- Server = ejabberd_config:get_option({sql_server, Host},
- fun iolist_to_binary/1,
- <<"localhost">>),
+ Type = ejabberd_option:sql_type(Host),
+ Server = ejabberd_option:sql_server(Host),
+ Timeout = ejabberd_option:sql_connect_timeout(Host),
+ Transport = case ejabberd_option:sql_ssl(Host) of
+ false -> tcp;
+ true -> ssl
+ end,
+ warn_if_ssl_unsupported(Transport, Type),
case Type of
odbc ->
- [odbc, Server];
+ [odbc, Server, Timeout];
sqlite ->
[sqlite, Host];
_ ->
- Port = ejabberd_config:get_option(
- {sql_port, Host},
- fun(P) when is_integer(P), P > 0, P < 65536 -> P end,
- case Type of
- mssql -> ?MSSQL_PORT;
- mysql -> ?MYSQL_PORT;
- pgsql -> ?PGSQL_PORT
- end),
- DB = ejabberd_config:get_option({sql_database, Host},
- fun iolist_to_binary/1,
- <<"ejabberd">>),
- User = ejabberd_config:get_option({sql_username, Host},
- fun iolist_to_binary/1,
- <<"ejabberd">>),
- Pass = ejabberd_config:get_option({sql_password, Host},
- fun iolist_to_binary/1,
- <<"">>),
+ Port = ejabberd_option:sql_port(Host),
+ DB = case ejabberd_option:sql_database(Host) of
+ undefined -> <<"ejabberd">>;
+ D -> D
+ end,
+ User = ejabberd_option:sql_username(Host),
+ Pass = ejabberd_option:sql_password(Host),
+ SSLOpts = get_ssl_opts(Transport, Host),
case Type of
mssql ->
[mssql, <<"DSN=", Host/binary, ";UID=", User/binary,
- ";PWD=", Pass/binary>>];
+ ";PWD=", Pass/binary>>, Timeout];
_ ->
- [Type, Server, Port, DB, User, Pass]
+ [Type, Server, Port, DB, User, Pass, Timeout, Transport, SSLOpts]
end
end.
+warn_if_ssl_unsupported(tcp, _) ->
+ ok;
+warn_if_ssl_unsupported(ssl, pgsql) ->
+ ok;
+warn_if_ssl_unsupported(ssl, Type) ->
+ ?WARNING_MSG("SSL connection is not supported for ~ts", [Type]).
+
+get_ssl_opts(ssl, Host) ->
+ Opts1 = case ejabberd_option:sql_ssl_certfile(Host) of
+ undefined -> [];
+ CertFile -> [{certfile, CertFile}]
+ end,
+ Opts2 = case ejabberd_option:sql_ssl_cafile(Host) of
+ undefined -> Opts1;
+ CAFile -> [{cacertfile, CAFile}|Opts1]
+ end,
+ case ejabberd_option:sql_ssl_verify(Host) of
+ true ->
+ case lists:keymember(cacertfile, 1, Opts2) of
+ true ->
+ [{verify, verify_peer}|Opts2];
+ false ->
+ ?WARNING_MSG("SSL verification is enabled for "
+ "SQL connection, but option "
+ "'sql_ssl_cafile' is not set; "
+ "verification will be disabled", []),
+ Opts2
+ end;
+ false ->
+ Opts2
+ end;
+get_ssl_opts(tcp, _) ->
+ [].
+
init_mssql(Host) ->
- Server = ejabberd_config:get_option({sql_server, Host},
- fun iolist_to_binary/1,
- <<"localhost">>),
- Port = ejabberd_config:get_option(
- {sql_port, Host},
- fun(P) when is_integer(P), P > 0, P < 65536 -> P end,
- ?MSSQL_PORT),
- DB = ejabberd_config:get_option({sql_database, Host},
- fun iolist_to_binary/1,
- <<"ejabberd">>),
- FreeTDS = io_lib:fwrite("[~s]~n"
- "\thost = ~s~n"
+ Server = ejabberd_option:sql_server(Host),
+ Port = ejabberd_option:sql_port(Host),
+ DB = case ejabberd_option:sql_database(Host) of
+ undefined -> <<"ejabberd">>;
+ D -> D
+ end,
+ FreeTDS = io_lib:fwrite("[~ts]~n"
+ "\thost = ~ts~n"
"\tport = ~p~n"
+ "\tclient charset = UTF-8~n"
"\ttds version = 7.1~n",
[Host, Server, Port]),
ODBCINST = io_lib:fwrite("[freetds]~n"
@@ -982,39 +1128,48 @@ init_mssql(Host) ->
"Setup = libtdsS.so~n"
"UsageCount = 1~n"
"FileUsage = 1~n", []),
- ODBCINI = io_lib:fwrite("[~s]~n"
+ ODBCINI = io_lib:fwrite("[~ts]~n"
"Description = MS SQL~n"
"Driver = freetds~n"
- "Servername = ~s~n"
- "Database = ~s~n"
+ "Servername = ~ts~n"
+ "Database = ~ts~n"
"Port = ~p~n",
[Host, Host, DB, Port]),
- ?DEBUG("~s:~n~s", [freetds_config(), FreeTDS]),
- ?DEBUG("~s:~n~s", [odbcinst_config(), ODBCINST]),
- ?DEBUG("~s:~n~s", [odbc_config(), ODBCINI]),
+ ?DEBUG("~ts:~n~ts", [freetds_config(), FreeTDS]),
+ ?DEBUG("~ts:~n~ts", [odbcinst_config(), ODBCINST]),
+ ?DEBUG("~ts:~n~ts", [odbc_config(), ODBCINI]),
case filelib:ensure_dir(freetds_config()) of
ok ->
try
- ok = file:write_file(freetds_config(), FreeTDS, [append]),
- ok = file:write_file(odbcinst_config(), ODBCINST),
- ok = file:write_file(odbc_config(), ODBCINI, [append]),
+ ok = write_file_if_new(freetds_config(), FreeTDS),
+ ok = write_file_if_new(odbcinst_config(), ODBCINST),
+ ok = write_file_if_new(odbc_config(), ODBCINI),
os:putenv("ODBCSYSINI", tmp_dir()),
os:putenv("FREETDS", freetds_config()),
os:putenv("FREETDSCONF", freetds_config()),
ok
catch error:{badmatch, {error, Reason} = Err} ->
- ?ERROR_MSG("failed to create temporary files in ~s: ~s",
+ ?ERROR_MSG("Failed to create temporary files in ~ts: ~ts",
[tmp_dir(), file:format_error(Reason)]),
Err
end;
{error, Reason} = Err ->
- ?ERROR_MSG("failed to create temporary directory ~s: ~s",
+ ?ERROR_MSG("Failed to create temporary directory ~ts: ~ts",
[tmp_dir(), file:format_error(Reason)]),
Err
end.
+write_file_if_new(File, Payload) ->
+ case filelib:is_file(File) of
+ true -> ok;
+ false -> file:write_file(File, Payload)
+ end.
+
tmp_dir() ->
- filename:join(["/tmp", "ejabberd"]).
+ case os:type() of
+ {win32, _} -> filename:join([os:getenv("HOME"), "conf"]);
+ _ -> filename:join(["/tmp", "ejabberd"])
+ end.
odbc_config() ->
filename:join(tmp_dir(), "odbc.ini").
@@ -1026,51 +1181,55 @@ odbcinst_config() ->
filename:join(tmp_dir(), "odbcinst.ini").
max_fsm_queue() ->
- ejabberd_config:get_option(
- max_fsm_queue,
- fun(N) when is_integer(N), N > 0 -> N end).
+ proplists:get_value(max_queue, fsm_limit_opts(), unlimited).
fsm_limit_opts() ->
- case max_fsm_queue() of
- N when is_integer(N) -> [{max_queue, N}];
- _ -> []
- end.
-
-check_error({error, Why} = Err, #sql_query{} = Query) ->
- ?ERROR_MSG("SQL query '~s' at ~p failed: ~p",
- [Query#sql_query.hash, Query#sql_query.loc, Why]),
+ ejabberd_config:fsm_limit_opts([]).
+
+query_timeout(LServer) ->
+ ejabberd_option:sql_query_timeout(LServer).
+
+current_time() ->
+ erlang:monotonic_time(millisecond).
+
+%% ***IMPORTANT*** This error format requires extended_errors turned on.
+extended_error({"08S01", _, Reason}) ->
+ % TCP Provider: The specified network name is no longer available
+ ?DEBUG("ODBC Link Failure: ~ts", [Reason]),
+ <<"Communication link failure">>;
+extended_error({"08001", _, Reason}) ->
+ % Login timeout expired
+ ?DEBUG("ODBC Connect Timeout: ~ts", [Reason]),
+ <<"SQL connection failed">>;
+extended_error({"IMC01", _, Reason}) ->
+ % The connection is broken and recovery is not possible
+ ?DEBUG("ODBC Link Failure: ~ts", [Reason]),
+ <<"Communication link failure">>;
+extended_error({"IMC06", _, Reason}) ->
+ % The connection is broken and recovery is not possible
+ ?DEBUG("ODBC Link Failure: ~ts", [Reason]),
+ <<"Communication link failure">>;
+extended_error({Code, _, Reason}) ->
+ ?DEBUG("ODBC Error ~ts: ~ts", [Code, Reason]),
+ iolist_to_binary(Reason);
+extended_error(Error) ->
+ Error.
+
+check_error({error, Why} = Err, _Query) when Why == killed ->
Err;
-check_error({error, Why} = Err, Query) ->
+check_error({error, Why}, #sql_query{} = Query) ->
+ Err = extended_error(Why),
+ ?ERROR_MSG("SQL query '~ts' at ~p failed: ~p",
+ [Query#sql_query.hash, Query#sql_query.loc, Err]),
+ {error, Err};
+check_error({error, Why}, Query) ->
+ Err = extended_error(Why),
case catch iolist_to_binary(Query) of
SQuery when is_binary(SQuery) ->
- ?ERROR_MSG("SQL query '~s' failed: ~p", [SQuery, Why]);
+ ?ERROR_MSG("SQL query '~ts' failed: ~p", [SQuery, Err]);
_ ->
- ?ERROR_MSG("SQL query ~p failed: ~p", [Query, Why])
+ ?ERROR_MSG("SQL query ~p failed: ~p", [Query, Err])
end,
- Err;
+ {error, Err};
check_error(Result, _Query) ->
Result.
-
-opt_type(max_fsm_queue) ->
- fun (N) when is_integer(N), N > 0 -> N end;
-opt_type(sql_database) -> fun iolist_to_binary/1;
-opt_type(sql_keepalive_interval) ->
- fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(sql_password) -> fun iolist_to_binary/1;
-opt_type(sql_port) ->
- fun (P) when is_integer(P), P > 0, P < 65536 -> P end;
-opt_type(sql_server) -> fun iolist_to_binary/1;
-opt_type(sql_type) ->
- fun (mysql) -> mysql;
- (pgsql) -> pgsql;
- (sqlite) -> sqlite;
- (mssql) -> mssql;
- (odbc) -> odbc
- end;
-opt_type(sql_username) -> fun iolist_to_binary/1;
-opt_type(sql_prepared_statements) ->
- fun(A) when is_boolean(A) -> A end;
-opt_type(_) ->
- [max_fsm_queue, sql_database, sql_keepalive_interval,
- sql_password, sql_port, sql_server, sql_type,
- sql_username, sql_prepared_statements].