aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_receiver.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_receiver.erl')
-rw-r--r--src/ejabberd_receiver.erl270
1 files changed, 195 insertions, 75 deletions
diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl
index 204771c1a..d09bef666 100644
--- a/src/ejabberd_receiver.erl
+++ b/src/ejabberd_receiver.erl
@@ -10,20 +10,68 @@
-author('alexey@sevcom.net').
-vsn('$Revision$ ').
+-behaviour(gen_server).
+
+%% API
-export([start/3,
- receiver/4,
change_shaper/2,
reset_stream/1,
- starttls/2]).
+ starttls/2,
+ become_controller/1,
+ close/1]).
--include("ejabberd.hrl").
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-include("ejabberd.hrl").
+-record(state, {socket,
+ sock_mod,
+ shaper_state,
+ c2s_pid,
+ xml_stream_state,
+ timeout}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
start(Socket, SockMod, Shaper) ->
- proc_lib:spawn(?MODULE, receiver, [Socket, SockMod, Shaper, self()]).
+ {ok, Pid} = gen_server:start(
+ ?MODULE, [Socket, SockMod, Shaper, self()], []),
+ Pid.
+
+change_shaper(Pid, Shaper) ->
+ gen_server:cast(Pid, {change_shaper, Shaper}).
+reset_stream(Pid) ->
+ gen_server:call(Pid, reset_stream).
-receiver(Socket, SockMod, Shaper, C2SPid) ->
+starttls(Pid, TLSSocket) ->
+ gen_server:call(Pid, {starttls, TLSSocket}).
+
+become_controller(Pid) ->
+ gen_server:call(Pid, become_controller).
+
+close(Pid) ->
+ gen_server:cast(Pid, close).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([Socket, SockMod, Shaper, C2SPid]) ->
XMLStreamState = xml_stream:new(C2SPid),
ShaperState = shaper:new(Shaper),
Timeout = case SockMod of
@@ -32,77 +80,149 @@ receiver(Socket, SockMod, Shaper, C2SPid) ->
_ ->
infinity
end,
- receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout).
-
-receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) ->
- Res = (catch SockMod:recv(Socket, 0, Timeout)),
- receive
- {starttls, TLSSocket} ->
- xml_stream:close(XMLStreamState),
- XMLStreamState1 = xml_stream:new(C2SPid),
- TLSRes = case Res of
- {ok, Data} ->
- tls:recv_data(TLSSocket, Data);
- _ ->
- tls:recv_data(TLSSocket, "")
- end,
- receiver1(TLSSocket, tls,
- ShaperState, C2SPid, XMLStreamState1, Timeout,
- TLSRes);
- {change_timeout, NewTimeout} -> % Dirty hack
- receiver1(Socket, SockMod,
- ShaperState, C2SPid, XMLStreamState, NewTimeout,
- Res)
- after 0 ->
- receiver1(Socket, SockMod,
- ShaperState, C2SPid, XMLStreamState, Timeout,
- Res)
- end.
-
-
-receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) ->
- case Res of
- {ok, Text} ->
- ShaperSt1 = receive
- {change_shaper, Shaper} ->
- shaper:new(Shaper)
- after 0 ->
- ShaperState
- end,
- NewShaperState = shaper:update(ShaperSt1, size(Text)),
- XMLStreamState1 = receive
- reset_stream ->
- xml_stream:close(XMLStreamState),
- xml_stream:new(C2SPid)
- after 0 ->
- XMLStreamState
- end,
- XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text),
- receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2,
- Timeout);
- {error, timeout} ->
- receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState,
- Timeout);
- {error, Reason} ->
- xml_stream:close(XMLStreamState),
- gen_fsm:send_event(C2SPid, closed),
- ok;
- {'EXIT', Reason} ->
- ?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n",
- [Socket, SockMod, Reason]),
- xml_stream:close(XMLStreamState),
- gen_fsm:send_event(C2SPid, closed),
- ok
+ {ok, #state{socket = Socket,
+ sock_mod = SockMod,
+ shaper_state = ShaperState,
+ c2s_pid = C2SPid,
+ xml_stream_state = XMLStreamState,
+ timeout = Timeout}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({starttls, TLSSocket}, _From,
+ #state{xml_stream_state = XMLStreamState,
+ c2s_pid = C2SPid} = State) ->
+ xml_stream:close(XMLStreamState),
+ NewXMLStreamState = xml_stream:new(C2SPid),
+ NewState = State#state{socket = TLSSocket,
+ sock_mod = tls,
+ xml_stream_state = NewXMLStreamState},
+ case tls:recv_data(TLSSocket, "") of
+ {ok, TLSData} ->
+ {reply, ok, process_data(TLSData, NewState)};
+ {error, _Reason} ->
+ {stop, normal, ok, NewState}
+ end;
+handle_call(reset_stream, _From,
+ #state{xml_stream_state = XMLStreamState,
+ c2s_pid = C2SPid} = State) ->
+ xml_stream:close(XMLStreamState),
+ NewXMLStreamState = xml_stream:new(C2SPid),
+ Reply = ok,
+ {reply, Reply, State#state{xml_stream_state = NewXMLStreamState}};
+handle_call(become_controller, _From, State) ->
+ activate_socket(State),
+ Reply = ok,
+ {reply, Reply, State};
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast({change_shaper, Shaper}, State) ->
+ NewShaperState = shaper:new(Shaper),
+ {noreply, State#state{shaper_state = NewShaperState}};
+handle_cast(close, State) ->
+ {stop, normal, State};
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({Tag, _TCPSocket, Data},
+ #state{socket = Socket,
+ sock_mod = SockMod} = State)
+ when (Tag == tcp) or (Tag == ssl) ->
+ case SockMod of
+ tls ->
+ case tls:recv_data(Socket, Data) of
+ {ok, TLSData} ->
+ {noreply, process_data(TLSData, State)};
+ {error, _Reason} ->
+ {stop, normal, State}
+ end;
+ _ ->
+ {noreply, process_data(Data, State)}
+ end;
+handle_info({Tag, _TCPSocket}, State)
+ when (Tag == tcp_closed) or (Tag == ssl_closed) ->
+ {stop, normal, State};
+handle_info({Tag, _TCPSocket, Reason}, State)
+ when (Tag == tcp_error) or (Tag == ssl_error) ->
+ case Reason of
+ timeout ->
+ {noreply, State};
+ _ ->
+ {stop, normal, State}
+ end;
+handle_info({timeout, _Ref, activate}, State) ->
+ activate_socket(State),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, #state{xml_stream_state = XMLStreamState,
+ c2s_pid = C2SPid} = State) ->
+ xml_stream:close(XMLStreamState),
+ gen_fsm:send_event(C2SPid, closed),
+ catch (State#state.sock_mod):close(State#state.socket),
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
+activate_socket(#state{socket = Socket,
+ sock_mod = SockMod}) ->
+ case SockMod of
+ gen_tcp ->
+ inet:setopts(Socket, [{active, once}]);
+ _ ->
+ SockMod:setopts(Socket, [{active, once}])
end.
-change_shaper(Pid, Shaper) ->
- Pid ! {change_shaper, Shaper}.
-
-reset_stream(Pid) ->
- Pid ! reset_stream.
-
-starttls(Pid, TLSSocket) ->
- Pid ! {starttls, TLSSocket}.
-
+process_data(Data,
+ #state{xml_stream_state = XMLStreamState,
+ shaper_state = ShaperState} = State) ->
+ XMLStreamState1 = xml_stream:parse(XMLStreamState, Data),
+ {NewShaperState, Pause} = shaper:update(ShaperState, size(Data)),
+ if
+ Pause > 0 ->
+ erlang:start_timer(Pause, self(), activate);
+ true ->
+ activate_socket(State)
+ end,
+ State#state{xml_stream_state = XMLStreamState1,
+ shaper_state = NewShaperState}.