diff options
Diffstat (limited to 'src/mod_multicast.erl')
-rw-r--r-- | src/mod_multicast.erl | 1144 |
1 files changed, 538 insertions, 606 deletions
diff --git a/src/mod_multicast.erl b/src/mod_multicast.erl index df385c28c..e76e9a63d 100644 --- a/src/mod_multicast.erl +++ b/src/mod_multicast.erl @@ -5,7 +5,7 @@ %%% Created : 29 May 2007 by Badlop <badlop@process-one.net> %%% %%% -%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% ejabberd, Copyright (C) 2002-2019 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -34,67 +34,53 @@ -behaviour(gen_mod). %% API --export([start_link/2, start/2, stop/1]). +-export([start/2, stop/1, reload/3, + user_send_packet/1]). %% gen_server callbacks -export([init/1, handle_info/2, handle_call/3, handle_cast/2, terminate/2, code_change/3]). --export([purge_loop/1, mod_opt_type/1, depends/2]). +-export([purge_loop/1, mod_opt_type/1, mod_options/1, depends/2]). --include("ejabberd.hrl"). -include("logger.hrl"). +-include("translate.hrl"). +-include("xmpp.hrl"). --include("jlib.hrl"). +-record(multicastc, {rserver :: binary(), + response, + ts :: integer()}). --record(state, - {lserver, lservice, access, service_limits}). +-record(dest, {jid_string :: binary() | none, + jid_jid :: jid() | undefined, + type :: bcc | cc | noreply | ofrom | replyroom | replyto | to, + address :: address()}). --record(multicastc, {rserver, response, ts}). +-type limit_value() :: {default | custom, integer()}. +-record(limits, {message :: limit_value(), + presence :: limit_value()}). -%% ts: timestamp (in seconds) when the cache item was last updated +-record(service_limits, {local :: #limits{}, + remote :: #limits{}}). --record(dest, {jid_string, jid_jid, type, full_xml}). +-type routing() :: route_single | {route_multicast, binary(), #service_limits{}}. -%% jid_string = string() -%% jid_jid = jid() -%% full_xml = xml() +-record(group, {server :: binary(), + dests :: [#dest{}], + multicast :: routing() | undefined, + others :: [address()], + addresses :: [address()]}). --record(group, - {server, dests, multicast, others, addresses}). - -%% server = string() -%% dests = [string()] -%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached -%% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()} -%% others = [xml()] -%% packet = xml() - --record(waiter, - {awaiting, group, renewal = false, sender, packet, - aattrs, addresses}). - -%% awaiting = {[Remote_service], Local_service, Type_awaiting} -%% Remote_service = Local_service = string() -%% Type_awaiting = info | items -%% group = #group -%% renewal = true | false -%% sender = From -%% packet = xml() -%% aattrs = [xml()] - --record(limits, {message, presence}). - -%% message = presence = integer() | infinite - --record(service_limits, {local, remote}). +-record(state, {lserver :: binary(), + lservice :: binary(), + access :: atom(), + service_limits :: #service_limits{}}). +-type state() :: #state{}. %% All the elements are of type value() -define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>). --define(PROCNAME, ejabberd_mod_multicast). - -define(PURGE_PROCNAME, ejabberd_mod_multicast_purgeloop). @@ -102,6 +88,8 @@ -define(MAXTIME_CACHE_NEGATIVE, 86400). +-define(MAXTIME_CACHE_NEGOTIATING, 600). + -define(CACHE_PURGE_TIMER, 86400000). -define(DISCO_QUERY_TIMEOUT, 10000). @@ -114,44 +102,67 @@ -define(DEFAULT_LIMIT_REMOTE_PRESENCE, 20). -start_link(LServerS, Opts) -> - Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), - gen_server:start_link({local, Proc}, ?MODULE, - [LServerS, Opts], []). - start(LServerS, Opts) -> - Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), - ChildSpec = {Proc, - {?MODULE, start_link, [LServerS, Opts]}, temporary, - 1000, worker, [?MODULE]}, - supervisor:start_child(ejabberd_sup, ChildSpec). + gen_mod:start_child(?MODULE, LServerS, Opts). stop(LServerS) -> - Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME), - gen_server:call(Proc, stop), - supervisor:terminate_child(ejabberd_sup, Proc), - supervisor:delete_child(ejabberd_sup, Proc). + gen_mod:stop_child(?MODULE, LServerS). + +reload(LServerS, NewOpts, OldOpts) -> + Proc = gen_mod:get_module_proc(LServerS, ?MODULE), + gen_server:cast(Proc, {reload, NewOpts, OldOpts}). + +-define(SETS, gb_sets). + +user_send_packet({#presence{} = Packet, C2SState} = Acc) -> + case xmpp:get_subtag(Packet, #addresses{}) of + #addresses{list = Addresses} -> + {ToDeliver, _Delivereds} = split_addresses_todeliver(Addresses), + NewState = + lists:foldl( + fun(Address, St) -> + case Address#address.jid of + #jid{} = JID -> + LJID = jid:tolower(JID), + #{pres_a := PresA} = St, + A = + case Packet#presence.type of + available -> + ?SETS:add_element(LJID, PresA); + unavailable -> + ?SETS:del_element(LJID, PresA); + _ -> + PresA + end, + St#{pres_a => A}; + undefined -> + St + end + end, C2SState, ToDeliver), + {Packet, NewState}; + false -> + Acc + end; +user_send_packet(Acc) -> + Acc. %%==================================================================== %% gen_server callbacks %%==================================================================== -init([LServerS, Opts]) -> - LServiceS = gen_mod:get_opt_host(LServerS, Opts, - <<"multicast.@HOST@">>), - Access = gen_mod:get_opt(access, Opts, - fun acl:access_rules_validator/1, all), - SLimits = - build_service_limit_record(gen_mod:get_opt(limits, Opts, - fun (A) when is_list(A) -> - A - end, - [])), +-spec init(list()) -> {ok, state()}. +init([LServerS|_]) -> + process_flag(trap_exit, true), + Opts = gen_mod:get_module_opts(LServerS, ?MODULE), + [LServiceS|_] = gen_mod:get_opt_hosts(Opts), + Access = mod_multicast_opt:access(Opts), + SLimits = build_service_limit_record(mod_multicast_opt:limits(Opts)), create_cache(), try_start_loop(), - create_pool(), ejabberd_router_multicast:register_route(LServerS), ejabberd_router:register_route(LServiceS, LServerS), + ejabberd_hooks:add(user_send_packet, LServerS, ?MODULE, + user_send_packet, 50), {ok, #state{lservice = LServiceS, lserver = LServerS, access = Access, service_limits = SLimits}}. @@ -159,7 +170,22 @@ init([LServerS, Opts]) -> handle_call(stop, _From, State) -> try_stop_loop(), {stop, normal, ok, State}. -handle_cast(_Msg, State) -> {noreply, State}. +handle_cast({reload, NewOpts, NewOpts}, + #state{lserver = LServerS, lservice = OldLServiceS} = State) -> + Access = mod_multicast_opt:access(NewOpts), + SLimits = build_service_limit_record(mod_multicast_opt:limits(NewOpts)), + [NewLServiceS|_] = gen_mod:get_opt_hosts(NewOpts), + if NewLServiceS /= OldLServiceS -> + ejabberd_router:register_route(NewLServiceS, LServerS), + ejabberd_router:unregister_route(OldLServiceS); + true -> + ok + end, + {noreply, State#state{lservice = NewLServiceS, + access = Access, service_limits = SLimits}}; +handle_cast(Msg, State) -> + ?WARNING_MSG("Unexpected cast: ~p", [Msg]), + {noreply, State}. %%-------------------------------------------------------------------- %% Function: handle_info(Info, State) -> {noreply, State} | @@ -168,10 +194,8 @@ handle_cast(_Msg, State) -> {noreply, State}. %% Description: Handling all non call/cast messages %%-------------------------------------------------------------------- -handle_info({route, From, To, - #xmlel{name = <<"iq">>, attrs = Attrs} = Packet}, - State) -> - case catch handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) of +handle_info({route, #iq{} = Packet}, State) -> + case catch handle_iq(Packet, State) of {'EXIT', Reason} -> ?ERROR_MSG("Error when processing IQ stanza: ~p", [Reason]); @@ -179,20 +203,17 @@ handle_info({route, From, To, end, {noreply, State}; %% XEP33 allows only 'message' and 'presence' stanza type -handle_info({route, From, To, - #xmlel{name = Stanza_type} = Packet}, +handle_info({route, Packet}, #state{lservice = LServiceS, lserver = LServerS, access = Access, service_limits = SLimits} = - State) - when (Stanza_type == <<"message">>) or - (Stanza_type == <<"presence">>) -> - route_untrusted(LServiceS, LServerS, Access, SLimits, - From, To, Packet), + State) when ?is_stanza(Packet) -> + route_untrusted(LServiceS, LServerS, Access, SLimits, Packet), {noreply, State}; %% Handle multicast packets sent by trusted local services -handle_info({route_trusted, From, Destinations, Packet}, +handle_info({route_trusted, Destinations, Packet}, #state{lservice = LServiceS, lserver = LServerS} = State) -> + From = xmpp:get_from(Packet), case catch route_trusted(LServiceS, LServerS, From, Destinations, Packet) of {'EXIT', Reason} -> @@ -206,6 +227,8 @@ handle_info({get_host, Pid}, State) -> handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State) -> + ejabberd_hooks:delete(user_send_packet, State#state.lserver, ?MODULE, + user_send_packet, 50), ejabberd_router_multicast:unregister_route(State#state.lserver), ejabberd_router:unregister_route(State#state.lservice), ok. @@ -220,206 +243,162 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%% IQ Request Processing %%%------------------------ -handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) -> - IQ = jlib:iq_query_info(Packet), - case catch process_iq(From, IQ, State) of - Result when is_record(Result, iq) -> - ejabberd_router:route(To, From, jlib:iq_to_xml(Result)); - {'EXIT', Reason} -> - ?ERROR_MSG("Error when processing IQ stanza: ~p", - [Reason]), - Err = jlib:make_error_reply(Packet, - ?ERR_INTERNAL_SERVER_ERROR), - ejabberd_router:route(To, From, Err); - reply -> - LServiceS = jts(To), - case fxml:get_attr_s(<<"type">>, Attrs) of - <<"result">> -> - process_iqreply_result(From, LServiceS, Packet, State); - <<"error">> -> - process_iqreply_error(From, LServiceS, Packet) - end; - ok -> ok +handle_iq(Packet, State) -> + try + IQ = xmpp:decode_els(Packet), + case process_iq(IQ, State) of + {result, SubEl} -> + ejabberd_router:route(xmpp:make_iq_result(Packet, SubEl)); + {error, Error} -> + ejabberd_router:route_error(Packet, Error); + reply -> + To = xmpp:get_to(IQ), + LServiceS = jid:encode(To), + case Packet#iq.type of + result -> + process_iqreply_result(LServiceS, IQ); + error -> + process_iqreply_error(LServiceS, IQ) + end + end + catch _:{xmpp_codec, Why} -> + Lang = xmpp:get_lang(Packet), + Err = xmpp:err_bad_request(xmpp:io_format_error(Why), Lang), + ejabberd_router:route_error(Packet, Err) end. -process_iq(From, - #iq{type = get, xmlns = ?NS_DISCO_INFO, lang = Lang} = - IQ, - State) -> - IQ#iq{type = result, - sub_el = - [#xmlel{name = <<"query">>, - attrs = [{<<"xmlns">>, ?NS_DISCO_INFO}], - children = iq_disco_info(From, Lang, State)}]}; -%% disco#items request -process_iq(_, - #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) -> - IQ#iq{type = result, - sub_el = - [#xmlel{name = <<"query">>, - attrs = [{<<"xmlns">>, ?NS_DISCO_ITEMS}], - children = []}]}; -%% vCard request -process_iq(_, - #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ, - _) -> - IQ#iq{type = result, - sub_el = - [#xmlel{name = <<"vCard">>, - attrs = [{<<"xmlns">>, ?NS_VCARD}], - children = iq_vcard(Lang)}]}; -%% Unknown "set" or "get" request -process_iq(_, #iq{type = Type, sub_el = SubEl} = IQ, _) - when Type == get; Type == set -> - IQ#iq{type = error, - sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]}; -%% IQ "result" or "error". -process_iq(_, reply, _) -> reply; -%% IQ "result" or "error". -process_iq(_, _, _) -> ok. - --define(FEATURE(Feat), - #xmlel{name = <<"feature">>, - attrs = [{<<"var">>, Feat}], children = []}). +-spec process_iq(iq(), state()) -> {result, xmpp_element()} | + {error, stanza_error()} | reply. +process_iq(#iq{type = get, lang = Lang, from = From, + sub_els = [#disco_info{}]}, State) -> + {result, iq_disco_info(From, Lang, State)}; +process_iq(#iq{type = get, sub_els = [#disco_items{}]}, _) -> + {result, #disco_items{}}; +process_iq(#iq{type = get, lang = Lang, sub_els = [#vcard_temp{}]}, State) -> + {result, iq_vcard(Lang, State)}; +process_iq(#iq{type = T}, _) when T == set; T == get -> + {error, xmpp:err_service_unavailable()}; +process_iq(_, _) -> + reply. + +-define(FEATURE(Feat), Feat). iq_disco_info(From, Lang, State) -> - [#xmlel{name = <<"identity">>, - attrs = - [{<<"category">>, <<"service">>}, - {<<"type">>, <<"multicast">>}, - {<<"name">>, - translate:translate(Lang, <<"Multicast">>)}], - children = []}, - ?FEATURE((?NS_DISCO_INFO)), ?FEATURE((?NS_DISCO_ITEMS)), - ?FEATURE((?NS_VCARD)), ?FEATURE((?NS_ADDRESS))] - ++ iq_disco_info_extras(From, State). - -iq_vcard(Lang) -> - [#xmlel{name = <<"FN">>, attrs = [], - children = [{xmlcdata, <<"ejabberd/mod_multicast">>}]}, - #xmlel{name = <<"URL">>, attrs = [], - children = [{xmlcdata, ?EJABBERD_URI}]}, - #xmlel{name = <<"DESC">>, attrs = [], - children = - [{xmlcdata, - <<(translate:translate(Lang, - <<"ejabberd Multicast service">>))/binary, - "\nCopyright (c) 2002-2016 ProcessOne">>}]}]. + Name = mod_multicast_opt:name(State#state.lserver), + #disco_info{ + identities = [#identity{category = <<"service">>, + type = <<"multicast">>, + name = translate:translate(Lang, Name)}], + features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS, ?NS_VCARD, ?NS_ADDRESS], + xdata = iq_disco_info_extras(From, State)}. + +-spec iq_vcard(binary(), state()) -> #vcard_temp{}. +iq_vcard(Lang, State) -> + case mod_multicast_opt:vcard(State#state.lserver) of + undefined -> + #vcard_temp{fn = <<"ejabberd/mod_multicast">>, + url = ejabberd_config:get_uri(), + desc = misc:get_descr(Lang, ?T("ejabberd Multicast service"))}; + VCard -> + VCard + end. %%%------------------------- %%% Route %%%------------------------- +-spec route_trusted(binary(), binary(), jid(), [jid()], stanza()) -> 'ok'. route_trusted(LServiceS, LServerS, FromJID, Destinations, Packet) -> Packet_stripped = Packet, - AAttrs = [{<<"xmlns">>, ?NS_ADDRESS}], Delivereds = [], - Dests2 = lists:map(fun (D) -> - DS = jts(D), - XML = #xmlel{name = <<"address">>, - attrs = - [{<<"type">>, <<"bcc">>}, - {<<"jid">>, DS}], - children = []}, - #dest{jid_string = DS, jid_jid = D, - type = <<"bcc">>, full_xml = XML} - end, - Destinations), + Dests2 = lists:map( + fun(D) -> + #dest{jid_string = jid:encode(D), + jid_jid = D, type = bcc, + address = #address{type = bcc, jid = D}} + end, Destinations), Groups = group_dests(Dests2), route_common(LServerS, LServiceS, FromJID, Groups, - Delivereds, Packet_stripped, AAttrs). + Delivereds, Packet_stripped). -route_untrusted(LServiceS, LServerS, Access, SLimits, - From, To, Packet) -> +-spec route_untrusted(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'. +route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) -> try route_untrusted2(LServiceS, LServerS, Access, - SLimits, From, Packet) + SLimits, Packet) catch adenied -> - route_error(To, From, Packet, forbidden, - <<"Access denied by service policy">>); + route_error(Packet, forbidden, + ?T("Access denied by service policy")); eadsele -> - route_error(To, From, Packet, bad_request, - <<"No addresses element found">>); + route_error(Packet, bad_request, + ?T("No addresses element found")); eadeles -> - route_error(To, From, Packet, bad_request, - <<"No address elements found">>); + route_error(Packet, bad_request, + ?T("No address elements found")); ewxmlns -> - route_error(To, From, Packet, bad_request, - <<"Wrong xmlns">>); + route_error(Packet, bad_request, + ?T("Wrong xmlns")); etoorec -> - route_error(To, From, Packet, not_acceptable, - <<"Too many receiver fields were specified">>); + route_error(Packet, not_acceptable, + ?T("Too many receiver fields were specified")); edrelay -> - route_error(To, From, Packet, forbidden, - <<"Packet relay is denied by service policy">>); + route_error(Packet, forbidden, + ?T("Packet relay is denied by service policy")); EType:EReason -> ?ERROR_MSG("Multicast unknown error: Type: ~p~nReason: ~p", [EType, EReason]), - route_error(To, From, Packet, internal_server_error, - <<"Unknown problem">>) + route_error(Packet, internal_server_error, + ?T("Internal server error")) end. -route_untrusted2(LServiceS, LServerS, Access, SLimits, - FromJID, Packet) -> +-spec route_untrusted2(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'. +route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) -> + FromJID = xmpp:get_from(Packet), ok = check_access(LServerS, Access, FromJID), - {ok, Packet_stripped, AAttrs, Addresses} = - strip_addresses_element(Packet), - {To_deliver, Delivereds} = - split_addresses_todeliver(Addresses), + {ok, Packet_stripped, Addresses} = strip_addresses_element(Packet), + {To_deliver, Delivereds} = split_addresses_todeliver(Addresses), Dests = convert_dest_record(To_deliver), {Dests2, Not_jids} = split_dests_jid(Dests), report_not_jid(FromJID, Packet, Not_jids), - ok = check_limit_dests(SLimits, FromJID, Packet, - Dests2), + ok = check_limit_dests(SLimits, FromJID, Packet, Dests2), Groups = group_dests(Dests2), ok = check_relay(FromJID#jid.server, LServerS, Groups), route_common(LServerS, LServiceS, FromJID, Groups, - Delivereds, Packet_stripped, AAttrs). + Delivereds, Packet_stripped). +-spec route_common(binary(), binary(), jid(), [#group{}], + [address()], stanza()) -> 'ok'. route_common(LServerS, LServiceS, FromJID, Groups, - Delivereds, Packet_stripped, AAttrs) -> - Groups2 = look_cached_servers(LServerS, Groups), + Delivereds, Packet_stripped) -> + Groups2 = look_cached_servers(LServerS, LServiceS, Groups), Groups3 = build_others_xml(Groups2), Groups4 = add_addresses(Delivereds, Groups3), AGroups = decide_action_groups(Groups4), - act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, + act_groups(FromJID, Packet_stripped, LServiceS, AGroups). -act_groups(FromJID, Packet_stripped, AAttrs, LServiceS, - AGroups) -> - [perform(FromJID, Packet_stripped, AAttrs, LServiceS, - AGroup) - || AGroup <- AGroups]. - -perform(From, Packet, AAttrs, _, +-spec act_groups(jid(), stanza(), binary(), [{routing(), #group{}}]) -> 'ok'. +act_groups(FromJID, Packet_stripped, LServiceS, AGroups) -> + lists:foreach( + fun(AGroup) -> + perform(FromJID, Packet_stripped, LServiceS, + AGroup) + end, AGroups). + +-spec perform(jid(), stanza(), binary(), + {routing(), #group{}}) -> 'ok'. +perform(From, Packet, _, {route_single, Group}) -> - [route_packet(From, ToUser, Packet, AAttrs, - Group#group.others, Group#group.addresses) - || ToUser <- Group#group.dests]; -perform(From, Packet, AAttrs, _, + lists:foreach( + fun(ToUser) -> + route_packet(From, ToUser, Packet, + Group#group.others, Group#group.addresses) + end, Group#group.dests); +perform(From, Packet, _, {{route_multicast, JID, RLimits}, Group}) -> - route_packet_multicast(From, JID, Packet, AAttrs, - Group#group.dests, Group#group.addresses, RLimits); -perform(From, Packet, AAttrs, LServiceS, - {{ask, Old_service, renewal}, Group}) -> - send_query_info(Old_service, LServiceS), - add_waiter(#waiter{awaiting = - {[Old_service], LServiceS, info}, - group = Group, renewal = true, sender = From, - packet = Packet, aattrs = AAttrs, - addresses = Group#group.addresses}); -perform(_From, _Packet, _AAttrs, LServiceS, - {{ask, LServiceS, _}, _Group}) -> - ok; -perform(From, Packet, AAttrs, LServiceS, - {{ask, Server, not_renewal}, Group}) -> - send_query_info(Server, LServiceS), - add_waiter(#waiter{awaiting = - {[Server], LServiceS, info}, - group = Group, renewal = false, sender = From, - packet = Packet, aattrs = AAttrs, - addresses = Group#group.addresses}). + route_packet_multicast(From, JID, Packet, + Group#group.dests, Group#group.addresses, RLimits). %%%------------------------- %%% Check access permission @@ -435,52 +414,39 @@ check_access(LServerS, Access, From) -> %%% Strip 'addresses' XML element %%%------------------------- +-spec strip_addresses_element(stanza()) -> {ok, stanza(), [address()]}. strip_addresses_element(Packet) -> - case fxml:get_subtag(Packet, <<"addresses">>) of - #xmlel{name = <<"addresses">>, attrs = AAttrs, - children = Addresses} -> - case fxml:get_attr_s(<<"xmlns">>, AAttrs) of - ?NS_ADDRESS -> - #xmlel{name = Name, attrs = Attrs, children = Els} = - Packet, - Els_stripped = lists:keydelete(<<"addresses">>, 2, Els), - Packet_stripped = #xmlel{name = Name, attrs = Attrs, - children = Els_stripped}, - {ok, Packet_stripped, AAttrs, fxml:remove_cdata(Addresses)}; - _ -> throw(ewxmlns) - end; - _ -> throw(eadsele) + case xmpp:get_subtag(Packet, #addresses{}) of + #addresses{list = Addrs} -> + PacketStripped = xmpp:remove_subtag(Packet, #addresses{}), + {ok, PacketStripped, Addrs}; + false -> + throw(eadsele) end. %%%------------------------- %%% Split Addresses %%%------------------------- +-spec split_addresses_todeliver([address()]) -> {[address()], [address()]}. split_addresses_todeliver(Addresses) -> - lists:partition(fun (XML) -> - case XML of - #xmlel{name = <<"address">>, attrs = Attrs} -> - case fxml:get_attr_s(<<"delivered">>, Attrs) of - <<"true">> -> false; - _ -> - Type = fxml:get_attr_s(<<"type">>, - Attrs), - case Type of - <<"to">> -> true; - <<"cc">> -> true; - <<"bcc">> -> true; - _ -> false - end - end; - _ -> false - end - end, - Addresses). + lists:partition( + fun(#address{delivered = true}) -> + false; + (#address{type = Type}) -> + case Type of + to -> true; + cc -> true; + bcc -> true; + _ -> false + end + end, Addresses). %%%------------------------- %%% Check does not exceed limit of destinations %%%------------------------- +-spec check_limit_dests(#service_limits{}, jid(), stanza(), [address()]) -> ok. check_limit_dests(SLimits, FromJID, Packet, Addresses) -> SenderT = sender_type(FromJID), @@ -497,24 +463,23 @@ check_limit_dests(SLimits, FromJID, Packet, %%% Convert Destination XML to record %%%------------------------- -convert_dest_record(XMLs) -> - lists:map(fun (XML) -> - case fxml:get_tag_attr_s(<<"jid">>, XML) of - <<"">> -> #dest{jid_string = none, full_xml = XML}; - JIDS -> - Type = fxml:get_tag_attr_s(<<"type">>, XML), - JIDJ = stj(JIDS), - #dest{jid_string = JIDS, jid_jid = JIDJ, - type = Type, full_xml = XML} - end - end, - XMLs). +-spec convert_dest_record([address()]) -> [#dest{}]. +convert_dest_record(Addrs) -> + lists:map( + fun(#address{jid = undefined, type = Type} = Addr) -> + #dest{jid_string = none, + type = Type, address = Addr}; + (#address{jid = JID, type = Type} = Addr) -> + #dest{jid_string = jid:encode(JID), jid_jid = JID, + type = Type, address = Addr} + end, Addrs). %%%------------------------- %%% Split destinations by existence of JID %%% and send error messages for other dests %%%------------------------- +-spec split_dests_jid([#dest{}]) -> {[#dest{}], [#dest{}]}. split_dests_jid(Dests) -> lists:partition(fun (Dest) -> case Dest#dest.jid_string of @@ -524,18 +489,20 @@ split_dests_jid(Dests) -> end, Dests). +-spec report_not_jid(jid(), stanza(), [#dest{}]) -> any(). report_not_jid(From, Packet, Dests) -> - Dests2 = [fxml:element_to_binary(Dest#dest.full_xml) + Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.address)) || Dest <- Dests], - [route_error(From, From, Packet, jid_malformed, - <<"This service can not process the address: ", - D/binary>>) + [route_error( + xmpp:set_from_to(Packet, From, From), jid_malformed, + str:format(?T("This service can not process the address: ~ts"), [D])) || D <- Dests2]. %%%------------------------- %%% Group destinations by their servers %%%------------------------- +-spec group_dests([#dest{}]) -> [#group{}]. group_dests(Dests) -> D = lists:foldl(fun (Dest, Dict) -> ServerS = (Dest#dest.jid_jid)#jid.server, @@ -543,21 +510,22 @@ group_dests(Dests) -> end, dict:new(), Dests), Keys = dict:fetch_keys(D), - [#group{server = Key, dests = dict:fetch(Key, D)} + [#group{server = Key, dests = dict:fetch(Key, D), + addresses = [], others = []} || Key <- Keys]. %%%------------------------- %%% Look for cached responses %%%------------------------- -look_cached_servers(LServerS, Groups) -> - [look_cached(LServerS, Group) || Group <- Groups]. +look_cached_servers(LServerS, LServiceS, Groups) -> + [look_cached(LServerS, LServiceS, Group) || Group <- Groups]. -look_cached(LServerS, G) -> +look_cached(LServerS, LServiceS, G) -> Maxtime_positive = (?MAXTIME_CACHE_POSITIVE), Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE), Cached_response = search_server_on_cache(G#group.server, - LServerS, + LServerS, LServiceS, {Maxtime_positive, Maxtime_negative}), G#group{multicast = Cached_response}. @@ -573,20 +541,19 @@ build_others_xml(Groups) -> build_other_xml(Dests) -> lists:foldl(fun (Dest, R) -> - XML = Dest#dest.full_xml, + XML = Dest#dest.address, case Dest#dest.type of - <<"to">> -> [add_delivered(XML) | R]; - <<"cc">> -> [add_delivered(XML) | R]; - <<"bcc">> -> R; + to -> [add_delivered(XML) | R]; + cc -> [add_delivered(XML) | R]; + bcc -> R; _ -> [XML | R] end end, [], Dests). -add_delivered(#xmlel{name = Name, attrs = Attrs, - children = Els}) -> - Attrs2 = [{<<"delivered">>, <<"true">>} | Attrs], - #xmlel{name = Name, attrs = Attrs2, children = Els}. +-spec add_delivered(address()) -> address(). +add_delivered(Addr) -> + Addr#address{delivered = true}. %%%------------------------- %%% Add preliminary packets @@ -608,80 +575,67 @@ add_addresses2(Delivereds, [Group | Groups], Res, Pa, %%% Decide action groups %%%------------------------- +-spec decide_action_groups([#group{}]) -> [{routing(), #group{}}]. decide_action_groups(Groups) -> - [{decide_action_group(Group), Group} + [{Group#group.multicast, Group} || Group <- Groups]. -decide_action_group(Group) -> - Server = Group#group.server, - case Group#group.multicast of - {cached, local_server} -> - %% Send a copy of the packet to each local user on Dests - route_single; - {cached, not_supported} -> - %% Send a copy of the packet to each remote user on Dests - route_single; - {cached, {multicast_supported, JID, RLimits}} -> - {route_multicast, JID, RLimits}; - {obsolete, - {multicast_supported, Old_service, _RLimits}} -> - {ask, Old_service, renewal}; - {obsolete, not_supported} -> {ask, Server, not_renewal}; - not_cached -> {ask, Server, not_renewal} - end. - %%%------------------------- %%% Route packet %%%------------------------- -route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) -> +-spec route_packet(jid(), #dest{}, stanza(), [addresses()], [addresses()]) -> 'ok'. +route_packet(From, ToDest, Packet, Others, Addresses) -> Dests = case ToDest#dest.type of - <<"bcc">> -> []; + bcc -> []; _ -> [ToDest] end, route_packet2(From, ToDest#dest.jid_string, Dests, - Packet, AAttrs, {Others, Addresses}). + Packet, {Others, Addresses}). -route_packet_multicast(From, ToS, Packet, AAttrs, Dests, +-spec route_packet_multicast(jid(), binary(), stanza(), [#dest{}], [address()], #limits{}) -> 'ok'. +route_packet_multicast(From, ToS, Packet, Dests, Addresses, Limits) -> Type_of_stanza = type_of_stanza(Packet), {_Type, Limit_number} = get_limit_number(Type_of_stanza, Limits), Fragmented_dests = fragment_dests(Dests, Limit_number), - [route_packet2(From, ToS, DFragment, Packet, AAttrs, - Addresses) - || DFragment <- Fragmented_dests]. - -route_packet2(From, ToS, Dests, Packet, AAttrs, - Addresses) -> - #xmlel{name = T, attrs = A, children = C} = Packet, - C2 = case append_dests(Dests, Addresses) of - [] -> C; - ACs -> - [#xmlel{name = <<"addresses">>, attrs = AAttrs, - children = ACs} - | C] - end, - Packet2 = #xmlel{name = T, attrs = A, children = C2}, + lists:foreach(fun(DFragment) -> + route_packet2(From, ToS, DFragment, Packet, + Addresses) + end, Fragmented_dests). + +-spec route_packet2(jid(), binary(), [#dest{}], stanza(), {[address()], [address()]} | [address()]) -> 'ok'. +route_packet2(From, ToS, Dests, Packet, Addresses) -> + Els = case append_dests(Dests, Addresses) of + [] -> + xmpp:get_els(Packet); + ACs -> + [#addresses{list = ACs}|xmpp:get_els(Packet)] + end, + Packet2 = xmpp:set_els(Packet, Els), ToJID = stj(ToS), - ejabberd_router:route(From, ToJID, Packet2). + ejabberd_router:route(xmpp:set_from_to(Packet2, From, ToJID)). +-spec append_dests([#dest{}], {[address()], [address()]} | [address()]) -> [address()]. append_dests(_Dests, {Others, Addresses}) -> - Addresses++Others; + Addresses ++ Others; append_dests([], Addresses) -> Addresses; append_dests([Dest | Dests], Addresses) -> - append_dests(Dests, [Dest#dest.full_xml | Addresses]). + append_dests(Dests, [Dest#dest.address | Addresses]). %%%------------------------- %%% Check relay %%%------------------------- +-spec check_relay(binary(), binary(), [#group{}]) -> ok. check_relay(RS, LS, Gs) -> case check_relay_required(RS, LS, Gs) of false -> ok; true -> throw(edrelay) end. +-spec check_relay_required(binary(), binary(), [#group{}]) -> boolean(). check_relay_required(RServer, LServerS, Groups) -> case lists:suffix(str:tokens(LServerS, <<".">>), str:tokens(RServer, <<".">>)) of @@ -689,6 +643,7 @@ check_relay_required(RServer, LServerS, Groups) -> false -> check_relay_required(LServerS, Groups) end. +-spec check_relay_required(binary(), [#group{}]) -> boolean(). check_relay_required(LServerS, Groups) -> lists:any(fun (Group) -> Group#group.server /= LServerS end, @@ -698,200 +653,193 @@ check_relay_required(LServerS, Groups) -> %%% Check protocol support: Send request %%%------------------------- -send_query_info(RServerS, LServiceS) -> +-spec send_query_info(binary(), binary(), binary()) -> ok. +send_query_info(RServerS, LServiceS, ID) -> case str:str(RServerS, <<"echo.">>) of - 1 -> false; - _ -> send_query(RServerS, LServiceS, ?NS_DISCO_INFO) + 1 -> ok; + _ -> send_query(RServerS, LServiceS, ID, #disco_info{}) end. -send_query_items(RServerS, LServiceS) -> - send_query(RServerS, LServiceS, ?NS_DISCO_ITEMS). +-spec send_query_items(binary(), binary(), binary()) -> ok. +send_query_items(RServerS, LServiceS, ID) -> + send_query(RServerS, LServiceS, ID, #disco_items{}). -send_query(RServerS, LServiceS, XMLNS) -> - Packet = #xmlel{name = <<"iq">>, - attrs = [{<<"to">>, RServerS}, {<<"type">>, <<"get">>}], - children = - [#xmlel{name = <<"query">>, - attrs = [{<<"xmlns">>, XMLNS}], - children = []}]}, - ejabberd_router:route(stj(LServiceS), stj(RServerS), - Packet). +-spec send_query(binary(), binary(), binary(), disco_info()|disco_items()) -> ok. +send_query(RServerS, LServiceS, ID, SubEl) -> + Packet = #iq{from = stj(LServiceS), + to = stj(RServerS), + id = ID, + type = get, sub_els = [SubEl]}, + ejabberd_router:route(Packet). %%%------------------------- %%% Check protocol support: Receive response: Error %%%------------------------- -process_iqreply_error(From, LServiceS, _Packet) -> - FromS = jts(From), - case search_waiter(FromS, LServiceS, info) of - {found_waiter, Waiter} -> - received_awaiter(FromS, Waiter, LServiceS); - _ -> ok +process_iqreply_error(LServiceS, Packet) -> + FromS = jts(xmpp:get_from(Packet)), + ID = Packet#iq.id, + case str:tokens(ID, <<"/">>) of + [RServer, _] -> + case look_server(RServer) of + {cached, {_Response, {wait_for_info, ID}}, _TS} + when RServer == FromS -> + add_response(RServer, not_supported, cached); + {cached, {_Response, {wait_for_items, ID}}, _TS} + when RServer == FromS -> + add_response(RServer, not_supported, cached); + {cached, {Response, {wait_for_items_info, ID, Items}}, + _TS} -> + case lists:member(FromS, Items) of + true -> + received_awaiter( + FromS, RServer, Response, ID, Items, + LServiceS); + false -> + ok + end; + _ -> + ok + end; + _ -> + ok end. %%%------------------------- %%% Check protocol support: Receive response: Disco %%%------------------------- -process_iqreply_result(From, LServiceS, Packet, State) -> - #xmlel{name = <<"query">>, attrs = Attrs2, - children = Els2} = - fxml:get_subtag(Packet, <<"query">>), - case fxml:get_attr_s(<<"xmlns">>, Attrs2) of - ?NS_DISCO_INFO -> - process_discoinfo_result(From, LServiceS, Els2, State); - ?NS_DISCO_ITEMS -> - process_discoitems_result(From, LServiceS, Els2) +-spec process_iqreply_result(binary(), iq()) -> any(). +process_iqreply_result(LServiceS, #iq{from = From, id = ID, sub_els = [SubEl]}) -> + case SubEl of + #disco_info{} -> + process_discoinfo_result(From, LServiceS, ID, SubEl); + #disco_items{} -> + process_discoitems_result(From, LServiceS, ID, SubEl); + _ -> + ok end. %%%------------------------- %%% Check protocol support: Receive response: Disco Info %%%------------------------- -process_discoinfo_result(From, LServiceS, Els, - _State) -> +process_discoinfo_result(From, LServiceS, ID, DiscoInfo) -> FromS = jts(From), - case search_waiter(FromS, LServiceS, info) of - {found_waiter, Waiter} -> - process_discoinfo_result2(From, FromS, LServiceS, Els, - Waiter); - _ -> ok + case str:tokens(ID, <<"/">>) of + [RServer, _] -> + case look_server(RServer) of + {cached, {Response, {wait_for_info, ID} = ST}, _TS} + when RServer == FromS -> + process_discoinfo_result2( + From, FromS, LServiceS, DiscoInfo, + RServer, Response, ST); + {cached, {Response, {wait_for_items_info, ID, Items} = ST}, + _TS} -> + case lists:member(FromS, Items) of + true -> + process_discoinfo_result2( + From, FromS, LServiceS, DiscoInfo, + RServer, Response, ST); + false -> + ok + end; + _ -> + ok + end; + _ -> + ok end. -process_discoinfo_result2(From, FromS, LServiceS, Els, - Waiter) -> - Multicast_support = - lists:any( - fun(XML) -> - case XML of - #xmlel{name = <<"feature">>, attrs = Attrs} -> - (?NS_ADDRESS) == fxml:get_attr_s(<<"var">>, Attrs); - _ -> false - end - end, - Els), - Group = Waiter#waiter.group, - RServer = Group#group.server, +process_discoinfo_result2(From, FromS, LServiceS, + #disco_info{features = Feats} = DiscoInfo, + RServer, Response, ST) -> + Multicast_support = lists:member(?NS_ADDRESS, Feats), case Multicast_support of true -> SenderT = sender_type(From), - RLimits = get_limits_xml(Els, SenderT), - add_response(RServer, {multicast_supported, FromS, RLimits}), - FromM = Waiter#waiter.sender, - DestsM = Group#group.dests, - PacketM = Waiter#waiter.packet, - AAttrsM = Waiter#waiter.aattrs, - AddressesM = Waiter#waiter.addresses, - RServiceM = FromS, - route_packet_multicast(FromM, RServiceM, PacketM, - AAttrsM, DestsM, AddressesM, RLimits), - delo_waiter(Waiter); + RLimits = get_limits_xml(DiscoInfo, SenderT), + add_response(RServer, {multicast_supported, FromS, RLimits}, cached); false -> - case FromS of - RServer -> - send_query_items(FromS, LServiceS), - delo_waiter(Waiter), - add_waiter(Waiter#waiter{awaiting = - {[FromS], LServiceS, items}, - renewal = false}); - %% We asked a component, and it does not support XEP33 - _ -> received_awaiter(FromS, Waiter, LServiceS) - end + case ST of + {wait_for_info, _ID} -> + Random = p1_rand:get_string(), + ID = <<RServer/binary, $/, Random/binary>>, + send_query_items(FromS, LServiceS, ID), + add_response(RServer, Response, {wait_for_items, ID}); + %% We asked a component, and it does not support XEP33 + {wait_for_items_info, ID, Items} -> + received_awaiter(FromS, RServer, Response, ID, Items, LServiceS) + end end. -get_limits_xml(Els, SenderT) -> - LimitOpts = get_limits_els(Els), +get_limits_xml(DiscoInfo, SenderT) -> + LimitOpts = get_limits_els(DiscoInfo), build_remote_limit_record(LimitOpts, SenderT). -get_limits_els(Els) -> - lists:foldl(fun (XML, R) -> - case XML of - #xmlel{name = <<"x">>, attrs = Attrs, - children = SubEls} -> - case ((?NS_XDATA) == - fxml:get_attr_s(<<"xmlns">>, Attrs)) - and - (<<"result">> == - fxml:get_attr_s(<<"type">>, Attrs)) - of - true -> get_limits_fields(SubEls) ++ R; - false -> R - end; - _ -> R - end - end, - [], Els). - -get_limits_fields(Fields) -> - {Head, Tail} = lists:partition(fun (Field) -> - case Field of - #xmlel{name = <<"field">>, - attrs = Attrs} -> - (<<"FORM_TYPE">> == - fxml:get_attr_s(<<"var">>, - Attrs)) - and - (<<"hidden">> == - fxml:get_attr_s(<<"type">>, - Attrs)); - _ -> false - end - end, - Fields), +-spec get_limits_els(disco_info()) -> [{atom(), integer()}]. +get_limits_els(DiscoInfo) -> + lists:flatmap( + fun(#xdata{type = result} = X) -> + get_limits_fields(X); + (_) -> + [] + end, DiscoInfo#disco_info.xdata). + +-spec get_limits_fields(xdata()) -> [{atom(), integer()}]. +get_limits_fields(X) -> + {Head, Tail} = lists:partition( + fun(#xdata_field{var = Var, type = Type}) -> + Var == <<"FORM_TYPE">> andalso Type == hidden + end, X#xdata.fields), case Head of [] -> []; _ -> get_limits_values(Tail) end. -get_limits_values(Values) -> - lists:foldl(fun (Value, R) -> - case Value of - #xmlel{name = <<"field">>, attrs = Attrs, - children = SubEls} -> - [#xmlel{name = <<"value">>, children = SubElsV}] = - SubEls, - Number = fxml:get_cdata(SubElsV), - Name = fxml:get_attr_s(<<"var">>, Attrs), - [{jlib:binary_to_atom(Name), - jlib:binary_to_integer(Number)} - | R]; - _ -> R - end - end, - [], Values). +-spec get_limits_values([xdata_field()]) -> [{atom(), integer()}]. +get_limits_values(Fields) -> + lists:flatmap( + fun(#xdata_field{var = Name, values = [Number]}) -> + try + [{binary_to_atom(Name, utf8), binary_to_integer(Number)}] + catch _:badarg -> + [] + end; + (_) -> + [] + end, Fields). %%%------------------------- %%% Check protocol support: Receive response: Disco Items %%%------------------------- -process_discoitems_result(From, LServiceS, Els) -> +process_discoitems_result(From, LServiceS, ID, #disco_items{items = Items}) -> FromS = jts(From), - case search_waiter(FromS, LServiceS, items) of - {found_waiter, Waiter} -> - List = lists:foldl( - fun(XML, Res) -> - case XML of - #xmlel{name = <<"item">>, attrs = Attrs} -> - SJID = fxml:get_attr_s(<<"jid">>, Attrs), - case jid:from_string(SJID) of - #jid{luser = <<"">>, - lresource = <<"">>} -> - [SJID | Res]; - _ -> Res - end; - _ -> Res - end - end, - [], Els), - case List of - [] -> - received_awaiter(FromS, Waiter, LServiceS); + case str:tokens(ID, <<"/">>) of + [FromS = RServer, _] -> + case look_server(RServer) of + {cached, {Response, {wait_for_items, ID}}, _TS} -> + List = lists:flatmap( + fun(#disco_item{jid = #jid{luser = <<"">>, + lserver = LServer, + lresource = <<"">>}}) -> + [LServer]; + (_) -> + [] + end, Items), + case List of + [] -> + add_response(RServer, not_supported, cached); + _ -> + Random = p1_rand:get_string(), + ID2 = <<RServer/binary, $/, Random/binary>>, + [send_query_info(Item, LServiceS, ID2) || Item <- List], + add_response(RServer, Response, + {wait_for_items_info, ID2, List}) + end; _ -> - [send_query_info(Item, LServiceS) || Item <- List], - delo_waiter(Waiter), - add_waiter(Waiter#waiter{awaiting = - {List, LServiceS, info}, - renewal = false}) + ok end; _ -> ok @@ -901,33 +849,12 @@ process_discoitems_result(From, LServiceS, Els) -> %%% Check protocol support: Receive response: Received awaiter %%%------------------------- -received_awaiter(JID, Waiter, LServiceS) -> - {JIDs, LServiceS, _} = Waiter#waiter.awaiting, - delo_waiter(Waiter), - Group = Waiter#waiter.group, - RServer = Group#group.server, +received_awaiter(JID, RServer, Response, ID, JIDs, _LServiceS) -> case lists:delete(JID, JIDs) of - [] -> - case Waiter#waiter.renewal of - false -> - add_response(RServer, not_supported), - From = Waiter#waiter.sender, - Packet = Waiter#waiter.packet, - AAttrs = Waiter#waiter.aattrs, - Others = Group#group.others, - Addresses = Waiter#waiter.addresses, - [route_packet(From, ToUser, Packet, AAttrs, Others, Addresses) - || ToUser <- Group#group.dests]; - true -> - send_query_info(RServer, LServiceS), - add_waiter(Waiter#waiter{awaiting = - {[RServer], LServiceS, info}, - renewal = false}) - end; - JIDs2 -> - add_waiter(Waiter#waiter{awaiting = - {JIDs2, LServiceS, info}, - renewal = false}) + [] -> + add_response(RServer, not_supported, cached); + JIDs2 -> + add_response(RServer, Response, {wait_for_items_info, ID, JIDs2}) end. %%%------------------------- @@ -935,29 +862,59 @@ received_awaiter(JID, Waiter, LServiceS) -> %%%------------------------- create_cache() -> - mnesia:create_table(multicastc, + ejabberd_mnesia:create(?MODULE, multicastc, [{ram_copies, [node()]}, {attributes, record_info(fields, multicastc)}]). -add_response(RServer, Response) -> +add_response(RServer, Response, State) -> Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()), mnesia:dirty_write(#multicastc{rserver = RServer, - response = Response, ts = Secs}). + response = {Response, State}, ts = Secs}). -search_server_on_cache(RServer, LServerS, _Maxmins) +search_server_on_cache(RServer, LServerS, _LServiceS, _Maxmins) when RServer == LServerS -> - {cached, local_server}; -search_server_on_cache(RServer, _LServerS, Maxmins) -> + route_single; +search_server_on_cache(RServer, _LServerS, LServiceS, _Maxmins) + when RServer == LServiceS -> + route_single; +search_server_on_cache(RServer, _LServerS, LServiceS, Maxmins) -> case look_server(RServer) of - not_cached -> not_cached; - {cached, Response, Ts} -> - Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()), - case is_obsolete(Response, Ts, Now, Maxmins) of - false -> {cached, Response}; - true -> {obsolete, Response} - end + not_cached -> + query_info(RServer, LServiceS, not_supported), + route_single; + {cached, {Response, State}, TS} -> + Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()), + Response2 = + case State of + cached -> + case is_obsolete(Response, TS, Now, Maxmins) of + false -> ok; + true -> + query_info(RServer, LServiceS, Response) + end, + Response; + _ -> + if + Now - TS > ?MAXTIME_CACHE_NEGOTIATING -> + query_info(RServer, LServiceS, not_supported), + not_supported; + true -> + Response + end + end, + case Response2 of + not_supported -> route_single; + {multicast_supported, Service, Limits} -> + {route_multicast, Service, Limits} + end end. +query_info(RServer, LServiceS, Response) -> + Random = p1_rand:get_string(), + ID = <<RServer/binary, $/, Random/binary>>, + send_query_info(RServer, LServiceS, ID), + add_response(RServer, Response, {wait_for_info, ID}). + look_server(RServer) -> case mnesia:dirty_read(multicastc, RServer) of [] -> not_cached; @@ -1029,44 +986,6 @@ purge_loop(NM) -> end. %%%------------------------- -%%% Pool -%%%------------------------- - -create_pool() -> - catch - begin - ets:new(multicastp, - [duplicate_bag, public, named_table, {keypos, 2}]), - ets:give_away(multicastp, whereis(ejabberd), ok) - end. - -add_waiter(Waiter) -> - true = ets:insert(multicastp, Waiter). - -delo_waiter(Waiter) -> - true = ets:delete_object(multicastp, Waiter). - --spec search_waiter(binary(), binary(), info | items) -> - {found_waiter, #waiter{}} | waiter_not_found. - -search_waiter(JID, LServiceS, Type) -> - Rs = ets:foldl(fun (W, Res) -> - {JIDs, LServiceS1, Type1} = W#waiter.awaiting, - case lists:member(JID, JIDs) and - (LServiceS == LServiceS1) - and (Type1 == Type) - of - true -> Res ++ [W]; - false -> Res - end - end, - [], multicastp), - case Rs of - [R | _] -> {found_waiter, R}; - [] -> waiter_not_found - end. - -%%%------------------------- %%% Limits: utils %%%------------------------- @@ -1091,33 +1010,36 @@ build_service_limit_record(LimitOpts) -> build_limit_record(LimitOptsR, remote)}. get_from_limitopts(LimitOpts, SenderT) -> - [{StanzaT, Number} - || {SenderT2, StanzaT, Number} <- LimitOpts, - SenderT =:= SenderT2]. + case lists:keyfind(SenderT, 1, LimitOpts) of + false -> []; + {SenderT, Result} -> Result + end. build_remote_limit_record(LimitOpts, SenderT) -> build_limit_record(LimitOpts, SenderT). +-spec build_limit_record(any(), local | remote) -> #limits{}. build_limit_record(LimitOpts, SenderT) -> Limits = [get_limit_value(Name, Default, LimitOpts) || {Name, Default} <- list_of_limits(SenderT)], list_to_tuple([limits | Limits]). +-spec get_limit_value(atom(), integer(), any()) -> limit_value(). get_limit_value(Name, Default, LimitOpts) -> case lists:keysearch(Name, 1, LimitOpts) of {value, {Name, Number}} -> {custom, Number}; false -> {default, Default} end. -type_of_stanza(#xmlel{name = <<"message">>}) -> message; -type_of_stanza(#xmlel{name = <<"presence">>}) -> - presence. +type_of_stanza(Stanza) -> element(1, Stanza). +-spec get_limit_number(message | presence, #limits{}) -> limit_value(). get_limit_number(message, Limits) -> Limits#limits.message; get_limit_number(presence, Limits) -> Limits#limits.presence. +-spec get_slimit_group(local | remote, #service_limits{}) -> #limits{}. get_slimit_group(local, SLimits) -> SLimits#service_limits.local; get_slimit_group(remote, SLimits) -> @@ -1144,17 +1066,10 @@ fragment_dests(Dests, Limit_number) -> %% Some parts of code are borrowed from mod_muc_room.erl -define(RFIELDT(Type, Var, Val), - #xmlel{name = <<"field">>, - attrs = [{<<"var">>, Var}, {<<"type">>, Type}], - children = - [#xmlel{name = <<"value">>, attrs = [], - children = [{xmlcdata, Val}]}]}). + #xdata_field{type = Type, var = Var, values = [Val]}). -define(RFIELDV(Var, Val), - #xmlel{name = <<"field">>, attrs = [{<<"var">>, Var}], - children = - [#xmlel{name = <<"value">>, attrs = [], - children = [{xmlcdata, Val}]}]}). + #xdata_field{var = Var, values = [Val]}). iq_disco_info_extras(From, State) -> SenderT = sender_type(From), @@ -1162,16 +1077,13 @@ iq_disco_info_extras(From, State) -> case iq_disco_info_extras2(SenderT, Service_limits) of [] -> []; List_limits_xmpp -> - [#xmlel{name = <<"x">>, - attrs = - [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"result">>}], - children = - [?RFIELDT(<<"hidden">>, <<"FORM_TYPE">>, (?NS_ADDRESS))] - ++ List_limits_xmpp}] + [#xdata{type = result, + fields = [?RFIELDT(hidden, <<"FORM_TYPE">>, ?NS_ADDRESS) + | List_limits_xmpp]}] end. sender_type(From) -> - Local_hosts = (?MYHOSTS), + Local_hosts = ejabberd_option:hosts(), case lists:member(From#jid.lserver, Local_hosts) of true -> local; false -> remote @@ -1197,34 +1109,54 @@ to_binary(A) -> list_to_binary(hd(io_lib:format("~p", [A]))). %%% Error report %%%------------------------- -route_error(From, To, Packet, ErrType, ErrText) -> - #xmlel{attrs = Attrs} = Packet, - Lang = fxml:get_attr_s(<<"xml:lang">>, Attrs), - Reply = make_reply(ErrType, Lang, ErrText), - Err = jlib:make_error_reply(Packet, Reply), - ejabberd_router:route(From, To, Err). +route_error(Packet, ErrType, ErrText) -> + Lang = xmpp:get_lang(Packet), + Err = make_reply(ErrType, Lang, ErrText), + ejabberd_router:route_error(Packet, Err). make_reply(bad_request, Lang, ErrText) -> - ?ERRT_BAD_REQUEST(Lang, ErrText); + xmpp:err_bad_request(ErrText, Lang); make_reply(jid_malformed, Lang, ErrText) -> - ?ERRT_JID_MALFORMED(Lang, ErrText); + xmpp:err_jid_malformed(ErrText, Lang); make_reply(not_acceptable, Lang, ErrText) -> - ?ERRT_NOT_ACCEPTABLE(Lang, ErrText); + xmpp:err_not_acceptable(ErrText, Lang); make_reply(internal_server_error, Lang, ErrText) -> - ?ERRT_INTERNAL_SERVER_ERROR(Lang, ErrText); + xmpp:err_internal_server_error(ErrText, Lang); make_reply(forbidden, Lang, ErrText) -> - ?ERRT_FORBIDDEN(Lang, ErrText). + xmpp:err_forbidden(ErrText, Lang). -stj(String) -> jid:from_string(String). +stj(String) -> jid:decode(String). -jts(String) -> jid:to_string(String). +jts(String) -> jid:encode(String). depends(_Host, _Opts) -> []. mod_opt_type(access) -> - fun acl:access_rules_validator/1; -mod_opt_type(host) -> fun iolist_to_binary/1; + econf:acl(); +mod_opt_type(name) -> + econf:binary(); mod_opt_type(limits) -> - fun (A) when is_list(A) -> A end; -mod_opt_type(_) -> [access, host, limits]. + econf:options( + #{local => + econf:options( + #{message => econf:non_neg_int(infinite), + presence => econf:non_neg_int(infinite)}), + remote => + econf:options( + #{message => econf:non_neg_int(infinite), + presence => econf:non_neg_int(infinite)})}); +mod_opt_type(host) -> + econf:host(); +mod_opt_type(hosts) -> + econf:hosts(); +mod_opt_type(vcard) -> + econf:vcard_temp(). + +mod_options(Host) -> + [{access, all}, + {host, <<"multicast.", Host/binary>>}, + {hosts, []}, + {limits, [{local, []}, {remote, []}]}, + {vcard, undefined}, + {name, ?T("Multicast")}]. |