diff options
Diffstat (limited to 'src/ejabberd_receiver.erl')
-rw-r--r-- | src/ejabberd_receiver.erl | 270 |
1 files changed, 195 insertions, 75 deletions
diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl index 204771c1a..d09bef666 100644 --- a/src/ejabberd_receiver.erl +++ b/src/ejabberd_receiver.erl @@ -10,20 +10,68 @@ -author('alexey@sevcom.net'). -vsn('$Revision$ '). +-behaviour(gen_server). + +%% API -export([start/3, - receiver/4, change_shaper/2, reset_stream/1, - starttls/2]). + starttls/2, + become_controller/1, + close/1]). --include("ejabberd.hrl"). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-include("ejabberd.hrl"). +-record(state, {socket, + sock_mod, + shaper_state, + c2s_pid, + xml_stream_state, + timeout}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- start(Socket, SockMod, Shaper) -> - proc_lib:spawn(?MODULE, receiver, [Socket, SockMod, Shaper, self()]). + {ok, Pid} = gen_server:start( + ?MODULE, [Socket, SockMod, Shaper, self()], []), + Pid. + +change_shaper(Pid, Shaper) -> + gen_server:cast(Pid, {change_shaper, Shaper}). +reset_stream(Pid) -> + gen_server:call(Pid, reset_stream). -receiver(Socket, SockMod, Shaper, C2SPid) -> +starttls(Pid, TLSSocket) -> + gen_server:call(Pid, {starttls, TLSSocket}). + +become_controller(Pid) -> + gen_server:call(Pid, become_controller). + +close(Pid) -> + gen_server:cast(Pid, close). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Socket, SockMod, Shaper, C2SPid]) -> XMLStreamState = xml_stream:new(C2SPid), ShaperState = shaper:new(Shaper), Timeout = case SockMod of @@ -32,77 +80,149 @@ receiver(Socket, SockMod, Shaper, C2SPid) -> _ -> infinity end, - receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout). - -receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) -> - Res = (catch SockMod:recv(Socket, 0, Timeout)), - receive - {starttls, TLSSocket} -> - xml_stream:close(XMLStreamState), - XMLStreamState1 = xml_stream:new(C2SPid), - TLSRes = case Res of - {ok, Data} -> - tls:recv_data(TLSSocket, Data); - _ -> - tls:recv_data(TLSSocket, "") - end, - receiver1(TLSSocket, tls, - ShaperState, C2SPid, XMLStreamState1, Timeout, - TLSRes); - {change_timeout, NewTimeout} -> % Dirty hack - receiver1(Socket, SockMod, - ShaperState, C2SPid, XMLStreamState, NewTimeout, - Res) - after 0 -> - receiver1(Socket, SockMod, - ShaperState, C2SPid, XMLStreamState, Timeout, - Res) - end. - - -receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) -> - case Res of - {ok, Text} -> - ShaperSt1 = receive - {change_shaper, Shaper} -> - shaper:new(Shaper) - after 0 -> - ShaperState - end, - NewShaperState = shaper:update(ShaperSt1, size(Text)), - XMLStreamState1 = receive - reset_stream -> - xml_stream:close(XMLStreamState), - xml_stream:new(C2SPid) - after 0 -> - XMLStreamState - end, - XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text), - receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2, - Timeout); - {error, timeout} -> - receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, - Timeout); - {error, Reason} -> - xml_stream:close(XMLStreamState), - gen_fsm:send_event(C2SPid, closed), - ok; - {'EXIT', Reason} -> - ?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n", - [Socket, SockMod, Reason]), - xml_stream:close(XMLStreamState), - gen_fsm:send_event(C2SPid, closed), - ok + {ok, #state{socket = Socket, + sock_mod = SockMod, + shaper_state = ShaperState, + c2s_pid = C2SPid, + xml_stream_state = XMLStreamState, + timeout = Timeout}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({starttls, TLSSocket}, _From, + #state{xml_stream_state = XMLStreamState, + c2s_pid = C2SPid} = State) -> + xml_stream:close(XMLStreamState), + NewXMLStreamState = xml_stream:new(C2SPid), + NewState = State#state{socket = TLSSocket, + sock_mod = tls, + xml_stream_state = NewXMLStreamState}, + case tls:recv_data(TLSSocket, "") of + {ok, TLSData} -> + {reply, ok, process_data(TLSData, NewState)}; + {error, _Reason} -> + {stop, normal, ok, NewState} + end; +handle_call(reset_stream, _From, + #state{xml_stream_state = XMLStreamState, + c2s_pid = C2SPid} = State) -> + xml_stream:close(XMLStreamState), + NewXMLStreamState = xml_stream:new(C2SPid), + Reply = ok, + {reply, Reply, State#state{xml_stream_state = NewXMLStreamState}}; +handle_call(become_controller, _From, State) -> + activate_socket(State), + Reply = ok, + {reply, Reply, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast({change_shaper, Shaper}, State) -> + NewShaperState = shaper:new(Shaper), + {noreply, State#state{shaper_state = NewShaperState}}; +handle_cast(close, State) -> + {stop, normal, State}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info({Tag, _TCPSocket, Data}, + #state{socket = Socket, + sock_mod = SockMod} = State) + when (Tag == tcp) or (Tag == ssl) -> + case SockMod of + tls -> + case tls:recv_data(Socket, Data) of + {ok, TLSData} -> + {noreply, process_data(TLSData, State)}; + {error, _Reason} -> + {stop, normal, State} + end; + _ -> + {noreply, process_data(Data, State)} + end; +handle_info({Tag, _TCPSocket}, State) + when (Tag == tcp_closed) or (Tag == ssl_closed) -> + {stop, normal, State}; +handle_info({Tag, _TCPSocket, Reason}, State) + when (Tag == tcp_error) or (Tag == ssl_error) -> + case Reason of + timeout -> + {noreply, State}; + _ -> + {stop, normal, State} + end; +handle_info({timeout, _Ref, activate}, State) -> + activate_socket(State), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, #state{xml_stream_state = XMLStreamState, + c2s_pid = C2SPid} = State) -> + xml_stream:close(XMLStreamState), + gen_fsm:send_event(C2SPid, closed), + catch (State#state.sock_mod):close(State#state.socket), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +activate_socket(#state{socket = Socket, + sock_mod = SockMod}) -> + case SockMod of + gen_tcp -> + inet:setopts(Socket, [{active, once}]); + _ -> + SockMod:setopts(Socket, [{active, once}]) end. -change_shaper(Pid, Shaper) -> - Pid ! {change_shaper, Shaper}. - -reset_stream(Pid) -> - Pid ! reset_stream. - -starttls(Pid, TLSSocket) -> - Pid ! {starttls, TLSSocket}. - +process_data(Data, + #state{xml_stream_state = XMLStreamState, + shaper_state = ShaperState} = State) -> + XMLStreamState1 = xml_stream:parse(XMLStreamState, Data), + {NewShaperState, Pause} = shaper:update(ShaperState, size(Data)), + if + Pause > 0 -> + erlang:start_timer(Pause, self(), activate); + true -> + activate_socket(State) + end, + State#state{xml_stream_state = XMLStreamState1, + shaper_state = NewShaperState}. |