summaryrefslogtreecommitdiff
path: root/src/ejabberd_s2s_out.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_s2s_out.erl')
-rw-r--r--src/ejabberd_s2s_out.erl71
1 files changed, 55 insertions, 16 deletions
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl
index da2c93c5..8e4394d6 100644
--- a/src/ejabberd_s2s_out.erl
+++ b/src/ejabberd_s2s_out.erl
@@ -32,7 +32,8 @@
-record(state, {socket, receiver, streamid,
myname, server, xmlpid, queue,
- new = false, verify = false}).
+ new = false, verify = false,
+ timer}).
%-define(DBGFSM, true).
@@ -91,12 +92,13 @@ init([From, Server, Type]) ->
{verify, Pid, Key, SID} ->
{false, {Pid, Key, SID}}
end,
+ Timer = erlang:start_timer(?S2STIMEOUT, self(), []),
{ok, open_socket, #state{queue = queue:new(),
myname = From,
server = Server,
new = New,
- verify = Verify},
- ?S2STIMEOUT}.
+ verify = Verify,
+ timer = Timer}}.
%%----------------------------------------------------------------------
%% Func: StateName/2
@@ -123,8 +125,7 @@ open_socket(init, StateData) ->
{next_state, wait_for_stream,
StateData#state{socket = Socket,
xmlpid = XMLStreamPid,
- streamid = new_id()},
- ?S2STIMEOUT};
+ streamid = new_id()}};
{error, Reason} ->
?DEBUG("s2s_out: inet6 connect return ~p~n", [Reason]),
Error = ?ERR_REMOTE_SERVER_NOT_FOUND,
@@ -174,8 +175,7 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) ->
{"id", SID}],
[{xmlcdata, Key2}]})
end,
- {next_state, wait_for_validation,
- StateData#state{new = New}, ?S2STIMEOUT};
+ {next_state, wait_for_validation, StateData#state{new = New}};
_ ->
send_text(StateData#state.socket, ?INVALID_NAMESPACE_ERR),
{stop, normal, StateData}
@@ -201,7 +201,7 @@ wait_for_validation({xmlstreamelement, El}, StateData) ->
case Type of
"valid" ->
send_queue(StateData#state.socket, StateData#state.queue),
- {next_state, stream_established, StateData, ?S2STIMEOUT};
+ {next_state, stream_established, StateData};
_ ->
% TODO: bounce packets
{stop, normal, StateData}
@@ -210,7 +210,7 @@ wait_for_validation({xmlstreamelement, El}, StateData) ->
?INFO_MSG("recv verify: ~p", [{From, To, Id, Type}]),
case StateData#state.verify of
false ->
- {next_state, wait_for_validation, StateData, ?S2STIMEOUT};
+ {next_state, wait_for_validation, StateData};
{Pid, _Key, _SID} ->
case Type of
"valid" ->
@@ -227,7 +227,7 @@ wait_for_validation({xmlstreamelement, El}, StateData) ->
{stop, normal, StateData}
end;
_ ->
- {next_state, wait_for_validation, StateData, ?S2STIMEOUT}
+ {next_state, wait_for_validation, StateData}
end;
wait_for_validation({xmlstreamend, Name}, StateData) ->
@@ -270,7 +270,7 @@ stream_established({xmlstreamelement, El}, StateData) ->
_ ->
ok
end,
- {next_state, stream_established, StateData, ?S2STIMEOUT};
+ {next_state, stream_established, StateData};
stream_established({xmlstreamend, Name}, StateData) ->
{stop, normal, StateData};
@@ -334,24 +334,40 @@ code_change(OldVsn, StateName, StateData, Extra) ->
%%----------------------------------------------------------------------
handle_info({send_text, Text}, StateName, StateData) ->
send_text(StateData#state.socket, Text),
- {next_state, StateName, StateData};
+ cancel_timer(StateData#state.timer),
+ Timer = erlang:start_timer(?S2STIMEOUT, self(), []),
+ {next_state, StateName, StateData#state{timer = Timer}};
+
handle_info({send_element, El}, StateName, StateData) ->
+ cancel_timer(StateData#state.timer),
+ Timer = erlang:start_timer(?S2STIMEOUT, self(), []),
case StateName of
stream_established ->
send_element(StateData#state.socket, El),
- {next_state, StateName, StateData};
+ {next_state, StateName, StateData#state{timer = Timer}};
_ ->
Q = queue:in(El, StateData#state.queue),
- {next_state, StateName, StateData#state{queue = Q}}
+ {next_state, StateName, StateData#state{queue = Q,
+ timer = Timer}}
end;
+
handle_info({tcp, Socket, Data}, StateName, StateData) ->
xml_stream:send_text(StateData#state.xmlpid, Data),
{next_state, StateName, StateData};
+
handle_info({tcp_closed, Socket}, StateName, StateData) ->
gen_fsm:send_event(self(), closed),
{next_state, StateName, StateData};
+
handle_info({tcp_error, Socket, Reason}, StateName, StateData) ->
gen_fsm:send_event(self(), closed),
+ {next_state, StateName, StateData};
+
+handle_info({timeout, Timer, _}, StateName,
+ #state{timer = Timer} = StateData) ->
+ {stop, normal, StateData};
+
+handle_info(_, StateName, StateData) ->
{next_state, StateName, StateData}.
%%----------------------------------------------------------------------
@@ -361,6 +377,8 @@ handle_info({tcp_error, Socket, Reason}, StateName, StateData) ->
%%----------------------------------------------------------------------
terminate(Reason, StateName, StateData) ->
?INFO_MSG("terminated: ~p", [Reason]),
+ Error = ?ERR_REMOTE_SERVER_NOT_FOUND,
+ bounce_queue(StateData#state.queue, Error),
case StateData#state.new of
false ->
ok;
@@ -396,13 +414,34 @@ send_queue(Socket, Q) ->
ok
end.
+bounce_queue(Q, Error) ->
+ case queue:out(Q) of
+ {{value, El}, Q1} ->
+ Err = jlib:make_error_reply(El, Error),
+ From = jlib:string_to_jid(xml:get_tag_attr_s("from", El)),
+ To = jlib:string_to_jid(xml:get_tag_attr_s("to", El)),
+ ejabberd_router:route(To, From, Err),
+ bounce_queue(Q1, Error);
+ {empty, Q1} ->
+ ok
+ end.
+
new_id() ->
randoms:get_string().
+cancel_timer(Timer) ->
+ erlang:cancel_timer(Timer),
+ receive
+ {timeout, Timer, _} ->
+ ok
+ after 0 ->
+ ok
+ end.
+
bounce_messages(Error) ->
receive
{send_element, El} ->
- {xmlelement, Name, Attrs, SubTags} = El,
+ {xmlelement, _Name, Attrs, _SubTags} = El,
case xml:get_attr_s("type", Attrs) of
"error" ->
ok;
@@ -410,7 +449,7 @@ bounce_messages(Error) ->
Err = jlib:make_error_reply(El, Error),
From = jlib:string_to_jid(xml:get_attr_s("from", Attrs)),
To = jlib:string_to_jid(xml:get_attr_s("to", Attrs)),
- ejabberd_router ! {route, To, From, Err}
+ ejabberd_router:route(To, From, Err)
end,
bounce_messages(Error)
after 0 ->