diff options
Diffstat (limited to 'src/ejabberd_receiver.erl')
-rw-r--r-- | src/ejabberd_receiver.erl | 241 |
1 files changed, 127 insertions, 114 deletions
diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl index 7e93feeb9..c9ed6b350 100644 --- a/src/ejabberd_receiver.erl +++ b/src/ejabberd_receiver.erl @@ -25,6 +25,7 @@ %%%---------------------------------------------------------------------- -module(ejabberd_receiver). + -author('alexey@process-one.net'). -behaviour(gen_server). @@ -41,18 +42,19 @@ close/1]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). -include("ejabberd.hrl"). --record(state, {socket, - sock_mod, - shaper_state, - c2s_pid, - max_stanza_size, - xml_stream_state, - timeout}). +-record(state, + {socket :: inet:socket() | tls:tls_socket() | ejabberd_zlib:zlib_socket(), + sock_mod = gen_tcp :: gen_tcp | tls | ejabberd_zlib, + shaper_state = none :: shaper:shaper(), + c2s_pid :: pid(), + max_stanza_size = infinity :: non_neg_integer() | infinity, + xml_stream_state :: xml_stream:xml_stream_state(), + timeout = infinity:: timeout()}). -define(HIBERNATE_TIMEOUT, 90000). @@ -63,9 +65,16 @@ %% Function: start_link() -> {ok,Pid} | ignore | {error,Error} %% Description: Starts the server %%-------------------------------------------------------------------- +-spec start_link(inet:socket(), atom(), shaper:shaper(), + non_neg_integer() | infinity) -> ignore | + {error, any()} | + {ok, pid()}. + start_link(Socket, SockMod, Shaper, MaxStanzaSize) -> - gen_server:start_link( - ?MODULE, [Socket, SockMod, Shaper, MaxStanzaSize], []). + gen_server:start_link(?MODULE, + [Socket, SockMod, Shaper, MaxStanzaSize], []). + +-spec start(inet:socket(), atom(), shaper:shaper()) -> undefined | pid(). %%-------------------------------------------------------------------- %% Function: start() -> {ok,Pid} | ignore | {error,Error} @@ -74,30 +83,46 @@ start_link(Socket, SockMod, Shaper, MaxStanzaSize) -> start(Socket, SockMod, Shaper) -> start(Socket, SockMod, Shaper, infinity). +-spec start(inet:socket(), atom(), shaper:shaper(), + non_neg_integer() | infinity) -> undefined | pid(). + start(Socket, SockMod, Shaper, MaxStanzaSize) -> - {ok, Pid} = supervisor:start_child( - ejabberd_receiver_sup, - [Socket, SockMod, Shaper, MaxStanzaSize]), + {ok, Pid} = + supervisor:start_child(ejabberd_receiver_sup, + [Socket, SockMod, Shaper, MaxStanzaSize]), Pid. +-spec change_shaper(pid(), shaper:shaper()) -> ok. + change_shaper(Pid, Shaper) -> gen_server:cast(Pid, {change_shaper, Shaper}). -reset_stream(Pid) -> - do_call(Pid, reset_stream). +-spec reset_stream(pid()) -> ok | {error, any()}. + +reset_stream(Pid) -> do_call(Pid, reset_stream). + +-spec starttls(pid(), iodata()) -> {ok, tls:tls_socket()} | {error, any()}. starttls(Pid, TLSSocket) -> do_call(Pid, {starttls, TLSSocket}). +-spec compress(pid(), iodata() | undefined) -> {error, any()} | + {ok, ejabberd_zlib:zlib_socket()}. + compress(Pid, ZlibSocket) -> do_call(Pid, {compress, ZlibSocket}). +-spec become_controller(pid(), pid()) -> ok | {error, any()}. + become_controller(Pid, C2SPid) -> do_call(Pid, {become_controller, C2SPid}). +-spec close(pid()) -> ok. + close(Pid) -> gen_server:cast(Pid, close). + %%==================================================================== %% gen_server callbacks %%==================================================================== @@ -112,16 +137,13 @@ close(Pid) -> init([Socket, SockMod, Shaper, MaxStanzaSize]) -> ShaperState = shaper:new(Shaper), Timeout = case SockMod of - ssl -> - 20; - _ -> - infinity + ssl -> 20; + _ -> infinity end, - {ok, #state{socket = Socket, - sock_mod = SockMod, - shaper_state = ShaperState, - max_stanza_size = MaxStanzaSize, - timeout = Timeout}}. + {ok, + #state{socket = Socket, sock_mod = SockMod, + shaper_state = ShaperState, + max_stanza_size = MaxStanzaSize, timeout = Timeout}}. %%-------------------------------------------------------------------- %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | @@ -137,11 +159,12 @@ handle_call({starttls, TLSSocket}, _From, c2s_pid = C2SPid, max_stanza_size = MaxStanzaSize} = State) -> close_stream(XMLStreamState), - NewXMLStreamState = xml_stream:new(C2SPid, MaxStanzaSize), + NewXMLStreamState = xml_stream:new(C2SPid, + MaxStanzaSize), NewState = State#state{socket = TLSSocket, sock_mod = tls, xml_stream_state = NewXMLStreamState}, - case tls:recv_data(TLSSocket, "") of + case tls:recv_data(TLSSocket, <<"">>) of {ok, TLSData} -> {reply, ok, process_data(TLSData, NewState), ?HIBERNATE_TIMEOUT}; {error, _Reason} -> @@ -152,11 +175,12 @@ handle_call({compress, ZlibSocket}, _From, c2s_pid = C2SPid, max_stanza_size = MaxStanzaSize} = State) -> close_stream(XMLStreamState), - NewXMLStreamState = xml_stream:new(C2SPid, MaxStanzaSize), + NewXMLStreamState = xml_stream:new(C2SPid, + MaxStanzaSize), NewState = State#state{socket = ZlibSocket, sock_mod = ejabberd_zlib, xml_stream_state = NewXMLStreamState}, - case ejabberd_zlib:recv_data(ZlibSocket, "") of + case ejabberd_zlib:recv_data(ZlibSocket, <<"">>) of {ok, ZlibData} -> {reply, ok, process_data(ZlibData, NewState), ?HIBERNATE_TIMEOUT}; {error, _Reason} -> @@ -164,12 +188,14 @@ handle_call({compress, ZlibSocket}, _From, end; handle_call(reset_stream, _From, #state{xml_stream_state = XMLStreamState, - c2s_pid = C2SPid, - max_stanza_size = MaxStanzaSize} = State) -> + c2s_pid = C2SPid, max_stanza_size = MaxStanzaSize} = + State) -> close_stream(XMLStreamState), - NewXMLStreamState = xml_stream:new(C2SPid, MaxStanzaSize), + NewXMLStreamState = xml_stream:new(C2SPid, + MaxStanzaSize), Reply = ok, - {reply, Reply, State#state{xml_stream_state = NewXMLStreamState}, + {reply, Reply, + State#state{xml_stream_state = NewXMLStreamState}, ?HIBERNATE_TIMEOUT}; handle_call({become_controller, C2SPid}, _From, State) -> XMLStreamState = xml_stream:new(C2SPid, State#state.max_stanza_size), @@ -179,8 +205,7 @@ handle_call({become_controller, C2SPid}, _From, State) -> Reply = ok, {reply, Reply, NewState, ?HIBERNATE_TIMEOUT}; handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State, ?HIBERNATE_TIMEOUT}. + Reply = ok, {reply, Reply, State, ?HIBERNATE_TIMEOUT}. %%-------------------------------------------------------------------- %% Function: handle_cast(Msg, State) -> {noreply, State} | @@ -190,9 +215,9 @@ handle_call(_Request, _From, State) -> %%-------------------------------------------------------------------- handle_cast({change_shaper, Shaper}, State) -> NewShaperState = shaper:new(Shaper), - {noreply, State#state{shaper_state = NewShaperState}, ?HIBERNATE_TIMEOUT}; -handle_cast(close, State) -> - {stop, normal, State}; + {noreply, State#state{shaper_state = NewShaperState}, + ?HIBERNATE_TIMEOUT}; +handle_cast(close, State) -> {stop, normal, State}; handle_cast(_Msg, State) -> {noreply, State, ?HIBERNATE_TIMEOUT}. @@ -203,45 +228,42 @@ handle_cast(_Msg, State) -> %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- handle_info({Tag, _TCPSocket, Data}, - #state{socket = Socket, - sock_mod = SockMod} = State) - when (Tag == tcp) or (Tag == ssl) or (Tag == ejabberd_xml) -> + #state{socket = Socket, sock_mod = SockMod} = State) + when (Tag == tcp) or (Tag == ssl) or + (Tag == ejabberd_xml) -> case SockMod of - tls -> - case tls:recv_data(Socket, Data) of - {ok, TLSData} -> - {noreply, process_data(TLSData, State), - ?HIBERNATE_TIMEOUT}; - {error, _Reason} -> - {stop, normal, State} - end; - ejabberd_zlib -> - case ejabberd_zlib:recv_data(Socket, Data) of - {ok, ZlibData} -> - {noreply, process_data(ZlibData, State), - ?HIBERNATE_TIMEOUT}; - {error, _Reason} -> - {stop, normal, State} - end; - _ -> - {noreply, process_data(Data, State), ?HIBERNATE_TIMEOUT} + tls -> + case tls:recv_data(Socket, Data) of + {ok, TLSData} -> + {noreply, process_data(TLSData, State), + ?HIBERNATE_TIMEOUT}; + {error, _Reason} -> {stop, normal, State} + end; + ejabberd_zlib -> + case ejabberd_zlib:recv_data(Socket, Data) of + {ok, ZlibData} -> + {noreply, process_data(ZlibData, State), + ?HIBERNATE_TIMEOUT}; + {error, _Reason} -> {stop, normal, State} + end; + _ -> + {noreply, process_data(Data, State), ?HIBERNATE_TIMEOUT} end; handle_info({Tag, _TCPSocket}, State) - when (Tag == tcp_closed) or (Tag == ssl_closed) -> + when (Tag == tcp_closed) or (Tag == ssl_closed) -> {stop, normal, State}; handle_info({Tag, _TCPSocket, Reason}, State) - when (Tag == tcp_error) or (Tag == ssl_error) -> + when (Tag == tcp_error) or (Tag == ssl_error) -> case Reason of - timeout -> - {noreply, State, ?HIBERNATE_TIMEOUT}; - _ -> - {stop, normal, State} + timeout -> {noreply, State, ?HIBERNATE_TIMEOUT}; + _ -> {stop, normal, State} end; handle_info({timeout, _Ref, activate}, State) -> activate_socket(State), {noreply, State, ?HIBERNATE_TIMEOUT}; handle_info(timeout, State) -> - proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]), + proc_lib:hibernate(gen_server, enter_loop, + [?MODULE, [], State]), {noreply, State, ?HIBERNATE_TIMEOUT}; handle_info(_Info, State) -> {noreply, State, ?HIBERNATE_TIMEOUT}. @@ -253,14 +275,14 @@ handle_info(_Info, State) -> %% cleaning up. When it returns, the gen_server terminates with Reason. %% The return value is ignored. %%-------------------------------------------------------------------- -terminate(_Reason, #state{xml_stream_state = XMLStreamState, - c2s_pid = C2SPid} = State) -> +terminate(_Reason, + #state{xml_stream_state = XMLStreamState, + c2s_pid = C2SPid} = + State) -> close_stream(XMLStreamState), - if - C2SPid /= undefined -> - gen_fsm:send_event(C2SPid, closed); - true -> - ok + if C2SPid /= undefined -> + gen_fsm:send_event(C2SPid, closed); + true -> ok end, catch (State#state.sock_mod):close(State#state.socket), ok. @@ -269,8 +291,7 @@ terminate(_Reason, #state{xml_stream_state = XMLStreamState, %% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} %% Description: Convert process state when code is changed %%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- %%% Internal functions @@ -278,48 +299,44 @@ code_change(_OldVsn, State, _Extra) -> activate_socket(#state{socket = Socket, sock_mod = SockMod}) -> - PeerName = - case SockMod of - gen_tcp -> - inet:setopts(Socket, [{active, once}]), - inet:peername(Socket); - _ -> - SockMod:setopts(Socket, [{active, once}]), - SockMod:peername(Socket) - end, + PeerName = case SockMod of + gen_tcp -> + inet:setopts(Socket, [{active, once}]), + inet:peername(Socket); + _ -> + SockMod:setopts(Socket, [{active, once}]), + SockMod:peername(Socket) + end, case PeerName of - {error, _Reason} -> - self() ! {tcp_closed, Socket}; - {ok, _} -> - ok + {error, _Reason} -> self() ! {tcp_closed, Socket}; + {ok, _} -> ok end. %% Data processing for connectors directly generating xmlelement in %% Erlang data structure. %% WARNING: Shaper does not work with Erlang data structure. process_data([], State) -> - activate_socket(State), - State; -process_data([Element|Els], #state{c2s_pid = C2SPid} = State) - when element(1, Element) == xmlelement; - element(1, Element) == xmlstreamstart; - element(1, Element) == xmlstreamelement; - element(1, Element) == xmlstreamend -> - if - C2SPid == undefined -> - State; - true -> - catch gen_fsm:send_event(C2SPid, element_wrapper(Element)), - process_data(Els, State) + activate_socket(State), State; +process_data([Element | Els], + #state{c2s_pid = C2SPid} = State) + when element(1, Element) == xmlel; + element(1, Element) == xmlstreamstart; + element(1, Element) == xmlstreamelement; + element(1, Element) == xmlstreamend -> + if C2SPid == undefined -> State; + true -> + catch gen_fsm:send_event(C2SPid, + element_wrapper(Element)), + process_data(Els, State) end; %% Data processing for connectors receivind data as string. process_data(Data, #state{xml_stream_state = XMLStreamState, - shaper_state = ShaperState, - c2s_pid = C2SPid} = State) -> - ?DEBUG("Received XML on stream = ~p", [binary_to_list(Data)]), + shaper_state = ShaperState, c2s_pid = C2SPid} = + State) -> + ?DEBUG("Received XML on stream = ~p", [(Data)]), XMLStreamState1 = xml_stream:parse(XMLStreamState, Data), - {NewShaperState, Pause} = shaper:update(ShaperState, size(Data)), + {NewShaperState, Pause} = shaper:update(ShaperState, byte_size(Data)), if C2SPid == undefined -> ok; @@ -336,20 +353,16 @@ process_data(Data, %% speaking directly Erlang XML), we wrap it inside the same %% xmlstreamelement coming from the XML parser. element_wrapper(XMLElement) - when element(1, XMLElement) == xmlelement -> + when element(1, XMLElement) == xmlel -> {xmlstreamelement, XMLElement}; -element_wrapper(Element) -> - Element. +element_wrapper(Element) -> Element. -close_stream(undefined) -> - ok; +close_stream(undefined) -> ok; close_stream(XMLStreamState) -> xml_stream:close(XMLStreamState). do_call(Pid, Msg) -> case catch gen_server:call(Pid, Msg) of - {'EXIT', Why} -> - {error, Why}; - Res -> - Res + {'EXIT', Why} -> {error, Why}; + Res -> Res end. |