diff options
Diffstat (limited to 'src/ejabberd_sql.erl')
-rw-r--r-- | src/ejabberd_sql.erl | 881 |
1 files changed, 520 insertions, 361 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl index 4f50cfa01..c1d9383a0 100644 --- a/src/ejabberd_sql.erl +++ b/src/ejabberd_sql.erl @@ -5,7 +5,7 @@ %%% Created : 8 Dec 2004 by Alexey Shchepin <alexey@process-one.net> %%% %%% -%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% ejabberd, Copyright (C) 2002-2019 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -25,23 +25,22 @@ -module(ejabberd_sql). --behaviour(ejabberd_config). - -author('alexey@process-one.net'). --define(GEN_FSM, p1_fsm). - --behaviour(?GEN_FSM). +-behaviour(p1_fsm). %% External exports --export([start/1, start_link/2, +-export([start_link/2, sql_query/2, sql_query_t/1, sql_transaction/2, sql_bloc/2, - sql_query_to_iolist/1, + abort/1, + restart/1, + use_new_schema/0, + sql_query_to_iolist/1, escape/1, - standard_escape/1, + standard_escape/1, escape_like/1, escape_like_arg/1, escape_like_arg_circumflex/1, @@ -54,7 +53,9 @@ freetds_config/0, odbcinst_config/0, init_mssql/1, - keep_alive/1]). + keep_alive/2, + to_list/2, + to_array/2]). %% gen_fsm callbacks -export([init/1, handle_event/3, handle_sync_event/4, @@ -62,87 +63,62 @@ code_change/4]). -export([connecting/2, connecting/3, - session_established/2, session_established/3, - opt_type/1]). + session_established/2, session_established/3]). --include("ejabberd.hrl"). -include("logger.hrl"). -include("ejabberd_sql_pt.hrl"). +-include("ejabberd_stacktrace.hrl"). -record(state, - {db_ref = self() :: pid(), - db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql, - db_version = undefined :: undefined | non_neg_integer(), - start_interval = 0 :: non_neg_integer(), - host = <<"">> :: binary(), - max_pending_requests_len :: non_neg_integer(), - pending_requests = {0, queue:new()} :: {non_neg_integer(), ?TQUEUE}}). + {db_ref :: undefined | pid(), + db_type = odbc :: pgsql | mysql | sqlite | odbc | mssql, + db_version :: undefined | non_neg_integer(), + host :: binary(), + pending_requests :: p1_queue:queue(), + overload_reported :: undefined | integer()}). -define(STATE_KEY, ejabberd_sql_state). - -define(NESTING_KEY, ejabberd_sql_nesting_level). - -define(TOP_LEVEL_TXN, 0). - --define(PGSQL_PORT, 5432). - --define(MYSQL_PORT, 3306). - --define(MSSQL_PORT, 1433). - -define(MAX_TRANSACTION_RESTARTS, 10). - --define(TRANSACTION_TIMEOUT, 60000). - --define(KEEPALIVE_TIMEOUT, 60000). - -define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]). - -define(PREPARE_KEY, ejabberd_sql_prepare). - %%-define(DBGFSM, true). - -ifdef(DBGFSM). - -define(FSMOPTS, [{debug, [trace]}]). - -else. - -define(FSMOPTS, []). - -endif. +-type state() :: #state{}. +-type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} | + fun(() -> any()) | fun((atom(), _) -> any()). +-type sql_query() :: sql_query_simple() | + [{atom() | {atom(), any()}, sql_query_simple()}]. +-type sql_query_result() :: {updated, non_neg_integer()} | + {error, binary() | atom()} | + {selected, [binary()], [[binary()]]} | + {selected, [any()]} | + ok. + %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- -start(Host) -> - (?GEN_FSM):start(ejabberd_sql, [Host], - fsm_limit_opts() ++ (?FSMOPTS)). - -start_link(Host, StartInterval) -> - (?GEN_FSM):start_link(ejabberd_sql, - [Host, StartInterval], - fsm_limit_opts() ++ (?FSMOPTS)). - --type sql_query() :: [sql_query() | binary()] | #sql_query{} | - fun(() -> any()) | fun((atom(), _) -> any()). --type sql_query_result() :: {updated, non_neg_integer()} | - {error, binary()} | - {selected, [binary()], - [[binary()]]} | - {selected, [any()]}. +-spec start_link(binary(), pos_integer()) -> {ok, pid()} | {error, term()}. +start_link(Host, I) -> + Proc = binary_to_atom(get_worker_name(Host, I), utf8), + p1_fsm:start_link({local, Proc}, ?MODULE, [Host], + fsm_limit_opts() ++ ?FSMOPTS). -spec sql_query(binary(), sql_query()) -> sql_query_result(). - sql_query(Host, Query) -> - check_error(sql_call(Host, {sql_query, Query}), Query). + sql_call(Host, {sql_query, Query}). %% SQL transaction based on a list of queries %% This function automatically -spec sql_transaction(binary(), [sql_query()] | fun(() -> any())) -> {atomic, any()} | {aborted, any()}. - sql_transaction(Host, Queries) when is_list(Queries) -> F = fun () -> @@ -152,48 +128,87 @@ sql_transaction(Host, Queries) sql_transaction(Host, F); %% SQL transaction, based on a erlang anonymous function (F = fun) sql_transaction(Host, F) when is_function(F) -> - sql_call(Host, {sql_transaction, F}). + case sql_call(Host, {sql_transaction, F}) of + {atomic, _} = Ret -> Ret; + {aborted, _} = Ret -> Ret; + Err -> {aborted, Err} + end. %% SQL bloc, based on a erlang anonymous function (F = fun) sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}). sql_call(Host, Msg) -> + Timeout = query_timeout(Host), case get(?STATE_KEY) of - undefined -> - case ejabberd_sql_sup:get_random_pid(Host) of - none -> {error, <<"Unknown Host">>}; - Pid -> - (?GEN_FSM):sync_send_event(Pid,{sql_cmd, Msg, - p1_time_compat:monotonic_time(milli_seconds)}, - ?TRANSACTION_TIMEOUT) - end; - _State -> nested_op(Msg) + undefined -> + sync_send_event(Host, + {sql_cmd, Msg, current_time() + Timeout}, + Timeout); + _State -> + nested_op(Msg) end. -keep_alive(PID) -> - (?GEN_FSM):sync_send_event(PID, - {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, - p1_time_compat:monotonic_time(milli_seconds)}, - ?KEEPALIVE_TIMEOUT). +keep_alive(Host, Proc) -> + Timeout = query_timeout(Host), + case sync_send_event( + Proc, + {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, current_time() + Timeout}, + Timeout) of + {selected,_,[[<<"1">>]]} -> + ok; + _Err -> + ?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]), + sync_send_event(Proc, force_timeout, Timeout) + end. --spec sql_query_t(sql_query()) -> sql_query_result(). +sync_send_event(Host, Msg, Timeout) when is_binary(Host) -> + case ejabberd_sql_sup:start(Host) of + ok -> + Proc = get_worker(Host), + sync_send_event(Proc, Msg, Timeout); + {error, _} = Err -> + Err + end; +sync_send_event(Proc, Msg, Timeout) -> + try p1_fsm:sync_send_event(Proc, Msg, Timeout) + catch _:{Reason, {p1_fsm, _, _}} -> + {error, Reason} + end. +-spec sql_query_t(sql_query()) -> sql_query_result(). %% This function is intended to be used from inside an sql_transaction: sql_query_t(Query) -> QRes = sql_query_internal(Query), case QRes of - {error, Reason} -> throw({aborted, Reason}); + {error, Reason} -> restart(Reason); Rs when is_list(Rs) -> case lists:keysearch(error, 1, Rs) of - {value, {error, Reason}} -> throw({aborted, Reason}); + {value, {error, Reason}} -> restart(Reason); _ -> QRes end; _ -> QRes end. -%% Escape character that will confuse an SQL engine +abort(Reason) -> + exit(Reason). + +restart(Reason) -> + throw({aborted, Reason}). + +-spec escape_char(char()) -> binary(). +escape_char($\000) -> <<"\\0">>; +escape_char($\n) -> <<"\\n">>; +escape_char($\t) -> <<"\\t">>; +escape_char($\b) -> <<"\\b">>; +escape_char($\r) -> <<"\\r">>; +escape_char($') -> <<"''">>; +escape_char($") -> <<"\\\"">>; +escape_char($\\) -> <<"\\\\">>; +escape_char(C) -> <<C>>. + +-spec escape(binary()) -> binary(). escape(S) -> - << <<(sql_queries:escape(Char))/binary>> || <<Char>> <= S >>. + << <<(escape_char(Char))/binary>> || <<Char>> <= S >>. %% Escape character that will confuse an SQL engine %% Percent and underscore only need to be escaped for pattern matching like @@ -203,7 +218,7 @@ escape_like(S) when is_binary(S) -> escape_like($%) -> <<"\\%">>; escape_like($_) -> <<"\\_">>; escape_like($\\) -> <<"\\\\\\\\">>; -escape_like(C) when is_integer(C), C >= 0, C =< 255 -> sql_queries:escape(C). +escape_like(C) when is_integer(C), C >= 0, C =< 255 -> escape_char(C). escape_like_arg(S) when is_binary(S) -> << <<(escape_like_arg(C))/binary>> || <<C>> <= S >>; @@ -228,6 +243,14 @@ to_bool(true) -> true; to_bool(1) -> true; to_bool(_) -> false. +to_list(EscapeFun, Val) -> + Escaped = lists:join(<<",">>, lists:map(EscapeFun, Val)), + [<<"(">>, Escaped, <<")">>]. + +to_array(EscapeFun, Val) -> + Escaped = lists:join(<<",">>, lists:map(EscapeFun, Val)), + [<<"{">>, Escaped, <<"}">>]. + encode_term(Term) -> escape(list_to_binary( erl_prettypr:format(erl_syntax:abstract(Term), @@ -235,9 +258,23 @@ encode_term(Term) -> decode_term(Bin) -> Str = binary_to_list(<<Bin/binary, ".">>), - {ok, Tokens, _} = erl_scan:string(Str), - {ok, Term} = erl_parse:parse_term(Tokens), - Term. + try + {ok, Tokens, _} = erl_scan:string(Str), + {ok, Term} = erl_parse:parse_term(Tokens), + Term + catch _:{badmatch, {error, {Line, Mod, Reason}, _}} -> + ?ERROR_MSG("Corrupted Erlang term in SQL database:~n" + "** Scanner error: at line ~B: ~ts~n" + "** Term: ~ts", + [Line, Mod:format_error(Reason), Bin]), + erlang:error(badarg); + _:{badmatch, {error, {Line, Mod, Reason}}} -> + ?ERROR_MSG("Corrupted Erlang term in SQL database:~n" + "** Parser error: at line ~B: ~ts~n" + "** Term: ~ts", + [Line, Mod:format_error(Reason), Bin]), + erlang:error(badarg) + end. -spec sqlite_db(binary()) -> atom(). sqlite_db(Host) -> @@ -245,121 +282,136 @@ sqlite_db(Host) -> -spec sqlite_file(binary()) -> string(). sqlite_file(Host) -> - case ejabberd_config:get_option({sql_database, Host}, - fun iolist_to_binary/1) of + case ejabberd_option:sql_database(Host) of undefined -> - {ok, Cwd} = file:get_cwd(), - filename:join([Cwd, "sqlite", atom_to_list(node()), - binary_to_list(Host), "ejabberd.db"]); + Path = ["sqlite", atom_to_list(node()), + binary_to_list(Host), "ejabberd.db"], + case file:get_cwd() of + {ok, Cwd} -> + filename:join([Cwd|Path]); + {error, Reason} -> + ?ERROR_MSG("Failed to get current directory: ~ts", + [file:format_error(Reason)]), + filename:join(Path) + end; File -> binary_to_list(File) end. +use_new_schema() -> + ejabberd_option:new_sql_schema(). + +-spec get_worker(binary()) -> atom(). +get_worker(Host) -> + PoolSize = ejabberd_option:sql_pool_size(Host), + I = p1_rand:round_robin(PoolSize) + 1, + binary_to_existing_atom(get_worker_name(Host, I), utf8). + +-spec get_worker_name(binary(), pos_integer()) -> binary(). +get_worker_name(Host, I) -> + <<"ejabberd_sql_", Host/binary, $_, (integer_to_binary(I))/binary>>. + %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- -init([Host, StartInterval]) -> - case ejabberd_config:get_option( - {sql_keepalive_interval, Host}, - fun(I) when is_integer(I), I>0 -> I end) of +init([Host]) -> + process_flag(trap_exit, true), + case ejabberd_option:sql_keepalive_interval(Host) of undefined -> ok; KeepaliveInterval -> - timer:apply_interval(KeepaliveInterval * 1000, ?MODULE, - keep_alive, [self()]) + timer:apply_interval(KeepaliveInterval, ?MODULE, + keep_alive, [Host, self()]) end, [DBType | _] = db_opts(Host), - (?GEN_FSM):send_event(self(), connect), - ejabberd_sql_sup:add_pid(Host, self()), + p1_fsm:send_event(self(), connect), + QueueType = ejabberd_option:sql_queue_type(Host), {ok, connecting, #state{db_type = DBType, host = Host, - max_pending_requests_len = max_fsm_queue(), - pending_requests = {0, queue:new()}, - start_interval = StartInterval}}. + pending_requests = p1_queue:new(QueueType, max_fsm_queue())}}. connecting(connect, #state{host = Host} = State) -> ConnectRes = case db_opts(Host) of - [mysql | Args] -> apply(fun mysql_connect/5, Args); - [pgsql | Args] -> apply(fun pgsql_connect/5, Args); + [mysql | Args] -> apply(fun mysql_connect/8, Args); + [pgsql | Args] -> apply(fun pgsql_connect/8, Args); [sqlite | Args] -> apply(fun sqlite_connect/1, Args); - [mssql | Args] -> apply(fun odbc_connect/1, Args); - [odbc | Args] -> apply(fun odbc_connect/1, Args) + [mssql | Args] -> apply(fun odbc_connect/2, Args); + [odbc | Args] -> apply(fun odbc_connect/2, Args) end, - {_, PendingRequests} = State#state.pending_requests, case ConnectRes of {ok, Ref} -> - erlang:monitor(process, Ref), - lists:foreach(fun (Req) -> - (?GEN_FSM):send_event(self(), Req) - end, - queue:to_list(PendingRequests)), - State1 = State#state{db_ref = Ref, - pending_requests = {0, queue:new()}}, - State2 = get_db_version(State1), - {next_state, session_established, State2}; - {error, Reason} -> - ?INFO_MSG("~p connection failed:~n** Reason: ~p~n** " - "Retry after: ~p seconds", - [State#state.db_type, Reason, - State#state.start_interval div 1000]), - (?GEN_FSM):send_event_after(State#state.start_interval, - connect), - {next_state, connecting, State} + try link(Ref) of + _ -> + lists:foreach( + fun({{?PREPARE_KEY, _} = Key, _}) -> + erase(Key); + (_) -> + ok + end, get()), + PendingRequests = + p1_queue:dropwhile( + fun(Req) -> + p1_fsm:send_event(self(), Req), + true + end, State#state.pending_requests), + State1 = State#state{db_ref = Ref, + pending_requests = PendingRequests}, + State2 = get_db_version(State1), + {next_state, session_established, State2} + catch _:Reason -> + handle_reconnect(Reason, State) + end; + {error, Reason} -> + handle_reconnect(Reason, State) end; connecting(Event, State) -> - ?WARNING_MSG("unexpected event in 'connecting': ~p", + ?WARNING_MSG("Unexpected event in 'connecting': ~p", [Event]), {next_state, connecting, State}. -connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, - _Timestamp}, +connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, Timestamp}, From, State) -> - (?GEN_FSM):reply(From, - {error, <<"SQL connection failed">>}), + reply(From, {error, <<"SQL connection failed">>}, Timestamp), {next_state, connecting, State}; connecting({sql_cmd, Command, Timestamp} = Req, From, State) -> - ?DEBUG("queuing pending request while connecting:~n\t~p", + ?DEBUG("Queuing pending request while connecting:~n\t~p", [Req]), - {Len, PendingRequests} = State#state.pending_requests, - NewPendingRequests = if Len < - State#state.max_pending_requests_len -> - {Len + 1, - queue:in({sql_cmd, Command, From, Timestamp}, - PendingRequests)}; - true -> - lists:foreach(fun ({sql_cmd, _, To, - _Timestamp}) -> - (?GEN_FSM):reply(To, - {error, - <<"SQL connection failed">>}) - end, - queue:to_list(PendingRequests)), - {1, - queue:from_list([{sql_cmd, Command, From, - Timestamp}])} - end, + PendingRequests = + try p1_queue:in({sql_cmd, Command, From, Timestamp}, + State#state.pending_requests) + catch error:full -> + Err = <<"SQL request queue is overfilled">>, + ?ERROR_MSG("~ts, bouncing all pending requests", [Err]), + Q = p1_queue:dropwhile( + fun({sql_cmd, _, To, TS}) -> + reply(To, {error, Err}, TS), + true + end, State#state.pending_requests), + p1_queue:in({sql_cmd, Command, From, Timestamp}, Q) + end, {next_state, connecting, - State#state{pending_requests = NewPendingRequests}}; + State#state{pending_requests = PendingRequests}}; connecting(Request, {Who, _Ref}, State) -> - ?WARNING_MSG("unexpected call ~p from ~p in 'connecting'", + ?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'", [Request, Who]), - {reply, {error, badarg}, connecting, State}. + {next_state, connecting, State}. session_established({sql_cmd, Command, Timestamp}, From, State) -> run_sql_cmd(Command, From, State, Timestamp); session_established(Request, {Who, _Ref}, State) -> - ?WARNING_MSG("unexpected call ~p from ~p in 'session_establ" - "ished'", + ?WARNING_MSG("Unexpected call ~p from ~p in 'session_established'", [Request, Who]), - {reply, {error, badarg}, session_established, State}. + {next_state, session_established, State}. session_established({sql_cmd, Command, From, Timestamp}, State) -> run_sql_cmd(Command, From, State, Timestamp); +session_established(force_timeout, State) -> + {stop, timeout, State}; session_established(Event, State) -> - ?WARNING_MSG("unexpected event in 'session_established': ~p", + ?WARNING_MSG("Unexpected event in 'session_established': ~p", [Event]), {next_state, session_established, State}. @@ -372,19 +424,14 @@ handle_sync_event(_Event, _From, StateName, State) -> code_change(_OldVsn, StateName, State, _Extra) -> {ok, StateName, State}. -%% We receive the down signal when we loose the MySQL connection (we are -%% monitoring the connection) -handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, - _StateName, State) -> - (?GEN_FSM):send_event(self(), connect), - {next_state, connecting, State}; +handle_info({'EXIT', _Pid, Reason}, _StateName, State) -> + handle_reconnect(Reason, State); handle_info(Info, StateName, State) -> - ?WARNING_MSG("unexpected info in ~p: ~p", + ?WARNING_MSG("Unexpected info in ~p: ~p", [StateName, Info]), {next_state, StateName, State}. terminate(_Reason, _StateName, State) -> - ejabberd_sql_sup:remove_pid(State#state.host, self()), case State#state.db_type of mysql -> catch p1_mysql_conn:stop(State#state.db_ref); sqlite -> catch sqlite3:close(sqlite_db(State#state.host)); @@ -402,18 +449,25 @@ print_state(State) -> State. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- +handle_reconnect(Reason, #state{host = Host} = State) -> + StartInterval = ejabberd_option:sql_start_interval(Host), + ?WARNING_MSG("~p connection failed:~n" + "** Reason: ~p~n" + "** Retry after: ~B seconds", + [State#state.db_type, Reason, + StartInterval div 1000]), + p1_fsm:send_event_after(StartInterval, connect), + {next_state, connecting, State}. run_sql_cmd(Command, From, State, Timestamp) -> - case p1_time_compat:monotonic_time(milli_seconds) - Timestamp of - Age when Age < (?TRANSACTION_TIMEOUT) -> - put(?NESTING_KEY, ?TOP_LEVEL_TXN), - put(?STATE_KEY, State), - abort_on_driver_error(outer_op(Command), From); - Age -> - ?ERROR_MSG("Database was not available or too slow, " - "discarding ~p milliseconds old request~n~p~n", - [Age, Command]), - {next_state, session_established, State} + case current_time() >= Timestamp of + true -> + State1 = report_overload(State), + {next_state, session_established, State1}; + false -> + put(?NESTING_KEY, ?TOP_LEVEL_TXN), + put(?STATE_KEY, State), + abort_on_driver_error(outer_op(Command), From, Timestamp) end. %% Only called by handle_call, only handles top level operations. @@ -442,8 +496,8 @@ inner_transaction(F) -> case get(?NESTING_KEY) of ?TOP_LEVEL_TXN -> {backtrace, T} = process_info(self(), backtrace), - ?ERROR_MSG("inner transaction called at outer txn " - "level. Trace: ~s", + ?ERROR_MSG("Inner transaction called at outer txn " + "level. Trace: ~ts", [T]), erlang:exit(implementation_faulty); _N -> ok @@ -464,31 +518,35 @@ outer_transaction(F, NRestarts, _Reason) -> ?TOP_LEVEL_TXN -> ok; _N -> {backtrace, T} = process_info(self(), backtrace), - ?ERROR_MSG("outer transaction called at inner txn " - "level. Trace: ~s", + ?ERROR_MSG("Outer transaction called at inner txn " + "level. Trace: ~ts", [T]), erlang:exit(implementation_faulty) end, - sql_query_internal([<<"begin;">>]), + sql_begin(), put(?NESTING_KEY, PreviousNestingLevel + 1), - Result = (catch F()), - put(?NESTING_KEY, PreviousNestingLevel), - case Result of - {aborted, Reason} when NRestarts > 0 -> - sql_query_internal([<<"rollback;">>]), - outer_transaction(F, NRestarts - 1, Reason); - {aborted, Reason} when NRestarts =:= 0 -> - ?ERROR_MSG("SQL transaction restarts exceeded~n** " - "Restarts: ~p~n** Last abort reason: " - "~p~n** Stacktrace: ~p~n** When State " - "== ~p", - [?MAX_TRANSACTION_RESTARTS, Reason, - erlang:get_stacktrace(), get(?STATE_KEY)]), - sql_query_internal([<<"rollback;">>]), - {aborted, Reason}; - {'EXIT', Reason} -> - sql_query_internal([<<"rollback;">>]), {aborted, Reason}; - Res -> sql_query_internal([<<"commit;">>]), {atomic, Res} + try F() of + Res -> + sql_commit(), + {atomic, Res} + catch + ?EX_RULE(throw, {aborted, Reason}, _) when NRestarts > 0 -> + sql_rollback(), + put(?NESTING_KEY, ?TOP_LEVEL_TXN), + outer_transaction(F, NRestarts - 1, Reason); + ?EX_RULE(throw, {aborted, Reason}, Stack) when NRestarts =:= 0 -> + StackTrace = ?EX_STACK(Stack), + ?ERROR_MSG("SQL transaction restarts exceeded~n** " + "Restarts: ~p~n** Last abort reason: " + "~p~n** Stacktrace: ~p~n** When State " + "== ~p", + [?MAX_TRANSACTION_RESTARTS, Reason, + StackTrace, get(?STATE_KEY)]), + sql_rollback(), + {aborted, Reason}; + ?EX_RULE(exit, Reason, _) -> + sql_rollback(), + {aborted, Reason} end. execute_bloc(F) -> @@ -522,24 +580,26 @@ sql_query_internal(#sql_query{} = Query) -> mssql -> mssql_sql_query(Query); pgsql -> - PreparedStatements = ejabberd_config:get_option( - {sql_prepared_statements, ?MYNAME}, - fun(A) when is_boolean(A) -> A end, - true), Key = {?PREPARE_KEY, Query#sql_query.hash}, - if not PreparedStatements -> put(Key, ignore); - true -> ok - end, case get(Key) of undefined -> - case pgsql_prepare(Query, State) of - {ok, _, _, _} -> - put(Key, prepared); - {error, Error} -> - ?ERROR_MSG("PREPARE failed for SQL query " + Host = State#state.host, + PreparedStatements = + ejabberd_option:sql_prepared_statements(Host), + case PreparedStatements of + false -> + put(Key, ignore); + true -> + case pgsql_prepare(Query, State) of + {ok, _, _, _} -> + put(Key, prepared); + {error, Error} -> + ?ERROR_MSG( + "PREPARE failed for SQL query " "at ~p: ~p", [Query#sql_query.loc, Error]), - put(Key, ignore) + put(Key, ignore) + end end; _ -> ok @@ -548,58 +608,59 @@ sql_query_internal(#sql_query{} = Query) -> prepared -> pgsql_execute_sql_query(Query, State); _ -> - generic_sql_query(Query) + pgsql_sql_query(Query) end; mysql -> generic_sql_query(Query); sqlite -> sqlite_sql_query(Query) end - catch - Class:Reason -> - ST = erlang:get_stacktrace(), - ?ERROR_MSG("Internal error while processing SQL query: ~p", - [{Class, Reason, ST}]), + catch exit:{timeout, _} -> + {error, <<"timed out">>}; + exit:{killed, _} -> + {error, <<"killed">>}; + exit:{normal, _} -> + {error, <<"terminated unexpectedly">>}; + exit:{shutdown, _} -> + {error, <<"shutdown">>}; + ?EX_RULE(Class, Reason, Stack) -> + StackTrace = ?EX_STACK(Stack), + ?ERROR_MSG("Internal error while processing SQL query:~n** ~ts", + [misc:format_exception(2, Class, Reason, StackTrace)]), {error, <<"internal error">>} end, - case Res of - {error, <<"No SQL-driver information available.">>} -> - {updated, 0}; - _Else -> Res - end; + check_error(Res, Query); sql_query_internal(F) when is_function(F) -> case catch execute_fun(F) of + {aborted, Reason} -> {error, Reason}; {'EXIT', Reason} -> {error, Reason}; Res -> Res end; sql_query_internal(Query) -> State = get(?STATE_KEY), - ?DEBUG("SQL: \"~s\"", [Query]), + ?DEBUG("SQL: \"~ts\"", [Query]), + QueryTimeout = query_timeout(State#state.host), Res = case State#state.db_type of odbc -> to_odbc(odbc:sql_query(State#state.db_ref, [Query], - (?TRANSACTION_TIMEOUT) - 1000)); + QueryTimeout - 1000)); mssql -> to_odbc(odbc:sql_query(State#state.db_ref, [Query], - (?TRANSACTION_TIMEOUT) - 1000)); + QueryTimeout - 1000)); pgsql -> - pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query)); + pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query, + QueryTimeout - 1000)); mysql -> R = mysql_to_odbc(p1_mysql_conn:squery(State#state.db_ref, [Query], self(), - [{timeout, (?TRANSACTION_TIMEOUT) - 1000}, + [{timeout, QueryTimeout - 1000}, {result_type, binary}])), - %% ?INFO_MSG("MySQL, Received result~n~p~n", [R]), R; sqlite -> Host = State#state.host, sqlite_to_odbc(Host, sqlite3:sql_exec(sqlite_db(Host), Query)) end, - case Res of - {error, <<"No SQL-driver information available.">>} -> - {updated, 0}; - _Else -> Res - end. + check_error(Res, Query). select_sql_query(Queries, State) -> select_sql_query( @@ -636,10 +697,29 @@ generic_sql_query_format(SQLQuery) -> generic_escape() -> #sql_escape{string = fun(X) -> <<"'", (escape(X))/binary, "'">> end, - integer = fun(X) -> integer_to_binary(X) end, - boolean = fun(true) -> <<"1">>; + integer = fun(X) -> misc:i2l(X) end, + boolean = fun(true) -> <<"1">>; (false) -> <<"0">> - end + end, + in_array_string = fun(X) -> <<"'", (escape(X))/binary, "'">> end + }. + +pgsql_sql_query(SQLQuery) -> + sql_query_format_res( + sql_query_internal(pgsql_sql_query_format(SQLQuery)), + SQLQuery). + +pgsql_sql_query_format(SQLQuery) -> + Args = (SQLQuery#sql_query.args)(pgsql_escape()), + (SQLQuery#sql_query.format_query)(Args). + +pgsql_escape() -> + #sql_escape{string = fun(X) -> <<"E'", (escape(X))/binary, "'">> end, + integer = fun(X) -> misc:i2l(X) end, + boolean = fun(true) -> <<"1">>; + (false) -> <<"0">> + end, + in_array_string = fun(X) -> <<"E'", (escape(X))/binary, "'">> end }. sqlite_sql_query(SQLQuery) -> @@ -653,10 +733,11 @@ sqlite_sql_query_format(SQLQuery) -> sqlite_escape() -> #sql_escape{string = fun(X) -> <<"'", (standard_escape(X))/binary, "'">> end, - integer = fun(X) -> integer_to_binary(X) end, - boolean = fun(true) -> <<"1">>; + integer = fun(X) -> misc:i2l(X) end, + boolean = fun(true) -> <<"1">>; (false) -> <<"0">> - end + end, + in_array_string = fun(X) -> <<"'", (standard_escape(X))/binary, "'">> end }. standard_escape(S) -> @@ -677,10 +758,11 @@ pgsql_prepare(SQLQuery, State) -> pgsql_execute_escape() -> #sql_escape{string = fun(X) -> X end, - integer = fun(X) -> [integer_to_binary(X)] end, - boolean = fun(true) -> "1"; + integer = fun(X) -> [misc:i2l(X)] end, + boolean = fun(true) -> "1"; (false) -> "0" - end + end, + in_array_string = fun(X) -> <<"\"", (escape(X))/binary, "\"">> end }. pgsql_execute_sql_query(SQLQuery, State) -> @@ -689,7 +771,7 @@ pgsql_execute_sql_query(SQLQuery, State) -> pgsql:execute(State#state.db_ref, SQLQuery#sql_query.hash, Args), % {T, ExecuteRes} = % timer:tc(pgsql, execute, [State#state.db_ref, SQLQuery#sql_query.hash, Args]), -% io:format("T ~s ~p~n", [SQLQuery#sql_query.hash, T]), +% io:format("T ~ts ~p~n", [SQLQuery#sql_query.hash, T]), Res = pgsql_execute_to_odbc(ExecuteRes), sql_query_format_res(Res, SQLQuery). @@ -701,12 +783,12 @@ sql_query_format_res({selected, _, Rows}, SQLQuery) -> try [(SQLQuery#sql_query.format_res)(Row)] catch - Class:Reason -> - ST = erlang:get_stacktrace(), - ?ERROR_MSG("Error while processing " - "SQL query result: ~p~n" - "row: ~p", - [{Class, Reason, ST}, Row]), + ?EX_RULE(Class, Reason, Stack) -> + StackTrace = ?EX_STACK(Stack), + ?ERROR_MSG("Error while processing SQL query result:~n" + "** Row: ~p~n** ~ts", + [Row, + misc:format_exception(2, Class, Reason, StackTrace)]), [] end end, Rows), @@ -717,31 +799,70 @@ sql_query_format_res(Res, _SQLQuery) -> sql_query_to_iolist(SQLQuery) -> generic_sql_query_format(SQLQuery). +sql_begin() -> + sql_query_internal( + [{mssql, [<<"begin transaction;">>]}, + {any, [<<"begin;">>]}]). + +sql_commit() -> + sql_query_internal( + [{mssql, [<<"commit transaction;">>]}, + {any, [<<"commit;">>]}]). + +sql_rollback() -> + sql_query_internal( + [{mssql, [<<"rollback transaction;">>]}, + {any, [<<"rollback;">>]}]). + + %% Generate the OTP callback return tuple depending on the driver result. -abort_on_driver_error({error, <<"query timed out">>} = - Reply, - From) -> - (?GEN_FSM):reply(From, Reply), +abort_on_driver_error({error, <<"query timed out">>} = Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), + {stop, timeout, get(?STATE_KEY)}; +abort_on_driver_error({error, <<"Failed sending data on socket", _/binary>>} = Reply, + From, Timestamp) -> + reply(From, Reply, Timestamp), + {stop, closed, get(?STATE_KEY)}; +abort_on_driver_error({error, <<"SQL connection failed">>} = Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), {stop, timeout, get(?STATE_KEY)}; -abort_on_driver_error({error, - <<"Failed sending data on socket", _/binary>>} = - Reply, - From) -> - (?GEN_FSM):reply(From, Reply), +abort_on_driver_error({error, <<"Communication link failure">>} = Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), {stop, closed, get(?STATE_KEY)}; -abort_on_driver_error(Reply, From) -> - (?GEN_FSM):reply(From, Reply), +abort_on_driver_error(Reply, From, Timestamp) -> + reply(From, Reply, Timestamp), {next_state, session_established, get(?STATE_KEY)}. +-spec report_overload(state()) -> state(). +report_overload(#state{overload_reported = PrevTime} = State) -> + CurrTime = current_time(), + case PrevTime == undefined orelse (CurrTime - PrevTime) > timer:seconds(30) of + true -> + ?ERROR_MSG("SQL connection pool is overloaded, " + "discarding stale requests", []), + State#state{overload_reported = current_time()}; + false -> + State + end. + +-spec reply({pid(), term()}, term(), integer()) -> term(). +reply(From, Reply, Timestamp) -> + case current_time() >= Timestamp of + true -> ok; + false -> p1_fsm:reply(From, Reply) + end. + %% == pure ODBC code %% part of init/1 %% Open an ODBC database connection -odbc_connect(SQLServer) -> +odbc_connect(SQLServer, Timeout) -> ejabberd:start_app(odbc), odbc:connect(binary_to_list(SQLServer), [{scrollable_cursors, off}, + {extended_errors, on}, {tuple_row, off}, + {timeout, Timeout}, {binary_strings, on}]). %% == Native SQLite code @@ -775,7 +896,7 @@ sqlite_to_odbc(Host, {rowid, _}) -> sqlite_to_odbc(_Host, [{columns, Columns}, {rows, TRows}]) -> Rows = [lists:map( fun(I) when is_integer(I) -> - jlib:integer_to_binary(I); + integer_to_binary(I); (B) -> B end, tuple_to_list(Row)) || Row <- TRows], @@ -789,13 +910,16 @@ sqlite_to_odbc(_Host, _) -> %% part of init/1 %% Open a database connection to PostgreSQL -pgsql_connect(Server, Port, DB, Username, Password) -> +pgsql_connect(Server, Port, DB, Username, Password, ConnectTimeout, + Transport, SSLOpts) -> case pgsql:connect([{host, Server}, {database, DB}, {user, Username}, {password, Password}, {port, Port}, - {as_binary, true}]) of + {transport, Transport}, + {connect_timeout, ConnectTimeout}, + {as_binary, true}|SSLOpts]) of {ok, Ref} -> pgsql:squery(Ref, [<<"alter database \"">>, DB, <<"\" set ">>, <<"standard_conforming_strings='off';">>]), @@ -820,11 +944,11 @@ pgsql_item_to_odbc({<<"FETCH", _/binary>>, Rows, {selected, [element(1, Row) || Row <- Rows], Recs}; pgsql_item_to_odbc(<<"INSERT ", OIDN/binary>>) -> [_OID, N] = str:tokens(OIDN, <<" ">>), - {updated, jlib:binary_to_integer(N)}; + {updated, binary_to_integer(N)}; pgsql_item_to_odbc(<<"DELETE ", N/binary>>) -> - {updated, jlib:binary_to_integer(N)}; + {updated, binary_to_integer(N)}; pgsql_item_to_odbc(<<"UPDATE ", N/binary>>) -> - {updated, jlib:binary_to_integer(N)}; + {updated, binary_to_integer(N)}; pgsql_item_to_odbc({error, Error}) -> {error, Error}; pgsql_item_to_odbc(_) -> {updated, undefined}. @@ -844,11 +968,12 @@ pgsql_execute_to_odbc(_) -> {updated, undefined}. %% part of init/1 %% Open a database connection to MySQL -mysql_connect(Server, Port, DB, Username, Password) -> +mysql_connect(Server, Port, DB, Username, Password, ConnectTimeout, _, _) -> case p1_mysql_conn:start(binary_to_list(Server), Port, binary_to_list(Username), binary_to_list(Password), - binary_to_list(DB), fun log/3) + binary_to_list(DB), + ConnectTimeout, fun log/3) of {ok, Ref} -> p1_mysql_conn:fetch( @@ -882,7 +1007,7 @@ mysql_item_to_odbc(Columns, Recs) -> to_odbc({selected, Columns, Recs}) -> Rows = [lists:map( fun(I) when is_integer(I) -> - jlib:integer_to_binary(I); + integer_to_binary(I); (B) -> B end, Row) || Row <- Recs], @@ -900,11 +1025,11 @@ get_db_version(#state{db_type = pgsql} = State) -> Version when is_integer(Version) -> State#state{db_version = Version}; Error -> - ?WARNING_MSG("error getting pgsql version: ~p", [Error]), + ?WARNING_MSG("Error getting pgsql version: ~p", [Error]), State end; Res -> - ?WARNING_MSG("error getting pgsql version: ~p", [Res]), + ?WARNING_MSG("Error getting pgsql version: ~p", [Res]), State end; get_db_version(State) -> @@ -913,67 +1038,88 @@ get_db_version(State) -> log(Level, Format, Args) -> case Level of debug -> ?DEBUG(Format, Args); + info -> ?INFO_MSG(Format, Args); normal -> ?INFO_MSG(Format, Args); error -> ?ERROR_MSG(Format, Args) end. db_opts(Host) -> - Type = ejabberd_config:get_option({sql_type, Host}, - fun(mysql) -> mysql; - (pgsql) -> pgsql; - (sqlite) -> sqlite; - (mssql) -> mssql; - (odbc) -> odbc - end, odbc), - Server = ejabberd_config:get_option({sql_server, Host}, - fun iolist_to_binary/1, - <<"localhost">>), + Type = ejabberd_option:sql_type(Host), + Server = ejabberd_option:sql_server(Host), + Timeout = ejabberd_option:sql_connect_timeout(Host), + Transport = case ejabberd_option:sql_ssl(Host) of + false -> tcp; + true -> ssl + end, + warn_if_ssl_unsupported(Transport, Type), case Type of odbc -> - [odbc, Server]; + [odbc, Server, Timeout]; sqlite -> [sqlite, Host]; _ -> - Port = ejabberd_config:get_option( - {sql_port, Host}, - fun(P) when is_integer(P), P > 0, P < 65536 -> P end, - case Type of - mssql -> ?MSSQL_PORT; - mysql -> ?MYSQL_PORT; - pgsql -> ?PGSQL_PORT - end), - DB = ejabberd_config:get_option({sql_database, Host}, - fun iolist_to_binary/1, - <<"ejabberd">>), - User = ejabberd_config:get_option({sql_username, Host}, - fun iolist_to_binary/1, - <<"ejabberd">>), - Pass = ejabberd_config:get_option({sql_password, Host}, - fun iolist_to_binary/1, - <<"">>), + Port = ejabberd_option:sql_port(Host), + DB = case ejabberd_option:sql_database(Host) of + undefined -> <<"ejabberd">>; + D -> D + end, + User = ejabberd_option:sql_username(Host), + Pass = ejabberd_option:sql_password(Host), + SSLOpts = get_ssl_opts(Transport, Host), case Type of mssql -> [mssql, <<"DSN=", Host/binary, ";UID=", User/binary, - ";PWD=", Pass/binary>>]; + ";PWD=", Pass/binary>>, Timeout]; _ -> - [Type, Server, Port, DB, User, Pass] + [Type, Server, Port, DB, User, Pass, Timeout, Transport, SSLOpts] end end. +warn_if_ssl_unsupported(tcp, _) -> + ok; +warn_if_ssl_unsupported(ssl, pgsql) -> + ok; +warn_if_ssl_unsupported(ssl, Type) -> + ?WARNING_MSG("SSL connection is not supported for ~ts", [Type]). + +get_ssl_opts(ssl, Host) -> + Opts1 = case ejabberd_option:sql_ssl_certfile(Host) of + undefined -> []; + CertFile -> [{certfile, CertFile}] + end, + Opts2 = case ejabberd_option:sql_ssl_cafile(Host) of + undefined -> Opts1; + CAFile -> [{cacertfile, CAFile}|Opts1] + end, + case ejabberd_option:sql_ssl_verify(Host) of + true -> + case lists:keymember(cacertfile, 1, Opts2) of + true -> + [{verify, verify_peer}|Opts2]; + false -> + ?WARNING_MSG("SSL verification is enabled for " + "SQL connection, but option " + "'sql_ssl_cafile' is not set; " + "verification will be disabled", []), + Opts2 + end; + false -> + Opts2 + end; +get_ssl_opts(tcp, _) -> + []. + init_mssql(Host) -> - Server = ejabberd_config:get_option({sql_server, Host}, - fun iolist_to_binary/1, - <<"localhost">>), - Port = ejabberd_config:get_option( - {sql_port, Host}, - fun(P) when is_integer(P), P > 0, P < 65536 -> P end, - ?MSSQL_PORT), - DB = ejabberd_config:get_option({sql_database, Host}, - fun iolist_to_binary/1, - <<"ejabberd">>), - FreeTDS = io_lib:fwrite("[~s]~n" - "\thost = ~s~n" + Server = ejabberd_option:sql_server(Host), + Port = ejabberd_option:sql_port(Host), + DB = case ejabberd_option:sql_database(Host) of + undefined -> <<"ejabberd">>; + D -> D + end, + FreeTDS = io_lib:fwrite("[~ts]~n" + "\thost = ~ts~n" "\tport = ~p~n" + "\tclient charset = UTF-8~n" "\ttds version = 7.1~n", [Host, Server, Port]), ODBCINST = io_lib:fwrite("[freetds]~n" @@ -982,39 +1128,48 @@ init_mssql(Host) -> "Setup = libtdsS.so~n" "UsageCount = 1~n" "FileUsage = 1~n", []), - ODBCINI = io_lib:fwrite("[~s]~n" + ODBCINI = io_lib:fwrite("[~ts]~n" "Description = MS SQL~n" "Driver = freetds~n" - "Servername = ~s~n" - "Database = ~s~n" + "Servername = ~ts~n" + "Database = ~ts~n" "Port = ~p~n", [Host, Host, DB, Port]), - ?DEBUG("~s:~n~s", [freetds_config(), FreeTDS]), - ?DEBUG("~s:~n~s", [odbcinst_config(), ODBCINST]), - ?DEBUG("~s:~n~s", [odbc_config(), ODBCINI]), + ?DEBUG("~ts:~n~ts", [freetds_config(), FreeTDS]), + ?DEBUG("~ts:~n~ts", [odbcinst_config(), ODBCINST]), + ?DEBUG("~ts:~n~ts", [odbc_config(), ODBCINI]), case filelib:ensure_dir(freetds_config()) of ok -> try - ok = file:write_file(freetds_config(), FreeTDS, [append]), - ok = file:write_file(odbcinst_config(), ODBCINST), - ok = file:write_file(odbc_config(), ODBCINI, [append]), + ok = write_file_if_new(freetds_config(), FreeTDS), + ok = write_file_if_new(odbcinst_config(), ODBCINST), + ok = write_file_if_new(odbc_config(), ODBCINI), os:putenv("ODBCSYSINI", tmp_dir()), os:putenv("FREETDS", freetds_config()), os:putenv("FREETDSCONF", freetds_config()), ok catch error:{badmatch, {error, Reason} = Err} -> - ?ERROR_MSG("failed to create temporary files in ~s: ~s", + ?ERROR_MSG("Failed to create temporary files in ~ts: ~ts", [tmp_dir(), file:format_error(Reason)]), Err end; {error, Reason} = Err -> - ?ERROR_MSG("failed to create temporary directory ~s: ~s", + ?ERROR_MSG("Failed to create temporary directory ~ts: ~ts", [tmp_dir(), file:format_error(Reason)]), Err end. +write_file_if_new(File, Payload) -> + case filelib:is_file(File) of + true -> ok; + false -> file:write_file(File, Payload) + end. + tmp_dir() -> - filename:join(["/tmp", "ejabberd"]). + case os:type() of + {win32, _} -> filename:join([os:getenv("HOME"), "conf"]); + _ -> filename:join(["/tmp", "ejabberd"]) + end. odbc_config() -> filename:join(tmp_dir(), "odbc.ini"). @@ -1026,51 +1181,55 @@ odbcinst_config() -> filename:join(tmp_dir(), "odbcinst.ini"). max_fsm_queue() -> - ejabberd_config:get_option( - max_fsm_queue, - fun(N) when is_integer(N), N > 0 -> N end). + proplists:get_value(max_queue, fsm_limit_opts(), unlimited). fsm_limit_opts() -> - case max_fsm_queue() of - N when is_integer(N) -> [{max_queue, N}]; - _ -> [] - end. - -check_error({error, Why} = Err, #sql_query{} = Query) -> - ?ERROR_MSG("SQL query '~s' at ~p failed: ~p", - [Query#sql_query.hash, Query#sql_query.loc, Why]), + ejabberd_config:fsm_limit_opts([]). + +query_timeout(LServer) -> + ejabberd_option:sql_query_timeout(LServer). + +current_time() -> + erlang:monotonic_time(millisecond). + +%% ***IMPORTANT*** This error format requires extended_errors turned on. +extended_error({"08S01", _, Reason}) -> + % TCP Provider: The specified network name is no longer available + ?DEBUG("ODBC Link Failure: ~ts", [Reason]), + <<"Communication link failure">>; +extended_error({"08001", _, Reason}) -> + % Login timeout expired + ?DEBUG("ODBC Connect Timeout: ~ts", [Reason]), + <<"SQL connection failed">>; +extended_error({"IMC01", _, Reason}) -> + % The connection is broken and recovery is not possible + ?DEBUG("ODBC Link Failure: ~ts", [Reason]), + <<"Communication link failure">>; +extended_error({"IMC06", _, Reason}) -> + % The connection is broken and recovery is not possible + ?DEBUG("ODBC Link Failure: ~ts", [Reason]), + <<"Communication link failure">>; +extended_error({Code, _, Reason}) -> + ?DEBUG("ODBC Error ~ts: ~ts", [Code, Reason]), + iolist_to_binary(Reason); +extended_error(Error) -> + Error. + +check_error({error, Why} = Err, _Query) when Why == killed -> Err; -check_error({error, Why} = Err, Query) -> +check_error({error, Why}, #sql_query{} = Query) -> + Err = extended_error(Why), + ?ERROR_MSG("SQL query '~ts' at ~p failed: ~p", + [Query#sql_query.hash, Query#sql_query.loc, Err]), + {error, Err}; +check_error({error, Why}, Query) -> + Err = extended_error(Why), case catch iolist_to_binary(Query) of SQuery when is_binary(SQuery) -> - ?ERROR_MSG("SQL query '~s' failed: ~p", [SQuery, Why]); + ?ERROR_MSG("SQL query '~ts' failed: ~p", [SQuery, Err]); _ -> - ?ERROR_MSG("SQL query ~p failed: ~p", [Query, Why]) + ?ERROR_MSG("SQL query ~p failed: ~p", [Query, Err]) end, - Err; + {error, Err}; check_error(Result, _Query) -> Result. - -opt_type(max_fsm_queue) -> - fun (N) when is_integer(N), N > 0 -> N end; -opt_type(sql_database) -> fun iolist_to_binary/1; -opt_type(sql_keepalive_interval) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(sql_password) -> fun iolist_to_binary/1; -opt_type(sql_port) -> - fun (P) when is_integer(P), P > 0, P < 65536 -> P end; -opt_type(sql_server) -> fun iolist_to_binary/1; -opt_type(sql_type) -> - fun (mysql) -> mysql; - (pgsql) -> pgsql; - (sqlite) -> sqlite; - (mssql) -> mssql; - (odbc) -> odbc - end; -opt_type(sql_username) -> fun iolist_to_binary/1; -opt_type(sql_prepared_statements) -> - fun(A) when is_boolean(A) -> A end; -opt_type(_) -> - [max_fsm_queue, sql_database, sql_keepalive_interval, - sql_password, sql_port, sql_server, sql_type, - sql_username, sql_prepared_statements]. |