diff options
Diffstat (limited to 'src/ejabberd_sql.erl')
-rw-r--r-- | src/ejabberd_sql.erl | 186 |
1 files changed, 76 insertions, 110 deletions
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl index 3431e61b8..5a94dcf8e 100644 --- a/src/ejabberd_sql.erl +++ b/src/ejabberd_sql.erl @@ -25,8 +25,6 @@ -module(ejabberd_sql). --behaviour(ejabberd_config). - -author('alexey@process-one.net'). -behaviour(p1_fsm). @@ -65,8 +63,7 @@ code_change/4]). -export([connecting/2, connecting/3, - session_established/2, session_established/3, - opt_type/1]). + session_established/2, session_established/3]). -include("logger.hrl"). -include("ejabberd_sql_pt.hrl"). @@ -86,24 +83,12 @@ -define(TOP_LEVEL_TXN, 0). --define(PGSQL_PORT, 5432). - --define(MYSQL_PORT, 3306). - --define(MSSQL_PORT, 1433). - -define(MAX_TRANSACTION_RESTARTS, 10). -define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]). -define(PREPARE_KEY, ejabberd_sql_prepare). --ifdef(NEW_SQL_SCHEMA). --define(USE_NEW_SCHEMA_DEFAULT, true). --else. --define(USE_NEW_SCHEMA_DEFAULT, false). --endif. - %%-define(DBGFSM, true). -ifdef(DBGFSM). @@ -128,13 +113,15 @@ start_link(Host, StartInterval) -> [Host, StartInterval], fsm_limit_opts() ++ (?FSMOPTS)). --type sql_query() :: [sql_query() | binary()] | #sql_query{} | - fun(() -> any()) | fun((atom(), _) -> any()). +-type sql_query_simple() :: [sql_query() | binary()] | #sql_query{} | + fun(() -> any()) | fun((atom(), _) -> any()). +-type sql_query() :: sql_query_simple() | + [{atom() | {atom(), any()}, sql_query_simple()}]. -type sql_query_result() :: {updated, non_neg_integer()} | - {error, binary()} | - {selected, [binary()], - [[binary()]]} | - {selected, [any()]}. + {error, binary() | atom()} | + {selected, [binary()], [[binary()]]} | + {selected, [any()]} | + ok. -spec sql_query(binary(), sql_query()) -> sql_query_result(). @@ -156,7 +143,11 @@ sql_transaction(Host, Queries) sql_transaction(Host, F); %% SQL transaction, based on a erlang anonymous function (F = fun) sql_transaction(Host, F) when is_function(F) -> - sql_call(Host, {sql_transaction, F}). + case sql_call(Host, {sql_transaction, F}) of + {atomic, _} = Ret -> Ret; + {aborted, _} = Ret -> Ret; + Err -> {aborted, Err} + end. %% SQL bloc, based on a erlang anonymous function (F = fun) sql_bloc(Host, F) -> sql_call(Host, {sql_bloc, F}). @@ -182,7 +173,7 @@ keep_alive(Host, PID) -> {selected,_,[[<<"1">>]]} -> ok; _Err -> - ?ERROR_MSG("keep alive query failed, closing connection: ~p", [_Err]), + ?ERROR_MSG("Keep alive query failed, closing connection: ~p", [_Err]), sync_send_event(PID, force_timeout, query_timeout(Host)) end. @@ -300,39 +291,41 @@ sqlite_db(Host) -> -spec sqlite_file(binary()) -> string(). sqlite_file(Host) -> - case ejabberd_config:get_option({sql_database, Host}) of + case ejabberd_option:sql_database(Host) of undefined -> - {ok, Cwd} = file:get_cwd(), - filename:join([Cwd, "sqlite", atom_to_list(node()), - binary_to_list(Host), "ejabberd.db"]); + Path = ["sqlite", atom_to_list(node()), + binary_to_list(Host), "ejabberd.db"], + case file:get_cwd() of + {ok, Cwd} -> + filename:join([Cwd|Path]); + {error, Reason} -> + ?ERROR_MSG("Failed to get current directory: ~s", + [file:format_error(Reason)]), + filename:join(Path) + end; File -> binary_to_list(File) end. use_new_schema() -> - ejabberd_config:get_option(new_sql_schema, ?USE_NEW_SCHEMA_DEFAULT). + ejabberd_option:new_sql_schema(). %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- init([Host, StartInterval]) -> process_flag(trap_exit, true), - case ejabberd_config:get_option({sql_keepalive_interval, Host}) of + case ejabberd_option:sql_keepalive_interval(Host) of undefined -> ok; KeepaliveInterval -> - timer:apply_interval(KeepaliveInterval * 1000, ?MODULE, + timer:apply_interval(KeepaliveInterval, ?MODULE, keep_alive, [Host, self()]) end, [DBType | _] = db_opts(Host), p1_fsm:send_event(self(), connect), ejabberd_sql_sup:add_pid(Host, self()), - QueueType = case ejabberd_config:get_option({sql_queue_type, Host}) of - undefined -> - ejabberd_config:default_queue_type(Host); - Type -> - Type - end, + QueueType = ejabberd_option:sql_queue_type(Host), {ok, connecting, #state{db_type = DBType, host = Host, pending_requests = p1_queue:new(QueueType, max_fsm_queue()), @@ -375,7 +368,7 @@ connecting(connect, #state{host = Host} = State) -> {next_state, connecting, State} end; connecting(Event, State) -> - ?WARNING_MSG("unexpected event in 'connecting': ~p", + ?WARNING_MSG("Unexpected event in 'connecting': ~p", [Event]), {next_state, connecting, State}. @@ -387,7 +380,7 @@ connecting({sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, {next_state, connecting, State}; connecting({sql_cmd, Command, Timestamp} = Req, From, State) -> - ?DEBUG("queuing pending request while connecting:~n\t~p", + ?DEBUG("Queuing pending request while connecting:~n\t~p", [Req]), PendingRequests = try p1_queue:in({sql_cmd, Command, From, Timestamp}, @@ -404,7 +397,7 @@ connecting({sql_cmd, Command, Timestamp} = Req, From, {next_state, connecting, State#state{pending_requests = PendingRequests}}; connecting(Request, {Who, _Ref}, State) -> - ?WARNING_MSG("unexpected call ~p from ~p in 'connecting'", + ?WARNING_MSG("Unexpected call ~p from ~p in 'connecting'", [Request, Who]), {reply, {error, badarg}, connecting, State}. @@ -412,7 +405,7 @@ session_established({sql_cmd, Command, Timestamp}, From, State) -> run_sql_cmd(Command, From, State, Timestamp); session_established(Request, {Who, _Ref}, State) -> - ?WARNING_MSG("unexpected call ~p from ~p in 'session_establ" + ?WARNING_MSG("Unexpected call ~p from ~p in 'session_establ" "ished'", [Request, Who]), {reply, {error, badarg}, session_established, State}. @@ -423,7 +416,7 @@ session_established({sql_cmd, Command, From, Timestamp}, session_established(force_timeout, State) -> {stop, timeout, State}; session_established(Event, State) -> - ?WARNING_MSG("unexpected event in 'session_established': ~p", + ?WARNING_MSG("Unexpected event in 'session_established': ~p", [Event]), {next_state, session_established, State}. @@ -443,7 +436,7 @@ handle_info({'DOWN', _MonitorRef, process, _Pid, _Info}, p1_fsm:send_event(self(), connect), {next_state, connecting, State}; handle_info(Info, StateName, State) -> - ?WARNING_MSG("unexpected info in ~p: ~p", + ?WARNING_MSG("Unexpected info in ~p: ~p", [StateName, Info]), {next_state, StateName, State}. @@ -507,7 +500,7 @@ inner_transaction(F) -> case get(?NESTING_KEY) of ?TOP_LEVEL_TXN -> {backtrace, T} = process_info(self(), backtrace), - ?ERROR_MSG("inner transaction called at outer txn " + ?ERROR_MSG("Inner transaction called at outer txn " "level. Trace: ~s", [T]), erlang:exit(implementation_faulty); @@ -529,7 +522,7 @@ outer_transaction(F, NRestarts, _Reason) -> ?TOP_LEVEL_TXN -> ok; _N -> {backtrace, T} = process_info(self(), backtrace), - ?ERROR_MSG("outer transaction called at inner txn " + ?ERROR_MSG("Outer transaction called at inner txn " "level. Trace: ~s", [T]), erlang:exit(implementation_faulty) @@ -546,12 +539,13 @@ outer_transaction(F, NRestarts, _Reason) -> put(?NESTING_KEY, ?TOP_LEVEL_TXN), outer_transaction(F, NRestarts - 1, Reason); ?EX_RULE(throw, {aborted, Reason}, Stack) when NRestarts =:= 0 -> + StackTrace = ?EX_STACK(Stack), ?ERROR_MSG("SQL transaction restarts exceeded~n** " "Restarts: ~p~n** Last abort reason: " "~p~n** Stacktrace: ~p~n** When State " "== ~p", [?MAX_TRANSACTION_RESTARTS, Reason, - ?EX_STACK(Stack), get(?STATE_KEY)]), + StackTrace, get(?STATE_KEY)]), sql_query_internal([<<"rollback;">>]), {aborted, Reason}; ?EX_RULE(exit, Reason, _) -> @@ -623,8 +617,9 @@ sql_query_internal(#sql_query{} = Query) -> exit:{normal, _} -> {error, <<"terminated unexpectedly">>}; ?EX_RULE(Class, Reason, Stack) -> - ?ERROR_MSG("Internal error while processing SQL query: ~p", - [{Class, Reason, ?EX_STACK(Stack)}]), + StackTrace = ?EX_STACK(Stack), + ?ERROR_MSG("Internal error while processing SQL query:~n** ~s", + [misc:format_exception(2, Class, Reason, StackTrace)]), {error, <<"internal error">>} end, check_error(Res, Query); @@ -764,10 +759,11 @@ sql_query_format_res({selected, _, Rows}, SQLQuery) -> [(SQLQuery#sql_query.format_res)(Row)] catch ?EX_RULE(Class, Reason, Stack) -> - ?ERROR_MSG("Error while processing " - "SQL query result: ~p~n" - "row: ~p", - [{Class, Reason, ?EX_STACK(Stack)}, Row]), + StackTrace = ?EX_STACK(Stack), + ?ERROR_MSG("Error while processing SQL query result:~n" + "** Row: ~p~n** ~s", + [Row, + misc:format_exception(2, Class, Reason, StackTrace)]), [] end end, Rows), @@ -976,11 +972,11 @@ get_db_version(#state{db_type = pgsql} = State) -> Version when is_integer(Version) -> State#state{db_version = Version}; Error -> - ?WARNING_MSG("error getting pgsql version: ~p", [Error]), + ?WARNING_MSG("Error getting pgsql version: ~p", [Error]), State end; Res -> - ?WARNING_MSG("error getting pgsql version: ~p", [Res]), + ?WARNING_MSG("Error getting pgsql version: ~p", [Res]), State end; get_db_version(State) -> @@ -995,11 +991,13 @@ log(Level, Format, Args) -> end. db_opts(Host) -> - Type = ejabberd_config:get_option({sql_type, Host}, odbc), - Server = ejabberd_config:get_option({sql_server, Host}, <<"localhost">>), - Timeout = timer:seconds( - ejabberd_config:get_option({sql_connect_timeout, Host}, 5)), - Transport = case ejabberd_config:get_option({sql_ssl, Host}, false) of + Type = case ejabberd_option:sql_type(Host) of + undefined -> odbc; + T -> T + end, + Server = ejabberd_option:sql_server(Host), + Timeout = ejabberd_option:sql_connect_timeout(Host), + Transport = case ejabberd_option:sql_ssl(Host) of false -> tcp; true -> ssl end, @@ -1010,19 +1008,13 @@ db_opts(Host) -> sqlite -> [sqlite, Host]; _ -> - Port = ejabberd_config:get_option( - {sql_port, Host}, - case Type of - mssql -> ?MSSQL_PORT; - mysql -> ?MYSQL_PORT; - pgsql -> ?PGSQL_PORT - end), - DB = ejabberd_config:get_option({sql_database, Host}, - <<"ejabberd">>), - User = ejabberd_config:get_option({sql_username, Host}, - <<"ejabberd">>), - Pass = ejabberd_config:get_option({sql_password, Host}, - <<"">>), + Port = ejabberd_option:sql_port(Host), + DB = case ejabberd_option:sql_database(Host) of + undefined -> <<"ejabberd">>; + D -> D + end, + User = ejabberd_option:sql_username(Host), + Pass = ejabberd_option:sql_password(Host), SSLOpts = get_ssl_opts(Transport, Host), case Type of mssql -> @@ -1041,15 +1033,15 @@ warn_if_ssl_unsupported(ssl, Type) -> ?WARNING_MSG("SSL connection is not supported for ~s", [Type]). get_ssl_opts(ssl, Host) -> - Opts1 = case ejabberd_config:get_option({sql_ssl_certfile, Host}) of + Opts1 = case ejabberd_option:sql_ssl_certfile(Host) of undefined -> []; CertFile -> [{certfile, CertFile}] end, - Opts2 = case ejabberd_config:get_option({sql_ssl_cafile, Host}) of + Opts2 = case ejabberd_option:sql_ssl_cafile(Host) of undefined -> Opts1; CAFile -> [{cacertfile, CAFile}|Opts1] end, - case ejabberd_config:get_option({sql_ssl_verify, Host}, false) of + case ejabberd_option:sql_ssl_verify(Host) of true -> case lists:keymember(cacertfile, 1, Opts2) of true -> @@ -1068,9 +1060,12 @@ get_ssl_opts(tcp, _) -> []. init_mssql(Host) -> - Server = ejabberd_config:get_option({sql_server, Host}, <<"localhost">>), - Port = ejabberd_config:get_option({sql_port, Host}, ?MSSQL_PORT), - DB = ejabberd_config:get_option({sql_database, Host}, <<"ejabberd">>), + Server = ejabberd_option:sql_server(Host), + Port = ejabberd_option:sql_port(Host), + DB = case ejabberd_option:sql_database(Host) of + undefined -> <<"ejabberd">>; + D -> D + end, FreeTDS = io_lib:fwrite("[~s]~n" "\thost = ~s~n" "\tport = ~p~n" @@ -1104,12 +1099,12 @@ init_mssql(Host) -> os:putenv("FREETDSCONF", freetds_config()), ok catch error:{badmatch, {error, Reason} = Err} -> - ?ERROR_MSG("failed to create temporary files in ~s: ~s", + ?ERROR_MSG("Failed to create temporary files in ~s: ~s", [tmp_dir(), file:format_error(Reason)]), Err end; {error, Reason} = Err -> - ?ERROR_MSG("failed to create temporary directory ~s: ~s", + ?ERROR_MSG("Failed to create temporary directory ~s: ~s", [tmp_dir(), file:format_error(Reason)]), Err end. @@ -1142,8 +1137,7 @@ fsm_limit_opts() -> ejabberd_config:fsm_limit_opts([]). query_timeout(LServer) -> - timer:seconds( - ejabberd_config:get_option({sql_query_timeout, LServer}, 60)). + ejabberd_option:sql_query_timeout(LServer). %% ***IMPORTANT*** This error format requires extended_errors turned on. extended_error({"08S01", _, Reason}) -> @@ -1186,31 +1180,3 @@ check_error({error, Why}, Query) -> {error, Err}; check_error(Result, _Query) -> Result. - --spec opt_type(atom()) -> fun((any()) -> any()) | [atom()]. -opt_type(sql_database) -> fun iolist_to_binary/1; -opt_type(sql_keepalive_interval) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(sql_password) -> fun iolist_to_binary/1; -opt_type(sql_port) -> - fun (P) when is_integer(P), P > 0, P < 65536 -> P end; -opt_type(sql_server) -> fun iolist_to_binary/1; -opt_type(sql_username) -> fun iolist_to_binary/1; -opt_type(sql_ssl) -> fun(B) when is_boolean(B) -> B end; -opt_type(sql_ssl_verify) -> fun(B) when is_boolean(B) -> B end; -opt_type(sql_ssl_certfile) -> fun ejabberd_pkix:try_certfile/1; -opt_type(sql_ssl_cafile) -> fun ejabberd_pkix:try_certfile/1; -opt_type(sql_query_timeout) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(sql_connect_timeout) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(sql_queue_type) -> - fun(ram) -> ram; (file) -> file end; -opt_type(new_sql_schema) -> fun(B) when is_boolean(B) -> B end; -opt_type(_) -> - [sql_database, sql_keepalive_interval, - sql_password, sql_port, sql_server, - sql_username, sql_ssl, sql_ssl_verify, sql_ssl_certfile, - sql_ssl_cafile, sql_queue_type, sql_query_timeout, - sql_connect_timeout, - new_sql_schema]. |