summaryrefslogtreecommitdiff
path: root/src/odbc/ejabberd_odbc.erl
diff options
context:
space:
mode:
authorMickaël Rémond <mickael.remond@process-one.net>2009-05-21 16:19:33 +0000
committerMickaël Rémond <mickael.remond@process-one.net>2009-05-21 16:19:33 +0000
commitb8f094b080349dbf7e827aec7f1bfbd983be6675 (patch)
tree9b9df623d738e997fcab84dc2255f390f910669a /src/odbc/ejabberd_odbc.erl
parentPubSub: improve get_entity_* API (diff)
* trunk/src/odbc/ejabberd_odbc.erl: Support for nested transaction (EJABS-859) (EJAB-940) (CR-EJAB-10)
SVN Revision: 2092
Diffstat (limited to 'src/odbc/ejabberd_odbc.erl')
-rw-r--r--src/odbc/ejabberd_odbc.erl174
1 files changed, 106 insertions, 68 deletions
diff --git a/src/odbc/ejabberd_odbc.erl b/src/odbc/ejabberd_odbc.erl
index 09a0ec8c..046690cd 100644
--- a/src/odbc/ejabberd_odbc.erl
+++ b/src/odbc/ejabberd_odbc.erl
@@ -70,8 +70,8 @@ start_link(Host, StartInterval) ->
gen_server:start_link(ejabberd_odbc, [Host, StartInterval], []).
sql_query(Host, Query) ->
- gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
- {sql_query, Query}, ?TRANSACTION_TIMEOUT).
+ Msg = {sql_query, Query},
+ sql_call(Host, Msg,).
%% SQL transaction based on a list of queries
%% This function automatically
@@ -85,13 +85,24 @@ sql_transaction(Host, Queries) when is_list(Queries) ->
sql_transaction(Host, F);
%% SQL transaction, based on a erlang anonymous function (F = fun)
sql_transaction(Host, F) ->
- gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
- {sql_transaction, F}, ?TRANSACTION_TIMEOUT).
+ Msg = {sql_transaction, F},
+ sql_call(Host, Msg).
%% SQL bloc, based on a erlang anonymous function (F = fun)
sql_bloc(Host, F) ->
- gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
- {sql_bloc, F}, ?TRANSACTION_TIMEOUT).
+ Msg = {sql_bloc, F},
+ sql_call(Host, Msg).
+
+sql_call(Host, Msg) ->
+ case get(?STATE_KEY) of
+ undefined ->
+ gen_server:call(ejabberd_odbc_sup:get_random_pid(Host),
+ Msg, ?TRANSACTION_TIMEOUT);
+ State ->
+ %% Query, Transaction or Bloc nested inside transaction
+ nested_op(Msg, State)
+ end.
+
%% This function is intended to be used from inside an sql_transaction:
sql_query_t(Query) ->
@@ -173,42 +184,8 @@ init([Host, StartInterval]) ->
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
-handle_call({sql_query, Query}, _From, State) ->
- case sql_query_internal(State, Query) of
- % error returned by MySQL driver
- {error, "query timed out"} = Reply ->
- {stop, timeout, Reply, State};
- % error returned by MySQL driver
- {error, "Failed sending data on socket"++_} = Reply ->
- {stop, closed, Reply, State};
- Reply ->
- {reply, Reply, State}
- end;
-handle_call({sql_transaction, F}, _From, State) ->
- case execute_transaction(State, F, ?MAX_TRANSACTION_RESTARTS, "") of
- % error returned by MySQL driver
- {error, "query timed out"} ->
- {stop, timeout, State};
- % error returned by MySQL driver
- {error, "Failed sending data on socket"++_} = Reply ->
- {stop, closed, Reply, State};
- Reply ->
- {reply, Reply, State}
- end;
-handle_call({sql_bloc, F}, _From, State) ->
- case execute_bloc(State, F) of
- % error returned by MySQL driver
- {error, "query timed out"} ->
- {stop, timeout, State};
- % error returned by MySQL driver
- {error, "Failed sending data on socket"++_} = Reply ->
- {stop, closed, Reply, State};
- Reply ->
- {reply, Reply, State}
- end;
-handle_call(_Request, _From, State) ->
- Reply = ok,
- {reply, Reply, State}.
+handle_call(Command, _From, State) ->
+ dispatch_sql_command(Command, State).
%%----------------------------------------------------------------------
%% Func: handle_cast/2
@@ -256,14 +233,36 @@ terminate(_Reason, State) ->
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
+dispatch_sql_command({sql_query, Query}, State) ->
+ abort_on_driver_error(sql_query_internal(State, Query), State);
+dispatch_sql_command({sql_transaction, F}, State) ->
+ abort_on_driver_error(
+ execute_transaction(State, F, ?MAX_TRANSACTION_RESTARTS, ""), State);
+dispatch_sql_command({sql_bloc, F}, State) ->
+ abort_on_driver_error(execute_bloc(State, F), State);
+dispatch_sql_command(Request, State) ->
+ ?WARNING_MSG("Unexpected call ~p.", [Request]),
+ {reply, ok, State}.
+
sql_query_internal(State, Query) ->
- case State#state.db_type of
- odbc ->
- odbc:sql_query(State#state.db_ref, Query);
- pgsql ->
- pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
- mysql ->
- mysql_to_odbc(mysql_conn:fetch(State#state.db_ref, Query, self()))
+ Nested = case get(?STATE_KEY) of
+ undefined -> put(?STATE_KEY, State), false;
+ _State -> true
+ end,
+ Result = case State#state.db_type of
+ odbc ->
+ odbc:sql_query(State#state.db_ref, Query);
+ pgsql ->
+ pgsql_to_odbc(pgsql:squery(State#state.db_ref, Query));
+ mysql ->
+ ?DEBUG("MySQL, Send query~n~p~n", [Query]),
+ R = mysql_to_odbc(mysql_conn:fetch(State#state.db_ref, Query, self())),
+ ?INFO_MSG("MySQL, Received result~n~p~n", [R]),
+ R
+ end,
+ case Nested of
+ true -> Result;
+ false -> erase(?STATE_KEY), Result
end.
execute_transaction(State, _F, 0, Reason) ->
@@ -277,30 +276,69 @@ execute_transaction(State, _F, 0, Reason) ->
sql_query_internal(State, "rollback;"),
{aborted, restarts_exceeded};
execute_transaction(State, F, NRestarts, _Reason) ->
- put(?STATE_KEY, State),
- sql_query_internal(State, "begin;"),
- case catch F() of
- {aborted, Reason} ->
- execute_transaction(State, F, NRestarts - 1, Reason);
- {'EXIT', Reason} ->
- sql_query_internal(State, "rollback;"),
- {aborted, Reason};
- Res ->
- sql_query_internal(State, "commit;"),
- {atomic, Res}
+ Nested = case get(?STATE_KEY) of
+ undefined ->
+ put(?STATE_KEY, State),
+ sql_query_internal(State, "begin;"),
+ false;
+ _State ->
+ true
+ end,
+ Result = case catch F() of
+ {aborted, Reason} ->
+ execute_transaction(State, F, NRestarts - 1, Reason);
+ {'EXIT', Reason} ->
+ sql_query_internal(State, "rollback;"),
+ {aborted, Reason};
+ Res ->
+ {atomic, Res}
+ end,
+ case Nested of
+ true -> Result;
+ false -> sql_query_internal(State, "commit;"), erase(?STATE_KEY), Result
end.
execute_bloc(State, F) ->
- put(?STATE_KEY, State),
- case catch F() of
- {aborted, Reason} ->
- {aborted, Reason};
- {'EXIT', Reason} ->
- {aborted, Reason};
- Res ->
- {atomic, Res}
+ Nested = case get(?STATE_KEY) of
+ undefined -> put(?STATE_KEY, State), false;
+ _State -> true
+ end,
+ Result = case catch F() of
+ {aborted, Reason} ->
+ {aborted, Reason};
+ {'EXIT', Reason} ->
+ {aborted, Reason};
+ Res ->
+ {atomic, Res}
+ end,
+ case Nested of
+ true -> Result;
+ false -> erase(?STATE_KEY), Result
end.
+nested_op(Op, State) ->
+ case dispatch_sql_command(Op, State) of
+ {reply, Res, NewState} ->
+ put(?STATE_KEY, NewState),
+ Res;
+ {stop, _Reason, Reply, NewState} ->
+ put(?STATE_KEY, NewState),
+ throw({aborted, Reply});
+ {noreply, NewState} ->
+ put(?STATE_KEY, NewState),
+ exit({bad_op_in_nested_txn, Op})
+ end.
+
+abort_on_driver_error({error, "query timed out"} = Reply, State) ->
+ %% mysql driver error
+ {stop, timeout, Reply, State};
+abort_on_driver_error({error, "Failed sending data on socket"++_} = Reply, State) ->
+ %% mysql driver error
+ {stop, closed, Reply, State};
+abort_on_driver_error(Reply, State) ->
+ {reply, Reply, State}.
+
+
%% == pure ODBC code
%% part of init/1