diff options
Diffstat (limited to 'src/ejabberd_s2s_out.erl')
-rw-r--r-- | src/ejabberd_s2s_out.erl | 71 |
1 files changed, 55 insertions, 16 deletions
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index da2c93c5..8e4394d6 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -32,7 +32,8 @@ -record(state, {socket, receiver, streamid, myname, server, xmlpid, queue, - new = false, verify = false}). + new = false, verify = false, + timer}). %-define(DBGFSM, true). @@ -91,12 +92,13 @@ init([From, Server, Type]) -> {verify, Pid, Key, SID} -> {false, {Pid, Key, SID}} end, + Timer = erlang:start_timer(?S2STIMEOUT, self(), []), {ok, open_socket, #state{queue = queue:new(), myname = From, server = Server, new = New, - verify = Verify}, - ?S2STIMEOUT}. + verify = Verify, + timer = Timer}}. %%---------------------------------------------------------------------- %% Func: StateName/2 @@ -123,8 +125,7 @@ open_socket(init, StateData) -> {next_state, wait_for_stream, StateData#state{socket = Socket, xmlpid = XMLStreamPid, - streamid = new_id()}, - ?S2STIMEOUT}; + streamid = new_id()}}; {error, Reason} -> ?DEBUG("s2s_out: inet6 connect return ~p~n", [Reason]), Error = ?ERR_REMOTE_SERVER_NOT_FOUND, @@ -174,8 +175,7 @@ wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> {"id", SID}], [{xmlcdata, Key2}]}) end, - {next_state, wait_for_validation, - StateData#state{new = New}, ?S2STIMEOUT}; + {next_state, wait_for_validation, StateData#state{new = New}}; _ -> send_text(StateData#state.socket, ?INVALID_NAMESPACE_ERR), {stop, normal, StateData} @@ -201,7 +201,7 @@ wait_for_validation({xmlstreamelement, El}, StateData) -> case Type of "valid" -> send_queue(StateData#state.socket, StateData#state.queue), - {next_state, stream_established, StateData, ?S2STIMEOUT}; + {next_state, stream_established, StateData}; _ -> % TODO: bounce packets {stop, normal, StateData} @@ -210,7 +210,7 @@ wait_for_validation({xmlstreamelement, El}, StateData) -> ?INFO_MSG("recv verify: ~p", [{From, To, Id, Type}]), case StateData#state.verify of false -> - {next_state, wait_for_validation, StateData, ?S2STIMEOUT}; + {next_state, wait_for_validation, StateData}; {Pid, _Key, _SID} -> case Type of "valid" -> @@ -227,7 +227,7 @@ wait_for_validation({xmlstreamelement, El}, StateData) -> {stop, normal, StateData} end; _ -> - {next_state, wait_for_validation, StateData, ?S2STIMEOUT} + {next_state, wait_for_validation, StateData} end; wait_for_validation({xmlstreamend, Name}, StateData) -> @@ -270,7 +270,7 @@ stream_established({xmlstreamelement, El}, StateData) -> _ -> ok end, - {next_state, stream_established, StateData, ?S2STIMEOUT}; + {next_state, stream_established, StateData}; stream_established({xmlstreamend, Name}, StateData) -> {stop, normal, StateData}; @@ -334,24 +334,40 @@ code_change(OldVsn, StateName, StateData, Extra) -> %%---------------------------------------------------------------------- handle_info({send_text, Text}, StateName, StateData) -> send_text(StateData#state.socket, Text), - {next_state, StateName, StateData}; + cancel_timer(StateData#state.timer), + Timer = erlang:start_timer(?S2STIMEOUT, self(), []), + {next_state, StateName, StateData#state{timer = Timer}}; + handle_info({send_element, El}, StateName, StateData) -> + cancel_timer(StateData#state.timer), + Timer = erlang:start_timer(?S2STIMEOUT, self(), []), case StateName of stream_established -> send_element(StateData#state.socket, El), - {next_state, StateName, StateData}; + {next_state, StateName, StateData#state{timer = Timer}}; _ -> Q = queue:in(El, StateData#state.queue), - {next_state, StateName, StateData#state{queue = Q}} + {next_state, StateName, StateData#state{queue = Q, + timer = Timer}} end; + handle_info({tcp, Socket, Data}, StateName, StateData) -> xml_stream:send_text(StateData#state.xmlpid, Data), {next_state, StateName, StateData}; + handle_info({tcp_closed, Socket}, StateName, StateData) -> gen_fsm:send_event(self(), closed), {next_state, StateName, StateData}; + handle_info({tcp_error, Socket, Reason}, StateName, StateData) -> gen_fsm:send_event(self(), closed), + {next_state, StateName, StateData}; + +handle_info({timeout, Timer, _}, StateName, + #state{timer = Timer} = StateData) -> + {stop, normal, StateData}; + +handle_info(_, StateName, StateData) -> {next_state, StateName, StateData}. %%---------------------------------------------------------------------- @@ -361,6 +377,8 @@ handle_info({tcp_error, Socket, Reason}, StateName, StateData) -> %%---------------------------------------------------------------------- terminate(Reason, StateName, StateData) -> ?INFO_MSG("terminated: ~p", [Reason]), + Error = ?ERR_REMOTE_SERVER_NOT_FOUND, + bounce_queue(StateData#state.queue, Error), case StateData#state.new of false -> ok; @@ -396,13 +414,34 @@ send_queue(Socket, Q) -> ok end. +bounce_queue(Q, Error) -> + case queue:out(Q) of + {{value, El}, Q1} -> + Err = jlib:make_error_reply(El, Error), + From = jlib:string_to_jid(xml:get_tag_attr_s("from", El)), + To = jlib:string_to_jid(xml:get_tag_attr_s("to", El)), + ejabberd_router:route(To, From, Err), + bounce_queue(Q1, Error); + {empty, Q1} -> + ok + end. + new_id() -> randoms:get_string(). +cancel_timer(Timer) -> + erlang:cancel_timer(Timer), + receive + {timeout, Timer, _} -> + ok + after 0 -> + ok + end. + bounce_messages(Error) -> receive {send_element, El} -> - {xmlelement, Name, Attrs, SubTags} = El, + {xmlelement, _Name, Attrs, _SubTags} = El, case xml:get_attr_s("type", Attrs) of "error" -> ok; @@ -410,7 +449,7 @@ bounce_messages(Error) -> Err = jlib:make_error_reply(El, Error), From = jlib:string_to_jid(xml:get_attr_s("from", Attrs)), To = jlib:string_to_jid(xml:get_attr_s("to", Attrs)), - ejabberd_router ! {route, To, From, Err} + ejabberd_router:route(To, From, Err) end, bounce_messages(Error) after 0 -> |