diff options
Diffstat (limited to 'src/mod_multicast.erl')
-rw-r--r-- | src/mod_multicast.erl | 267 |
1 files changed, 166 insertions, 101 deletions
diff --git a/src/mod_multicast.erl b/src/mod_multicast.erl index 8a1960088..83520c0be 100644 --- a/src/mod_multicast.erl +++ b/src/mod_multicast.erl @@ -3,12 +3,32 @@ %%% Author : Badlop <badlop@process-one.net> %%% Purpose : Extended Stanza Addressing (XEP-0033) support %%% Created : 29 May 2007 by Badlop <badlop@process-one.net> +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% %%%---------------------------------------------------------------------- -module(mod_multicast). -author('badlop@process-one.net'). +-protocol({xep, 33, '1.1'}). + -behaviour(gen_server). -behaviour(gen_mod). @@ -20,7 +40,7 @@ -export([init/1, handle_info/2, handle_call/3, handle_cast/2, terminate/2, code_change/3]). --export([purge_loop/1]). +-export([purge_loop/1, mod_opt_type/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -131,7 +151,7 @@ init([LServerS, Opts]) -> try_start_loop(), create_pool(), ejabberd_router_multicast:register_route(LServerS), - ejabberd_router:register_route(LServiceS), + ejabberd_router:register_route(LServiceS, LServerS), {ok, #state{lservice = LServiceS, lserver = LServerS, access = Access, service_limits = SLimits}}. @@ -151,24 +171,11 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info({route, From, To, #xmlel{name = <<"iq">>, attrs = Attrs} = Packet}, State) -> - IQ = jlib:iq_query_info(Packet), - case catch process_iq(From, IQ, State) of - Result when is_record(Result, iq) -> - ejabberd_router:route(To, From, jlib:iq_to_xml(Result)); - {'EXIT', Reason} -> - ?ERROR_MSG("Error when processing IQ stanza: ~p", - [Reason]), - Err = jlib:make_error_reply(Packet, - ?ERR_INTERNAL_SERVER_ERROR), - ejabberd_router:route(To, From, Err); - reply -> - LServiceS = jts(To), - case xml:get_attr_s(<<"type">>, Attrs) of - <<"result">> -> - process_iqreply_result(From, LServiceS, Packet, State); - <<"error">> -> - process_iqreply_error(From, LServiceS, Packet) - end + case catch handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) of + {'EXIT', Reason} -> + ?ERROR_MSG("Error when processing IQ stanza: ~p", + [Reason]); + _ -> ok end, {noreply, State}; %% XEP33 allows only 'message' and 'presence' stanza type @@ -186,11 +193,16 @@ handle_info({route, From, To, handle_info({route_trusted, From, Destinations, Packet}, #state{lservice = LServiceS, lserver = LServerS} = State) -> - route_trusted(LServiceS, LServerS, From, Destinations, - Packet), + case catch route_trusted(LServiceS, LServerS, From, Destinations, + Packet) of + {'EXIT', Reason} -> + ?ERROR_MSG("Error in route_trusted: ~p", [Reason]); + _ -> ok + end, {noreply, State}; handle_info({get_host, Pid}, State) -> - Pid ! {my_host, State#state.lservice}, {noreply, State}; + Pid ! {my_host, State#state.lservice}, + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State) -> @@ -208,6 +220,28 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%% IQ Request Processing %%%------------------------ +handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) -> + IQ = jlib:iq_query_info(Packet), + case catch process_iq(From, IQ, State) of + Result when is_record(Result, iq) -> + ejabberd_router:route(To, From, jlib:iq_to_xml(Result)); + {'EXIT', Reason} -> + ?ERROR_MSG("Error when processing IQ stanza: ~p", + [Reason]), + Err = jlib:make_error_reply(Packet, + ?ERR_INTERNAL_SERVER_ERROR), + ejabberd_router:route(To, From, Err); + reply -> + LServiceS = jts(To), + case fxml:get_attr_s(<<"type">>, Attrs) of + <<"result">> -> + process_iqreply_result(From, LServiceS, Packet, State); + <<"error">> -> + process_iqreply_error(From, LServiceS, Packet) + end; + ok -> ok + end. + process_iq(From, #iq{type = get, xmlns = ?NS_DISCO_INFO, lang = Lang} = IQ, @@ -270,7 +304,7 @@ iq_vcard(Lang) -> [{xmlcdata, <<(translate:translate(Lang, <<"ejabberd Multicast service">>))/binary, - "\nCopyright (c) 2002-2015 ProcessOne">>}]}]. + "\nCopyright (c) 2002-2016 ProcessOne">>}]}]. %%%------------------------- %%% Route @@ -361,7 +395,7 @@ act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, perform(From, Packet, AAttrs, _, {route_single, Group}) -> [route_packet(From, ToUser, Packet, AAttrs, - Group#group.addresses) + Group#group.others, Group#group.addresses) || ToUser <- Group#group.dests]; perform(From, Packet, AAttrs, _, {{route_multicast, JID, RLimits}, Group}) -> @@ -375,6 +409,9 @@ perform(From, Packet, AAttrs, LServiceS, group = Group, renewal = true, sender = From, packet = Packet, aattrs = AAttrs, addresses = Group#group.addresses}); +perform(_From, _Packet, _AAttrs, LServiceS, + {{ask, LServiceS, _}, _Group}) -> + ok; perform(From, Packet, AAttrs, LServiceS, {{ask, Server, not_renewal}, Group}) -> send_query_info(Server, LServiceS), @@ -399,17 +436,17 @@ check_access(LServerS, Access, From) -> %%%------------------------- strip_addresses_element(Packet) -> - case xml:get_subtag(Packet, <<"addresses">>) of + case fxml:get_subtag(Packet, <<"addresses">>) of #xmlel{name = <<"addresses">>, attrs = AAttrs, children = Addresses} -> - case xml:get_attr_s(<<"xmlns">>, AAttrs) of + case fxml:get_attr_s(<<"xmlns">>, AAttrs) of ?NS_ADDRESS -> #xmlel{name = Name, attrs = Attrs, children = Els} = Packet, Els_stripped = lists:keydelete(<<"addresses">>, 2, Els), Packet_stripped = #xmlel{name = Name, attrs = Attrs, children = Els_stripped}, - {ok, Packet_stripped, AAttrs, Addresses}; + {ok, Packet_stripped, AAttrs, fxml:remove_cdata(Addresses)}; _ -> throw(ewxmlns) end; _ -> throw(eadsele) @@ -423,10 +460,10 @@ split_addresses_todeliver(Addresses) -> lists:partition(fun (XML) -> case XML of #xmlel{name = <<"address">>, attrs = Attrs} -> - case xml:get_attr_s(<<"delivered">>, Attrs) of + case fxml:get_attr_s(<<"delivered">>, Attrs) of <<"true">> -> false; _ -> - Type = xml:get_attr_s(<<"type">>, + Type = fxml:get_attr_s(<<"type">>, Attrs), case Type of <<"to">> -> true; @@ -462,10 +499,10 @@ check_limit_dests(SLimits, FromJID, Packet, convert_dest_record(XMLs) -> lists:map(fun (XML) -> - case xml:get_tag_attr_s(<<"jid">>, XML) of + case fxml:get_tag_attr_s(<<"jid">>, XML) of <<"">> -> #dest{jid_string = none, full_xml = XML}; JIDS -> - Type = xml:get_tag_attr_s(<<"type">>, XML), + Type = fxml:get_tag_attr_s(<<"type">>, XML), JIDJ = stj(JIDS), #dest{jid_string = JIDS, jid_jid = JIDJ, type = Type, full_xml = XML} @@ -488,7 +525,7 @@ split_dests_jid(Dests) -> Dests). report_not_jid(From, Packet, Dests) -> - Dests2 = [xml:element_to_binary(Dest#dest.full_xml) + Dests2 = [fxml:element_to_binary(Dest#dest.full_xml) || Dest <- Dests], [route_error(From, From, Packet, jid_malformed, <<"This service can not process the address: ", @@ -597,13 +634,13 @@ decide_action_group(Group) -> %%% Route packet %%%------------------------- -route_packet(From, ToDest, Packet, AAttrs, Addresses) -> +route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) -> Dests = case ToDest#dest.type of <<"bcc">> -> []; _ -> [ToDest] end, route_packet2(From, ToDest#dest.jid_string, Dests, - Packet, AAttrs, Addresses). + Packet, AAttrs, {Others, Addresses}). route_packet_multicast(From, ToS, Packet, AAttrs, Dests, Addresses, Limits) -> @@ -629,6 +666,8 @@ route_packet2(From, ToS, Dests, Packet, AAttrs, ToJID = stj(ToS), ejabberd_router:route(From, ToJID, Packet2). +append_dests(_Dests, {Others, Addresses}) -> + Addresses++Others; append_dests([], Addresses) -> Addresses; append_dests([Dest | Dests], Addresses) -> append_dests(Dests, [Dest#dest.full_xml | Addresses]). @@ -644,7 +683,8 @@ check_relay(RS, LS, Gs) -> end. check_relay_required(RServer, LServerS, Groups) -> - case str:str(RServer, LServerS) > 0 of + case lists:suffix(str:tokens(LServerS, <<".">>), + str:tokens(RServer, <<".">>)) of true -> false; false -> check_relay_required(LServerS, Groups) end. @@ -693,12 +733,11 @@ process_iqreply_error(From, LServiceS, _Packet) -> %%% Check protocol support: Receive response: Disco %%%------------------------- -process_iqreply_result(From, LServiceS, Packet, - State) -> +process_iqreply_result(From, LServiceS, Packet, State) -> #xmlel{name = <<"query">>, attrs = Attrs2, children = Els2} = - xml:get_subtag(Packet, <<"query">>), - case xml:get_attr_s(<<"xmlns">>, Attrs2) of + fxml:get_subtag(Packet, <<"query">>), + case fxml:get_attr_s(<<"xmlns">>, Attrs2) of ?NS_DISCO_INFO -> process_discoinfo_result(From, LServiceS, Els2, State); ?NS_DISCO_ITEMS -> @@ -721,37 +760,35 @@ process_discoinfo_result(From, LServiceS, Els, process_discoinfo_result2(From, FromS, LServiceS, Els, Waiter) -> - Multicast_support = lists:any(fun (XML) -> - case XML of - #xmlel{name = <<"feature">>, - attrs = Attrs} -> - (?NS_ADDRESS) == - xml:get_attr_s(<<"var">>, - Attrs); - _ -> false - end - end, - Els), + Multicast_support = + lists:any( + fun(XML) -> + case XML of + #xmlel{name = <<"feature">>, attrs = Attrs} -> + (?NS_ADDRESS) == fxml:get_attr_s(<<"var">>, Attrs); + _ -> false + end + end, + Els), Group = Waiter#waiter.group, RServer = Group#group.server, case Multicast_support of - true -> - SenderT = sender_type(From), - RLimits = get_limits_xml(Els, SenderT), - add_response(RServer, - {multicast_supported, FromS, RLimits}), - FromM = Waiter#waiter.sender, - DestsM = Group#group.dests, - PacketM = Waiter#waiter.packet, - AAttrsM = Waiter#waiter.aattrs, - AddressesM = Waiter#waiter.addresses, - RServiceM = FromS, - route_packet_multicast(FromM, RServiceM, PacketM, - AAttrsM, DestsM, AddressesM, RLimits), - delo_waiter(Waiter); - false -> - case FromS of - RServer -> + true -> + SenderT = sender_type(From), + RLimits = get_limits_xml(Els, SenderT), + add_response(RServer, {multicast_supported, FromS, RLimits}), + FromM = Waiter#waiter.sender, + DestsM = Group#group.dests, + PacketM = Waiter#waiter.packet, + AAttrsM = Waiter#waiter.aattrs, + AddressesM = Waiter#waiter.addresses, + RServiceM = FromS, + route_packet_multicast(FromM, RServiceM, PacketM, + AAttrsM, DestsM, AddressesM, RLimits), + delo_waiter(Waiter); + false -> + case FromS of + RServer -> send_query_items(FromS, LServiceS), delo_waiter(Waiter), add_waiter(Waiter#waiter{awaiting = @@ -772,10 +809,10 @@ get_limits_els(Els) -> #xmlel{name = <<"x">>, attrs = Attrs, children = SubEls} -> case ((?NS_XDATA) == - xml:get_attr_s(<<"xmlns">>, Attrs)) + fxml:get_attr_s(<<"xmlns">>, Attrs)) and (<<"result">> == - xml:get_attr_s(<<"type">>, Attrs)) + fxml:get_attr_s(<<"type">>, Attrs)) of true -> get_limits_fields(SubEls) ++ R; false -> R @@ -791,11 +828,11 @@ get_limits_fields(Fields) -> #xmlel{name = <<"field">>, attrs = Attrs} -> (<<"FORM_TYPE">> == - xml:get_attr_s(<<"var">>, + fxml:get_attr_s(<<"var">>, Attrs)) and (<<"hidden">> == - xml:get_attr_s(<<"type">>, + fxml:get_attr_s(<<"type">>, Attrs)); _ -> false end @@ -813,8 +850,8 @@ get_limits_values(Values) -> children = SubEls} -> [#xmlel{name = <<"value">>, children = SubElsV}] = SubEls, - Number = xml:get_cdata(SubElsV), - Name = xml:get_attr_s(<<"var">>, Attrs), + Number = fxml:get_cdata(SubElsV), + Name = fxml:get_attr_s(<<"var">>, Attrs), [{jlib:binary_to_atom(Name), jlib:binary_to_integer(Number)} | R]; @@ -828,29 +865,44 @@ get_limits_values(Values) -> %%%------------------------- process_discoitems_result(From, LServiceS, Els) -> - List = lists:foldl(fun (XML, Res) -> - case XML of - #xmlel{name = <<"item">>, attrs = Attrs} -> - Res ++ [xml:get_attr_s(<<"jid">>, Attrs)]; - _ -> Res - end - end, - [], Els), - [send_query_info(Item, LServiceS) || Item <- List], FromS = jts(From), - {found_waiter, Waiter} = search_waiter(FromS, LServiceS, - items), - delo_waiter(Waiter), - add_waiter(Waiter#waiter{awaiting = - {List, LServiceS, info}, - renewal = false}). + case search_waiter(FromS, LServiceS, items) of + {found_waiter, Waiter} -> + List = lists:foldl( + fun(XML, Res) -> + case XML of + #xmlel{name = <<"item">>, attrs = Attrs} -> + SJID = fxml:get_attr_s(<<"jid">>, Attrs), + case jid:from_string(SJID) of + #jid{luser = <<"">>, + lresource = <<"">>} -> + [SJID | Res]; + _ -> Res + end; + _ -> Res + end + end, + [], Els), + case List of + [] -> + received_awaiter(FromS, Waiter, LServiceS); + _ -> + [send_query_info(Item, LServiceS) || Item <- List], + delo_waiter(Waiter), + add_waiter(Waiter#waiter{awaiting = + {List, LServiceS, info}, + renewal = false}) + end; + _ -> + ok + end. %%%------------------------- %%% Check protocol support: Receive response: Received awaiter %%%------------------------- received_awaiter(JID, Waiter, LServiceS) -> - {JIDs, LServiceS, info} = Waiter#waiter.awaiting, + {JIDs, LServiceS, _} = Waiter#waiter.awaiting, delo_waiter(Waiter), Group = Waiter#waiter.group, RServer = Group#group.server, @@ -862,8 +914,9 @@ received_awaiter(JID, Waiter, LServiceS) -> From = Waiter#waiter.sender, Packet = Waiter#waiter.packet, AAttrs = Waiter#waiter.aattrs, + Others = Group#group.others, Addresses = Waiter#waiter.addresses, - [route_packet(From, ToUser, Packet, AAttrs, Addresses) + [route_packet(From, ToUser, Packet, AAttrs, Others, Addresses) || ToUser <- Group#group.dests]; true -> send_query_info(RServer, LServiceS), @@ -887,8 +940,7 @@ create_cache() -> {attributes, record_info(fields, multicastc)}]). add_response(RServer, Response) -> - Secs = - calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()), mnesia:dirty_write(#multicastc{rserver = RServer, response = Response, ts = Secs}). @@ -899,8 +951,7 @@ search_server_on_cache(RServer, _LServerS, Maxmins) -> case look_server(RServer) of not_cached -> not_cached; {cached, Response, Ts} -> - Now = - calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()), case is_obsolete(Response, Ts, Now, Maxmins) of false -> {cached, Response}; true -> {obsolete, Response} @@ -928,7 +979,7 @@ purge() -> Maxmins_positive = (?MAXTIME_CACHE_POSITIVE), Maxmins_negative = (?MAXTIME_CACHE_NEGATIVE), Now = - calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())), + calendar:datetime_to_gregorian_seconds(calendar:local_time()), purge(Now, {Maxmins_positive, Maxmins_negative}). purge(Now, Maxmins) -> @@ -982,8 +1033,12 @@ purge_loop(NM) -> %%%------------------------- create_pool() -> - catch ets:new(multicastp, - [duplicate_bag, public, named_table, {keypos, 2}]). + catch + begin + ets:new(multicastp, + [duplicate_bag, public, named_table, {keypos, 2}]), + ets:give_away(multicastp, whereis(ejabberd), ok) + end. add_waiter(Waiter) -> true = ets:insert(multicastp, Waiter). @@ -991,6 +1046,9 @@ add_waiter(Waiter) -> delo_waiter(Waiter) -> true = ets:delete_object(multicastp, Waiter). +-spec search_waiter(binary(), binary(), info | items) -> + {found_waiter, #waiter{}} | waiter_not_found. + search_waiter(JID, LServiceS, Type) -> Rs = ets:foldl(fun (W, Res) -> {JIDs, LServiceS1, Type1} = W#waiter.awaiting, @@ -1141,7 +1199,7 @@ to_binary(A) -> list_to_binary(hd(io_lib:format("~p", [A]))). route_error(From, To, Packet, ErrType, ErrText) -> #xmlel{attrs = Attrs} = Packet, - Lang = xml:get_attr_s(<<"xml:lang">>, Attrs), + Lang = fxml:get_attr_s(<<"xml:lang">>, Attrs), Reply = make_reply(ErrType, Lang, ErrText), Err = jlib:make_error_reply(Packet, Reply), ejabberd_router:route(From, To, Err). @@ -1157,6 +1215,13 @@ make_reply(internal_server_error, Lang, ErrText) -> make_reply(forbidden, Lang, ErrText) -> ?ERRT_FORBIDDEN(Lang, ErrText). -stj(String) -> jlib:string_to_jid(String). +stj(String) -> jid:from_string(String). + +jts(String) -> jid:to_string(String). -jts(String) -> jlib:jid_to_string(String). +mod_opt_type(access) -> + fun (A) when is_atom(A) -> A end; +mod_opt_type(host) -> fun iolist_to_binary/1; +mod_opt_type(limits) -> + fun (A) when is_list(A) -> A end; +mod_opt_type(_) -> [access, host, limits]. |