aboutsummaryrefslogtreecommitdiff
path: root/src/mod_multicast.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_multicast.erl')
-rw-r--r--src/mod_multicast.erl1144
1 files changed, 538 insertions, 606 deletions
diff --git a/src/mod_multicast.erl b/src/mod_multicast.erl
index df385c28c..e76e9a63d 100644
--- a/src/mod_multicast.erl
+++ b/src/mod_multicast.erl
@@ -5,7 +5,7 @@
%%% Created : 29 May 2007 by Badlop <badlop@process-one.net>
%%%
%%%
-%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
+%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
@@ -34,67 +34,53 @@
-behaviour(gen_mod).
%% API
--export([start_link/2, start/2, stop/1]).
+-export([start/2, stop/1, reload/3,
+ user_send_packet/1]).
%% gen_server callbacks
-export([init/1, handle_info/2, handle_call/3,
handle_cast/2, terminate/2, code_change/3]).
--export([purge_loop/1, mod_opt_type/1, depends/2]).
+-export([purge_loop/1, mod_opt_type/1, mod_options/1, depends/2]).
--include("ejabberd.hrl").
-include("logger.hrl").
+-include("translate.hrl").
+-include("xmpp.hrl").
--include("jlib.hrl").
+-record(multicastc, {rserver :: binary(),
+ response,
+ ts :: integer()}).
--record(state,
- {lserver, lservice, access, service_limits}).
+-record(dest, {jid_string :: binary() | none,
+ jid_jid :: jid() | undefined,
+ type :: bcc | cc | noreply | ofrom | replyroom | replyto | to,
+ address :: address()}).
--record(multicastc, {rserver, response, ts}).
+-type limit_value() :: {default | custom, integer()}.
+-record(limits, {message :: limit_value(),
+ presence :: limit_value()}).
-%% ts: timestamp (in seconds) when the cache item was last updated
+-record(service_limits, {local :: #limits{},
+ remote :: #limits{}}).
--record(dest, {jid_string, jid_jid, type, full_xml}).
+-type routing() :: route_single | {route_multicast, binary(), #service_limits{}}.
-%% jid_string = string()
-%% jid_jid = jid()
-%% full_xml = xml()
+-record(group, {server :: binary(),
+ dests :: [#dest{}],
+ multicast :: routing() | undefined,
+ others :: [address()],
+ addresses :: [address()]}).
--record(group,
- {server, dests, multicast, others, addresses}).
-
-%% server = string()
-%% dests = [string()]
-%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached
-%% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()}
-%% others = [xml()]
-%% packet = xml()
-
--record(waiter,
- {awaiting, group, renewal = false, sender, packet,
- aattrs, addresses}).
-
-%% awaiting = {[Remote_service], Local_service, Type_awaiting}
-%% Remote_service = Local_service = string()
-%% Type_awaiting = info | items
-%% group = #group
-%% renewal = true | false
-%% sender = From
-%% packet = xml()
-%% aattrs = [xml()]
-
--record(limits, {message, presence}).
-
-%% message = presence = integer() | infinite
-
--record(service_limits, {local, remote}).
+-record(state, {lserver :: binary(),
+ lservice :: binary(),
+ access :: atom(),
+ service_limits :: #service_limits{}}).
+-type state() :: #state{}.
%% All the elements are of type value()
-define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>).
--define(PROCNAME, ejabberd_mod_multicast).
-
-define(PURGE_PROCNAME,
ejabberd_mod_multicast_purgeloop).
@@ -102,6 +88,8 @@
-define(MAXTIME_CACHE_NEGATIVE, 86400).
+-define(MAXTIME_CACHE_NEGOTIATING, 600).
+
-define(CACHE_PURGE_TIMER, 86400000).
-define(DISCO_QUERY_TIMEOUT, 10000).
@@ -114,44 +102,67 @@
-define(DEFAULT_LIMIT_REMOTE_PRESENCE, 20).
-start_link(LServerS, Opts) ->
- Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME),
- gen_server:start_link({local, Proc}, ?MODULE,
- [LServerS, Opts], []).
-
start(LServerS, Opts) ->
- Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME),
- ChildSpec = {Proc,
- {?MODULE, start_link, [LServerS, Opts]}, temporary,
- 1000, worker, [?MODULE]},
- supervisor:start_child(ejabberd_sup, ChildSpec).
+ gen_mod:start_child(?MODULE, LServerS, Opts).
stop(LServerS) ->
- Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME),
- gen_server:call(Proc, stop),
- supervisor:terminate_child(ejabberd_sup, Proc),
- supervisor:delete_child(ejabberd_sup, Proc).
+ gen_mod:stop_child(?MODULE, LServerS).
+
+reload(LServerS, NewOpts, OldOpts) ->
+ Proc = gen_mod:get_module_proc(LServerS, ?MODULE),
+ gen_server:cast(Proc, {reload, NewOpts, OldOpts}).
+
+-define(SETS, gb_sets).
+
+user_send_packet({#presence{} = Packet, C2SState} = Acc) ->
+ case xmpp:get_subtag(Packet, #addresses{}) of
+ #addresses{list = Addresses} ->
+ {ToDeliver, _Delivereds} = split_addresses_todeliver(Addresses),
+ NewState =
+ lists:foldl(
+ fun(Address, St) ->
+ case Address#address.jid of
+ #jid{} = JID ->
+ LJID = jid:tolower(JID),
+ #{pres_a := PresA} = St,
+ A =
+ case Packet#presence.type of
+ available ->
+ ?SETS:add_element(LJID, PresA);
+ unavailable ->
+ ?SETS:del_element(LJID, PresA);
+ _ ->
+ PresA
+ end,
+ St#{pres_a => A};
+ undefined ->
+ St
+ end
+ end, C2SState, ToDeliver),
+ {Packet, NewState};
+ false ->
+ Acc
+ end;
+user_send_packet(Acc) ->
+ Acc.
%%====================================================================
%% gen_server callbacks
%%====================================================================
-init([LServerS, Opts]) ->
- LServiceS = gen_mod:get_opt_host(LServerS, Opts,
- <<"multicast.@HOST@">>),
- Access = gen_mod:get_opt(access, Opts,
- fun acl:access_rules_validator/1, all),
- SLimits =
- build_service_limit_record(gen_mod:get_opt(limits, Opts,
- fun (A) when is_list(A) ->
- A
- end,
- [])),
+-spec init(list()) -> {ok, state()}.
+init([LServerS|_]) ->
+ process_flag(trap_exit, true),
+ Opts = gen_mod:get_module_opts(LServerS, ?MODULE),
+ [LServiceS|_] = gen_mod:get_opt_hosts(Opts),
+ Access = mod_multicast_opt:access(Opts),
+ SLimits = build_service_limit_record(mod_multicast_opt:limits(Opts)),
create_cache(),
try_start_loop(),
- create_pool(),
ejabberd_router_multicast:register_route(LServerS),
ejabberd_router:register_route(LServiceS, LServerS),
+ ejabberd_hooks:add(user_send_packet, LServerS, ?MODULE,
+ user_send_packet, 50),
{ok,
#state{lservice = LServiceS, lserver = LServerS,
access = Access, service_limits = SLimits}}.
@@ -159,7 +170,22 @@ init([LServerS, Opts]) ->
handle_call(stop, _From, State) ->
try_stop_loop(), {stop, normal, ok, State}.
-handle_cast(_Msg, State) -> {noreply, State}.
+handle_cast({reload, NewOpts, NewOpts},
+ #state{lserver = LServerS, lservice = OldLServiceS} = State) ->
+ Access = mod_multicast_opt:access(NewOpts),
+ SLimits = build_service_limit_record(mod_multicast_opt:limits(NewOpts)),
+ [NewLServiceS|_] = gen_mod:get_opt_hosts(NewOpts),
+ if NewLServiceS /= OldLServiceS ->
+ ejabberd_router:register_route(NewLServiceS, LServerS),
+ ejabberd_router:unregister_route(OldLServiceS);
+ true ->
+ ok
+ end,
+ {noreply, State#state{lservice = NewLServiceS,
+ access = Access, service_limits = SLimits}};
+handle_cast(Msg, State) ->
+ ?WARNING_MSG("Unexpected cast: ~p", [Msg]),
+ {noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
@@ -168,10 +194,8 @@ handle_cast(_Msg, State) -> {noreply, State}.
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
-handle_info({route, From, To,
- #xmlel{name = <<"iq">>, attrs = Attrs} = Packet},
- State) ->
- case catch handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) of
+handle_info({route, #iq{} = Packet}, State) ->
+ case catch handle_iq(Packet, State) of
{'EXIT', Reason} ->
?ERROR_MSG("Error when processing IQ stanza: ~p",
[Reason]);
@@ -179,20 +203,17 @@ handle_info({route, From, To,
end,
{noreply, State};
%% XEP33 allows only 'message' and 'presence' stanza type
-handle_info({route, From, To,
- #xmlel{name = Stanza_type} = Packet},
+handle_info({route, Packet},
#state{lservice = LServiceS, lserver = LServerS,
access = Access, service_limits = SLimits} =
- State)
- when (Stanza_type == <<"message">>) or
- (Stanza_type == <<"presence">>) ->
- route_untrusted(LServiceS, LServerS, Access, SLimits,
- From, To, Packet),
+ State) when ?is_stanza(Packet) ->
+ route_untrusted(LServiceS, LServerS, Access, SLimits, Packet),
{noreply, State};
%% Handle multicast packets sent by trusted local services
-handle_info({route_trusted, From, Destinations, Packet},
+handle_info({route_trusted, Destinations, Packet},
#state{lservice = LServiceS, lserver = LServerS} =
State) ->
+ From = xmpp:get_from(Packet),
case catch route_trusted(LServiceS, LServerS, From, Destinations,
Packet) of
{'EXIT', Reason} ->
@@ -206,6 +227,8 @@ handle_info({get_host, Pid}, State) ->
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, State) ->
+ ejabberd_hooks:delete(user_send_packet, State#state.lserver, ?MODULE,
+ user_send_packet, 50),
ejabberd_router_multicast:unregister_route(State#state.lserver),
ejabberd_router:unregister_route(State#state.lservice),
ok.
@@ -220,206 +243,162 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%% IQ Request Processing
%%%------------------------
-handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) ->
- IQ = jlib:iq_query_info(Packet),
- case catch process_iq(From, IQ, State) of
- Result when is_record(Result, iq) ->
- ejabberd_router:route(To, From, jlib:iq_to_xml(Result));
- {'EXIT', Reason} ->
- ?ERROR_MSG("Error when processing IQ stanza: ~p",
- [Reason]),
- Err = jlib:make_error_reply(Packet,
- ?ERR_INTERNAL_SERVER_ERROR),
- ejabberd_router:route(To, From, Err);
- reply ->
- LServiceS = jts(To),
- case fxml:get_attr_s(<<"type">>, Attrs) of
- <<"result">> ->
- process_iqreply_result(From, LServiceS, Packet, State);
- <<"error">> ->
- process_iqreply_error(From, LServiceS, Packet)
- end;
- ok -> ok
+handle_iq(Packet, State) ->
+ try
+ IQ = xmpp:decode_els(Packet),
+ case process_iq(IQ, State) of
+ {result, SubEl} ->
+ ejabberd_router:route(xmpp:make_iq_result(Packet, SubEl));
+ {error, Error} ->
+ ejabberd_router:route_error(Packet, Error);
+ reply ->
+ To = xmpp:get_to(IQ),
+ LServiceS = jid:encode(To),
+ case Packet#iq.type of
+ result ->
+ process_iqreply_result(LServiceS, IQ);
+ error ->
+ process_iqreply_error(LServiceS, IQ)
+ end
+ end
+ catch _:{xmpp_codec, Why} ->
+ Lang = xmpp:get_lang(Packet),
+ Err = xmpp:err_bad_request(xmpp:io_format_error(Why), Lang),
+ ejabberd_router:route_error(Packet, Err)
end.
-process_iq(From,
- #iq{type = get, xmlns = ?NS_DISCO_INFO, lang = Lang} =
- IQ,
- State) ->
- IQ#iq{type = result,
- sub_el =
- [#xmlel{name = <<"query">>,
- attrs = [{<<"xmlns">>, ?NS_DISCO_INFO}],
- children = iq_disco_info(From, Lang, State)}]};
-%% disco#items request
-process_iq(_,
- #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) ->
- IQ#iq{type = result,
- sub_el =
- [#xmlel{name = <<"query">>,
- attrs = [{<<"xmlns">>, ?NS_DISCO_ITEMS}],
- children = []}]};
-%% vCard request
-process_iq(_,
- #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ,
- _) ->
- IQ#iq{type = result,
- sub_el =
- [#xmlel{name = <<"vCard">>,
- attrs = [{<<"xmlns">>, ?NS_VCARD}],
- children = iq_vcard(Lang)}]};
-%% Unknown "set" or "get" request
-process_iq(_, #iq{type = Type, sub_el = SubEl} = IQ, _)
- when Type == get; Type == set ->
- IQ#iq{type = error,
- sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]};
-%% IQ "result" or "error".
-process_iq(_, reply, _) -> reply;
-%% IQ "result" or "error".
-process_iq(_, _, _) -> ok.
-
--define(FEATURE(Feat),
- #xmlel{name = <<"feature">>,
- attrs = [{<<"var">>, Feat}], children = []}).
+-spec process_iq(iq(), state()) -> {result, xmpp_element()} |
+ {error, stanza_error()} | reply.
+process_iq(#iq{type = get, lang = Lang, from = From,
+ sub_els = [#disco_info{}]}, State) ->
+ {result, iq_disco_info(From, Lang, State)};
+process_iq(#iq{type = get, sub_els = [#disco_items{}]}, _) ->
+ {result, #disco_items{}};
+process_iq(#iq{type = get, lang = Lang, sub_els = [#vcard_temp{}]}, State) ->
+ {result, iq_vcard(Lang, State)};
+process_iq(#iq{type = T}, _) when T == set; T == get ->
+ {error, xmpp:err_service_unavailable()};
+process_iq(_, _) ->
+ reply.
+
+-define(FEATURE(Feat), Feat).
iq_disco_info(From, Lang, State) ->
- [#xmlel{name = <<"identity">>,
- attrs =
- [{<<"category">>, <<"service">>},
- {<<"type">>, <<"multicast">>},
- {<<"name">>,
- translate:translate(Lang, <<"Multicast">>)}],
- children = []},
- ?FEATURE((?NS_DISCO_INFO)), ?FEATURE((?NS_DISCO_ITEMS)),
- ?FEATURE((?NS_VCARD)), ?FEATURE((?NS_ADDRESS))]
- ++ iq_disco_info_extras(From, State).
-
-iq_vcard(Lang) ->
- [#xmlel{name = <<"FN">>, attrs = [],
- children = [{xmlcdata, <<"ejabberd/mod_multicast">>}]},
- #xmlel{name = <<"URL">>, attrs = [],
- children = [{xmlcdata, ?EJABBERD_URI}]},
- #xmlel{name = <<"DESC">>, attrs = [],
- children =
- [{xmlcdata,
- <<(translate:translate(Lang,
- <<"ejabberd Multicast service">>))/binary,
- "\nCopyright (c) 2002-2016 ProcessOne">>}]}].
+ Name = mod_multicast_opt:name(State#state.lserver),
+ #disco_info{
+ identities = [#identity{category = <<"service">>,
+ type = <<"multicast">>,
+ name = translate:translate(Lang, Name)}],
+ features = [?NS_DISCO_INFO, ?NS_DISCO_ITEMS, ?NS_VCARD, ?NS_ADDRESS],
+ xdata = iq_disco_info_extras(From, State)}.
+
+-spec iq_vcard(binary(), state()) -> #vcard_temp{}.
+iq_vcard(Lang, State) ->
+ case mod_multicast_opt:vcard(State#state.lserver) of
+ undefined ->
+ #vcard_temp{fn = <<"ejabberd/mod_multicast">>,
+ url = ejabberd_config:get_uri(),
+ desc = misc:get_descr(Lang, ?T("ejabberd Multicast service"))};
+ VCard ->
+ VCard
+ end.
%%%-------------------------
%%% Route
%%%-------------------------
+-spec route_trusted(binary(), binary(), jid(), [jid()], stanza()) -> 'ok'.
route_trusted(LServiceS, LServerS, FromJID,
Destinations, Packet) ->
Packet_stripped = Packet,
- AAttrs = [{<<"xmlns">>, ?NS_ADDRESS}],
Delivereds = [],
- Dests2 = lists:map(fun (D) ->
- DS = jts(D),
- XML = #xmlel{name = <<"address">>,
- attrs =
- [{<<"type">>, <<"bcc">>},
- {<<"jid">>, DS}],
- children = []},
- #dest{jid_string = DS, jid_jid = D,
- type = <<"bcc">>, full_xml = XML}
- end,
- Destinations),
+ Dests2 = lists:map(
+ fun(D) ->
+ #dest{jid_string = jid:encode(D),
+ jid_jid = D, type = bcc,
+ address = #address{type = bcc, jid = D}}
+ end, Destinations),
Groups = group_dests(Dests2),
route_common(LServerS, LServiceS, FromJID, Groups,
- Delivereds, Packet_stripped, AAttrs).
+ Delivereds, Packet_stripped).
-route_untrusted(LServiceS, LServerS, Access, SLimits,
- From, To, Packet) ->
+-spec route_untrusted(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
+route_untrusted(LServiceS, LServerS, Access, SLimits, Packet) ->
try route_untrusted2(LServiceS, LServerS, Access,
- SLimits, From, Packet)
+ SLimits, Packet)
catch
adenied ->
- route_error(To, From, Packet, forbidden,
- <<"Access denied by service policy">>);
+ route_error(Packet, forbidden,
+ ?T("Access denied by service policy"));
eadsele ->
- route_error(To, From, Packet, bad_request,
- <<"No addresses element found">>);
+ route_error(Packet, bad_request,
+ ?T("No addresses element found"));
eadeles ->
- route_error(To, From, Packet, bad_request,
- <<"No address elements found">>);
+ route_error(Packet, bad_request,
+ ?T("No address elements found"));
ewxmlns ->
- route_error(To, From, Packet, bad_request,
- <<"Wrong xmlns">>);
+ route_error(Packet, bad_request,
+ ?T("Wrong xmlns"));
etoorec ->
- route_error(To, From, Packet, not_acceptable,
- <<"Too many receiver fields were specified">>);
+ route_error(Packet, not_acceptable,
+ ?T("Too many receiver fields were specified"));
edrelay ->
- route_error(To, From, Packet, forbidden,
- <<"Packet relay is denied by service policy">>);
+ route_error(Packet, forbidden,
+ ?T("Packet relay is denied by service policy"));
EType:EReason ->
?ERROR_MSG("Multicast unknown error: Type: ~p~nReason: ~p",
[EType, EReason]),
- route_error(To, From, Packet, internal_server_error,
- <<"Unknown problem">>)
+ route_error(Packet, internal_server_error,
+ ?T("Internal server error"))
end.
-route_untrusted2(LServiceS, LServerS, Access, SLimits,
- FromJID, Packet) ->
+-spec route_untrusted2(binary(), binary(), atom(), #service_limits{}, stanza()) -> 'ok'.
+route_untrusted2(LServiceS, LServerS, Access, SLimits, Packet) ->
+ FromJID = xmpp:get_from(Packet),
ok = check_access(LServerS, Access, FromJID),
- {ok, Packet_stripped, AAttrs, Addresses} =
- strip_addresses_element(Packet),
- {To_deliver, Delivereds} =
- split_addresses_todeliver(Addresses),
+ {ok, Packet_stripped, Addresses} = strip_addresses_element(Packet),
+ {To_deliver, Delivereds} = split_addresses_todeliver(Addresses),
Dests = convert_dest_record(To_deliver),
{Dests2, Not_jids} = split_dests_jid(Dests),
report_not_jid(FromJID, Packet, Not_jids),
- ok = check_limit_dests(SLimits, FromJID, Packet,
- Dests2),
+ ok = check_limit_dests(SLimits, FromJID, Packet, Dests2),
Groups = group_dests(Dests2),
ok = check_relay(FromJID#jid.server, LServerS, Groups),
route_common(LServerS, LServiceS, FromJID, Groups,
- Delivereds, Packet_stripped, AAttrs).
+ Delivereds, Packet_stripped).
+-spec route_common(binary(), binary(), jid(), [#group{}],
+ [address()], stanza()) -> 'ok'.
route_common(LServerS, LServiceS, FromJID, Groups,
- Delivereds, Packet_stripped, AAttrs) ->
- Groups2 = look_cached_servers(LServerS, Groups),
+ Delivereds, Packet_stripped) ->
+ Groups2 = look_cached_servers(LServerS, LServiceS, Groups),
Groups3 = build_others_xml(Groups2),
Groups4 = add_addresses(Delivereds, Groups3),
AGroups = decide_action_groups(Groups4),
- act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
+ act_groups(FromJID, Packet_stripped, LServiceS,
AGroups).
-act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
- AGroups) ->
- [perform(FromJID, Packet_stripped, AAttrs, LServiceS,
- AGroup)
- || AGroup <- AGroups].
-
-perform(From, Packet, AAttrs, _,
+-spec act_groups(jid(), stanza(), binary(), [{routing(), #group{}}]) -> 'ok'.
+act_groups(FromJID, Packet_stripped, LServiceS, AGroups) ->
+ lists:foreach(
+ fun(AGroup) ->
+ perform(FromJID, Packet_stripped, LServiceS,
+ AGroup)
+ end, AGroups).
+
+-spec perform(jid(), stanza(), binary(),
+ {routing(), #group{}}) -> 'ok'.
+perform(From, Packet, _,
{route_single, Group}) ->
- [route_packet(From, ToUser, Packet, AAttrs,
- Group#group.others, Group#group.addresses)
- || ToUser <- Group#group.dests];
-perform(From, Packet, AAttrs, _,
+ lists:foreach(
+ fun(ToUser) ->
+ route_packet(From, ToUser, Packet,
+ Group#group.others, Group#group.addresses)
+ end, Group#group.dests);
+perform(From, Packet, _,
{{route_multicast, JID, RLimits}, Group}) ->
- route_packet_multicast(From, JID, Packet, AAttrs,
- Group#group.dests, Group#group.addresses, RLimits);
-perform(From, Packet, AAttrs, LServiceS,
- {{ask, Old_service, renewal}, Group}) ->
- send_query_info(Old_service, LServiceS),
- add_waiter(#waiter{awaiting =
- {[Old_service], LServiceS, info},
- group = Group, renewal = true, sender = From,
- packet = Packet, aattrs = AAttrs,
- addresses = Group#group.addresses});
-perform(_From, _Packet, _AAttrs, LServiceS,
- {{ask, LServiceS, _}, _Group}) ->
- ok;
-perform(From, Packet, AAttrs, LServiceS,
- {{ask, Server, not_renewal}, Group}) ->
- send_query_info(Server, LServiceS),
- add_waiter(#waiter{awaiting =
- {[Server], LServiceS, info},
- group = Group, renewal = false, sender = From,
- packet = Packet, aattrs = AAttrs,
- addresses = Group#group.addresses}).
+ route_packet_multicast(From, JID, Packet,
+ Group#group.dests, Group#group.addresses, RLimits).
%%%-------------------------
%%% Check access permission
@@ -435,52 +414,39 @@ check_access(LServerS, Access, From) ->
%%% Strip 'addresses' XML element
%%%-------------------------
+-spec strip_addresses_element(stanza()) -> {ok, stanza(), [address()]}.
strip_addresses_element(Packet) ->
- case fxml:get_subtag(Packet, <<"addresses">>) of
- #xmlel{name = <<"addresses">>, attrs = AAttrs,
- children = Addresses} ->
- case fxml:get_attr_s(<<"xmlns">>, AAttrs) of
- ?NS_ADDRESS ->
- #xmlel{name = Name, attrs = Attrs, children = Els} =
- Packet,
- Els_stripped = lists:keydelete(<<"addresses">>, 2, Els),
- Packet_stripped = #xmlel{name = Name, attrs = Attrs,
- children = Els_stripped},
- {ok, Packet_stripped, AAttrs, fxml:remove_cdata(Addresses)};
- _ -> throw(ewxmlns)
- end;
- _ -> throw(eadsele)
+ case xmpp:get_subtag(Packet, #addresses{}) of
+ #addresses{list = Addrs} ->
+ PacketStripped = xmpp:remove_subtag(Packet, #addresses{}),
+ {ok, PacketStripped, Addrs};
+ false ->
+ throw(eadsele)
end.
%%%-------------------------
%%% Split Addresses
%%%-------------------------
+-spec split_addresses_todeliver([address()]) -> {[address()], [address()]}.
split_addresses_todeliver(Addresses) ->
- lists:partition(fun (XML) ->
- case XML of
- #xmlel{name = <<"address">>, attrs = Attrs} ->
- case fxml:get_attr_s(<<"delivered">>, Attrs) of
- <<"true">> -> false;
- _ ->
- Type = fxml:get_attr_s(<<"type">>,
- Attrs),
- case Type of
- <<"to">> -> true;
- <<"cc">> -> true;
- <<"bcc">> -> true;
- _ -> false
- end
- end;
- _ -> false
- end
- end,
- Addresses).
+ lists:partition(
+ fun(#address{delivered = true}) ->
+ false;
+ (#address{type = Type}) ->
+ case Type of
+ to -> true;
+ cc -> true;
+ bcc -> true;
+ _ -> false
+ end
+ end, Addresses).
%%%-------------------------
%%% Check does not exceed limit of destinations
%%%-------------------------
+-spec check_limit_dests(#service_limits{}, jid(), stanza(), [address()]) -> ok.
check_limit_dests(SLimits, FromJID, Packet,
Addresses) ->
SenderT = sender_type(FromJID),
@@ -497,24 +463,23 @@ check_limit_dests(SLimits, FromJID, Packet,
%%% Convert Destination XML to record
%%%-------------------------
-convert_dest_record(XMLs) ->
- lists:map(fun (XML) ->
- case fxml:get_tag_attr_s(<<"jid">>, XML) of
- <<"">> -> #dest{jid_string = none, full_xml = XML};
- JIDS ->
- Type = fxml:get_tag_attr_s(<<"type">>, XML),
- JIDJ = stj(JIDS),
- #dest{jid_string = JIDS, jid_jid = JIDJ,
- type = Type, full_xml = XML}
- end
- end,
- XMLs).
+-spec convert_dest_record([address()]) -> [#dest{}].
+convert_dest_record(Addrs) ->
+ lists:map(
+ fun(#address{jid = undefined, type = Type} = Addr) ->
+ #dest{jid_string = none,
+ type = Type, address = Addr};
+ (#address{jid = JID, type = Type} = Addr) ->
+ #dest{jid_string = jid:encode(JID), jid_jid = JID,
+ type = Type, address = Addr}
+ end, Addrs).
%%%-------------------------
%%% Split destinations by existence of JID
%%% and send error messages for other dests
%%%-------------------------
+-spec split_dests_jid([#dest{}]) -> {[#dest{}], [#dest{}]}.
split_dests_jid(Dests) ->
lists:partition(fun (Dest) ->
case Dest#dest.jid_string of
@@ -524,18 +489,20 @@ split_dests_jid(Dests) ->
end,
Dests).
+-spec report_not_jid(jid(), stanza(), [#dest{}]) -> any().
report_not_jid(From, Packet, Dests) ->
- Dests2 = [fxml:element_to_binary(Dest#dest.full_xml)
+ Dests2 = [fxml:element_to_binary(xmpp:encode(Dest#dest.address))
|| Dest <- Dests],
- [route_error(From, From, Packet, jid_malformed,
- <<"This service can not process the address: ",
- D/binary>>)
+ [route_error(
+ xmpp:set_from_to(Packet, From, From), jid_malformed,
+ str:format(?T("This service can not process the address: ~ts"), [D]))
|| D <- Dests2].
%%%-------------------------
%%% Group destinations by their servers
%%%-------------------------
+-spec group_dests([#dest{}]) -> [#group{}].
group_dests(Dests) ->
D = lists:foldl(fun (Dest, Dict) ->
ServerS = (Dest#dest.jid_jid)#jid.server,
@@ -543,21 +510,22 @@ group_dests(Dests) ->
end,
dict:new(), Dests),
Keys = dict:fetch_keys(D),
- [#group{server = Key, dests = dict:fetch(Key, D)}
+ [#group{server = Key, dests = dict:fetch(Key, D),
+ addresses = [], others = []}
|| Key <- Keys].
%%%-------------------------
%%% Look for cached responses
%%%-------------------------
-look_cached_servers(LServerS, Groups) ->
- [look_cached(LServerS, Group) || Group <- Groups].
+look_cached_servers(LServerS, LServiceS, Groups) ->
+ [look_cached(LServerS, LServiceS, Group) || Group <- Groups].
-look_cached(LServerS, G) ->
+look_cached(LServerS, LServiceS, G) ->
Maxtime_positive = (?MAXTIME_CACHE_POSITIVE),
Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE),
Cached_response = search_server_on_cache(G#group.server,
- LServerS,
+ LServerS, LServiceS,
{Maxtime_positive,
Maxtime_negative}),
G#group{multicast = Cached_response}.
@@ -573,20 +541,19 @@ build_others_xml(Groups) ->
build_other_xml(Dests) ->
lists:foldl(fun (Dest, R) ->
- XML = Dest#dest.full_xml,
+ XML = Dest#dest.address,
case Dest#dest.type of
- <<"to">> -> [add_delivered(XML) | R];
- <<"cc">> -> [add_delivered(XML) | R];
- <<"bcc">> -> R;
+ to -> [add_delivered(XML) | R];
+ cc -> [add_delivered(XML) | R];
+ bcc -> R;
_ -> [XML | R]
end
end,
[], Dests).
-add_delivered(#xmlel{name = Name, attrs = Attrs,
- children = Els}) ->
- Attrs2 = [{<<"delivered">>, <<"true">>} | Attrs],
- #xmlel{name = Name, attrs = Attrs2, children = Els}.
+-spec add_delivered(address()) -> address().
+add_delivered(Addr) ->
+ Addr#address{delivered = true}.
%%%-------------------------
%%% Add preliminary packets
@@ -608,80 +575,67 @@ add_addresses2(Delivereds, [Group | Groups], Res, Pa,
%%% Decide action groups
%%%-------------------------
+-spec decide_action_groups([#group{}]) -> [{routing(), #group{}}].
decide_action_groups(Groups) ->
- [{decide_action_group(Group), Group}
+ [{Group#group.multicast, Group}
|| Group <- Groups].
-decide_action_group(Group) ->
- Server = Group#group.server,
- case Group#group.multicast of
- {cached, local_server} ->
- %% Send a copy of the packet to each local user on Dests
- route_single;
- {cached, not_supported} ->
- %% Send a copy of the packet to each remote user on Dests
- route_single;
- {cached, {multicast_supported, JID, RLimits}} ->
- {route_multicast, JID, RLimits};
- {obsolete,
- {multicast_supported, Old_service, _RLimits}} ->
- {ask, Old_service, renewal};
- {obsolete, not_supported} -> {ask, Server, not_renewal};
- not_cached -> {ask, Server, not_renewal}
- end.
-
%%%-------------------------
%%% Route packet
%%%-------------------------
-route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) ->
+-spec route_packet(jid(), #dest{}, stanza(), [addresses()], [addresses()]) -> 'ok'.
+route_packet(From, ToDest, Packet, Others, Addresses) ->
Dests = case ToDest#dest.type of
- <<"bcc">> -> [];
+ bcc -> [];
_ -> [ToDest]
end,
route_packet2(From, ToDest#dest.jid_string, Dests,
- Packet, AAttrs, {Others, Addresses}).
+ Packet, {Others, Addresses}).
-route_packet_multicast(From, ToS, Packet, AAttrs, Dests,
+-spec route_packet_multicast(jid(), binary(), stanza(), [#dest{}], [address()], #limits{}) -> 'ok'.
+route_packet_multicast(From, ToS, Packet, Dests,
Addresses, Limits) ->
Type_of_stanza = type_of_stanza(Packet),
{_Type, Limit_number} = get_limit_number(Type_of_stanza,
Limits),
Fragmented_dests = fragment_dests(Dests, Limit_number),
- [route_packet2(From, ToS, DFragment, Packet, AAttrs,
- Addresses)
- || DFragment <- Fragmented_dests].
-
-route_packet2(From, ToS, Dests, Packet, AAttrs,
- Addresses) ->
- #xmlel{name = T, attrs = A, children = C} = Packet,
- C2 = case append_dests(Dests, Addresses) of
- [] -> C;
- ACs ->
- [#xmlel{name = <<"addresses">>, attrs = AAttrs,
- children = ACs}
- | C]
- end,
- Packet2 = #xmlel{name = T, attrs = A, children = C2},
+ lists:foreach(fun(DFragment) ->
+ route_packet2(From, ToS, DFragment, Packet,
+ Addresses)
+ end, Fragmented_dests).
+
+-spec route_packet2(jid(), binary(), [#dest{}], stanza(), {[address()], [address()]} | [address()]) -> 'ok'.
+route_packet2(From, ToS, Dests, Packet, Addresses) ->
+ Els = case append_dests(Dests, Addresses) of
+ [] ->
+ xmpp:get_els(Packet);
+ ACs ->
+ [#addresses{list = ACs}|xmpp:get_els(Packet)]
+ end,
+ Packet2 = xmpp:set_els(Packet, Els),
ToJID = stj(ToS),
- ejabberd_router:route(From, ToJID, Packet2).
+ ejabberd_router:route(xmpp:set_from_to(Packet2, From, ToJID)).
+-spec append_dests([#dest{}], {[address()], [address()]} | [address()]) -> [address()].
append_dests(_Dests, {Others, Addresses}) ->
- Addresses++Others;
+ Addresses ++ Others;
append_dests([], Addresses) -> Addresses;
append_dests([Dest | Dests], Addresses) ->
- append_dests(Dests, [Dest#dest.full_xml | Addresses]).
+ append_dests(Dests, [Dest#dest.address | Addresses]).
%%%-------------------------
%%% Check relay
%%%-------------------------
+-spec check_relay(binary(), binary(), [#group{}]) -> ok.
check_relay(RS, LS, Gs) ->
case check_relay_required(RS, LS, Gs) of
false -> ok;
true -> throw(edrelay)
end.
+-spec check_relay_required(binary(), binary(), [#group{}]) -> boolean().
check_relay_required(RServer, LServerS, Groups) ->
case lists:suffix(str:tokens(LServerS, <<".">>),
str:tokens(RServer, <<".">>)) of
@@ -689,6 +643,7 @@ check_relay_required(RServer, LServerS, Groups) ->
false -> check_relay_required(LServerS, Groups)
end.
+-spec check_relay_required(binary(), [#group{}]) -> boolean().
check_relay_required(LServerS, Groups) ->
lists:any(fun (Group) -> Group#group.server /= LServerS
end,
@@ -698,200 +653,193 @@ check_relay_required(LServerS, Groups) ->
%%% Check protocol support: Send request
%%%-------------------------
-send_query_info(RServerS, LServiceS) ->
+-spec send_query_info(binary(), binary(), binary()) -> ok.
+send_query_info(RServerS, LServiceS, ID) ->
case str:str(RServerS, <<"echo.">>) of
- 1 -> false;
- _ -> send_query(RServerS, LServiceS, ?NS_DISCO_INFO)
+ 1 -> ok;
+ _ -> send_query(RServerS, LServiceS, ID, #disco_info{})
end.
-send_query_items(RServerS, LServiceS) ->
- send_query(RServerS, LServiceS, ?NS_DISCO_ITEMS).
+-spec send_query_items(binary(), binary(), binary()) -> ok.
+send_query_items(RServerS, LServiceS, ID) ->
+ send_query(RServerS, LServiceS, ID, #disco_items{}).
-send_query(RServerS, LServiceS, XMLNS) ->
- Packet = #xmlel{name = <<"iq">>,
- attrs = [{<<"to">>, RServerS}, {<<"type">>, <<"get">>}],
- children =
- [#xmlel{name = <<"query">>,
- attrs = [{<<"xmlns">>, XMLNS}],
- children = []}]},
- ejabberd_router:route(stj(LServiceS), stj(RServerS),
- Packet).
+-spec send_query(binary(), binary(), binary(), disco_info()|disco_items()) -> ok.
+send_query(RServerS, LServiceS, ID, SubEl) ->
+ Packet = #iq{from = stj(LServiceS),
+ to = stj(RServerS),
+ id = ID,
+ type = get, sub_els = [SubEl]},
+ ejabberd_router:route(Packet).
%%%-------------------------
%%% Check protocol support: Receive response: Error
%%%-------------------------
-process_iqreply_error(From, LServiceS, _Packet) ->
- FromS = jts(From),
- case search_waiter(FromS, LServiceS, info) of
- {found_waiter, Waiter} ->
- received_awaiter(FromS, Waiter, LServiceS);
- _ -> ok
+process_iqreply_error(LServiceS, Packet) ->
+ FromS = jts(xmpp:get_from(Packet)),
+ ID = Packet#iq.id,
+ case str:tokens(ID, <<"/">>) of
+ [RServer, _] ->
+ case look_server(RServer) of
+ {cached, {_Response, {wait_for_info, ID}}, _TS}
+ when RServer == FromS ->
+ add_response(RServer, not_supported, cached);
+ {cached, {_Response, {wait_for_items, ID}}, _TS}
+ when RServer == FromS ->
+ add_response(RServer, not_supported, cached);
+ {cached, {Response, {wait_for_items_info, ID, Items}},
+ _TS} ->
+ case lists:member(FromS, Items) of
+ true ->
+ received_awaiter(
+ FromS, RServer, Response, ID, Items,
+ LServiceS);
+ false ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
end.
%%%-------------------------
%%% Check protocol support: Receive response: Disco
%%%-------------------------
-process_iqreply_result(From, LServiceS, Packet, State) ->
- #xmlel{name = <<"query">>, attrs = Attrs2,
- children = Els2} =
- fxml:get_subtag(Packet, <<"query">>),
- case fxml:get_attr_s(<<"xmlns">>, Attrs2) of
- ?NS_DISCO_INFO ->
- process_discoinfo_result(From, LServiceS, Els2, State);
- ?NS_DISCO_ITEMS ->
- process_discoitems_result(From, LServiceS, Els2)
+-spec process_iqreply_result(binary(), iq()) -> any().
+process_iqreply_result(LServiceS, #iq{from = From, id = ID, sub_els = [SubEl]}) ->
+ case SubEl of
+ #disco_info{} ->
+ process_discoinfo_result(From, LServiceS, ID, SubEl);
+ #disco_items{} ->
+ process_discoitems_result(From, LServiceS, ID, SubEl);
+ _ ->
+ ok
end.
%%%-------------------------
%%% Check protocol support: Receive response: Disco Info
%%%-------------------------
-process_discoinfo_result(From, LServiceS, Els,
- _State) ->
+process_discoinfo_result(From, LServiceS, ID, DiscoInfo) ->
FromS = jts(From),
- case search_waiter(FromS, LServiceS, info) of
- {found_waiter, Waiter} ->
- process_discoinfo_result2(From, FromS, LServiceS, Els,
- Waiter);
- _ -> ok
+ case str:tokens(ID, <<"/">>) of
+ [RServer, _] ->
+ case look_server(RServer) of
+ {cached, {Response, {wait_for_info, ID} = ST}, _TS}
+ when RServer == FromS ->
+ process_discoinfo_result2(
+ From, FromS, LServiceS, DiscoInfo,
+ RServer, Response, ST);
+ {cached, {Response, {wait_for_items_info, ID, Items} = ST},
+ _TS} ->
+ case lists:member(FromS, Items) of
+ true ->
+ process_discoinfo_result2(
+ From, FromS, LServiceS, DiscoInfo,
+ RServer, Response, ST);
+ false ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
end.
-process_discoinfo_result2(From, FromS, LServiceS, Els,
- Waiter) ->
- Multicast_support =
- lists:any(
- fun(XML) ->
- case XML of
- #xmlel{name = <<"feature">>, attrs = Attrs} ->
- (?NS_ADDRESS) == fxml:get_attr_s(<<"var">>, Attrs);
- _ -> false
- end
- end,
- Els),
- Group = Waiter#waiter.group,
- RServer = Group#group.server,
+process_discoinfo_result2(From, FromS, LServiceS,
+ #disco_info{features = Feats} = DiscoInfo,
+ RServer, Response, ST) ->
+ Multicast_support = lists:member(?NS_ADDRESS, Feats),
case Multicast_support of
true ->
SenderT = sender_type(From),
- RLimits = get_limits_xml(Els, SenderT),
- add_response(RServer, {multicast_supported, FromS, RLimits}),
- FromM = Waiter#waiter.sender,
- DestsM = Group#group.dests,
- PacketM = Waiter#waiter.packet,
- AAttrsM = Waiter#waiter.aattrs,
- AddressesM = Waiter#waiter.addresses,
- RServiceM = FromS,
- route_packet_multicast(FromM, RServiceM, PacketM,
- AAttrsM, DestsM, AddressesM, RLimits),
- delo_waiter(Waiter);
+ RLimits = get_limits_xml(DiscoInfo, SenderT),
+ add_response(RServer, {multicast_supported, FromS, RLimits}, cached);
false ->
- case FromS of
- RServer ->
- send_query_items(FromS, LServiceS),
- delo_waiter(Waiter),
- add_waiter(Waiter#waiter{awaiting =
- {[FromS], LServiceS, items},
- renewal = false});
- %% We asked a component, and it does not support XEP33
- _ -> received_awaiter(FromS, Waiter, LServiceS)
- end
+ case ST of
+ {wait_for_info, _ID} ->
+ Random = p1_rand:get_string(),
+ ID = <<RServer/binary, $/, Random/binary>>,
+ send_query_items(FromS, LServiceS, ID),
+ add_response(RServer, Response, {wait_for_items, ID});
+ %% We asked a component, and it does not support XEP33
+ {wait_for_items_info, ID, Items} ->
+ received_awaiter(FromS, RServer, Response, ID, Items, LServiceS)
+ end
end.
-get_limits_xml(Els, SenderT) ->
- LimitOpts = get_limits_els(Els),
+get_limits_xml(DiscoInfo, SenderT) ->
+ LimitOpts = get_limits_els(DiscoInfo),
build_remote_limit_record(LimitOpts, SenderT).
-get_limits_els(Els) ->
- lists:foldl(fun (XML, R) ->
- case XML of
- #xmlel{name = <<"x">>, attrs = Attrs,
- children = SubEls} ->
- case ((?NS_XDATA) ==
- fxml:get_attr_s(<<"xmlns">>, Attrs))
- and
- (<<"result">> ==
- fxml:get_attr_s(<<"type">>, Attrs))
- of
- true -> get_limits_fields(SubEls) ++ R;
- false -> R
- end;
- _ -> R
- end
- end,
- [], Els).
-
-get_limits_fields(Fields) ->
- {Head, Tail} = lists:partition(fun (Field) ->
- case Field of
- #xmlel{name = <<"field">>,
- attrs = Attrs} ->
- (<<"FORM_TYPE">> ==
- fxml:get_attr_s(<<"var">>,
- Attrs))
- and
- (<<"hidden">> ==
- fxml:get_attr_s(<<"type">>,
- Attrs));
- _ -> false
- end
- end,
- Fields),
+-spec get_limits_els(disco_info()) -> [{atom(), integer()}].
+get_limits_els(DiscoInfo) ->
+ lists:flatmap(
+ fun(#xdata{type = result} = X) ->
+ get_limits_fields(X);
+ (_) ->
+ []
+ end, DiscoInfo#disco_info.xdata).
+
+-spec get_limits_fields(xdata()) -> [{atom(), integer()}].
+get_limits_fields(X) ->
+ {Head, Tail} = lists:partition(
+ fun(#xdata_field{var = Var, type = Type}) ->
+ Var == <<"FORM_TYPE">> andalso Type == hidden
+ end, X#xdata.fields),
case Head of
[] -> [];
_ -> get_limits_values(Tail)
end.
-get_limits_values(Values) ->
- lists:foldl(fun (Value, R) ->
- case Value of
- #xmlel{name = <<"field">>, attrs = Attrs,
- children = SubEls} ->
- [#xmlel{name = <<"value">>, children = SubElsV}] =
- SubEls,
- Number = fxml:get_cdata(SubElsV),
- Name = fxml:get_attr_s(<<"var">>, Attrs),
- [{jlib:binary_to_atom(Name),
- jlib:binary_to_integer(Number)}
- | R];
- _ -> R
- end
- end,
- [], Values).
+-spec get_limits_values([xdata_field()]) -> [{atom(), integer()}].
+get_limits_values(Fields) ->
+ lists:flatmap(
+ fun(#xdata_field{var = Name, values = [Number]}) ->
+ try
+ [{binary_to_atom(Name, utf8), binary_to_integer(Number)}]
+ catch _:badarg ->
+ []
+ end;
+ (_) ->
+ []
+ end, Fields).
%%%-------------------------
%%% Check protocol support: Receive response: Disco Items
%%%-------------------------
-process_discoitems_result(From, LServiceS, Els) ->
+process_discoitems_result(From, LServiceS, ID, #disco_items{items = Items}) ->
FromS = jts(From),
- case search_waiter(FromS, LServiceS, items) of
- {found_waiter, Waiter} ->
- List = lists:foldl(
- fun(XML, Res) ->
- case XML of
- #xmlel{name = <<"item">>, attrs = Attrs} ->
- SJID = fxml:get_attr_s(<<"jid">>, Attrs),
- case jid:from_string(SJID) of
- #jid{luser = <<"">>,
- lresource = <<"">>} ->
- [SJID | Res];
- _ -> Res
- end;
- _ -> Res
- end
- end,
- [], Els),
- case List of
- [] ->
- received_awaiter(FromS, Waiter, LServiceS);
+ case str:tokens(ID, <<"/">>) of
+ [FromS = RServer, _] ->
+ case look_server(RServer) of
+ {cached, {Response, {wait_for_items, ID}}, _TS} ->
+ List = lists:flatmap(
+ fun(#disco_item{jid = #jid{luser = <<"">>,
+ lserver = LServer,
+ lresource = <<"">>}}) ->
+ [LServer];
+ (_) ->
+ []
+ end, Items),
+ case List of
+ [] ->
+ add_response(RServer, not_supported, cached);
+ _ ->
+ Random = p1_rand:get_string(),
+ ID2 = <<RServer/binary, $/, Random/binary>>,
+ [send_query_info(Item, LServiceS, ID2) || Item <- List],
+ add_response(RServer, Response,
+ {wait_for_items_info, ID2, List})
+ end;
_ ->
- [send_query_info(Item, LServiceS) || Item <- List],
- delo_waiter(Waiter),
- add_waiter(Waiter#waiter{awaiting =
- {List, LServiceS, info},
- renewal = false})
+ ok
end;
_ ->
ok
@@ -901,33 +849,12 @@ process_discoitems_result(From, LServiceS, Els) ->
%%% Check protocol support: Receive response: Received awaiter
%%%-------------------------
-received_awaiter(JID, Waiter, LServiceS) ->
- {JIDs, LServiceS, _} = Waiter#waiter.awaiting,
- delo_waiter(Waiter),
- Group = Waiter#waiter.group,
- RServer = Group#group.server,
+received_awaiter(JID, RServer, Response, ID, JIDs, _LServiceS) ->
case lists:delete(JID, JIDs) of
- [] ->
- case Waiter#waiter.renewal of
- false ->
- add_response(RServer, not_supported),
- From = Waiter#waiter.sender,
- Packet = Waiter#waiter.packet,
- AAttrs = Waiter#waiter.aattrs,
- Others = Group#group.others,
- Addresses = Waiter#waiter.addresses,
- [route_packet(From, ToUser, Packet, AAttrs, Others, Addresses)
- || ToUser <- Group#group.dests];
- true ->
- send_query_info(RServer, LServiceS),
- add_waiter(Waiter#waiter{awaiting =
- {[RServer], LServiceS, info},
- renewal = false})
- end;
- JIDs2 ->
- add_waiter(Waiter#waiter{awaiting =
- {JIDs2, LServiceS, info},
- renewal = false})
+ [] ->
+ add_response(RServer, not_supported, cached);
+ JIDs2 ->
+ add_response(RServer, Response, {wait_for_items_info, ID, JIDs2})
end.
%%%-------------------------
@@ -935,29 +862,59 @@ received_awaiter(JID, Waiter, LServiceS) ->
%%%-------------------------
create_cache() ->
- mnesia:create_table(multicastc,
+ ejabberd_mnesia:create(?MODULE, multicastc,
[{ram_copies, [node()]},
{attributes, record_info(fields, multicastc)}]).
-add_response(RServer, Response) ->
+add_response(RServer, Response, State) ->
Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
mnesia:dirty_write(#multicastc{rserver = RServer,
- response = Response, ts = Secs}).
+ response = {Response, State}, ts = Secs}).
-search_server_on_cache(RServer, LServerS, _Maxmins)
+search_server_on_cache(RServer, LServerS, _LServiceS, _Maxmins)
when RServer == LServerS ->
- {cached, local_server};
-search_server_on_cache(RServer, _LServerS, Maxmins) ->
+ route_single;
+search_server_on_cache(RServer, _LServerS, LServiceS, _Maxmins)
+ when RServer == LServiceS ->
+ route_single;
+search_server_on_cache(RServer, _LServerS, LServiceS, Maxmins) ->
case look_server(RServer) of
- not_cached -> not_cached;
- {cached, Response, Ts} ->
- Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
- case is_obsolete(Response, Ts, Now, Maxmins) of
- false -> {cached, Response};
- true -> {obsolete, Response}
- end
+ not_cached ->
+ query_info(RServer, LServiceS, not_supported),
+ route_single;
+ {cached, {Response, State}, TS} ->
+ Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
+ Response2 =
+ case State of
+ cached ->
+ case is_obsolete(Response, TS, Now, Maxmins) of
+ false -> ok;
+ true ->
+ query_info(RServer, LServiceS, Response)
+ end,
+ Response;
+ _ ->
+ if
+ Now - TS > ?MAXTIME_CACHE_NEGOTIATING ->
+ query_info(RServer, LServiceS, not_supported),
+ not_supported;
+ true ->
+ Response
+ end
+ end,
+ case Response2 of
+ not_supported -> route_single;
+ {multicast_supported, Service, Limits} ->
+ {route_multicast, Service, Limits}
+ end
end.
+query_info(RServer, LServiceS, Response) ->
+ Random = p1_rand:get_string(),
+ ID = <<RServer/binary, $/, Random/binary>>,
+ send_query_info(RServer, LServiceS, ID),
+ add_response(RServer, Response, {wait_for_info, ID}).
+
look_server(RServer) ->
case mnesia:dirty_read(multicastc, RServer) of
[] -> not_cached;
@@ -1029,44 +986,6 @@ purge_loop(NM) ->
end.
%%%-------------------------
-%%% Pool
-%%%-------------------------
-
-create_pool() ->
- catch
- begin
- ets:new(multicastp,
- [duplicate_bag, public, named_table, {keypos, 2}]),
- ets:give_away(multicastp, whereis(ejabberd), ok)
- end.
-
-add_waiter(Waiter) ->
- true = ets:insert(multicastp, Waiter).
-
-delo_waiter(Waiter) ->
- true = ets:delete_object(multicastp, Waiter).
-
--spec search_waiter(binary(), binary(), info | items) ->
- {found_waiter, #waiter{}} | waiter_not_found.
-
-search_waiter(JID, LServiceS, Type) ->
- Rs = ets:foldl(fun (W, Res) ->
- {JIDs, LServiceS1, Type1} = W#waiter.awaiting,
- case lists:member(JID, JIDs) and
- (LServiceS == LServiceS1)
- and (Type1 == Type)
- of
- true -> Res ++ [W];
- false -> Res
- end
- end,
- [], multicastp),
- case Rs of
- [R | _] -> {found_waiter, R};
- [] -> waiter_not_found
- end.
-
-%%%-------------------------
%%% Limits: utils
%%%-------------------------
@@ -1091,33 +1010,36 @@ build_service_limit_record(LimitOpts) ->
build_limit_record(LimitOptsR, remote)}.
get_from_limitopts(LimitOpts, SenderT) ->
- [{StanzaT, Number}
- || {SenderT2, StanzaT, Number} <- LimitOpts,
- SenderT =:= SenderT2].
+ case lists:keyfind(SenderT, 1, LimitOpts) of
+ false -> [];
+ {SenderT, Result} -> Result
+ end.
build_remote_limit_record(LimitOpts, SenderT) ->
build_limit_record(LimitOpts, SenderT).
+-spec build_limit_record(any(), local | remote) -> #limits{}.
build_limit_record(LimitOpts, SenderT) ->
Limits = [get_limit_value(Name, Default, LimitOpts)
|| {Name, Default} <- list_of_limits(SenderT)],
list_to_tuple([limits | Limits]).
+-spec get_limit_value(atom(), integer(), any()) -> limit_value().
get_limit_value(Name, Default, LimitOpts) ->
case lists:keysearch(Name, 1, LimitOpts) of
{value, {Name, Number}} -> {custom, Number};
false -> {default, Default}
end.
-type_of_stanza(#xmlel{name = <<"message">>}) -> message;
-type_of_stanza(#xmlel{name = <<"presence">>}) ->
- presence.
+type_of_stanza(Stanza) -> element(1, Stanza).
+-spec get_limit_number(message | presence, #limits{}) -> limit_value().
get_limit_number(message, Limits) ->
Limits#limits.message;
get_limit_number(presence, Limits) ->
Limits#limits.presence.
+-spec get_slimit_group(local | remote, #service_limits{}) -> #limits{}.
get_slimit_group(local, SLimits) ->
SLimits#service_limits.local;
get_slimit_group(remote, SLimits) ->
@@ -1144,17 +1066,10 @@ fragment_dests(Dests, Limit_number) ->
%% Some parts of code are borrowed from mod_muc_room.erl
-define(RFIELDT(Type, Var, Val),
- #xmlel{name = <<"field">>,
- attrs = [{<<"var">>, Var}, {<<"type">>, Type}],
- children =
- [#xmlel{name = <<"value">>, attrs = [],
- children = [{xmlcdata, Val}]}]}).
+ #xdata_field{type = Type, var = Var, values = [Val]}).
-define(RFIELDV(Var, Val),
- #xmlel{name = <<"field">>, attrs = [{<<"var">>, Var}],
- children =
- [#xmlel{name = <<"value">>, attrs = [],
- children = [{xmlcdata, Val}]}]}).
+ #xdata_field{var = Var, values = [Val]}).
iq_disco_info_extras(From, State) ->
SenderT = sender_type(From),
@@ -1162,16 +1077,13 @@ iq_disco_info_extras(From, State) ->
case iq_disco_info_extras2(SenderT, Service_limits) of
[] -> [];
List_limits_xmpp ->
- [#xmlel{name = <<"x">>,
- attrs =
- [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"result">>}],
- children =
- [?RFIELDT(<<"hidden">>, <<"FORM_TYPE">>, (?NS_ADDRESS))]
- ++ List_limits_xmpp}]
+ [#xdata{type = result,
+ fields = [?RFIELDT(hidden, <<"FORM_TYPE">>, ?NS_ADDRESS)
+ | List_limits_xmpp]}]
end.
sender_type(From) ->
- Local_hosts = (?MYHOSTS),
+ Local_hosts = ejabberd_option:hosts(),
case lists:member(From#jid.lserver, Local_hosts) of
true -> local;
false -> remote
@@ -1197,34 +1109,54 @@ to_binary(A) -> list_to_binary(hd(io_lib:format("~p", [A]))).
%%% Error report
%%%-------------------------
-route_error(From, To, Packet, ErrType, ErrText) ->
- #xmlel{attrs = Attrs} = Packet,
- Lang = fxml:get_attr_s(<<"xml:lang">>, Attrs),
- Reply = make_reply(ErrType, Lang, ErrText),
- Err = jlib:make_error_reply(Packet, Reply),
- ejabberd_router:route(From, To, Err).
+route_error(Packet, ErrType, ErrText) ->
+ Lang = xmpp:get_lang(Packet),
+ Err = make_reply(ErrType, Lang, ErrText),
+ ejabberd_router:route_error(Packet, Err).
make_reply(bad_request, Lang, ErrText) ->
- ?ERRT_BAD_REQUEST(Lang, ErrText);
+ xmpp:err_bad_request(ErrText, Lang);
make_reply(jid_malformed, Lang, ErrText) ->
- ?ERRT_JID_MALFORMED(Lang, ErrText);
+ xmpp:err_jid_malformed(ErrText, Lang);
make_reply(not_acceptable, Lang, ErrText) ->
- ?ERRT_NOT_ACCEPTABLE(Lang, ErrText);
+ xmpp:err_not_acceptable(ErrText, Lang);
make_reply(internal_server_error, Lang, ErrText) ->
- ?ERRT_INTERNAL_SERVER_ERROR(Lang, ErrText);
+ xmpp:err_internal_server_error(ErrText, Lang);
make_reply(forbidden, Lang, ErrText) ->
- ?ERRT_FORBIDDEN(Lang, ErrText).
+ xmpp:err_forbidden(ErrText, Lang).
-stj(String) -> jid:from_string(String).
+stj(String) -> jid:decode(String).
-jts(String) -> jid:to_string(String).
+jts(String) -> jid:encode(String).
depends(_Host, _Opts) ->
[].
mod_opt_type(access) ->
- fun acl:access_rules_validator/1;
-mod_opt_type(host) -> fun iolist_to_binary/1;
+ econf:acl();
+mod_opt_type(name) ->
+ econf:binary();
mod_opt_type(limits) ->
- fun (A) when is_list(A) -> A end;
-mod_opt_type(_) -> [access, host, limits].
+ econf:options(
+ #{local =>
+ econf:options(
+ #{message => econf:non_neg_int(infinite),
+ presence => econf:non_neg_int(infinite)}),
+ remote =>
+ econf:options(
+ #{message => econf:non_neg_int(infinite),
+ presence => econf:non_neg_int(infinite)})});
+mod_opt_type(host) ->
+ econf:host();
+mod_opt_type(hosts) ->
+ econf:hosts();
+mod_opt_type(vcard) ->
+ econf:vcard_temp().
+
+mod_options(Host) ->
+ [{access, all},
+ {host, <<"multicast.", Host/binary>>},
+ {hosts, []},
+ {limits, [{local, []}, {remote, []}]},
+ {vcard, undefined},
+ {name, ?T("Multicast")}].