diff options
Diffstat (limited to 'src/ejabberd_socket.erl')
-rw-r--r-- | src/ejabberd_socket.erl | 353 |
1 files changed, 190 insertions, 163 deletions
diff --git a/src/ejabberd_socket.erl b/src/ejabberd_socket.erl index 1384205a5..37a38993a 100644 --- a/src/ejabberd_socket.erl +++ b/src/ejabberd_socket.erl @@ -25,92 +25,93 @@ %%%---------------------------------------------------------------------- -module(ejabberd_socket). + -author('alexey@process-one.net'). %% API --export([start/4, - connect/3, - connect/4, - starttls/2, - starttls/3, - compress/1, - compress/2, - reset_stream/1, - send/2, - send_xml/2, - change_shaper/2, - monitor/1, - get_sockmod/1, - get_peer_certificate/1, - get_verify_result/1, - close/1, - change_controller/2, - change_socket/2, - sockname/1, peername/1]). +-export([init/0, start/4, connect/3, connect/4, starttls/2, + starttls/3, compress/1, compress/2, reset_stream/1, + send/2, send_xml/2, change_shaper/2, monitor/1, + get_sockmod/1, get_peer_certificate/1, get_conn_type/1, + get_verify_result/1, close/1, change_controller/2, + change_socket/2, sockname/1, peername/1, is_remote_receiver/1]). -include("ejabberd.hrl"). +-include("jlib.hrl"). --record(socket_state, {sockmod, socket, receiver}). +-type sockmod() :: ejabberd_http_poll | ejabberd_bosh | + ejabberd_http_bind | ejabberd_http_bindjson | + ejabberd_http_ws | ejabberd_http_wsjson | + gen_tcp | tls | ejabberd_zlib. +-type receiver() :: pid () | atom(). +-type socket() :: pid() | inet:socket() | + tls:tls_socket() | + ejabberd_zlib:zlib_socket() | + ejabberd_bosh:bosh_socket() | + ejabberd_http_ws:ws_socket() | + ejabberd_http_poll:poll_socket(). + +-record(socket_state, {sockmod = gen_tcp :: sockmod(), + socket = self() :: socket(), + receiver = self() :: receiver()}). + +-type socket_state() :: #socket_state{}. + +-export_type([socket_state/0, sockmod/0]). + +-spec start(atom(), sockmod(), socket(), [{atom(), any()}]) -> any(). + +init() -> #socket_state{}. -%%==================================================================== -%% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: -%% Description: -%%-------------------------------------------------------------------- start(Module, SockMod, Socket, Opts) -> case Module:socket_type() of - xml_stream -> - MaxStanzaSize = - case lists:keysearch(max_stanza_size, 1, Opts) of - {value, {_, Size}} -> Size; - _ -> infinity - end, - {ReceiverMod, Receiver, RecRef} = - case catch SockMod:custom_receiver(Socket) of - {receiver, RecMod, RecPid} -> - {RecMod, RecPid, RecMod}; - _ -> - RecPid = ejabberd_receiver:start( - Socket, SockMod, none, MaxStanzaSize), - {ejabberd_receiver, RecPid, RecPid} + xml_stream -> + MaxStanzaSize = case lists:keysearch(max_stanza_size, 1, + Opts) + of + {value, {_, Size}} -> Size; + _ -> infinity + end, + {ReceiverMod, Receiver, RecRef} = case catch + SockMod:custom_receiver(Socket) + of + {receiver, RecMod, RecPid} -> + {RecMod, RecPid, RecMod}; + _ -> + RecPid = + ejabberd_receiver:start(Socket, + SockMod, + none, + MaxStanzaSize), + {ejabberd_receiver, RecPid, + RecPid} + end, + SocketData = #socket_state{sockmod = SockMod, + socket = Socket, receiver = RecRef}, + case Module:start({?MODULE, SocketData}, Opts) of + {ok, Pid} -> + case SockMod:controlling_process(Socket, Receiver) of + ok -> ok; + {error, _Reason} -> SockMod:close(Socket) end, - SocketData = #socket_state{sockmod = SockMod, - socket = Socket, - receiver = RecRef}, - case Module:start({?MODULE, SocketData}, Opts) of - {ok, Pid} -> - case SockMod:controlling_process(Socket, Receiver) of - ok -> - ok; - {error, _Reason} -> - SockMod:close(Socket) - end, - ReceiverMod:become_controller(Receiver, Pid); - {error, _Reason} -> - SockMod:close(Socket), - case ReceiverMod of - ejabberd_receiver -> - ReceiverMod:close(Receiver); - _ -> - ok - end - end; - independent -> - ok; - raw -> - case Module:start({SockMod, Socket}, Opts) of - {ok, Pid} -> - case SockMod:controlling_process(Socket, Pid) of - ok -> - ok; - {error, _Reason} -> - SockMod:close(Socket) - end; - {error, _Reason} -> - SockMod:close(Socket) - end + ReceiverMod:become_controller(Receiver, Pid); + {error, _Reason} -> + SockMod:close(Socket), + case ReceiverMod of + ejabberd_receiver -> ReceiverMod:close(Receiver); + _ -> ok + end + end; + independent -> ok; + raw -> + case Module:start({SockMod, Socket}, Opts) of + {ok, Pid} -> + case SockMod:controlling_process(Socket, Pid) of + ok -> ok; + {error, _Reason} -> SockMod:close(Socket) + end; + {error, _Reason} -> SockMod:close(Socket) + end end. connect(Addr, Port, Opts) -> @@ -118,51 +119,55 @@ connect(Addr, Port, Opts) -> connect(Addr, Port, Opts, Timeout) -> case gen_tcp:connect(Addr, Port, Opts, Timeout) of - {ok, Socket} -> - Receiver = ejabberd_receiver:start(Socket, gen_tcp, none), - SocketData = #socket_state{sockmod = gen_tcp, - socket = Socket, - receiver = Receiver}, - Pid = self(), - case gen_tcp:controlling_process(Socket, Receiver) of - ok -> - ejabberd_receiver:become_controller(Receiver, Pid), - {ok, SocketData}; - {error, _Reason} = Error -> - gen_tcp:close(Socket), - Error - end; - {error, _Reason} = Error -> - Error + {ok, Socket} -> + Receiver = ejabberd_receiver:start(Socket, gen_tcp, + none), + SocketData = #socket_state{sockmod = gen_tcp, + socket = Socket, receiver = Receiver}, + Pid = self(), + case gen_tcp:controlling_process(Socket, Receiver) of + ok -> + ejabberd_receiver:become_controller(Receiver, Pid), + {ok, SocketData}; + {error, _Reason} = Error -> gen_tcp:close(Socket), Error + end; + {error, _Reason} = Error -> Error end. starttls(SocketData, TLSOpts) -> starttls(SocketData, TLSOpts, undefined). starttls(SocketData, TLSOpts, Data) -> - {ok, TLSSocket} = ejabberd_receiver:starttls( - SocketData#socket_state.receiver, TLSOpts, Data), - SocketData#socket_state{socket = TLSSocket, sockmod = tls}. + {ok, TLSSocket} = + ejabberd_receiver:starttls(SocketData#socket_state.receiver, + TLSOpts, Data), + SocketData#socket_state{socket = TLSSocket, + sockmod = tls}. -compress(SocketData) -> - compress(SocketData, undefined). +compress(SocketData) -> compress(SocketData, undefined). compress(SocketData, Data) -> - {ok, ZlibSocket} = ejabberd_receiver:compress( - SocketData#socket_state.receiver, Data), - SocketData#socket_state{socket = ZlibSocket, sockmod = ejabberd_zlib}. + {ok, ZlibSocket} = + ejabberd_receiver:compress(SocketData#socket_state.receiver, + Data), + SocketData#socket_state{socket = ZlibSocket, + sockmod = ejabberd_zlib}. -reset_stream(SocketData) when is_pid(SocketData#socket_state.receiver) -> +reset_stream(SocketData) + when is_pid(SocketData#socket_state.receiver) -> ejabberd_receiver:reset_stream(SocketData#socket_state.receiver); -reset_stream(SocketData) when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):reset_stream( - SocketData#socket_state.socket). +reset_stream(SocketData) + when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):reset_stream(SocketData#socket_state.socket). -change_controller(#socket_state{receiver = Recv}, Pid) when is_pid(Recv) -> +change_controller(#socket_state{receiver = Recv}, Pid) + when is_pid(Recv) -> ejabberd_receiver:setopts(Recv, [{active, false}]), sync_events(Pid), ejabberd_receiver:change_controller(Recv, Pid); -change_controller(#socket_state{socket = Socket, receiver = Mod}, Pid) -> +change_controller(#socket_state{socket = Socket, + receiver = Mod}, + Pid) -> Mod:setopts(Socket, [{active, false}]), sync_events(Pid), Mod:change_controller(Socket, Pid). @@ -170,45 +175,54 @@ change_controller(#socket_state{socket = Socket, receiver = Mod}, Pid) -> change_socket(SocketData, Socket) -> SocketData#socket_state{socket = Socket}. -%% sockmod=gen_tcp|tls|ejabberd_zlib +-spec send(socket_state(), iodata()) -> ok. + send(SocketData, Data) -> - Res = if node(SocketData#socket_state.receiver) == node() -> - catch (SocketData#socket_state.sockmod):send( - SocketData#socket_state.socket, Data); + Res = if node(SocketData#socket_state.receiver) == + node() -> + catch + (SocketData#socket_state.sockmod):send(SocketData#socket_state.socket, + Data); true -> - catch ejabberd_receiver:send( - SocketData#socket_state.receiver, Data) - end, + catch + ejabberd_receiver:send(SocketData#socket_state.receiver, + Data) + end, case Res of - ok -> ok; - {error, timeout} -> - ?INFO_MSG("Timeout on ~p:send",[SocketData#socket_state.sockmod]), - exit(normal); - Error -> - ?DEBUG("Error in ~p:send: ~p",[SocketData#socket_state.sockmod, Error]), - exit(normal) + ok -> ok; + {error, timeout} -> + ?INFO_MSG("Timeout on ~p:send", + [SocketData#socket_state.sockmod]), + exit(normal); + Error -> + ?DEBUG("Error in ~p:send: ~p", + [SocketData#socket_state.sockmod, Error]), + exit(normal) end. -%% Can only be called when in c2s StateData#state.xml_socket is true -%% This function is used for HTTP bind -%% sockmod=ejabberd_http_poll|ejabberd_http_bind or any custom module +-spec send_xml(socket_state(), xmlel()) -> any(). + send_xml(SocketData, Data) -> - catch (SocketData#socket_state.sockmod):send_xml( - SocketData#socket_state.socket, Data). + catch + (SocketData#socket_state.sockmod):send_xml(SocketData#socket_state.socket, + Data). change_shaper(SocketData, Shaper) - when is_pid(SocketData#socket_state.receiver) -> - ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, Shaper); + when is_pid(SocketData#socket_state.receiver) -> + ejabberd_receiver:change_shaper(SocketData#socket_state.receiver, + Shaper); change_shaper(SocketData, Shaper) - when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):change_shaper( - SocketData#socket_state.socket, Shaper). + when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):change_shaper(SocketData#socket_state.socket, + Shaper). -monitor(SocketData) when is_pid(SocketData#socket_state.receiver) -> - erlang:monitor(process, SocketData#socket_state.receiver); -monitor(SocketData) when is_atom(SocketData#socket_state.receiver) -> - (SocketData#socket_state.receiver):monitor( - SocketData#socket_state.socket). +monitor(SocketData) + when is_pid(SocketData#socket_state.receiver) -> + erlang:monitor(process, + SocketData#socket_state.receiver); +monitor(SocketData) + when is_atom(SocketData#socket_state.receiver) -> + (SocketData#socket_state.receiver):monitor(SocketData#socket_state.socket). get_sockmod(SocketData) -> SocketData#socket_state.sockmod. @@ -222,40 +236,53 @@ get_verify_result(SocketData) -> close(SocketData) -> ejabberd_receiver:close(SocketData#socket_state.receiver). -sockname(#socket_state{sockmod = SockMod, socket = Socket}) -> +sockname(#socket_state{sockmod = SockMod, + socket = Socket}) -> case SockMod of - gen_tcp -> - inet:sockname(Socket); - _ -> - SockMod:sockname(Socket) + gen_tcp -> inet:sockname(Socket); + _ -> SockMod:sockname(Socket) end. -peername(#socket_state{sockmod = SockMod, socket = Socket}) -> +peername(#socket_state{sockmod = SockMod, + socket = Socket}) -> case SockMod of - gen_tcp -> - inet:peername(Socket); - _ -> - SockMod:peername(Socket) + gen_tcp -> inet:peername(Socket); + _ -> SockMod:peername(Socket) end. -%%==================================================================== -%% Internal functions -%%==================================================================== -%% dirty hack to relay queued messages from -%% old owner to new owner. The idea is based -%% on code of gen_tcp:controlling_process/2. +get_conn_type(#socket_state{sockmod = SockMod, socket = Socket}) -> + case SockMod of + gen_tcp -> c2s; + tls -> c2s_tls; + ejabberd_zlib -> + case ejabberd_zlib:get_sockmod(Socket) of + gen_tcp -> c2s_compressed; + tls -> c2s_compressed_tls + end; + ejabberd_http_poll -> http_poll; + ejabberd_http_ws -> http_ws; + ejabberd_http_bind -> http_bind; + ejabberd_bosh -> http_bind; + ejabberd_http_bindjson -> http_bindjson; + ejabberd_http_wsjson -> http_wsjson + end. + +-spec is_remote_receiver(socket_state()) -> boolean(). + +is_remote_receiver(#socket_state{receiver = Pid}) when is_pid(Pid) -> + node(Pid) /= node(); +is_remote_receiver(_) -> + false. + sync_events(C2SPid) -> receive - {'$gen_event', El} = Event when element(1, El) == xmlelement; - element(1, El) == xmlstreamstart; - element(1, El) == xmlstreamelement; - element(1, El) == xmlstreamend; - element(1, El) == xmlstreamerror -> - C2SPid ! Event, - sync_events(C2SPid); - closed -> - C2SPid ! closed, - sync_events(C2SPid) - after 0 -> - ok + {'$gen_event', El} = Event + when element(1, El) == xmlel; + element(1, El) == xmlstreamstart; + element(1, El) == xmlstreamelement; + element(1, El) == xmlstreamend; + element(1, El) == xmlstreamerror -> + C2SPid ! Event, sync_events(C2SPid); + closed -> C2SPid ! closed, sync_events(C2SPid) + after 0 -> ok end. |