summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-12-26 19:02:54 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-12-26 19:02:54 +0300
commit7cdc51becd4c6defdecdcb41f8fdd93703f6be22 (patch)
tree597be32e8ba6034122e29f2ca65a107e8ee0c895
parentCleanup rebar.config (diff)
parentGet rid of ejabberd receiver (diff)
Merge branch 'no-more-ejabberd-receivers'
Conflicts: rebar.config
-rw-r--r--rebar.config3
-rw-r--r--src/ejabberd_bosh.erl15
-rw-r--r--src/ejabberd_c2s.erl42
-rw-r--r--src/ejabberd_http_ws.erl20
-rw-r--r--src/ejabberd_listener.erl2
-rw-r--r--src/ejabberd_receiver.erl357
-rw-r--r--src/ejabberd_s2s_in.erl24
-rw-r--r--src/ejabberd_s2s_out.erl14
-rw-r--r--src/ejabberd_service.erl23
-rw-r--r--src/ejabberd_socket.erl293
-rw-r--r--src/mod_s2s_dialback.erl6
-rw-r--r--src/mod_stream_mgmt.erl4
-rw-r--r--src/xmpp_socket.erl386
-rw-r--r--src/xmpp_stream_in.erl65
-rw-r--r--src/xmpp_stream_out.erl88
-rw-r--r--src/xmpp_stream_pkix.erl16
16 files changed, 561 insertions, 797 deletions
diff --git a/rebar.config b/rebar.config
index dc8fe4f9..0a8a6efd 100644
--- a/rebar.config
+++ b/rebar.config
@@ -81,7 +81,8 @@
iconv]}}.
{erl_first_files, ["src/ejabberd_sql_pt.erl", "src/ejabberd_config.erl",
- "src/gen_mod.erl", "src/mod_muc_room.erl", "src/mod_push.erl"]}.
+ "src/gen_mod.erl", "src/mod_muc_room.erl",
+ "src/mod_push.erl", "src/xmpp_socket.erl"]}.
{erl_opts, [nowarn_deprecated_function,
{i, "include"},
diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl
index 1df6681f..710e24ae 100644
--- a/src/ejabberd_bosh.erl
+++ b/src/ejabberd_bosh.erl
@@ -33,7 +33,7 @@
-export([start/2, start/3, start_link/3]).
-export([send_xml/2, setopts/2, controlling_process/2,
- migrate/3, custom_receiver/1, become_controller/2,
+ migrate/3, become_controller/2,
reset_stream/1, change_shaper/2, monitor/1, close/1,
sockname/1, peername/1, process_request/3, send/2,
change_controller/2]).
@@ -175,9 +175,6 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
controlling_process(_Socket, _Pid) -> ok.
-custom_receiver({http_bind, FsmRef, _IP}) ->
- {receiver, ?MODULE, FsmRef}.
-
become_controller(FsmRef, C2SPid) ->
p1_fsm:send_all_state_event(FsmRef,
{become_controller, C2SPid}).
@@ -185,11 +182,11 @@ become_controller(FsmRef, C2SPid) ->
change_controller({http_bind, FsmRef, _IP}, C2SPid) ->
become_controller(FsmRef, C2SPid).
-reset_stream({http_bind, _FsmRef, _IP}) -> ok.
+reset_stream({http_bind, _FsmRef, _IP} = Socket) ->
+ Socket.
change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
- p1_fsm:send_all_state_event(FsmRef,
- {change_shaper, Shaper}).
+ p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
monitor({http_bind, FsmRef, _IP}) ->
erlang:monitor(process, FsmRef).
@@ -306,8 +303,8 @@ init([#body{attrs = Attrs}, IP, SID]) ->
buf_new(XMPPDomain)),
Opts2}
end,
- ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket,
- Opts),
+ xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
+ [{receiver, self()}|Opts]),
Inactivity = gen_mod:get_module_opt(XMPPDomain,
mod_bosh, max_inactivity,
?DEFAULT_INACTIVITY),
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 6794ef16..35353bb8 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -22,11 +22,11 @@
-module(ejabberd_c2s).
-behaviour(xmpp_stream_in).
-behaviour(ejabberd_config).
--behaviour(ejabberd_socket).
+-behaviour(xmpp_socket).
-protocol({rfc, 6121}).
-%% ejabberd_socket callbacks
+%% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0]).
%% ejabberd_config callbacks
-export([opt_type/1, listen_opt_type/1, transform_listen_option/2]).
@@ -62,7 +62,7 @@
-export_type([state/0]).
%%%===================================================================
-%%% ejabberd_socket API
+%%% xmpp_socket API
%%%===================================================================
start(SockData, Opts) ->
case proplists:get_value(supervisor, Opts, true) of
@@ -203,16 +203,16 @@ copy_state(#{owner := Owner} = NewState,
open_session(#{user := U, server := S, resource := R,
sid := SID, ip := IP, auth_module := AuthModule} = State) ->
JID = jid:make(U, S, R),
- change_shaper(State),
- Conn = get_conn_type(State),
- State1 = State#{conn => Conn, resource => R, jid => JID},
+ State1 = change_shaper(State),
+ Conn = get_conn_type(State1),
+ State2 = State1#{conn => Conn, resource => R, jid => JID},
Prio = case maps:get(pres_last, State, undefined) of
undefined -> undefined;
Pres -> get_priority_from_presence(Pres)
end,
Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}],
ejabberd_sm:open_session(SID, U, S, R, Prio, Info),
- xmpp_stream_in:establish(State1).
+ xmpp_stream_in:establish(State2).
%%%===================================================================
%%% Hooks
@@ -264,12 +264,12 @@ reject_unauthenticated_packet(State, _Pkt) ->
process_closed(State, Reason) ->
stop(State#{stop_reason => Reason}).
-process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket,
+process_terminated(#{sid := SID, socket := Socket,
jid := JID, user := U, server := S, resource := R} = State,
Reason) ->
Status = format_reason(State, Reason),
?INFO_MSG("(~s) Closing c2s session for ~s: ~s",
- [SockMod:pp(Socket), jid:encode(JID), Status]),
+ [xmpp_socket:pp(Socket), jid:encode(JID), Status]),
State1 = case maps:is_key(pres_last, State) of
true ->
Pres = #presence{type = unavailable,
@@ -285,10 +285,10 @@ process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket,
end,
bounce_message_queue(),
State1;
-process_terminated(#{sockmod := SockMod, socket := Socket,
+process_terminated(#{socket := Socket,
stop_reason := {tls, _}} = State, Reason) ->
?WARNING_MSG("(~s) Failed to secure c2s connection: ~s",
- [SockMod:pp(Socket), format_reason(State, Reason)]),
+ [xmpp_socket:pp(Socket), format_reason(State, Reason)]),
State;
process_terminated(State, _Reason) ->
State.
@@ -385,7 +385,7 @@ check_password_digest_fun(#{lserver := LServer}) ->
bind(<<"">>, State) ->
bind(new_uniq_id(), State);
bind(R, #{user := U, server := S, access := Access, lang := Lang,
- lserver := LServer, sockmod := SockMod, socket := Socket,
+ lserver := LServer, socket := Socket,
ip := IP} = State) ->
case resource_conflict_action(U, S, R) of
closenew ->
@@ -401,12 +401,12 @@ bind(R, #{user := U, server := S, access := Access, lang := Lang,
State2 = ejabberd_hooks:run_fold(
c2s_session_opened, LServer, State1, []),
?INFO_MSG("(~s) Opened c2s session for ~s",
- [SockMod:pp(Socket), jid:encode(JID)]),
+ [xmpp_socket:pp(Socket), jid:encode(JID)]),
{ok, State2};
deny ->
ejabberd_hooks:run(forbidden_session_hook, LServer, [JID]),
?INFO_MSG("(~s) Forbidden c2s session for ~s",
- [SockMod:pp(Socket), jid:encode(JID)]),
+ [xmpp_socket:pp(Socket), jid:encode(JID)]),
Txt = <<"Access denied by service policy">>,
{error, xmpp:err_not_allowed(Txt, Lang), State}
end
@@ -417,9 +417,9 @@ handle_stream_start(StreamStart, #{lserver := LServer} = State) ->
false ->
send(State#{lserver => ?MYNAME}, xmpp:serr_host_unknown());
true ->
- change_shaper(State),
+ State1 = change_shaper(State),
ejabberd_hooks:run_fold(
- c2s_stream_started, LServer, State, [StreamStart])
+ c2s_stream_started, LServer, State1, [StreamStart])
end.
handle_stream_end(Reason, #{lserver := LServer} = State) ->
@@ -427,20 +427,20 @@ handle_stream_end(Reason, #{lserver := LServer} = State) ->
ejabberd_hooks:run_fold(c2s_closed, LServer, State1, [Reason]).
handle_auth_success(User, Mech, AuthModule,
- #{socket := Socket, sockmod := SockMod,
+ #{socket := Socket,
ip := IP, lserver := LServer} = State) ->
?INFO_MSG("(~s) Accepted c2s ~s authentication for ~s@~s by ~s backend from ~s",
- [SockMod:pp(Socket), Mech, User, LServer,
+ [xmpp_socket:pp(Socket), Mech, User, LServer,
ejabberd_auth:backend_type(AuthModule),
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
State1 = State#{auth_module => AuthModule},
ejabberd_hooks:run_fold(c2s_auth_result, LServer, State1, [true, User]).
handle_auth_failure(User, Mech, Reason,
- #{socket := Socket, sockmod := SockMod,
+ #{socket := Socket,
ip := IP, lserver := LServer} = State) ->
?INFO_MSG("(~s) Failed c2s ~s authentication ~sfrom ~s: ~s",
- [SockMod:pp(Socket), Mech,
+ [xmpp_socket:pp(Socket), Mech,
if User /= <<"">> -> ["for ", User, "@", LServer, " "];
true -> ""
end,
@@ -912,7 +912,7 @@ fix_from_to(Pkt, #{jid := JID}) when ?is_stanza(Pkt) ->
fix_from_to(Pkt, _State) ->
Pkt.
--spec change_shaper(state()) -> ok.
+-spec change_shaper(state()) -> state().
change_shaper(#{shaper := ShaperName, ip := IP, lserver := LServer,
user := U, server := S, resource := R} = State) ->
JID = jid:make(U, S, R),
diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl
index f9f7b07e..4a5d0b1b 100644
--- a/src/ejabberd_http_ws.erl
+++ b/src/ejabberd_http_ws.erl
@@ -34,7 +34,8 @@
handle_sync_event/4, code_change/4, handle_info/3,
terminate/3, send_xml/2, setopts/2, sockname/1,
peername/1, controlling_process/2, become_controller/2,
- close/1, socket_handoff/6, opt_type/1]).
+ monitor/1, reset_stream/1, close/1, change_shaper/2,
+ socket_handoff/6, opt_type/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
@@ -105,12 +106,21 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}.
controlling_process(_Socket, _Pid) -> ok.
become_controller(FsmRef, C2SPid) ->
- p1_fsm:send_all_state_event(FsmRef,
- {become_controller, C2SPid}).
+ p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}).
close({http_ws, FsmRef, _IP}) ->
catch p1_fsm:sync_send_all_state_event(FsmRef, close).
+monitor({http_ws, FsmRef, _IP}) ->
+ erlang:monitor(process, FsmRef).
+
+reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
+ Socket.
+
+change_shaper({http_ws, _FsmRef, _IP}, _Shaper) ->
+ %% TODO???
+ ok.
+
socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) ->
ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod,
Buf, Opts, ?MODULE, fun get_human_html_xmlel/0).
@@ -136,8 +146,8 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
Socket = {http_ws, self(), IP},
?DEBUG("Client connected through websocket ~p",
[Socket]),
- ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket,
- Opts),
+ xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
+ [{receiver, self()}|Opts]),
Timer = erlang:start_timer(WSTimeout, self(), []),
{ok, loop,
#state{socket = Socket, timeout = WSTimeout,
diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl
index 248e3faf..dd4842f5 100644
--- a/src/ejabberd_listener.erl
+++ b/src/ejabberd_listener.erl
@@ -294,7 +294,7 @@ accept(ListenSocket, Module, Opts, Interval) ->
{ok, Socket} ->
case {inet:sockname(Socket), inet:peername(Socket)} of
{{ok, {Addr, Port}}, {ok, {PAddr, PPort}}} ->
- Receiver = case ejabberd_socket:start(Module,
+ Receiver = case xmpp_socket:start(Module,
gen_tcp, Socket, Opts) of
{ok, RecvPid} -> RecvPid;
_ -> none
diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl
deleted file mode 100644
index 52077ac3..00000000
--- a/src/ejabberd_receiver.erl
+++ /dev/null
@@ -1,357 +0,0 @@
-%%%----------------------------------------------------------------------
-%%% File : ejabberd_receiver.erl
-%%% Author : Alexey Shchepin <alexey@process-one.net>
-%%% Purpose : Socket receiver for C2S and S2S connections
-%%% Created : 10 Nov 2003 by Alexey Shchepin <alexey@process-one.net>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
--module(ejabberd_receiver).
-
--author('alexey@process-one.net').
-
--ifndef(GEN_SERVER).
--define(GEN_SERVER, gen_server).
--endif.
--behaviour(?GEN_SERVER).
--behaviour(ejabberd_config).
-
-%% API
--export([start_link/4,
- start/3,
- start/4,
- change_shaper/2,
- reset_stream/1,
- starttls/2,
- compress/2,
- become_controller/2,
- close/1,
- opt_type/1]).
-
-%% gen_server callbacks
--export([init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
-
--include("ejabberd.hrl").
--include("logger.hrl").
-
--record(state,
- {socket :: inet:socket() | fast_tls:tls_socket() | ezlib:zlib_socket(),
- sock_mod = gen_tcp :: gen_tcp | fast_tls | ezlib,
- shaper_state = none :: shaper:shaper(),
- c2s_pid :: pid() | undefined,
- max_stanza_size = infinity :: non_neg_integer() | infinity,
- xml_stream_state :: fxml_stream:xml_stream_state() | undefined,
- timeout = infinity:: timeout()}).
-
--spec start_link(inet:socket(), atom(), shaper:shaper(),
- non_neg_integer() | infinity) -> ignore |
- {error, any()} |
- {ok, pid()}.
-
-start_link(Socket, SockMod, Shaper, MaxStanzaSize) ->
- ?GEN_SERVER:start_link(?MODULE,
- [Socket, SockMod, Shaper, MaxStanzaSize], []).
-
--spec start(inet:socket(), atom(), shaper:shaper()) -> undefined | pid().
-
-start(Socket, SockMod, Shaper) ->
- start(Socket, SockMod, Shaper, infinity).
-
--spec start(inet:socket(), atom(), shaper:shaper(),
- non_neg_integer() | infinity) -> undefined | pid().
-
-start(Socket, SockMod, Shaper, MaxStanzaSize) ->
- {ok, Pid} = ?GEN_SERVER:start(ejabberd_receiver,
- [Socket, SockMod, Shaper, MaxStanzaSize], []),
- Pid.
-
--spec change_shaper(pid(), shaper:shaper()) -> ok.
-
-change_shaper(Pid, Shaper) ->
- ?GEN_SERVER:cast(Pid, {change_shaper, Shaper}).
-
--spec reset_stream(pid()) -> ok | {error, any()}.
-
-reset_stream(Pid) -> do_call(Pid, reset_stream).
-
--spec starttls(pid(), fast_tls:tls_socket()) -> ok | {error, any()}.
-
-starttls(Pid, TLSSocket) ->
- do_call(Pid, {starttls, TLSSocket}).
-
--spec compress(pid(), iodata() | undefined) -> {error, any()} |
- {ok, ezlib:zlib_socket()}.
-
-compress(Pid, Data) ->
- do_call(Pid, {compress, Data}).
-
--spec become_controller(pid(), pid()) -> ok | {error, any()}.
-
-become_controller(Pid, C2SPid) ->
- do_call(Pid, {become_controller, C2SPid}).
-
--spec close(pid()) -> ok.
-
-close(Pid) ->
- ?GEN_SERVER:cast(Pid, close).
-
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-init([Socket, SockMod, Shaper, MaxStanzaSize]) ->
- ShaperState = shaper:new(Shaper),
- Timeout = case SockMod of
- ssl -> 20;
- _ -> infinity
- end,
- {ok,
- #state{socket = Socket, sock_mod = SockMod,
- shaper_state = ShaperState,
- max_stanza_size = MaxStanzaSize, timeout = Timeout}}.
-
-handle_call({starttls, TLSSocket}, _From, State) ->
- State1 = reset_parser(State),
- NewState = State1#state{socket = TLSSocket,
- sock_mod = fast_tls},
- case fast_tls:recv_data(TLSSocket, <<"">>) of
- {ok, TLSData} ->
- {reply, ok,
- process_data(TLSData, NewState), hibernate_timeout()};
- {error, _} = Err ->
- {stop, normal, Err, NewState}
- end;
-handle_call({compress, Data}, _From,
- #state{socket = Socket, sock_mod = SockMod} =
- State) ->
- ejabberd:start_app(ezlib),
- {ok, ZlibSocket} = ezlib:enable_zlib(SockMod,
- Socket),
- if Data /= undefined -> do_send(State, Data);
- true -> ok
- end,
- State1 = reset_parser(State),
- NewState = State1#state{socket = ZlibSocket,
- sock_mod = ezlib},
- case ezlib:recv_data(ZlibSocket, <<"">>) of
- {ok, ZlibData} ->
- {reply, {ok, ZlibSocket},
- process_data(ZlibData, NewState), hibernate_timeout()};
- {error, _} = Err ->
- {stop, normal, Err, NewState}
- end;
-handle_call(reset_stream, _From, State) ->
- NewState = reset_parser(State),
- Reply = ok,
- {reply, Reply, NewState, hibernate_timeout()};
-handle_call({become_controller, C2SPid}, _From, State) ->
- XMLStreamState = fxml_stream:new(C2SPid, State#state.max_stanza_size),
- NewState = State#state{c2s_pid = C2SPid,
- xml_stream_state = XMLStreamState},
- activate_socket(NewState),
- Reply = ok,
- {reply, Reply, NewState, hibernate_timeout()};
-handle_call(_Request, _From, State) ->
- Reply = ok, {reply, Reply, State, hibernate_timeout()}.
-
-handle_cast({change_shaper, Shaper}, State) ->
- NewShaperState = shaper:new(Shaper),
- {noreply, State#state{shaper_state = NewShaperState},
- hibernate_timeout()};
-handle_cast(close, State) -> {stop, normal, State};
-handle_cast(_Msg, State) ->
- {noreply, State, hibernate_timeout()}.
-
-handle_info({Tag, _TCPSocket, Data},
- #state{socket = Socket, sock_mod = SockMod} = State)
- when (Tag == tcp) or (Tag == ssl) or
- (Tag == ejabberd_xml) ->
- case SockMod of
- fast_tls ->
- case fast_tls:recv_data(Socket, Data) of
- {ok, TLSData} ->
- {noreply, process_data(TLSData, State),
- hibernate_timeout()};
- {error, Reason} ->
- if is_binary(Reason) ->
- ?DEBUG("TLS error = ~s", [Reason]);
- true ->
- ok
- end,
- {stop, normal, State}
- end;
- ezlib ->
- case ezlib:recv_data(Socket, Data) of
- {ok, ZlibData} ->
- {noreply, process_data(ZlibData, State),
- hibernate_timeout()};
- {error, _Reason} -> {stop, normal, State}
- end;
- _ ->
- {noreply, process_data(Data, State), hibernate_timeout()}
- end;
-handle_info({Tag, _TCPSocket}, State)
- when (Tag == tcp_closed) or (Tag == ssl_closed) ->
- {stop, normal, State};
-handle_info({Tag, _TCPSocket, Reason}, State)
- when (Tag == tcp_error) or (Tag == ssl_error) ->
- case Reason of
- timeout -> {noreply, State, hibernate_timeout()};
- _ -> {stop, normal, State}
- end;
-handle_info({timeout, _Ref, activate}, State) ->
- activate_socket(State),
- {noreply, State, hibernate_timeout()};
-handle_info(timeout, State) ->
- proc_lib:hibernate(?GEN_SERVER, enter_loop,
- [?MODULE, [], State]),
- {noreply, State, hibernate_timeout()};
-handle_info(_Info, State) ->
- {noreply, State, hibernate_timeout()}.
-
-terminate(_Reason,
- #state{xml_stream_state = XMLStreamState,
- c2s_pid = C2SPid} =
- State) ->
- close_stream(XMLStreamState),
- if C2SPid /= undefined ->
- p1_fsm:send_event(C2SPid, closed);
- true -> ok
- end,
- catch (State#state.sock_mod):close(State#state.socket),
- ok.
-
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-
-%%--------------------------------------------------------------------
-%%% Internal functions
-%%--------------------------------------------------------------------
-
-activate_socket(#state{socket = Socket,
- sock_mod = SockMod}) ->
- Res = case SockMod of
- gen_tcp ->
- inet:setopts(Socket, [{active, once}]);
- _ ->
- SockMod:setopts(Socket, [{active, once}])
- end,
- case Res of
- {error, _Reason} -> self() ! {tcp_closed, Socket};
- ok -> ok
- end.
-
-%% Data processing for connectors directly generating xmlelement in
-%% Erlang data structure.
-%% WARNING: Shaper does not work with Erlang data structure.
-process_data([], State) ->
- activate_socket(State), State;
-process_data([Element | Els],
- #state{c2s_pid = C2SPid} = State)
- when element(1, Element) == xmlel;
- element(1, Element) == xmlstreamstart;
- element(1, Element) == xmlstreamelement;
- element(1, Element) == xmlstreamend ->
- if C2SPid == undefined -> State;
- true ->
- catch p1_fsm:send_event(C2SPid,
- element_wrapper(Element)),
- process_data(Els, State)
- end;
-%% Data processing for connectors receivind data as string.
-process_data(Data,
- #state{xml_stream_state = XMLStreamState,
- shaper_state = ShaperState, c2s_pid = C2SPid} =
- State) ->
- ?DEBUG("Received XML on stream = ~p", [(Data)]),
- XMLStreamState1 = case XMLStreamState of
- undefined ->
- XMLStreamState;
- _ ->
- fxml_stream:parse(XMLStreamState, Data)
- end,
- {NewShaperState, Pause} = shaper:update(ShaperState, byte_size(Data)),
- if
- C2SPid == undefined ->
- ok;
- Pause > 0 ->
- erlang:start_timer(Pause, self(), activate);
- true ->
- activate_socket(State)
- end,
- State#state{xml_stream_state = XMLStreamState1,
- shaper_state = NewShaperState}.
-
-%% Element coming from XML parser are wrapped inside xmlstreamelement
-%% When we receive directly xmlelement tuple (from a socket module
-%% speaking directly Erlang XML), we wrap it inside the same
-%% xmlstreamelement coming from the XML parser.
-element_wrapper(XMLElement)
- when element(1, XMLElement) == xmlel ->
- {xmlstreamelement, XMLElement};
-element_wrapper(Element) -> Element.
-
-close_stream(undefined) -> ok;
-close_stream(XMLStreamState) ->
- fxml_stream:close(XMLStreamState).
-
-reset_parser(#state{xml_stream_state = undefined} = State) ->
- State;
-reset_parser(#state{c2s_pid = C2SPid,
- max_stanza_size = MaxStanzaSize,
- xml_stream_state = XMLStreamState}
- = State) ->
- NewStreamState = try fxml_stream:reset(XMLStreamState)
- catch error:_ ->
- close_stream(XMLStreamState),
- case C2SPid of
- undefined ->
- undefined;
- _ ->
- fxml_stream:new(C2SPid, MaxStanzaSize)
- end
- end,
- State#state{xml_stream_state = NewStreamState}.
-
-do_send(State, Data) ->
- (State#state.sock_mod):send(State#state.socket, Data).
-
-do_call(Pid, Msg) ->
- try ?GEN_SERVER:call(Pid, Msg) of
- Res -> Res
- catch _:{timeout, _} ->
- {error, timeout};
- _:_ ->
- {error, einval}
- end.
-
-hibernate_timeout() ->
- ejabberd_config:get_option(receiver_hibernate, timer:seconds(90)).
-
--spec opt_type(receiver_hibernate) -> fun((pos_integer() | hibernate) ->
- pos_integer() | hibernate);
- (atom()) -> [atom()].
-opt_type(receiver_hibernate) ->
- fun(I) when is_integer(I), I>0 -> I;
- (hibernate) -> hibernate
- end;
-opt_type(_) ->
- [receiver_hibernate].
diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl
index ae81f739..ecd632cb 100644
--- a/src/ejabberd_s2s_in.erl
+++ b/src/ejabberd_s2s_in.erl
@@ -21,9 +21,9 @@
%%%-------------------------------------------------------------------
-module(ejabberd_s2s_in).
-behaviour(xmpp_stream_in).
--behaviour(ejabberd_socket).
+-behaviour(xmpp_socket).
-%% ejabberd_socket callbacks
+%% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0]).
%% ejabberd_listener callbacks
-export([listen_opt_type/1]).
@@ -180,31 +180,29 @@ handle_stream_established(State) ->
set_idle_timeout(State#{established => true}).
handle_auth_success(RServer, Mech, _AuthModule,
- #{sockmod := SockMod,
- socket := Socket, ip := IP,
+ #{socket := Socket, ip := IP,
auth_domains := AuthDomains,
server_host := ServerHost,
lserver := LServer} = State) ->
?INFO_MSG("(~s) Accepted inbound s2s ~s authentication ~s -> ~s (~s)",
- [SockMod:pp(Socket), Mech, RServer, LServer,
+ [xmpp_socket:pp(Socket), Mech, RServer, LServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
State1 = case ejabberd_s2s:allow_host(ServerHost, RServer) of
true ->
AuthDomains1 = sets:add_element(RServer, AuthDomains),
- change_shaper(State, RServer),
- State#{auth_domains => AuthDomains1};
+ State0 = change_shaper(State, RServer),
+ State0#{auth_domains => AuthDomains1};
false ->
State
end,
ejabberd_hooks:run_fold(s2s_in_auth_result, ServerHost, State1, [true, RServer]).
handle_auth_failure(RServer, Mech, Reason,
- #{sockmod := SockMod,
- socket := Socket, ip := IP,
+ #{socket := Socket, ip := IP,
server_host := ServerHost,
lserver := LServer} = State) ->
?INFO_MSG("(~s) Failed inbound s2s ~s authentication ~s -> ~s (~s): ~s",
- [SockMod:pp(Socket), Mech, RServer, LServer,
+ [xmpp_socket:pp(Socket), Mech, RServer, LServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP)), Reason]),
ejabberd_hooks:run_fold(s2s_in_auth_result,
ServerHost, State, [false, RServer]).
@@ -286,11 +284,11 @@ handle_info(Info, #{server_host := LServer} = State) ->
ejabberd_hooks:run_fold(s2s_in_handle_info, LServer, State, [Info]).
terminate(Reason, #{auth_domains := AuthDomains,
- sockmod := SockMod, socket := Socket} = State) ->
+ socket := Socket} = State) ->
case maps:get(stop_reason, State, undefined) of
{tls, _} = Err ->
?WARNING_MSG("(~s) Failed to secure inbound s2s connection: ~s",
- [SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]);
+ [xmpp_socket:pp(Socket), xmpp_stream_in:format_error(Err)]);
_ ->
ok
end,
@@ -340,7 +338,7 @@ set_idle_timeout(#{server_host := LServer,
set_idle_timeout(State) ->
State.
--spec change_shaper(state(), binary()) -> ok.
+-spec change_shaper(state(), binary()) -> state().
change_shaper(#{shaper := ShaperName, server_host := ServerHost} = State,
RServer) ->
Shaper = acl:match_rule(ServerHost, ShaperName, jid:make(RServer)),
diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl
index fea5d816..0ece804a 100644
--- a/src/ejabberd_s2s_out.erl
+++ b/src/ejabberd_s2s_out.erl
@@ -61,12 +61,12 @@ start(From, To, Opts) ->
Res -> Res
end;
_ ->
- xmpp_stream_out:start(?MODULE, [ejabberd_socket, From, To, Opts],
+ xmpp_stream_out:start(?MODULE, [xmpp_socket, From, To, Opts],
ejabberd_config:fsm_limit_opts([]))
end.
start_link(From, To, Opts) ->
- xmpp_stream_out:start_link(?MODULE, [ejabberd_socket, From, To, Opts],
+ xmpp_stream_out:start_link(?MODULE, [xmpp_socket, From, To, Opts],
ejabberd_config:fsm_limit_opts([])).
-spec connect(pid()) -> ok.
@@ -210,24 +210,22 @@ dns_retries(#{server := LServer}) ->
dns_timeout(#{server := LServer}) ->
ejabberd_config:get_option({s2s_dns_timeout, LServer}, timer:seconds(10)).
-handle_auth_success(Mech, #{sockmod := SockMod,
- socket := Socket, ip := IP,
+handle_auth_success(Mech, #{socket := Socket, ip := IP,
remote_server := RServer,
server_host := ServerHost,
server := LServer} = State) ->
?INFO_MSG("(~s) Accepted outbound s2s ~s authentication ~s -> ~s (~s)",
- [SockMod:pp(Socket), Mech, LServer, RServer,
+ [xmpp_socket:pp(Socket), Mech, LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [true]).
handle_auth_failure(Mech, Reason,
- #{sockmod := SockMod,
- socket := Socket, ip := IP,
+ #{socket := Socket, ip := IP,
remote_server := RServer,
server_host := ServerHost,
server := LServer} = State) ->
?INFO_MSG("(~s) Failed outbound s2s ~s authentication ~s -> ~s (~s): ~s",
- [SockMod:pp(Socket), Mech, LServer, RServer,
+ [xmpp_socket:pp(Socket), Mech, LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP)),
xmpp_stream_out:format_error(Reason)]),
ejabberd_hooks:run_fold(s2s_out_auth_result, ServerHost, State, [{false, Reason}]).
diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl
index 7b5f945d..3fe176a3 100644
--- a/src/ejabberd_service.erl
+++ b/src/ejabberd_service.erl
@@ -21,11 +21,11 @@
%%%-------------------------------------------------------------------
-module(ejabberd_service).
-behaviour(xmpp_stream_in).
--behaviour(ejabberd_socket).
+-behaviour(xmpp_socket).
-protocol({xep, 114, '1.6'}).
-%% ejabberd_socket callbacks
+%% xmpp_socket callbacks
-export([start/2, start_link/2, socket_type/0, close/1, close/2]).
%% ejabberd_listener callbacks
-export([listen_opt_type/1, transform_listen_option/2]).
@@ -100,8 +100,8 @@ init([State, Opts]) ->
false -> [compression_none | TLSOpts1];
true -> TLSOpts1
end,
- xmpp_stream_in:change_shaper(State, Shaper),
- State1 = State#{access => Access,
+ State1 = xmpp_stream_in:change_shaper(State, Shaper),
+ State2 = State1#{access => Access,
xmlns => ?NS_COMPONENT,
lang => ?MYLANG,
server => ?MYNAME,
@@ -109,7 +109,7 @@ init([State, Opts]) ->
stream_version => undefined,
tls_options => TLSOpts,
check_from => CheckFrom},
- ejabberd_hooks:run_fold(component_init, {ok, State1}, [Opts]).
+ ejabberd_hooks:run_fold(component_init, {ok, State2}, [Opts]).
handle_stream_start(_StreamStart,
#{remote_server := RemoteServer,
@@ -135,8 +135,7 @@ handle_stream_start(_StreamStart,
end.
get_password_fun(#{remote_server := RemoteServer,
- socket := Socket, sockmod := SockMod,
- ip := IP,
+ socket := Socket, ip := IP,
host_opts := HostOpts}) ->
fun(_) ->
case dict:find(RemoteServer, HostOpts) of
@@ -145,7 +144,7 @@ get_password_fun(#{remote_server := RemoteServer,
error ->
?INFO_MSG("(~s) Domain ~s is unconfigured for "
"external component from ~s",
- [SockMod:pp(Socket), RemoteServer,
+ [xmpp_socket:pp(Socket), RemoteServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
{false, undefined}
end
@@ -153,11 +152,10 @@ get_password_fun(#{remote_server := RemoteServer,
handle_auth_success(_, Mech, _,
#{remote_server := RemoteServer, host_opts := HostOpts,
- socket := Socket, sockmod := SockMod,
- ip := IP} = State) ->
+ socket := Socket, ip := IP} = State) ->
?INFO_MSG("(~s) Accepted external component ~s authentication "
"for ~s from ~s",
- [SockMod:pp(Socket), Mech, RemoteServer,
+ [xmpp_socket:pp(Socket), Mech, RemoteServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
lists:foreach(
fun (H) ->
@@ -168,11 +166,10 @@ handle_auth_success(_, Mech, _,
handle_auth_failure(_, Mech, Reason,
#{remote_server := RemoteServer,
- sockmod := SockMod,
socket := Socket, ip := IP} = State) ->
?INFO_MSG("(~s) Failed external component ~s authentication "
"for ~s from ~s: ~s",
- [SockMod:pp(Socket), Mech, RemoteServer,
+ [xmpp_socket:pp(Socket), Mech, RemoteServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP)),
Reason]),
State.
diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl
deleted file mode 100644
index 9953a76a..00000000
--- a/src/ejabberd_socket.erl
+++ /dev/null
@@ -1,293 +0,0 @@
-%%%----------------------------------------------------------------------
-%%% File : ejabberd_socket.erl
-%%% Author : Alexey Shchepin <alexey@process-one.net>
-%%% Purpose : Socket with zlib and TLS support library
-%%% Created : 23 Aug 2006 by Alexey Shchepin <alexey@process-one.net>
-%%%
-%%%
-%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
-%%%
-%%% This program is free software; you can redistribute it and/or
-%%% modify it under the terms of the GNU General Public License as
-%%% published by the Free Software Foundation; either version 2 of the
-%%% License, or (at your option) any later version.
-%%%
-%%% This program is distributed in the hope that it will be useful,
-%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
-%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-%%% General Public License for more details.
-%%%
-%%% You should have received a copy of the GNU General Public License along
-%%% with this program; if not, write to the Free Software Foundation, Inc.,
-%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-%%%
-%%%----------------------------------------------------------------------
-
--module(ejabberd_socket).
-
--author('alexey@process-one.net').
-
-%% API
--export([start/4,
- connect/3,
- connect/4,
- connect/5,
- starttls/2,
- compress/1,
- compress/2,
- reset_stream/1,
- send_element/2,
- send_header/2,
- send_trailer/1,
- send/2,
- send_xml/2,
- change_shaper/2,
- monitor/1,
- get_sockmod/1,
- get_transport/1,
- get_peer_certificate/2,
- get_verify_result/1,
- close/1,
- pp/1,
- sockname/1, peername/1]).
-
--include("ejabberd.hrl").
--include("xmpp.hrl").
--include("logger.hrl").
-
--type sockmod() :: ejabberd_bosh |
- ejabberd_http_ws |
- gen_tcp | fast_tls | ezlib.
--type receiver() :: pid () | atom().
--type socket() :: pid() | inet:socket() |
- fast_tls:tls_socket() |
- ezlib:zlib_socket() |
- ejabberd_bosh:bosh_socket() |
- ejabberd_http_ws:ws_socket().
-
--record(socket_state, {sockmod = gen_tcp :: sockmod(),
- socket = self() :: socket(),
- receiver = self() :: receiver()}).
-
--type socket_state() :: #socket_state{}.
-
--export_type([socket/0, socket_state/0, sockmod/0]).
-
--callback start({module(), socket_state()},
- [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
--callback start_link({module(), socket_state()},
- [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
--callback socket_type() -> xml_stream | independent | raw.
-
--define(is_http_socket(S),
- (S#socket_state.sockmod == ejabberd_bosh orelse
- S#socket_state.sockmod == ejabberd_http_ws)).
-
-%%====================================================================
-%% API
-%%====================================================================
--spec start(atom(), sockmod(), socket(), [proplists:property()])
- -> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore.
-start(Module, SockMod, Socket, Opts) ->
- case Module:socket_type() of
- independent -> {ok, independent};
- xml_stream ->
- MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity),
- {ReceiverMod, Receiver, RecRef} =
- try SockMod:custom_receiver(Socket) of
- {receiver, RecMod, RecPid} ->
- {RecMod, RecPid, RecMod}
- catch _:_ ->
- RecPid = ejabberd_receiver:start(
- Socket, SockMod, none, MaxStanzaSize),
- {ejabberd_receiver, RecPid, RecPid}
- end,
- SocketData = #socket_state{sockmod = SockMod,
- socket = Socket, receiver = RecRef},
- case Module:start({?MODULE, SocketData}, Opts) of
- {ok, Pid} ->
- case SockMod:controlling_process(Socket, Receiver) of
- ok ->
- ReceiverMod:become_controller(Receiver, Pid),
- {ok, Receiver};
- Err ->
- SockMod:close(Socket),
- Err
- end;
- Err ->
- SockMod:close(Socket),
- case ReceiverMod of
- ejabberd_receiver -> ReceiverMod:close(Receiver);
- _ -> ok
- end,
- Err
- end;
- raw ->
- case Module:start({SockMod, Socket}, Opts) of
- {ok, Pid} ->
- case SockMod:controlling_process(Socket, Pid) of
- ok ->
- {ok, Pid};
- {error, _} = Err ->
- SockMod:close(Socket),
- Err
- end;
- Err ->
- SockMod:close(Socket),
- Err
- end
- end.
-
-connect(Addr, Port, Opts) ->
- connect(Addr, Port, Opts, infinity, self()).
-
-connect(Addr, Port, Opts, Timeout) ->
- connect(Addr, Port, Opts, Timeout, self()).
-
-connect(Addr, Port, Opts, Timeout, Owner) ->
- case gen_tcp:connect(Addr, Port, Opts, Timeout) of
- {ok, Socket} ->
- Receiver = ejabberd_receiver:start(Socket, gen_tcp,
- none),
- SocketData = #socket_state{sockmod = gen_tcp,
- socket = Socket, receiver = Receiver},
- case gen_tcp:controlling_process(Socket, Receiver) of
- ok ->
- ejabberd_receiver:become_controller(Receiver, Owner),
- {ok, SocketData};
- {error, _Reason} = Error -> gen_tcp:close(Socket), Error
- end;
- {error, _Reason} = Error -> Error
- end.
-
-starttls(#socket_state{socket = Socket,
- receiver = Receiver} = SocketData, TLSOpts) ->
- case fast_tls:tcp_to_tls(Socket, TLSOpts) of
- {ok, TLSSocket} ->
- case ejabberd_receiver:starttls(Receiver, TLSSocket) of
- ok ->
- {ok, SocketData#socket_state{socket = TLSSocket,
- sockmod = fast_tls}};
- {error, _} = Err ->
- Err
- end;
- {error, _} = Err ->
- Err
- end.
-
-compress(SocketData) -> compress(SocketData, undefined).
-
-compress(SocketData, Data) ->
- case ejabberd_receiver:compress(SocketData#socket_state.receiver, Data) of
- {ok, ZlibSocket} ->
- {ok, SocketData#socket_state{socket = ZlibSocket, sockmod = ezlib}};
- Err ->
- ?ERROR_MSG("compress failed: ~p", [Err]),
- Err
- end.
-
-reset_stream(SocketData)
- when is_pid(SocketData#socket_state.receiver) ->
- ejabberd_receiver:reset_stream(SocketData#socket_state.receiver);
-reset_stream(SocketData)
- when is_atom(SocketData#socket_state.receiver) ->
- (SocketData#socket_state.receiver):reset_stream(SocketData#socket_state.socket).
-
--spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
-send_element(SocketData, El) when ?is_http_socket(SocketData) ->
- send_xml(SocketData, {xmlstreamelement, El});
-send_element(SocketData, El) ->
- send(SocketData, fxml:element_to_binary(El)).
-
--spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
-send_header(SocketData, El) when ?is_http_socket(SocketData) ->
- send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs});
-send_header(SocketData, El) ->
- send(SocketData, fxml:element_to_header(El)).
-
--spec send_trailer(socket_state()) -> ok | {error, inet:posix()}.
-send_trailer(SocketData) when ?is_http_socket(SocketData) ->
- send_xml(SocketData, {xmlstreamend, <<"stream:stream">>});
-send_trailer(SocketData) ->
- send(SocketData, <<"</stream:stream>">>).
-
--spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}.
-send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) ->
- ?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]),
- try SockMod:send(Socket, Data) of
- {error, einval} -> {error, closed};
- Result -> Result
- catch _:badarg ->
- %% Some modules throw badarg exceptions on closed sockets
- %% TODO: their code should be improved
- {error, closed}
- end.
-
--spec send_xml(socket_state(),
- {xmlstreamelement, fxml:xmlel()} |
- {xmlstreamstart, binary(), [{binary(), binary()}]} |
- {xmlstreamend, binary()} |
- {xmlstreamraw, iodata()}) -> term().
-send_xml(SocketData, El) ->
- (SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El).
-
-change_shaper(SocketData, Shaper)
- when is_pid(SocketData#socket_state.receiver) ->
- ejabberd_receiver:change_shaper(SocketData#socket_state.receiver,
- Shaper);
-change_shaper(SocketData, Shaper)
- when is_atom(SocketData#socket_state.receiver) ->
- (SocketData#socket_state.receiver):change_shaper(SocketData#socket_state.socket,
- Shaper).
-
-monitor(SocketData)
- when is_pid(SocketData#socket_state.receiver) ->
- erlang:monitor(process,
- SocketData#socket_state.receiver);
-monitor(SocketData)
- when is_atom(SocketData#socket_state.receiver) ->
- (SocketData#socket_state.receiver):monitor(SocketData#socket_state.socket).
-
-get_sockmod(SocketData) ->
- SocketData#socket_state.sockmod.
-
-get_transport(#socket_state{sockmod = SockMod,
- socket = Socket}) ->
- case SockMod of
- gen_tcp -> tcp;
- fast_tls -> tls;
- ezlib ->
- case ezlib:get_sockmod(Socket) of
- gen_tcp -> tcp_zlib;
- fast_tls -> tls_zlib
- end;
- ejabberd_bosh -> http_bind;
- ejabberd_http_ws -> websocket
- end.
-
-get_peer_certificate(SocketData, Type) ->
- fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type).
-
-get_verify_result(SocketData) ->
- fast_tls:get_verify_result(SocketData#socket_state.socket).
-
-close(SocketData) ->
- ejabberd_receiver:close(SocketData#socket_state.receiver).
-
-sockname(#socket_state{sockmod = SockMod,
- socket = Socket}) ->
- case SockMod of
- gen_tcp -> inet:sockname(Socket);
- _ -> SockMod:sockname(Socket)
- end.
-
-peername(#socket_state{sockmod = SockMod,
- socket = Socket}) ->
- case SockMod of
- gen_tcp -> inet:peername(Socket);
- _ -> SockMod:peername(Socket)
- end.
-
-pp(#socket_state{receiver = Receiver} = State) ->
- Transport = get_transport(State),
- io_lib:format("~s|~w", [Transport, Receiver]).
diff --git a/src/mod_s2s_dialback.erl b/src/mod_s2s_dialback.erl
index 1bf04af3..917bb71e 100644
--- a/src/mod_s2s_dialback.erl
+++ b/src/mod_s2s_dialback.erl
@@ -139,14 +139,13 @@ s2s_out_auth_result(#{db_verify := _} = State, _) ->
%% in section 2.1.2, step 2
{stop, send_verify_request(State)};
s2s_out_auth_result(#{db_enabled := true,
- sockmod := SockMod,
socket := Socket, ip := IP,
server := LServer,
remote_server := RServer} = State, {false, _}) ->
%% SASL authentication has failed, retrying with dialback
%% Sending dialback request, section 2.1.1, step 1
?INFO_MSG("(~s) Retrying with s2s dialback authentication: ~s -> ~s (~s)",
- [SockMod:pp(Socket), LServer, RServer,
+ [xmpp_socket:pp(Socket), LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
State1 = maps:remove(stop_reason, State#{on_route => queue}),
{stop, send_db_request(State1)};
@@ -159,7 +158,6 @@ s2s_out_downgraded(#{db_verify := _} = State, _) ->
%% section 2.1.2, step 2
{stop, send_verify_request(State)};
s2s_out_downgraded(#{db_enabled := true,
- sockmod := SockMod,
socket := Socket, ip := IP,
server := LServer,
remote_server := RServer} = State, _) ->
@@ -167,7 +165,7 @@ s2s_out_downgraded(#{db_enabled := true,
%% section 2.1.1, step 1
?INFO_MSG("(~s) Trying s2s dialback authentication with "
"non-RFC compliant server: ~s -> ~s (~s)",
- [SockMod:pp(Socket), LServer, RServer,
+ [xmpp_socket:pp(Socket), LServer, RServer,
ejabberd_config:may_hide_data(misc:ip_to_list(IP))]),
{stop, send_db_request(State)};
s2s_out_downgraded(State, _) ->
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index 658bd504..c533a2ba 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -389,7 +389,7 @@ handle_a(State, #sm_a{h = H}) ->
resend_rack(State1).
-spec handle_resume(state(), sm_resume()) -> {ok, state()} | {error, state()}.
-handle_resume(#{user := User, lserver := LServer, sockmod := SockMod,
+handle_resume(#{user := User, lserver := LServer,
lang := Lang, socket := Socket} = State,
#sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) ->
R = case inherit_session_state(State, PrevID) of
@@ -416,7 +416,7 @@ handle_resume(#{user := User, lserver := LServer, sockmod := SockMod,
State4 = send(State3, #sm_r{xmlns = AttrXmlns}),
State5 = ejabberd_hooks:run_fold(c2s_session_resumed, LServer, State4, []),
?INFO_MSG("(~s) Resumed session for ~s",
- [SockMod:pp(Socket), jid:encode(JID)]),
+ [xmpp_socket:pp(Socket), jid:encode(JID)]),
{ok, State5};
{error, El, Msg} ->
?INFO_MSG("Cannot resume session for ~s@~s: ~s",
diff --git a/src/xmpp_socket.erl b/src/xmpp_socket.erl
new file mode 100644
index 00000000..1ee02131
--- /dev/null
+++ b/src/xmpp_socket.erl
@@ -0,0 +1,386 @@
+%%%----------------------------------------------------------------------
+%%% File : xmpp_socket.erl
+%%% Author : Alexey Shchepin <alexey@process-one.net>
+%%% Purpose : Socket with zlib and TLS support library
+%%% Created : 23 Aug 2006 by Alexey Shchepin <alexey@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2017 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%----------------------------------------------------------------------
+
+-module(xmpp_socket).
+
+-author('alexey@process-one.net').
+
+%% API
+-export([start/4,
+ connect/3,
+ connect/4,
+ connect/5,
+ starttls/2,
+ compress/1,
+ compress/2,
+ reset_stream/1,
+ send_element/2,
+ send_header/2,
+ send_trailer/1,
+ send/2,
+ send_xml/2,
+ recv/2,
+ activate/1,
+ change_shaper/2,
+ monitor/1,
+ get_sockmod/1,
+ get_transport/1,
+ get_peer_certificate/2,
+ get_verify_result/1,
+ close/1,
+ pp/1,
+ sockname/1, peername/1]).
+
+-include("ejabberd.hrl").
+-include("xmpp.hrl").
+-include("logger.hrl").
+
+-type sockmod() :: ejabberd_bosh |
+ ejabberd_http_ws |
+ gen_tcp | fast_tls | ezlib.
+-type receiver() :: atom().
+-type socket() :: pid() | inet:socket() |
+ fast_tls:tls_socket() |
+ ezlib:zlib_socket() |
+ ejabberd_bosh:bosh_socket() |
+ ejabberd_http_ws:ws_socket().
+
+-record(socket_state, {sockmod = gen_tcp :: sockmod(),
+ socket :: socket(),
+ max_stanza_size = infinity :: timeout(),
+ xml_stream :: undefined | fxml_stream:xml_stream_state(),
+ shaper = none :: none | shaper:shaper(),
+ receiver :: receiver()}).
+
+-type socket_state() :: #socket_state{}.
+
+-export_type([socket/0, socket_state/0, sockmod/0]).
+
+-callback start({module(), socket_state()},
+ [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
+-callback start_link({module(), socket_state()},
+ [proplists:property()]) -> {ok, pid()} | {error, term()} | ignore.
+-callback socket_type() -> xml_stream | independent | raw.
+
+-define(is_http_socket(S),
+ (S#socket_state.sockmod == ejabberd_bosh orelse
+ S#socket_state.sockmod == ejabberd_http_ws)).
+
+%%====================================================================
+%% API
+%%====================================================================
+-spec start(atom(), sockmod(), socket(), [proplists:property()])
+ -> {ok, pid() | independent} | {error, inet:posix() | any()} | ignore.
+start(Module, SockMod, Socket, Opts) ->
+ try
+ case Module:socket_type() of
+ independent ->
+ {ok, independent};
+ xml_stream ->
+ MaxStanzaSize = proplists:get_value(max_stanza_size, Opts, infinity),
+ Receiver = proplists:get_value(receiver, Opts),
+ SocketData = #socket_state{sockmod = SockMod,
+ socket = Socket,
+ receiver = Receiver,
+ max_stanza_size = MaxStanzaSize},
+ {ok, Pid} = Module:start({?MODULE, SocketData}, Opts),
+ Receiver1 = if is_pid(Receiver) -> Receiver;
+ true -> Pid
+ end,
+ ok = controlling_process(SocketData, Receiver1),
+ ok = become_controller(SocketData, Pid),
+ {ok, Receiver1};
+ raw ->
+ {ok, Pid} = Module:start({SockMod, Socket}, Opts),
+ ok = SockMod:controlling_process(Socket, Pid),
+ {ok, Pid}
+ end
+ catch _:{badmatch, {error, _} = Err} ->
+ SockMod:close(Socket),
+ Err
+ end.
+
+connect(Addr, Port, Opts) ->
+ connect(Addr, Port, Opts, infinity, self()).
+
+connect(Addr, Port, Opts, Timeout) ->
+ connect(Addr, Port, Opts, Timeout, self()).
+
+connect(Addr, Port, Opts, Timeout, Owner) ->
+ case gen_tcp:connect(Addr, Port, Opts, Timeout) of
+ {ok, Socket} ->
+ SocketData = #socket_state{sockmod = gen_tcp, socket = Socket},
+ case controlling_process(SocketData, Owner) of
+ ok ->
+ activate_after(Socket, Owner, 0),
+ {ok, SocketData};
+ {error, _Reason} = Error ->
+ gen_tcp:close(Socket),
+ Error
+ end;
+ {error, _Reason} = Error ->
+ Error
+ end.
+
+starttls(#socket_state{socket = Socket,
+ receiver = undefined} = SocketData, TLSOpts) ->
+ case fast_tls:tcp_to_tls(Socket, TLSOpts) of
+ {ok, TLSSocket} ->
+ SocketData1 = SocketData#socket_state{socket = TLSSocket,
+ sockmod = fast_tls},
+ SocketData2 = reset_stream(SocketData1),
+ case fast_tls:recv_data(TLSSocket, <<>>) of
+ {ok, TLSData} ->
+ parse(SocketData2, TLSData);
+ {error, _} = Err ->
+ Err
+ end;
+ {error, _} = Err ->
+ Err
+ end.
+
+compress(SocketData) -> compress(SocketData, undefined).
+
+compress(#socket_state{receiver = undefined,
+ sockmod = SockMod,
+ socket = Socket} = SocketData, Data) ->
+ ejabberd:start_app(ezlib),
+ {ok, ZlibSocket} = ezlib:enable_zlib(SockMod, Socket),
+ case Data of
+ undefined -> ok;
+ _ -> send(SocketData, Data)
+ end,
+ SocketData1 = SocketData#socket_state{socket = ZlibSocket,
+ sockmod = ezlib},
+ SocketData2 = reset_stream(SocketData1),
+ case ezlib:recv_data(ZlibSocket, <<"">>) of
+ {ok, ZlibData} ->
+ parse(SocketData2, ZlibData);
+ {error, _} = Err ->
+ Err
+ end.
+
+reset_stream(#socket_state{xml_stream = XMLStream,
+ max_stanza_size = MaxStanzaSize} = SocketData)
+ when XMLStream /= undefined ->
+ XMLStream1 = try fxml_stream:reset(XMLStream)
+ catch error:_ ->
+ close_stream(XMLStream),
+ fxml_stream:new(self(), MaxStanzaSize)
+ end,
+ SocketData#socket_state{xml_stream = XMLStream1};
+reset_stream(#socket_state{sockmod = SockMod, socket = Socket} = SocketData) ->
+ Socket1 = SockMod:reset_stream(Socket),
+ SocketData#socket_state{socket = Socket1}.
+
+-spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
+send_element(SocketData, El) when ?is_http_socket(SocketData) ->
+ send_xml(SocketData, {xmlstreamelement, El});
+send_element(SocketData, El) ->
+ send(SocketData, fxml:element_to_binary(El)).
+
+-spec send_header(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}.
+send_header(SocketData, El) when ?is_http_socket(SocketData) ->
+ send_xml(SocketData, {xmlstreamstart, El#xmlel.name, El#xmlel.attrs});
+send_header(SocketData, El) ->
+ send(SocketData, fxml:element_to_header(El)).
+
+-spec send_trailer(socket_state()) -> ok | {error, inet:posix()}.
+send_trailer(SocketData) when ?is_http_socket(SocketData) ->
+ send_xml(SocketData, {xmlstreamend, <<"stream:stream">>});
+send_trailer(SocketData) ->
+ send(SocketData, <<"</stream:stream>">>).
+
+-spec send(socket_state(), iodata()) -> ok | {error, closed | inet:posix()}.
+send(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) ->
+ ?DEBUG("(~s) Send XML on stream = ~p", [pp(SocketData), Data]),
+ try SockMod:send(Socket, Data) of
+ {error, einval} -> {error, closed};
+ Result -> Result
+ catch _:badarg ->
+ %% Some modules throw badarg exceptions on closed sockets
+ %% TODO: their code should be improved
+ {error, closed}
+ end.
+
+-spec send_xml(socket_state(),
+ {xmlstreamelement, fxml:xmlel()} |
+ {xmlstreamstart, binary(), [{binary(), binary()}]} |
+ {xmlstreamend, binary()} |
+ {xmlstreamraw, iodata()}) -> term().
+send_xml(SocketData, El) ->
+ (SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, El).
+
+recv(#socket_state{xml_stream = undefined} = SocketData, Data) ->
+ XMLStream = fxml_stream:new(self(), SocketData#socket_state.max_stanza_size),
+ recv(SocketData#socket_state{xml_stream = XMLStream}, Data);
+recv(#socket_state{sockmod = SockMod, socket = Socket} = SocketData, Data) ->
+ case SockMod of
+ fast_tls ->
+ case fast_tls:recv_data(Socket, Data) of
+ {ok, TLSData} ->
+ parse(SocketData, TLSData);
+ {error, _} = Err ->
+ Err
+ end;
+ ezlib ->
+ case ezlib:recv_data(Socket, Data) of
+ {ok, ZlibData} ->
+ parse(SocketData, ZlibData);
+ {error, _} = Err ->
+ Err
+ end;
+ _ ->
+ parse(SocketData, Data)
+ end.
+
+change_shaper(#socket_state{receiver = undefined} = SocketData, Shaper) ->
+ ShaperState = shaper:new(Shaper),
+ SocketData#socket_state{shaper = ShaperState};
+change_shaper(#socket_state{sockmod = SockMod,
+ socket = Socket} = SocketData, Shaper) ->
+ SockMod:change_shaper(Socket, Shaper),
+ SocketData.
+
+monitor(#socket_state{receiver = undefined}) ->
+ make_ref();
+monitor(#socket_state{sockmod = SockMod, socket = Socket}) ->
+ SockMod:monitor(Socket).
+
+controlling_process(#socket_state{sockmod = SockMod,
+ socket = Socket}, Pid) ->
+ SockMod:controlling_process(Socket, Pid).
+
+become_controller(#socket_state{receiver = Receiver,
+ sockmod = SockMod,
+ socket = Socket}, Pid) ->
+ if is_pid(Receiver) ->
+ SockMod:become_controller(Receiver, Pid);
+ true ->
+ activate_after(Socket, Pid, 0)
+ end.
+
+get_sockmod(SocketData) ->
+ SocketData#socket_state.sockmod.
+
+get_transport(#socket_state{sockmod = SockMod,
+ socket = Socket}) ->
+ case SockMod of
+ gen_tcp -> tcp;
+ fast_tls -> tls;
+ ezlib ->
+ case ezlib:get_sockmod(Socket) of
+ gen_tcp -> tcp_zlib;
+ fast_tls -> tls_zlib
+ end;
+ ejabberd_bosh -> http_bind;
+ ejabberd_http_ws -> websocket
+ end.
+
+get_peer_certificate(SocketData, Type) ->
+ fast_tls:get_peer_certificate(SocketData#socket_state.socket, Type).
+
+get_verify_result(SocketData) ->
+ fast_tls:get_verify_result(SocketData#socket_state.socket).
+
+close(#socket_state{sockmod = SockMod, socket = Socket}) ->
+ SockMod:close(Socket).
+
+sockname(#socket_state{sockmod = SockMod,
+ socket = Socket}) ->
+ case SockMod of
+ gen_tcp -> inet:sockname(Socket);
+ _ -> SockMod:sockname(Socket)
+ end.
+
+peername(#socket_state{sockmod = SockMod,
+ socket = Socket}) ->
+ case SockMod of
+ gen_tcp -> inet:peername(Socket);
+ _ -> SockMod:peername(Socket)
+ end.
+
+activate(#socket_state{sockmod = SockMod, socket = Socket}) ->
+ case SockMod of
+ gen_tcp -> inet:setopts(Socket, [{active, once}]);
+ _ -> SockMod:setopts(Socket, [{active, once}])
+ end.
+
+activate_after(Socket, Pid, Pause) ->
+ if Pause > 0 ->
+ erlang:send_after(Pause, Pid, {tcp, Socket, <<>>});
+ true ->
+ Pid ! {tcp, Socket, <<>>}
+ end,
+ ok.
+
+pp(#socket_state{receiver = Receiver} = State) ->
+ Transport = get_transport(State),
+ Receiver1 = case Receiver of
+ undefined -> self();
+ _ -> Receiver
+ end,
+ io_lib:format("~s|~w", [Transport, Receiver1]).
+
+parse(SocketData, Data) when Data == <<>>; Data == [] ->
+ case activate(SocketData) of
+ ok ->
+ {ok, SocketData};
+ {error, _} = Err ->
+ Err
+ end;
+parse(SocketData, [El | Els]) when is_record(El, xmlel) ->
+ self() ! {'$gen_event', {xmlstreamelement, El}},
+ parse(SocketData, Els);
+parse(SocketData, [El | Els]) when
+ element(1, El) == xmlstreamstart;
+ element(1, El) == xmlstreamelement;
+ element(1, El) == xmlstreamend;
+ element(1, El) == xmlstreamerror ->
+ self() ! {'$gen_event', El},
+ parse(SocketData, Els);
+parse(#socket_state{xml_stream = XMLStream,
+ socket = Socket,
+ shaper = ShaperState} = SocketData, Data)
+ when is_binary(Data) ->
+ XMLStream1 = fxml_stream:parse(XMLStream, Data),
+ {ShaperState1, Pause} = shaper:update(ShaperState, byte_size(Data)),
+ Ret = if Pause > 0 ->
+ activate_after(Socket, self(), Pause);
+ true ->
+ activate(SocketData)
+ end,
+ case Ret of
+ ok ->
+ {ok, SocketData#socket_state{xml_stream = XMLStream1,
+ shaper = ShaperState1}};
+ {error, _} = Err ->
+ Err
+ end.
+
+close_stream(undefined) ->
+ ok;
+close_stream(XMLStream) ->
+ fxml_stream:close(XMLStream).
diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl
index c28bad8e..f41bed6c 100644
--- a/src/xmpp_stream_in.erl
+++ b/src/xmpp_stream_in.erl
@@ -177,16 +177,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
set_timeout(_, _) ->
erlang:error(badarg).
-get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner})
+get_transport(#{socket := Socket, owner := Owner})
when Owner == self() ->
- SockMod:get_transport(Socket);
+ xmpp_socket:get_transport(Socket);
get_transport(_) ->
erlang:error(badarg).
--spec change_shaper(state(), shaper:shaper()) -> ok.
-change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper)
+-spec change_shaper(state(), shaper:shaper()) -> state().
+change_shaper(#{socket := Socket, owner := Owner} = State, Shaper)
when Owner == self() ->
- SockMod:change_shaper(Socket, Shaper);
+ Socket1 = xmpp_socket:change_shaper(Socket, Shaper),
+ State#{socket => Socket1};
change_shaper(_, _) ->
erlang:error(badarg).
@@ -209,16 +210,15 @@ format_error(Err) ->
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
-init([Module, {SockMod, Socket}, Opts]) ->
+init([Module, {_SockMod, Socket}, Opts]) ->
Encrypted = proplists:get_bool(tls, Opts),
- SocketMonitor = SockMod:monitor(Socket),
- case SockMod:peername(Socket) of
+ SocketMonitor = xmpp_socket:monitor(Socket),
+ case xmpp_socket:peername(Socket) of
{ok, IP} ->
Time = p1_time_compat:monotonic_time(milli_seconds),
State = #{owner => self(),
mod => Module,
socket => Socket,
- sockmod => SockMod,
socket_monitor => SocketMonitor,
stream_timeout => {timer:seconds(30), Time},
stream_direction => in,
@@ -247,7 +247,7 @@ init([Module, {SockMod, Socket}, Opts]) ->
TLSOpts = try Module:tls_options(State1)
catch _:undef -> []
end,
- case SockMod:starttls(Socket, TLSOpts) of
+ case xmpp_socket:starttls(Socket, TLSOpts) of
{ok, TLSSocket} ->
State2 = State1#{socket => TLSSocket},
{_, State3, Timeout} = noreply(State2),
@@ -333,8 +333,7 @@ handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) ->
send_pkt(State1, Err)
end);
handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) ->
- %% TODO: find and fix this in fast_xml
- error_logger:error_msg("unexpected event from receiver: ~p; "
+ error_logger:error_msg("unexpected event from XML driver: ~p; "
"xmlstreamstart was expected", [El]),
State1 = send_header(State),
noreply(
@@ -379,6 +378,21 @@ handle_info(timeout, #{mod := Mod} = State) ->
handle_info({'DOWN', MRef, _Type, _Object, _Info},
#{socket_monitor := MRef} = State) ->
noreply(process_stream_end({socket, closed}, State));
+handle_info({tcp, _, Data}, #{socket := Socket} = State) ->
+ noreply(
+ case xmpp_socket:recv(Socket, Data) of
+ {ok, NewSocket} ->
+ State#{socket => NewSocket};
+ {error, Reason} when is_atom(Reason) ->
+ process_stream_end({socket, Reason}, State);
+ {error, Reason} ->
+ %% TODO: make fast_tls return atoms
+ process_stream_end({tls, Reason}, State)
+ end);
+handle_info({tcp_closed, _}, State) ->
+ handle_info({'$gen_event', closed}, State);
+handle_info({tcp_error, _, Reason}, State) ->
+ noreply(process_stream_end({socket, Reason}, State));
handle_info(Info, #{mod := Mod} = State) ->
noreply(try Mod:handle_info(Info, State)
catch _:undef -> State
@@ -698,14 +712,14 @@ process_compress(#compress{},
when Compressed or not Authenticated ->
send_pkt(State, #compress_failure{reason = 'setup-failed'});
process_compress(#compress{methods = HisMethods},
- #{socket := Socket, sockmod := SockMod, mod := Mod} = State) ->
+ #{socket := Socket, mod := Mod} = State) ->
MyMethods = try Mod:compress_methods(State)
catch _:undef -> []
end,
CommonMethods = lists_intersection(MyMethods, HisMethods),
case lists:member(<<"zlib">>, CommonMethods) of
true ->
- case SockMod:compress(Socket) of
+ case xmpp_socket:compress(Socket) of
{ok, ZlibSocket} ->
State1 = send_pkt(State, #compressed{}),
case is_disconnected(State1) of
@@ -730,13 +744,13 @@ process_compress(#compress{methods = HisMethods},
process_starttls(#{stream_encrypted := true} = State) ->
process_starttls_failure(already_encrypted, State);
process_starttls(#{socket := Socket,
- sockmod := SockMod, mod := Mod} = State) ->
+ mod := Mod} = State) ->
case is_starttls_available(State) of
true ->
TLSOpts = try Mod:tls_options(State)
catch _:undef -> []
end,
- case SockMod:starttls(Socket, TLSOpts) of
+ case xmpp_socket:starttls(Socket, TLSOpts) of
{ok, TLSSocket} ->
State1 = send_pkt(State, #starttls_proceed{}),
case is_disconnected(State1) of
@@ -814,12 +828,13 @@ process_sasl_result({error, Reason, User}, State) ->
-spec process_sasl_success([cyrsasl:sasl_property()], binary(), state()) -> state().
process_sasl_success(Props, ServerOut,
- #{socket := Socket, sockmod := SockMod,
+ #{socket := Socket,
mod := Mod, sasl_mech := Mech} = State) ->
User = identity(Props),
AuthModule = proplists:get_value(auth_module, Props),
- SockMod:reset_stream(Socket),
- State1 = send_pkt(State, #sasl_success{text = ServerOut}),
+ Socket1 = xmpp_socket:reset_stream(Socket),
+ State0 = State#{socket => Socket1},
+ State1 = send_pkt(State0, #sasl_success{text = ServerOut}),
case is_disconnected(State1) of
true -> State1;
false ->
@@ -1090,17 +1105,17 @@ send_trailer(State) ->
close_socket(State).
-spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}.
-socket_send(#{socket := Sock, sockmod := SockMod,
+socket_send(#{socket := Sock,
stream_state := StateName,
xmlns := NS,
stream_header_sent := true}, Pkt) ->
case Pkt of
trailer ->
- SockMod:send_trailer(Sock);
+ xmpp_socket:send_trailer(Sock);
#stream_start{} when StateName /= disconnected ->
- SockMod:send_header(Sock, xmpp:encode(Pkt));
+ xmpp_socket:send_header(Sock, xmpp:encode(Pkt));
_ when StateName /= disconnected ->
- SockMod:send_element(Sock, xmpp:encode(Pkt, NS));
+ xmpp_socket:send_element(Sock, xmpp:encode(Pkt, NS));
_ ->
{error, closed}
end;
@@ -1108,8 +1123,8 @@ socket_send(_, _) ->
{error, closed}.
-spec close_socket(state()) -> state().
-close_socket(#{sockmod := SockMod, socket := Socket} = State) ->
- SockMod:close(Socket),
+close_socket(#{socket := Socket} = State) ->
+ xmpp_socket:close(Socket),
State#{stream_timeout => infinity,
stream_state => disconnected}.
diff --git a/src/xmpp_stream_out.erl b/src/xmpp_stream_out.erl
index 8f4fa5c8..b5851b0b 100644
--- a/src/xmpp_stream_out.erl
+++ b/src/xmpp_stream_out.erl
@@ -191,16 +191,17 @@ set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() ->
set_timeout(_, _) ->
erlang:error(badarg).
-get_transport(#{sockmod := SockMod, socket := Socket, owner := Owner})
+get_transport(#{socket := Socket, owner := Owner})
when Owner == self() ->
- SockMod:get_transport(Socket);
+ xmpp_socket:get_transport(Socket);
get_transport(_) ->
erlang:error(badarg).
--spec change_shaper(state(), shaper:shaper()) -> ok.
-change_shaper(#{sockmod := SockMod, socket := Socket, owner := Owner}, Shaper)
+-spec change_shaper(state(), shaper:shaper()) -> state().
+change_shaper(#{socket := Socket, owner := Owner} = State, Shaper)
when Owner == self() ->
- SockMod:change_shaper(Socket, Shaper);
+ Socket1 = xmpp_socket:change_shaper(Socket, Shaper),
+ State#{socket => Socket1};
change_shaper(_, _) ->
erlang:error(badarg).
@@ -233,11 +234,10 @@ format_error(Err) ->
%%% gen_server callbacks
%%%===================================================================
-spec init(list()) -> {ok, state(), timeout()} | {stop, term()} | ignore.
-init([Mod, SockMod, From, To, Opts]) ->
+init([Mod, _SockMod, From, To, Opts]) ->
Time = p1_time_compat:monotonic_time(milli_seconds),
State = #{owner => self(),
mod => Mod,
- sockmod => SockMod,
server => From,
user => <<"">>,
resource => <<"">>,
@@ -272,7 +272,6 @@ handle_call(Call, From, #{mod := Mod} = State) ->
-spec handle_cast(term(), state()) -> noreply().
handle_cast(connect, #{remote_server := RemoteServer,
- sockmod := SockMod,
stream_state := connecting} = State) ->
noreply(
case idna_to_ascii(RemoteServer) of
@@ -283,7 +282,7 @@ handle_cast(connect, #{remote_server := RemoteServer,
{ok, AddrPorts} ->
case connect(AddrPorts, State) of
{ok, Socket, {Addr, Port, Encrypted}} ->
- SocketMonitor = SockMod:monitor(Socket),
+ SocketMonitor = xmpp_socket:monitor(Socket),
State1 = State#{ip => {Addr, Port},
socket => Socket,
stream_encrypted => Encrypted,
@@ -388,6 +387,21 @@ handle_info(timeout, #{mod := Mod} = State) ->
handle_info({'DOWN', MRef, _Type, _Object, _Info},
#{socket_monitor := MRef} = State) ->
noreply(process_stream_end({socket, closed}, State));
+handle_info({tcp, _, Data}, #{socket := Socket} = State) ->
+ noreply(
+ case xmpp_socket:recv(Socket, Data) of
+ {ok, NewSocket} ->
+ State#{socket => NewSocket};
+ {error, Reason} when is_atom(Reason) ->
+ process_stream_end({socket, Reason}, State);
+ {error, Reason} ->
+ %% TODO: make fast_tls return atoms
+ process_stream_end({tls, Reason}, State)
+ end);
+handle_info({tcp_closed, _}, State) ->
+ handle_info({'$gen_event', closed}, State);
+handle_info({tcp_error, _, Reason}, State) ->
+ noreply(process_stream_end({socket, Reason}, State));
handle_info(Info, #{mod := Mod} = State) ->
noreply(try Mod:handle_info(Info, State)
catch _:undef -> State
@@ -638,13 +652,13 @@ process_cert_verification(State) ->
-spec process_sasl_success(state()) -> state().
process_sasl_success(#{mod := Mod,
- sockmod := SockMod,
socket := Socket} = State) ->
- SockMod:reset_stream(Socket),
- State1 = State#{stream_id => new_id(),
- stream_restarted => true,
- stream_state => wait_for_stream,
- stream_authenticated => true},
+ Socket1 = xmpp_socket:reset_stream(Socket),
+ State0 = State#{socket => Socket1},
+ State1 = State0#{stream_id => new_id(),
+ stream_restarted => true,
+ stream_state => wait_for_stream,
+ stream_authenticated => true},
State2 = send_header(State1),
case is_disconnected(State2) of
true -> State2;
@@ -745,15 +759,15 @@ send_error(State, Pkt, Err) ->
end.
-spec socket_send(state(), xmpp_element() | xmlel() | trailer) -> ok | {error, inet:posix()}.
-socket_send(#{sockmod := SockMod, socket := Socket, xmlns := NS,
+socket_send(#{socket := Socket, xmlns := NS,
stream_state := StateName}, Pkt) ->
case Pkt of
trailer ->
- SockMod:send_trailer(Socket);
+ xmpp_socket:send_trailer(Socket);
#stream_start{} when StateName /= disconnected ->
- SockMod:send_header(Socket, xmpp:encode(Pkt));
+ xmpp_socket:send_header(Socket, xmpp:encode(Pkt));
_ when StateName /= disconnected ->
- SockMod:send_element(Socket, xmpp:encode(Pkt, NS));
+ xmpp_socket:send_element(Socket, xmpp:encode(Pkt, NS));
_ ->
{error, closed}
end;
@@ -768,8 +782,8 @@ send_trailer(State) ->
-spec close_socket(state()) -> state().
close_socket(State) ->
case State of
- #{sockmod := SockMod, socket := Socket} ->
- SockMod:close(Socket);
+ #{socket := Socket} ->
+ xmpp_socket:close(Socket);
_ ->
ok
end,
@@ -777,8 +791,8 @@ close_socket(State) ->
stream_state => disconnected}.
-spec starttls(term(), state()) -> {ok, term()} | {error, tls_error_reason()}.
-starttls(Socket, #{sockmod := SockMod, mod := Mod,
- xmlns := NS, remote_server := RemoteServer} = State) ->
+starttls(Socket, #{mod := Mod, xmlns := NS,
+ remote_server := RemoteServer} = State) ->
TLSOpts = try Mod:tls_options(State)
catch _:undef -> []
end,
@@ -787,7 +801,7 @@ starttls(Socket, #{sockmod := SockMod, mod := Mod,
?NS_SERVER -> <<"xmpp-server">>;
?NS_CLIENT -> <<"xmpp-client">>
end,
- SockMod:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]).
+ xmpp_socket:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]).
-spec select_lang(binary(), binary()) -> binary().
select_lang(Lang, <<"">>) -> Lang;
@@ -1020,9 +1034,9 @@ host_entry_to_addr_ports(#hostent{h_addr_list = AddrList}, Port, TLS) ->
-spec connect([ip_port()], state()) -> {ok, term(), ip_port()} |
{error, {socket, socket_error_reason()}} |
{error, {tls, tls_error_reason()}}.
-connect(AddrPorts, #{sockmod := SockMod} = State) ->
+connect(AddrPorts, State) ->
Timeout = get_connect_timeout(State),
- case connect(AddrPorts, SockMod, Timeout, {error, nxdomain}) of
+ case connect(AddrPorts, Timeout, {error, nxdomain}) of
{ok, Socket, {Addr, Port, TLS = true}} ->
case starttls(Socket, State) of
{ok, TLSSocket} -> {ok, TLSSocket, {Addr, Port, TLS}};
@@ -1034,24 +1048,24 @@ connect(AddrPorts, #{sockmod := SockMod} = State) ->
{error, {socket, Why}}
end.
--spec connect([ip_port()], module(), timeout(), network_error()) ->
+-spec connect([ip_port()], timeout(), network_error()) ->
{ok, term(), ip_port()} | network_error().
-connect([{Addr, Port, TLS}|AddrPorts], SockMod, Timeout, _) ->
+connect([{Addr, Port, TLS}|AddrPorts], Timeout, _) ->
Type = get_addr_type(Addr),
- try SockMod:connect(Addr, Port,
- [binary, {packet, 0},
- {send_timeout, ?TCP_SEND_TIMEOUT},
- {send_timeout_close, true},
- {active, false}, Type],
- Timeout) of
+ try xmpp_socket:connect(Addr, Port,
+ [binary, {packet, 0},
+ {send_timeout, ?TCP_SEND_TIMEOUT},
+ {send_timeout_close, true},
+ {active, false}, Type],
+ Timeout) of
{ok, Socket} ->
{ok, Socket, {Addr, Port, TLS}};
Err ->
- connect(AddrPorts, SockMod, Timeout, Err)
+ connect(AddrPorts, Timeout, Err)
catch _:badarg ->
- connect(AddrPorts, SockMod, Timeout, {error, einval})
+ connect(AddrPorts, Timeout, {error, einval})
end;
-connect([], _SockMod, _Timeout, Err) ->
+connect([], _Timeout, Err) ->
Err.
-spec get_addr_type(inet:ip_address()) -> inet:address_family().
diff --git a/src/xmpp_stream_pkix.erl b/src/xmpp_stream_pkix.erl
index 8361999f..9ca9fdff 100644
--- a/src/xmpp_stream_pkix.erl
+++ b/src/xmpp_stream_pkix.erl
@@ -40,10 +40,10 @@ authenticate(State) ->
-spec authenticate(xmpp_stream_in:state() | xmpp_stream_out:state(), binary())
-> {ok, binary()} | {error, atom(), binary()}.
-authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod,
+authenticate(#{xmlns := ?NS_SERVER,
socket := Socket} = State, Authzid) ->
Peer = maps:get(remote_server, State, Authzid),
- case verify_cert(SockMod, Socket) of
+ case verify_cert(Socket) of
{ok, Cert} ->
case ejabberd_idna:domain_utf8_to_ascii(Peer) of
false ->
@@ -61,7 +61,7 @@ authenticate(#{xmlns := ?NS_SERVER, sockmod := SockMod,
{error, Reason} ->
{error, Reason, Peer}
end;
-authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod,
+authenticate(#{xmlns := ?NS_CLIENT,
socket := Socket, lserver := LServer}, Authzid) ->
JID = try jid:decode(Authzid)
catch _:{bad_jid, <<>>} -> jid:make(LServer);
@@ -69,7 +69,7 @@ authenticate(#{xmlns := ?NS_CLIENT, sockmod := SockMod,
end,
case JID of
#jid{user = User} ->
- case verify_cert(SockMod, Socket) of
+ case verify_cert(Socket) of
{ok, Cert} ->
JIDs = get_xmpp_addrs(Cert),
get_username(JID, JIDs, LServer);
@@ -104,11 +104,11 @@ get_cert_domains(Cert) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
--spec verify_cert(module(), ejabberd_socket:socket()) -> {ok, cert()} | {error, atom()}.
-verify_cert(SockMod, Socket) ->
- case SockMod:get_peer_certificate(Socket, otp) of
+-spec verify_cert(xmpp_socket:socket()) -> {ok, cert()} | {error, atom()}.
+verify_cert(Socket) ->
+ case xmpp_socket:get_peer_certificate(Socket, otp) of
{ok, Cert} ->
- case SockMod:get_verify_result(Socket) of
+ case xmpp_socket:get_verify_result(Socket) of
0 ->
{ok, Cert};
VerifyRes ->