aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_receiver.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_receiver.erl')
-rw-r--r--src/ejabberd_receiver.erl341
1 files changed, 0 insertions, 341 deletions
diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl
deleted file mode 100644
index 0a33e30ec..000000000
--- a/src/ejabberd_receiver.erl
+++ /dev/null
@@ -1,341 +0,0 @@
-%%%----------------------------------------------------------------------
-%%% File : ejabberd_receiver.erl
-%%% Author : Alexey Shchepin <alexey@process-one.net>
-%%% Purpose : Socket receiver for C2S and S2S connections
-%%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@process-one.net>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
--module(ejabberd_receiver).
-
--author('alexey@process-one.net').
-
--behaviour(gen_server).
-
-%% API
--export([start_link/4,
- start/3,
- start/4,
- change_shaper/2,
- reset_stream/1,
- starttls/2,
- compress/2,
- become_controller/2,
- close/1]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
-
--include("ejabberd.hrl").
--include("logger.hrl").
-
--record(state,
- {socket :: inet:socket() | fast_tls:tls_socket() | ezlib:zlib_socket(),
- sock_mod = gen_tcp :: gen_tcp | fast_tls | ezlib,
- shaper_state = none :: shaper:shaper(),
- c2s_pid :: pid(),
- max_stanza_size = infinity :: non_neg_integer() | infinity,
- xml_stream_state :: fxml_stream:xml_stream_state(),
- timeout = infinity:: timeout()}).
-
--define(HIBERNATE_TIMEOUT, ejabberd_config:get_option(receiver_hibernate, fun(X) when is_integer(X); X == hibernate-> X end, 90000)).
-
-
--spec start_link(inet:socket(), atom(), shaper:shaper(),
- non_neg_integer() | infinity) -> ignore |
- {error, any()} |
- {ok, pid()}.
-
-start_link(Socket, SockMod, Shaper, MaxStanzaSize) ->
- gen_server:start_link(?MODULE,
- [Socket, SockMod, Shaper, MaxStanzaSize], []).
-
--spec start(inet:socket(), atom(), shaper:shaper()) -> undefined | pid().
-
-start(Socket, SockMod, Shaper) ->
- start(Socket, SockMod, Shaper, infinity).
-
--spec start(inet:socket(), atom(), shaper:shaper(),
- non_neg_integer() | infinity) -> undefined | pid().
-
-start(Socket, SockMod, Shaper, MaxStanzaSize) ->
- {ok, Pid} = gen_server:start(ejabberd_receiver,
- [Socket, SockMod, Shaper, MaxStanzaSize], []),
- Pid.
-
--spec change_shaper(pid(), shaper:shaper()) -> ok.
-
-change_shaper(Pid, Shaper) ->
- gen_server:cast(Pid, {change_shaper, Shaper}).
-
--spec reset_stream(pid()) -> ok | {error, any()}.
-
-reset_stream(Pid) -> do_call(Pid, reset_stream).
-
--spec starttls(pid(), fast_tls:tls_socket()) -> ok.
-
-starttls(Pid, TLSSocket) ->
- do_call(Pid, {starttls, TLSSocket}).
-
--spec compress(pid(), iodata() | undefined) -> {error, any()} |
- {ok, ezlib:zlib_socket()}.
-
-compress(Pid, Data) ->
- do_call(Pid, {compress, Data}).
-
--spec become_controller(pid(), pid()) -> ok | {error, any()}.
-
-become_controller(Pid, C2SPid) ->
- do_call(Pid, {become_controller, C2SPid}).
-
--spec close(pid()) -> ok.
-
-close(Pid) ->
- gen_server:cast(Pid, close).
-
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-init([Socket, SockMod, Shaper, MaxStanzaSize]) ->
- ShaperState = shaper:new(Shaper),
- Timeout = case SockMod of
- ssl -> 20;
- _ -> infinity
- end,
- {ok,
- #state{socket = Socket, sock_mod = SockMod,
- shaper_state = ShaperState,
- max_stanza_size = MaxStanzaSize, timeout = Timeout}}.
-
-handle_call({starttls, TLSSocket}, _From, State) ->
- State1 = reset_parser(State),
- NewState = State1#state{socket = TLSSocket,
- sock_mod = fast_tls},
- case fast_tls:recv_data(TLSSocket, <<"">>) of
- {ok, TLSData} ->
- {reply, ok,
- process_data(TLSData, NewState), ?HIBERNATE_TIMEOUT};
- {error, _Reason} ->
- {stop, normal, ok, NewState}
- end;
-handle_call({compress, Data}, _From,
- #state{socket = Socket, sock_mod = SockMod} =
- State) ->
- ejabberd:start_app(ezlib),
- {ok, ZlibSocket} = ezlib:enable_zlib(SockMod,
- Socket),
- if Data /= undefined -> do_send(State, Data);
- true -> ok
- end,
- State1 = reset_parser(State),
- NewState = State1#state{socket = ZlibSocket,
- sock_mod = ezlib},
- case ezlib:recv_data(ZlibSocket, <<"">>) of
- {ok, ZlibData} ->
- {reply, {ok, ZlibSocket},
- process_data(ZlibData, NewState), ?HIBERNATE_TIMEOUT};
- {error, _Reason} ->
- {stop, normal, ok, NewState}
- end;
-handle_call(reset_stream, _From, State) ->
- NewState = reset_parser(State),
- Reply = ok,
- {reply, Reply, NewState, ?HIBERNATE_TIMEOUT};
-handle_call({become_controller, C2SPid}, _From, State) ->
- XMLStreamState = fxml_stream:new(C2SPid, State#state.max_stanza_size),
- NewState = State#state{c2s_pid = C2SPid,
- xml_stream_state = XMLStreamState},
- activate_socket(NewState),
- Reply = ok,
- {reply, Reply, NewState, ?HIBERNATE_TIMEOUT};
-handle_call(_Request, _From, State) ->
- Reply = ok, {reply, Reply, State, ?HIBERNATE_TIMEOUT}.
-
-handle_cast({change_shaper, Shaper}, State) ->
- NewShaperState = shaper:new(Shaper),
- {noreply, State#state{shaper_state = NewShaperState},
- ?HIBERNATE_TIMEOUT};
-handle_cast(close, State) -> {stop, normal, State};
-handle_cast(_Msg, State) ->
- {noreply, State, ?HIBERNATE_TIMEOUT}.
-
-handle_info({Tag, _TCPSocket, Data},
- #state{socket = Socket, sock_mod = SockMod} = State)
- when (Tag == tcp) or (Tag == ssl) or
- (Tag == ejabberd_xml) ->
- case SockMod of
- fast_tls ->
- case fast_tls:recv_data(Socket, Data) of
- {ok, TLSData} ->
- {noreply, process_data(TLSData, State),
- ?HIBERNATE_TIMEOUT};
- {error, Reason} ->
- if is_binary(Reason) ->
- ?DEBUG("TLS error = ~s", [Reason]);
- true ->
- ok
- end,
- {stop, normal, State}
- end;
- ezlib ->
- case ezlib:recv_data(Socket, Data) of
- {ok, ZlibData} ->
- {noreply, process_data(ZlibData, State),
- ?HIBERNATE_TIMEOUT};
- {error, _Reason} -> {stop, normal, State}
- end;
- _ ->
- {noreply, process_data(Data, State), ?HIBERNATE_TIMEOUT}
- end;
-handle_info({Tag, _TCPSocket}, State)
- when (Tag == tcp_closed) or (Tag == ssl_closed) ->
- {stop, normal, State};
-handle_info({Tag, _TCPSocket, Reason}, State)
- when (Tag == tcp_error) or (Tag == ssl_error) ->
- case Reason of
- timeout -> {noreply, State, ?HIBERNATE_TIMEOUT};
- _ -> {stop, normal, State}
- end;
-handle_info({timeout, _Ref, activate}, State) ->
- activate_socket(State),
- {noreply, State, ?HIBERNATE_TIMEOUT};
-handle_info(timeout, State) ->
- proc_lib:hibernate(gen_server, enter_loop,
- [?MODULE, [], State]),
- {noreply, State, ?HIBERNATE_TIMEOUT};
-handle_info(_Info, State) ->
- {noreply, State, ?HIBERNATE_TIMEOUT}.
-
-terminate(_Reason,
- #state{xml_stream_state = XMLStreamState,
- c2s_pid = C2SPid} =
- State) ->
- close_stream(XMLStreamState),
- if C2SPid /= undefined ->
- gen_fsm:send_event(C2SPid, closed);
- true -> ok
- end,
- catch (State#state.sock_mod):close(State#state.socket),
- ok.
-
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-
-activate_socket(#state{socket = Socket,
- sock_mod = SockMod}) ->
- PeerName = case SockMod of
- gen_tcp ->
- inet:setopts(Socket, [{active, once}]),
- inet:peername(Socket);
- _ ->
- SockMod:setopts(Socket, [{active, once}]),
- SockMod:peername(Socket)
- end,
- case PeerName of
- {error, _Reason} -> self() ! {tcp_closed, Socket};
- {ok, _} -> ok
- end.
-
-%% Data processing for connectors directly generating xmlelement in
-%% Erlang data structure.
-%% WARNING: Shaper does not work with Erlang data structure.
-process_data([], State) ->
- activate_socket(State), State;
-process_data([Element | Els],
- #state{c2s_pid = C2SPid} = State)
- when element(1, Element) == xmlel;
- element(1, Element) == xmlstreamstart;
- element(1, Element) == xmlstreamelement;
- element(1, Element) == xmlstreamend ->
- if C2SPid == undefined -> State;
- true ->
- catch gen_fsm:send_event(C2SPid,
- element_wrapper(Element)),
- process_data(Els, State)
- end;
-%% Data processing for connectors receivind data as string.
-process_data(Data,
- #state{xml_stream_state = XMLStreamState,
- shaper_state = ShaperState, c2s_pid = C2SPid} =
- State) ->
- ?DEBUG("Received XML on stream = ~p", [(Data)]),
- XMLStreamState1 = case XMLStreamState of
- undefined ->
- XMLStreamState;
- _ ->
- fxml_stream:parse(XMLStreamState, Data)
- end,
- {NewShaperState, Pause} = shaper:update(ShaperState, byte_size(Data)),
- if
- C2SPid == undefined ->
- ok;
- Pause > 0 ->
- erlang:start_timer(Pause, self(), activate);
- true ->
- activate_socket(State)
- end,
- State#state{xml_stream_state = XMLStreamState1,
- shaper_state = NewShaperState}.
-
-%% Element coming from XML parser are wrapped inside xmlstreamelement
-%% When we receive directly xmlelement tuple (from a socket module
-%% speaking directly Erlang XML), we wrap it inside the same
-%% xmlstreamelement coming from the XML parser.
-element_wrapper(XMLElement)
- when element(1, XMLElement) == xmlel ->
- {xmlstreamelement, XMLElement};
-element_wrapper(Element) -> Element.
-
-close_stream(undefined) -> ok;
-close_stream(XMLStreamState) ->
- fxml_stream:close(XMLStreamState).
-
-reset_parser(#state{xml_stream_state = undefined} = State) ->
- State;
-reset_parser(#state{c2s_pid = C2SPid,
- max_stanza_size = MaxStanzaSize,
- xml_stream_state = XMLStreamState}
- = State) ->
- NewStreamState = try fxml_stream:reset(XMLStreamState)
- catch error:_ ->
- close_stream(XMLStreamState),
- case C2SPid of
- undefined ->
- undefined;
- _ ->
- fxml_stream:new(C2SPid, MaxStanzaSize)
- end
- end,
- State#state{xml_stream_state = NewStreamState}.
-
-do_send(State, Data) ->
- (State#state.sock_mod):send(State#state.socket, Data).
-
-do_call(Pid, Msg) ->
- case catch gen_server:call(Pid, Msg) of
- {'EXIT', Why} -> {error, Why};
- Res -> Res
- end.