aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2010-07-02 20:31:42 +1000
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2010-07-02 20:31:42 +1000
commit4c2e7e38a1170bacad0ecbe0244d048d11fd355c (patch)
tree569e9bf83d25b8f6c8ec48151a463892f25f8213 /src
parentRecompile the Guide and Configure (diff)
Use ets insead of asking supervisor in ejabberd_odbc_sup:get_pids/1 (Thanks to Alexey Shchepin)
Diffstat (limited to 'src')
-rw-r--r--src/odbc/ejabberd_odbc.erl2
-rw-r--r--src/odbc/ejabberd_odbc_sup.erl41
-rw-r--r--src/p1_fsm.erl70
3 files changed, 97 insertions, 16 deletions
diff --git a/src/odbc/ejabberd_odbc.erl b/src/odbc/ejabberd_odbc.erl
index d7cdd0371..1b07fd6d9 100644
--- a/src/odbc/ejabberd_odbc.erl
+++ b/src/odbc/ejabberd_odbc.erl
@@ -178,6 +178,7 @@ init([Host, StartInterval]) ->
end,
[DBType | _] = db_opts(Host),
?GEN_FSM:send_event(self(), connect),
+ ejabberd_odbc_sup:add_pid(Host, self()),
{ok, connecting, #state{db_type = DBType,
host = Host,
max_pending_requests_len = max_fsm_queue(),
@@ -274,6 +275,7 @@ handle_info(Info, StateName, State) ->
{next_state, StateName, State}.
terminate(_Reason, _StateName, State) ->
+ ejabberd_odbc_sup:remove_pid(State#state.host, self()),
case State#state.db_type of
mysql ->
%% old versions of mysql driver don't have the stop function
diff --git a/src/odbc/ejabberd_odbc_sup.erl b/src/odbc/ejabberd_odbc_sup.erl
index d828449ec..45ede1835 100644
--- a/src/odbc/ejabberd_odbc_sup.erl
+++ b/src/odbc/ejabberd_odbc_sup.erl
@@ -30,6 +30,8 @@
%% API
-export([start_link/1,
init/1,
+ add_pid/2,
+ remove_pid/2,
get_pids/1,
get_random_pid/1
]).
@@ -44,7 +46,19 @@
-define(CONNECT_TIMEOUT, 500). % milliseconds
+-record(sql_pool, {host, pid}).
+
start_link(Host) ->
+ mnesia:create_table(sql_pool,
+ [{ram_copies, [node()]},
+ {type, bag},
+ {local_content, true},
+ {attributes, record_info(fields, sql_pool)}]),
+ mnesia:add_table_copy(local_config, node(), ram_copies),
+ F = fun() ->
+ mnesia:delete({sql_pool, Host})
+ end,
+ mnesia:ets(F),
supervisor:start_link({local, gen_mod:get_module_proc(Host, ?MODULE)},
?MODULE, [Host]).
@@ -86,16 +100,25 @@ init([Host]) ->
end, lists:seq(1, PoolSize))}}.
get_pids(Host) ->
- Proc = gen_mod:get_module_proc(Host, ?MODULE),
-
- % throw an exception if supervisor is not ready (i.e. if it cannot
- % start its children, if the database is down for example)
- sys:get_status(Proc, ?CONNECT_TIMEOUT),
-
- [Child ||
- {_Id, Child, _Type, _Modules} <- supervisor:which_children(Proc),
- Child /= undefined].
+ Rs = mnesia:dirty_read(sql_pool, Host),
+ [R#sql_pool.pid || R <- Rs].
get_random_pid(Host) ->
Pids = get_pids(Host),
lists:nth(erlang:phash(now(), length(Pids)), Pids).
+
+add_pid(Host, Pid) ->
+ F = fun() ->
+ mnesia:write(
+ #sql_pool{host = Host,
+ pid = Pid})
+ end,
+ mnesia:ets(F).
+
+remove_pid(Host, Pid) ->
+ F = fun() ->
+ mnesia:delete_object(
+ #sql_pool{host = Host,
+ pid = Pid})
+ end,
+ mnesia:ets(F).
diff --git a/src/p1_fsm.erl b/src/p1_fsm.erl
index 03ff7f8ce..9ca924112 100644
--- a/src/p1_fsm.erl
+++ b/src/p1_fsm.erl
@@ -517,6 +517,25 @@ print_event(Dev, return, {Name, StateName}) ->
io:format(Dev, "*DBG* ~p switched to state ~w~n",
[Name, StateName]).
+relay_messages(MRef, TRef, Clone, Queue) ->
+ lists:foreach(
+ fun(Msg) -> Clone ! Msg end,
+ queue:to_list(Queue)),
+ relay_messages(MRef, TRef, Clone).
+
+relay_messages(MRef, TRef, Clone) ->
+ receive
+ {'DOWN', MRef, process, Clone, Reason} ->
+ Reason;
+ {'EXIT', _Parent, _Reason} ->
+ {migrated, Clone};
+ {timeout, TRef, timeout} ->
+ {migrated, Clone};
+ Msg ->
+ Clone ! Msg,
+ relay_messages(MRef, TRef, Clone)
+ end.
+
handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
Limits, Queue, QueueLen) -> %No debug here
From = from(Msg),
@@ -535,6 +554,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData, Mod, _Time,
reply(From, Reply),
loop(Parent, Name, NStateName, NStateData, Mod, Time1, [],
Limits, Queue, QueueLen);
+ {migrate, NStateData, {Node, M, F, A}, Time1} ->
+ Reason = case catch rpc:call(Node, M, F, A, 5000) of
+ {badrpc, _} = Err ->
+ {migration_error, Err};
+ {'EXIT', _} = Err ->
+ {migration_error, Err};
+ {error, _} = Err ->
+ {migration_error, Err};
+ {ok, Clone} ->
+ process_flag(trap_exit, true),
+ MRef = erlang:monitor(process, Clone),
+ TRef = erlang:start_timer(Time1, self(), timeout),
+ relay_messages(MRef, TRef, Clone, Queue);
+ Reply ->
+ {migration_error, {bad_reply, Reply}}
+ end,
+ terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, []);
{stop, Reason, Reply, NStateData} when From =/= undefined ->
@@ -571,6 +607,23 @@ handle_msg(Msg, Parent, Name, StateName, StateData,
Debug1 = reply(Name, From, Reply, Debug, NStateName),
loop(Parent, Name, NStateName, NStateData,
Mod, Time1, Debug1, Limits, Queue, QueueLen);
+ {migrate, NStateData, {Node, M, F, A}, Time1} ->
+ Reason = case catch rpc:call(Node, M, F, A, Time1) of
+ {badrpc, R} ->
+ {migration_error, R};
+ {'EXIT', R} ->
+ {migration_error, R};
+ {error, R} ->
+ {migration_error, R};
+ {ok, Clone} ->
+ process_flag(trap_exit, true),
+ MRef = erlang:monitor(process, Clone),
+ TRef = erlang:start_timer(Time1, self(), timeout),
+ relay_messages(MRef, TRef, Clone, Queue);
+ Reply ->
+ {migration_error, {bad_reply, Reply}}
+ end,
+ terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
{stop, Reason, NStateData} ->
terminate(Reason, Name, Msg, Mod, StateName, NStateData, Debug);
{stop, Reason, Reply, NStateData} when From =/= undefined ->
@@ -633,12 +686,10 @@ terminate(Reason, Name, Msg, Mod, StateName, StateData, Debug) ->
%% Priority shutdown should be considered as
%% shutdown by SASL
exit(shutdown);
- {process_limit, Limit} ->
- %% Priority shutdown should be considered as
- %% shutdown by SASL
- error_logger:error_msg("FSM limit reached (~p): ~p~n",
- [self(), Limit]),
- exit(shutdown);
+ {process_limit, _Limit} ->
+ exit(Reason);
+ {migrated, _Clone} ->
+ exit(normal);
_ ->
error_info(Mod, Reason, Name, Msg, StateName, StateData, Debug),
exit(Reason)
@@ -705,7 +756,12 @@ get_msg(Msg) -> Msg.
format_status(Opt, StatusData) ->
[PDict, SysState, Parent, Debug, [Name, StateName, StateData, Mod, _Time]] =
StatusData,
- Header = lists:concat(["Status for state machine ", Name]),
+ NameTag = if is_pid(Name) ->
+ pid_to_list(Name);
+ is_atom(Name) ->
+ Name
+ end,
+ Header = lists:concat(["Status for state machine ", NameTag]),
Log = sys:get_debug(log, Debug, []),
Specfic =
case erlang:function_exported(Mod, format_status, 2) of