diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2016-12-11 18:24:51 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2016-12-11 18:24:51 +0300 |
commit | 7f653cfe762ecf33ae9522b8df25f2902d5546df (patch) | |
tree | 53698cee80c834570768b119edfb4d7a0bf71d7e /src/ejabberd_service.erl | |
parent | Initial version of new XMPP stream behaviour (for review) (diff) |
Rewrite ejabberd_service to use new XMPP stream API
Diffstat (limited to 'src/ejabberd_service.erl')
-rw-r--r-- | src/ejabberd_service.erl | 398 |
1 files changed, 123 insertions, 275 deletions
diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index 35cfe15af..c48cd536c 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -1,8 +1,5 @@ -%%%---------------------------------------------------------------------- -%%% File : ejabberd_service.erl -%%% Author : Alexey Shchepin <alexey@process-one.net> -%%% Purpose : External component management (XEP-0114) -%%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@process-one.net> +%%%------------------------------------------------------------------- +%%% Created : 11 Dec 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net> %%% %%% %%% ejabberd, Copyright (C) 2002-2016 ProcessOne @@ -21,77 +18,60 @@ %%% with this program; if not, write to the Free Software Foundation, Inc., %%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. %%% -%%%---------------------------------------------------------------------- - +%%%------------------------------------------------------------------- -module(ejabberd_service). - +-behaviour(xmpp_stream_in). -behaviour(ejabberd_config). --author('alexey@process-one.net'). - -protocol({xep, 114, '1.6'}). --define(GEN_FSM, p1_fsm). - --behaviour(?GEN_FSM). - -%% External exports --export([start/2, start_link/2, send_text/2, - send_element/2, socket_type/0, transform_listen_option/2]). - --export([init/1, wait_for_stream/2, - wait_for_handshake/2, stream_established/2, - handle_event/3, handle_sync_event/4, code_change/4, - handle_info/3, terminate/3, print_state/1, opt_type/1]). +%% ejabberd_socket callbacks +-export([start/2, socket_type/0]). +%% ejabberd_config callbacks +-export([opt_type/1, transform_listen_option/2]). +%% xmpp_stream_in callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-export([handshake/2, handle_stream_start/1, handle_authenticated_packet/2]). +%% API +-export([send/2]). -include("ejabberd.hrl"). --include("logger.hrl"). -include("xmpp.hrl"). +-include("logger.hrl"). --record(state, - {socket :: ejabberd_socket:socket_state(), - sockmod = ejabberd_socket :: ejabberd_socket | ejabberd_frontend_socket, - streamid = <<"">> :: binary(), - host_opts = dict:new() :: ?TDICT, - host = <<"">> :: binary(), - access :: atom(), - check_from = true :: boolean()}). - --type state_name() :: wait_for_stream | wait_for_handshake | stream_established. --type state() :: #state{}. --type fsm_next() :: {next_state, state_name(), state()}. --type fsm_stop() :: {stop, normal, state()}. --type fsm_transition() :: fsm_stop() | fsm_next(). - -%-define(DBGFSM, true). +%%-define(DBGFSM, true). -ifdef(DBGFSM). -define(FSMOPTS, [{debug, [trace]}]). -else. -define(FSMOPTS, []). -endif. -%%%---------------------------------------------------------------------- +-type state() :: map(). +-type next_state() :: {noreply, state()} | {stop, term(), state()}. +-export_type([state/0, next_state/0]). + +%%%=================================================================== %%% API -%%%---------------------------------------------------------------------- +%%%=================================================================== start(SockData, Opts) -> - supervisor:start_child(ejabberd_service_sup, - [SockData, Opts]). + xmpp_stream_in:start(?MODULE, [SockData, Opts], + fsm_limit_opts(Opts) ++ ?FSMOPTS). -start_link(SockData, Opts) -> - (?GEN_FSM):start_link(ejabberd_service, - [SockData, Opts], fsm_limit_opts(Opts) ++ (?FSMOPTS)). +socket_type() -> + xml_stream. -socket_type() -> xml_stream. +-spec send(state(), xmpp_element()) -> next_state(). +send(State, Pkt) -> + xmpp_stream_in:send(State, Pkt). -%%%---------------------------------------------------------------------- -%%% Callback functions from gen_fsm -%%%---------------------------------------------------------------------- -init([{SockMod, Socket}, Opts]) -> +%%%=================================================================== +%%% xmpp_stream_in callbacks +%%%=================================================================== +init([#{socket := Socket} = State, Opts]) -> ?INFO_MSG("(~w) External service connected", [Socket]), - Access = case lists:keysearch(access, 1, Opts) of - {value, {_, A}} -> A; - _ -> all - end, + Access = gen_mod:get_opt(access, Opts, fun acl:access_rules_validator/1, all), + Shaper = gen_mod:get_opt(shaper_rule, Opts, fun acl:shaper_rules_validator/1, none), HostOpts = case lists:keyfind(hosts, 1, Opts) of {hosts, HOpts} -> lists:foldl( @@ -107,252 +87,120 @@ init([{SockMod, Socket}, Opts]) -> p1_sha:sha(randoms:bytes(20))), dict:from_list([{global, Pass}]) end, - Shaper = case lists:keysearch(shaper_rule, 1, Opts) of - {value, {_, S}} -> S; - _ -> none - end, - CheckFrom = case lists:keysearch(service_check_from, 1, - Opts) - of - {value, {_, CF}} -> CF; - _ -> true - end, - SockMod:change_shaper(Socket, Shaper), - {ok, wait_for_stream, - #state{socket = Socket, sockmod = SockMod, - streamid = new_id(), host_opts = HostOpts, - access = Access, check_from = CheckFrom}}. - -wait_for_stream({xmlstreamstart, Name, Attrs}, StateData) -> - try xmpp:decode(#xmlel{name = Name, attrs = Attrs}) of - #stream_start{xmlns = NS_COMPONENT, stream_xmlns = NS_STREAM} - when NS_COMPONENT /= ?NS_COMPONENT; NS_STREAM /= ?NS_STREAM -> - send_header(StateData, ?MYNAME), - send_element(StateData, xmpp:serr_invalid_namespace()), - {stop, normal, StateData}; - #stream_start{to = To} when is_record(To, jid) -> - Host = To#jid.lserver, - send_header(StateData, Host), - HostOpts = case dict:is_key(Host, StateData#state.host_opts) of - true -> - StateData#state.host_opts; - false -> - case dict:find(global, StateData#state.host_opts) of - {ok, GlobalPass} -> - dict:from_list([{Host, GlobalPass}]); - error -> - StateData#state.host_opts - end - end, - {next_state, wait_for_handshake, - StateData#state{host = Host, host_opts = HostOpts}}; - #stream_start{} -> - send_header(StateData, ?MYNAME), - send_element(StateData, xmpp:serr_improper_addressing()), - {stop, normal, StateData}; - _ -> - send_header(StateData, ?MYNAME), - send_element(StateData, xmpp:serr_invalid_xml()), - {stop, normal, StateData} - catch _:{xmpp_codec, Why} -> - Txt = xmpp:format_error(Why), - send_header(StateData, ?MYNAME), - send_element(StateData, xmpp:serr_invalid_xml(Txt, ?MYLANG)), - {stop, normal, StateData} - end; -wait_for_stream({xmlstreamerror, _}, StateData) -> - send_header(StateData, ?MYNAME), - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_stream(closed, StateData) -> - {stop, normal, StateData}. - -wait_for_handshake({xmlstreamelement, El}, StateData) -> - decode_element(El, wait_for_handshake, StateData); -wait_for_handshake(#handshake{data = Digest}, StateData) -> - case dict:find(StateData#state.host, StateData#state.host_opts) of + CheckFrom = gen_mod:get_opt(check_from, Opts, + fun(Flag) when is_boolean(Flag) -> Flag end), + xmpp_stream_in:change_shaper(State, Shaper), + State1 = State#{access => Access, + xmlns => ?NS_COMPONENT, + lang => ?MYLANG, + server => ?MYNAME, + host_opts => HostOpts, + check_from => CheckFrom}, + ejabberd_hooks:run_fold(component_init, {ok, State1}, []). + +handle_stream_start(#{remote_server := RemoteServer, + host_opts := HostOpts} = State) -> + NewHostOpts = case dict:is_key(RemoteServer, HostOpts) of + true -> + HostOpts; + false -> + case dict:find(global, HostOpts) of + {ok, GlobalPass} -> + dict:from_list([{RemoteServer, GlobalPass}]); + error -> + HostOpts + end + end, + {noreply, State#{host_opts => NewHostOpts}}. + +handshake(Digest, #{remote_server := RemoteServer, + stream_id := StreamID, + host_opts := HostOpts} = State) -> + case dict:find(RemoteServer, HostOpts) of {ok, Password} -> - case p1_sha:sha(<<(StateData#state.streamid)/binary, - Password/binary>>) of + case p1_sha:sha(<<StreamID/binary, Password/binary>>) of Digest -> - send_element(StateData, #handshake{}), lists:foreach( fun (H) -> ejabberd_router:register_route(H, ?MYNAME), - ?INFO_MSG("Route registered for service ~p~n", - [H]), + ?INFO_MSG("Route registered for service ~p~n", [H]), ejabberd_hooks:run(component_connected, [H]) - end, dict:fetch_keys(StateData#state.host_opts)), - {next_state, stream_established, StateData}; - _ -> - send_element(StateData, xmpp:serr_not_authorized()), - {stop, normal, StateData} - end; - _ -> - send_element(StateData, xmpp:serr_not_authorized()), - {stop, normal, StateData} - end; -wait_for_handshake({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -wait_for_handshake({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -wait_for_handshake(closed, StateData) -> - {stop, normal, StateData}; -wait_for_handshake(_Pkt, StateData) -> - {next_state, wait_for_handshake, StateData}. - -stream_established({xmlstreamelement, El}, StateData) -> - decode_element(El, stream_established, StateData); -stream_established(El, StateData) when ?is_stanza(El) -> - From = xmpp:get_from(El), - To = xmpp:get_to(El), - Lang = xmpp:get_lang(El), - if From == undefined orelse To == undefined -> - Txt = <<"Missing 'from' or 'to' attribute">>, - send_error(StateData, El, xmpp:err_jid_malformed(Txt, Lang)); - true -> - case check_from(From, StateData) of - true -> - ejabberd_router:route(From, To, El); - false -> - Txt = <<"Improper domain part of 'from' attribute">>, - send_error(StateData, El, xmpp:err_not_allowed(Txt, Lang)) - end - end, - {next_state, stream_established, StateData}; -stream_established({xmlstreamend, _Name}, StateData) -> - {stop, normal, StateData}; -stream_established({xmlstreamerror, _}, StateData) -> - send_element(StateData, xmpp:serr_not_well_formed()), - {stop, normal, StateData}; -stream_established(closed, StateData) -> - {stop, normal, StateData}; -stream_established(_Event, StateData) -> - {next_state, stream_established, StateData}. + end, dict:fetch_keys(HostOpts)), + {ok, State}; + _ -> + ?ERROR_MSG("Failed authentication for service ~s", [RemoteServer]), + {error, xmpp:serr_not_authorized(), State} + end; + _ -> + ?ERROR_MSG("Failed authentication for service ~s", [RemoteServer]), + {error, xmpp:serr_not_authorized(), State} + end. -handle_event(_Event, StateName, StateData) -> - {next_state, StateName, StateData}. +handle_authenticated_packet(Pkt, #{lang := Lang} = State) -> + From = xmpp:get_from(Pkt), + case check_from(From, State) of + true -> + To = xmpp:get_to(Pkt), + ejabberd_router:route(From, To, Pkt), + {noreply, State}; + false -> + Txt = <<"Improper domain part of 'from' attribute">>, + Err = xmpp:serr_invalid_from(Txt, Lang), + xmpp_stream_in:send(State, Err) + end. -handle_sync_event(_Event, _From, StateName, - StateData) -> - Reply = ok, {reply, Reply, StateName, StateData}. +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. -code_change(_OldVsn, StateName, StateData, _Extra) -> - {ok, StateName, StateData}. +handle_cast(_Msg, State) -> + {noreply, State}. -handle_info({send_text, Text}, StateName, StateData) -> - send_text(StateData, Text), - {next_state, StateName, StateData}; -handle_info({send_element, El}, StateName, StateData) -> - send_element(StateData, El), - {next_state, StateName, StateData}; -handle_info({route, From, To, Packet}, StateName, - StateData) -> - case acl:match_rule(global, StateData#state.access, From) of - allow -> +handle_info({route, From, To, Packet}, #{access := Access} = State) -> + case acl:match_rule(global, Access, From) of + allow -> Pkt = xmpp:set_from_to(Packet, From, To), - send_element(StateData, Pkt); + xmpp_stream_in:send(State, Pkt); deny -> Lang = xmpp:get_lang(Packet), Err = xmpp:err_not_allowed(<<"Denied by ACL">>, Lang), - ejabberd_router:route_error(To, From, Packet, Err) - end, - {next_state, StateName, StateData}; -handle_info(Info, StateName, StateData) -> + ejabberd_router:route_error(To, From, Packet, Err), + {noreply, State} + end; +handle_info(Info, State) -> ?ERROR_MSG("Unexpected info: ~p", [Info]), - {next_state, StateName, StateData}. - -terminate(Reason, StateName, StateData) -> - ?INFO_MSG("terminated: ~p", [Reason]), - case StateName of - stream_established -> - lists:foreach(fun (H) -> - ejabberd_router:unregister_route(H), - ejabberd_hooks:run(component_disconnected, - [H, Reason]) - end, - dict:fetch_keys(StateData#state.host_opts)); - _ -> ok - end, - catch send_trailer(StateData), - (StateData#state.sockmod):close(StateData#state.socket), - ok. - -%%---------------------------------------------------------------------- -%% Func: print_state/1 -%% Purpose: Prepare the state to be printed on error log -%% Returns: State to print -%%---------------------------------------------------------------------- -print_state(State) -> State. - -%%%---------------------------------------------------------------------- -%%% Internal functions -%%%---------------------------------------------------------------------- - --spec send_text(state(), iodata()) -> ok. -send_text(StateData, Text) -> - (StateData#state.sockmod):send(StateData#state.socket, - Text). - --spec send_element(state(), xmpp_element()) -> ok. -send_element(StateData, El) -> - El1 = xmpp:encode(El, ?NS_COMPONENT), - send_text(StateData, fxml:element_to_binary(El1)). - --spec send_error(state(), xmlel() | stanza(), stanza_error()) -> ok. -send_error(StateData, Stanza, Error) -> - Type = xmpp:get_type(Stanza), - if Type == error; Type == result; - Type == <<"error">>; Type == <<"result">> -> - ok; - true -> - send_element(StateData, xmpp:make_error(Stanza, Error)) + {noreply, State}. + +terminate(Reason, #{stream_state := StreamState, host_opts := HostOpts}) -> + ?INFO_MSG("External service disconnected: ~p", [Reason]), + case StreamState of + session_established -> + lists:foreach( + fun(H) -> + ejabberd_router:unregister_route(H), + ejabberd_hooks:run(component_disconnected, [H, Reason]) + end, dict:fetch_keys(HostOpts)); + _ -> + ok end. --spec send_header(state(), binary()) -> ok. -send_header(StateData, Host) -> - Header = xmpp:encode( - #stream_start{xmlns = ?NS_COMPONENT, - stream_xmlns = ?NS_STREAM, - from = jid:make(Host), - id = StateData#state.streamid}), - send_text(StateData, fxml:element_to_header(Header)). - --spec send_trailer(state()) -> ok. -send_trailer(StateData) -> - send_text(StateData, <<"</stream:stream>">>). - --spec decode_element(xmlel(), state_name(), state()) -> fsm_transition(). -decode_element(#xmlel{} = El, StateName, StateData) -> - try xmpp:decode(El, ?NS_COMPONENT, [ignore_els]) of - Pkt -> ?MODULE:StateName(Pkt, StateData) - catch error:{xmpp_codec, Why} -> - case xmpp:is_stanza(El) of - true -> - Lang = xmpp:get_lang(El), - Txt = xmpp:format_error(Why), - send_error(StateData, El, xmpp:err_bad_request(Txt, Lang)); - false -> - ok - end, - {next_state, StateName, StateData} - end. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. +%%%=================================================================== +%%% Internal functions +%%%=================================================================== -spec check_from(jid(), state()) -> boolean(). -check_from(_From, #state{check_from = false}) -> +check_from(_From, #{check_from := false}) -> %% If the admin does not want to check the from field %% when accept packets from any address. %% In this case, the component can send packet of %% behalf of the server users. true; -check_from(From, StateData) -> +check_from(From, #{host_opts := HostOpts}) -> %% The default is the standard behaviour in XEP-0114 Server = From#jid.lserver, - dict:is_key(Server, StateData#state.host_opts). - --spec new_id() -> binary(). -new_id() -> randoms:get_string(). + dict:is_key(Server, HostOpts). transform_listen_option({hosts, Hosts, O}, Opts) -> case lists:keyfind(hosts, 1, Opts) of |