diff options
author | Alexey Shchepin <alexey@process-one.net> | 2016-02-09 19:23:15 +0300 |
---|---|---|
committer | Alexey Shchepin <alexey@process-one.net> | 2016-03-01 22:48:30 +0300 |
commit | 6374ef48669283933931946f9fbe9a6fccd811ed (patch) | |
tree | ff479e6bd02be3e5abbd8c535d63cdc7296c5faf /src/ejabberd_odbc.erl | |
parent | Update ejabberd version for hex.pm release (diff) |
New parse transform for SQL queries, use prepare/execute calls with Postgres
Diffstat (limited to 'src/ejabberd_odbc.erl')
-rw-r--r-- | src/ejabberd_odbc.erl | 130 |
1 files changed, 128 insertions, 2 deletions
diff --git a/src/ejabberd_odbc.erl b/src/ejabberd_odbc.erl index a15c66b5..ef3c61d0 100644 --- a/src/ejabberd_odbc.erl +++ b/src/ejabberd_odbc.erl @@ -63,6 +63,7 @@ -include("ejabberd.hrl"). -include("logger.hrl"). +-include("ejabberd_sql_pt.hrl"). -record(state, {db_ref = self() :: pid(), @@ -92,6 +93,8 @@ -define(KEEPALIVE_QUERY, [<<"SELECT 1;">>]). +-define(PREPARE_KEY, ejabberd_odbc_prepare). + %%-define(DBGFSM, true). -ifdef(DBGFSM). @@ -116,11 +119,12 @@ start_link(Host, StartInterval) -> [Host, StartInterval], fsm_limit_opts() ++ (?FSMOPTS)). --type sql_query() :: [sql_query() | binary()]. +-type sql_query() :: [sql_query() | binary()] | #sql_query{}. -type sql_query_result() :: {updated, non_neg_integer()} | {error, binary()} | {selected, [binary()], - [[binary()]]}. + [[binary()]]} | + {selected, [any]}. -spec sql_query(binary(), sql_query()) -> sql_query_result(). @@ -469,6 +473,52 @@ execute_bloc(F) -> Res -> {atomic, Res} end. +sql_query_internal(#sql_query{} = Query) -> + State = get(?STATE_KEY), + Res = + try + case State#state.db_type of + odbc -> + generic_sql_query(Query); + pgsql -> + Key = {?PREPARE_KEY, Query#sql_query.hash}, + case get(Key) of + undefined -> + case pgsql_prepare(Query, State) of + {ok, _, _, _} -> + put(Key, prepared); + {error, Error} -> + ?ERROR_MSG("PREPARE failed for SQL query " + "at ~p: ~p", + [Query#sql_query.loc, Error]), + put(Key, ignore) + end; + _ -> + ok + end, + case get(Key) of + prepared -> + pgsql_execute_sql_query(Query, State); + _ -> + generic_sql_query(Query) + end; + mysql -> + generic_sql_query(Query); + sqlite -> + generic_sql_query(Query) + end + catch + Class:Reason -> + ST = erlang:get_stacktrace(), + ?ERROR_MSG("Internal error while processing SQL query: ~p", + [{Class, Reason, ST}]), + {error, <<"internal error">>} + end, + case Res of + {error, <<"No SQL-driver information available.">>} -> + {updated, 0}; + _Else -> Res + end; sql_query_internal(Query) -> State = get(?STATE_KEY), ?DEBUG("SQL: \"~s\"", [Query]), @@ -495,6 +545,66 @@ sql_query_internal(Query) -> _Else -> Res end. +generic_sql_query(SQLQuery) -> + sql_query_format_res( + sql_query_internal(generic_sql_query_format(SQLQuery)), + SQLQuery). + +generic_sql_query_format(SQLQuery) -> + Args = (SQLQuery#sql_query.args)(generic_escape()), + (SQLQuery#sql_query.format_query)(Args). + +generic_escape() -> + #sql_escape{string = fun(X) -> <<"'", (escape(X))/binary, "'">> end, + integer = fun(X) -> integer_to_binary(X) end, + boolean = fun(true) -> <<"1">>; + (false) -> <<"0">> + end + }. + +pgsql_prepare(SQLQuery, State) -> + Escape = #sql_escape{_ = fun(X) -> X end}, + N = length((SQLQuery#sql_query.args)(Escape)), + Args = [<<$$, (integer_to_binary(I))/binary>> || I <- lists:seq(1, N)], + Query = (SQLQuery#sql_query.format_query)(Args), + pgsql:prepare(State#state.db_ref, SQLQuery#sql_query.hash, Query). + +pgsql_execute_escape() -> + #sql_escape{string = fun(X) -> X end, + integer = fun(X) -> integer_to_binary(X) end, + boolean = fun(true) -> <<"1">>; + (false) -> <<"0">> + end + }. + +pgsql_execute_sql_query(SQLQuery, State) -> + Args = (SQLQuery#sql_query.args)(pgsql_execute_escape()), + ExecuteRes = + pgsql:execute(State#state.db_ref, SQLQuery#sql_query.hash, Args), + Res = pgsql_execute_to_odbc(ExecuteRes), + sql_query_format_res(Res, SQLQuery). + + +sql_query_format_res({selected, _, Rows}, SQLQuery) -> + Res = + lists:flatmap( + fun(Row) -> + try + [(SQLQuery#sql_query.format_res)(Row)] + catch + Class:Reason -> + ST = erlang:get_stacktrace(), + ?ERROR_MSG("Error while processing " + "SQL query result: ~p~n" + "row: ~p", + [{Class, Reason, ST}, Row]), + [] + end + end, Rows), + {selected, Res}; +sql_query_format_res(Res, _SQLQuery) -> + Res. + %% Generate the OTP callback return tuple depending on the driver result. abort_on_driver_error({error, <<"query timed out">>} = Reply, @@ -606,6 +716,18 @@ pgsql_item_to_odbc(<<"UPDATE ", N/binary>>) -> pgsql_item_to_odbc({error, Error}) -> {error, Error}; pgsql_item_to_odbc(_) -> {updated, undefined}. +pgsql_execute_to_odbc({ok, {<<"SELECT", _/binary>>, Rows}}) -> + {selected, [], [[Field || {_, Field} <- Row] || Row <- Rows]}; +pgsql_execute_to_odbc({ok, {'INSERT', N}}) -> + {updated, N}; +pgsql_execute_to_odbc({ok, {'DELETE', N}}) -> + {updated, N}; +pgsql_execute_to_odbc({ok, {'UPDATE', N}}) -> + {updated, N}; +pgsql_execute_to_odbc({error, Error}) -> {error, Error}; +pgsql_execute_to_odbc(_) -> {updated, undefined}. + + %% == Native MySQL code %% part of init/1 @@ -800,6 +922,10 @@ fsm_limit_opts() -> _ -> [] end. +check_error({error, Why} = Err, #sql_query{} = Query) -> + ?ERROR_MSG("SQL query '~s' at ~p failed: ~p", + [Query#sql_query.hash, Query#sql_query.loc, Why]), + Err; check_error({error, Why} = Err, Query) -> ?ERROR_MSG("SQL query '~s' failed: ~p", [Query, Why]), Err; |