From 9fc203ee6d93d2d630f5e427678df5fa2dcd7e8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micka=C3=ABl=20R=C3=A9mond?= Date: Fri, 14 Sep 2007 14:15:44 +0000 Subject: open up to 3 s2s outgoing connection per domain pair SVN Revision: 928 --- src/ejabberd_s2s.erl | 121 ++++++++++++++++++++++++++++++--------------------- 1 file changed, 71 insertions(+), 50 deletions(-) (limited to 'src/ejabberd_s2s.erl') diff --git a/src/ejabberd_s2s.erl b/src/ejabberd_s2s.erl index c9e40efbc..e20870272 100644 --- a/src/ejabberd_s2s.erl +++ b/src/ejabberd_s2s.erl @@ -3,12 +3,12 @@ %%% Author : Alexey Shchepin %%% Purpose : S2S connections manager %%% Created : 7 Dec 2002 by Alexey Shchepin -%%% Id : $Id$ +%%% Id : $Id: ejabberd_s2s.erl 820 2007-07-19 21:17:13Z mremond $ %%%---------------------------------------------------------------------- -module(ejabberd_s2s). -author('alexey@sevcom.net'). --vsn('$Revision$ '). +-vsn('$Revision: 820 $ '). -behaviour(gen_server). @@ -16,9 +16,8 @@ -export([start_link/0, route/3, have_connection/1, - get_key/1, + has_key/2, try_register/1, - remove_connection/1, remove_connection/3, dirty_get_connections/0, allow_host/2, @@ -55,14 +54,9 @@ route(From, To, Packet) -> ok end. -remove_connection(FromTo) -> - F = fun() -> - mnesia:delete({s2s, FromTo}) - end, - mnesia:transaction(F). - remove_connection(FromTo, Pid, Key) -> - case catch mnesia:dirty_read(s2s, FromTo) of + ?ERROR_MSG("XXXXXXXXXXX ~p~n", [{FromTo, Pid, Key}]), + case catch mnesia:dirty_match_object(s2s, {s2s, FromTo, Pid, '_'}) of [#s2s{pid = Pid, key = Key}] -> F = fun() -> mnesia:delete_object(#s2s{fromto = FromTo, @@ -82,23 +76,27 @@ have_connection(FromTo) -> false end. -get_key(FromTo) -> - case catch mnesia:dirty_read(s2s, FromTo) of - [E] -> - E#s2s.key; +has_key(FromTo, Key) -> + case mnesia:dirty_select(s2s, + [{#s2s{fromto = FromTo, key = Key, _ = '_'}, + [], + ['$_']}]) of + [] -> + false; _ -> - error + true end. try_register(FromTo) -> Key = randoms:get_string(), + Max_S2S_Connexions_Number = 3, F = fun() -> case mnesia:read({s2s, FromTo}) of - [] -> - mnesia:write(#s2s{fromto = FromTo, - pid = self(), - key = Key}), - {key, Key}; + L when length(L) < Max_S2S_Connexions_Number -> + mnesia:write(#s2s{fromto = FromTo, + pid = self(), + key = Key}), + {key, Key}; _ -> false end @@ -126,9 +124,10 @@ dirty_get_connections() -> %%-------------------------------------------------------------------- init([]) -> update_tables(), - mnesia:create_table(s2s, [{ram_copies, [node()]}, + mnesia:create_table(s2s, [{ram_copies, [node()]}, {type, bag}, {attributes, record_info(fields, s2s)}]), mnesia:add_table_copy(s2s, node(), ram_copies), + mnesia:add_table_index(s2s, key), mnesia:subscribe(system), ejabberd_ctl:register_commands( [{"incoming-s2s-number", "print number of incoming s2s connections on the node"}, @@ -224,7 +223,7 @@ do_route(From, To, Packet) -> Attrs), send_element(Pid, {xmlelement, Name, NewAttrs, Els}), ok; - {aborted, Reason} -> + {aborted, _Reason} -> case xml:get_tag_attr_s("type", Packet) of "error" -> ok; "result" -> ok; @@ -240,6 +239,8 @@ find_connection(From, To) -> #jid{lserver = MyServer} = From, #jid{lserver = Server} = To, FromTo = {MyServer, Server}, + Max_S2S_Connexions_Number = 3, + ?ERROR_MSG("XXX Finding connection for ~p~n", [FromTo]), case catch mnesia:dirty_read(s2s, FromTo) of {'EXIT', Reason} -> {aborted, Reason}; @@ -250,36 +251,49 @@ find_connection(From, To) -> case {is_service(From, To), allow_host(MyServer, Server)} of {false, true} -> - ?DEBUG("starting new s2s connection~n", []), - Key = randoms:get_string(), - {ok, Pid} = ejabberd_s2s_out:start( - MyServer, Server, {new, Key}), - F = fun() -> - case mnesia:read({s2s, FromTo}) of - [El] -> - El#s2s.pid; - [] -> - mnesia:write(#s2s{fromto = FromTo, - pid = Pid, - key = Key}), - Pid - end - end, - TRes = mnesia:transaction(F), - case TRes of - {atomic, Pid} -> - ejabberd_s2s_out:start_connection(Pid); - _ -> - ejabberd_s2s_out:stop_connection(Pid) - end, - TRes; + new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number); _ -> {aborted, error} end; - [El] -> - {atomic, El#s2s.pid} + L when is_list(L) , length(L) < Max_S2S_Connexions_Number -> + %% We establish another connection for this pair. + new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number); + L when is_list(L) -> + %% We choose a connexion from the pool of opened ones. + {atomic, choose_connection(From, L)} end. +choose_connection(From, Connections) -> + El = lists:nth(erlang:phash(From, length(Connections)), Connections), + %El = lists:nth(random:uniform(length(Connections)), Connections), + ?ERROR_MSG("XXX using ejabberd_s2s_out ~p~n", [El#s2s.pid]), + El#s2s.pid. + +new_connection(MyServer, Server, From, FromTo, Max_S2S_Connexions_Number) -> + Key = randoms:get_string(), + {ok, Pid} = ejabberd_s2s_out:start( + MyServer, Server, {new, Key}), + F = fun() -> + case mnesia:read({s2s, FromTo}) of + L when length(L) < Max_S2S_Connexions_Number -> + mnesia:write(#s2s{fromto = FromTo, + pid = Pid, + key = Key}), + ?ERROR_MSG("XXX new s2s connection started ~p~n", [Pid]), + Pid; + L -> + choose_connection(From, L) + end + end, + TRes = mnesia:transaction(F), + case TRes of + {atomic, Pid} -> + ejabberd_s2s_out:start_connection(Pid); + _ -> + ejabberd_s2s_out:stop_connection(Pid) + end, + TRes. + %%-------------------------------------------------------------------- %% Function: is_service(From, To) -> true | false @@ -321,6 +335,15 @@ ctl_process(Val, _Args) -> Val. update_tables() -> + case catch mnesia:table_info(s2s, type) of + bag -> + ok; + {'EXIT', _} -> + ok; + _ -> + % XXX TODO convert it ? + mnesia:delete_table(s2s) + end, case catch mnesia:table_info(s2s, attributes) of [fromto, node, key] -> mnesia:transform_table(s2s, ignore, [fromto, pid, key]), @@ -349,5 +372,3 @@ allow_host(MyServer, S2SHost) -> _ -> true %% The default s2s policy is allow end end. - - -- cgit v1.2.3