aboutsummaryrefslogtreecommitdiff
path: root/src/mod_multicast.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_multicast.erl')
-rw-r--r--src/mod_multicast.erl267
1 files changed, 166 insertions, 101 deletions
diff --git a/src/mod_multicast.erl b/src/mod_multicast.erl
index 8a1960088..83520c0be 100644
--- a/src/mod_multicast.erl
+++ b/src/mod_multicast.erl
@@ -3,12 +3,32 @@
%%% Author : Badlop <badlop@process-one.net>
%%% Purpose : Extended Stanza Addressing (XEP-0033) support
%%% Created : 29 May 2007 by Badlop <badlop@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
%%%----------------------------------------------------------------------
-module(mod_multicast).
-author('badlop@process-one.net').
+-protocol({xep, 33, '1.1'}).
+
-behaviour(gen_server).
-behaviour(gen_mod).
@@ -20,7 +40,7 @@
-export([init/1, handle_info/2, handle_call/3,
handle_cast/2, terminate/2, code_change/3]).
--export([purge_loop/1]).
+-export([purge_loop/1, mod_opt_type/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
@@ -131,7 +151,7 @@ init([LServerS, Opts]) ->
try_start_loop(),
create_pool(),
ejabberd_router_multicast:register_route(LServerS),
- ejabberd_router:register_route(LServiceS),
+ ejabberd_router:register_route(LServiceS, LServerS),
{ok,
#state{lservice = LServiceS, lserver = LServerS,
access = Access, service_limits = SLimits}}.
@@ -151,24 +171,11 @@ handle_cast(_Msg, State) -> {noreply, State}.
handle_info({route, From, To,
#xmlel{name = <<"iq">>, attrs = Attrs} = Packet},
State) ->
- IQ = jlib:iq_query_info(Packet),
- case catch process_iq(From, IQ, State) of
- Result when is_record(Result, iq) ->
- ejabberd_router:route(To, From, jlib:iq_to_xml(Result));
- {'EXIT', Reason} ->
- ?ERROR_MSG("Error when processing IQ stanza: ~p",
- [Reason]),
- Err = jlib:make_error_reply(Packet,
- ?ERR_INTERNAL_SERVER_ERROR),
- ejabberd_router:route(To, From, Err);
- reply ->
- LServiceS = jts(To),
- case xml:get_attr_s(<<"type">>, Attrs) of
- <<"result">> ->
- process_iqreply_result(From, LServiceS, Packet, State);
- <<"error">> ->
- process_iqreply_error(From, LServiceS, Packet)
- end
+ case catch handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Error when processing IQ stanza: ~p",
+ [Reason]);
+ _ -> ok
end,
{noreply, State};
%% XEP33 allows only 'message' and 'presence' stanza type
@@ -186,11 +193,16 @@ handle_info({route, From, To,
handle_info({route_trusted, From, Destinations, Packet},
#state{lservice = LServiceS, lserver = LServerS} =
State) ->
- route_trusted(LServiceS, LServerS, From, Destinations,
- Packet),
+ case catch route_trusted(LServiceS, LServerS, From, Destinations,
+ Packet) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Error in route_trusted: ~p", [Reason]);
+ _ -> ok
+ end,
{noreply, State};
handle_info({get_host, Pid}, State) ->
- Pid ! {my_host, State#state.lservice}, {noreply, State};
+ Pid ! {my_host, State#state.lservice},
+ {noreply, State};
handle_info(_Info, State) -> {noreply, State}.
terminate(_Reason, State) ->
@@ -208,6 +220,28 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}.
%%% IQ Request Processing
%%%------------------------
+handle_iq(From, To, #xmlel{attrs = Attrs} = Packet, State) ->
+ IQ = jlib:iq_query_info(Packet),
+ case catch process_iq(From, IQ, State) of
+ Result when is_record(Result, iq) ->
+ ejabberd_router:route(To, From, jlib:iq_to_xml(Result));
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Error when processing IQ stanza: ~p",
+ [Reason]),
+ Err = jlib:make_error_reply(Packet,
+ ?ERR_INTERNAL_SERVER_ERROR),
+ ejabberd_router:route(To, From, Err);
+ reply ->
+ LServiceS = jts(To),
+ case fxml:get_attr_s(<<"type">>, Attrs) of
+ <<"result">> ->
+ process_iqreply_result(From, LServiceS, Packet, State);
+ <<"error">> ->
+ process_iqreply_error(From, LServiceS, Packet)
+ end;
+ ok -> ok
+ end.
+
process_iq(From,
#iq{type = get, xmlns = ?NS_DISCO_INFO, lang = Lang} =
IQ,
@@ -270,7 +304,7 @@ iq_vcard(Lang) ->
[{xmlcdata,
<<(translate:translate(Lang,
<<"ejabberd Multicast service">>))/binary,
- "\nCopyright (c) 2002-2015 ProcessOne">>}]}].
+ "\nCopyright (c) 2002-2016 ProcessOne">>}]}].
%%%-------------------------
%%% Route
@@ -361,7 +395,7 @@ act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
perform(From, Packet, AAttrs, _,
{route_single, Group}) ->
[route_packet(From, ToUser, Packet, AAttrs,
- Group#group.addresses)
+ Group#group.others, Group#group.addresses)
|| ToUser <- Group#group.dests];
perform(From, Packet, AAttrs, _,
{{route_multicast, JID, RLimits}, Group}) ->
@@ -375,6 +409,9 @@ perform(From, Packet, AAttrs, LServiceS,
group = Group, renewal = true, sender = From,
packet = Packet, aattrs = AAttrs,
addresses = Group#group.addresses});
+perform(_From, _Packet, _AAttrs, LServiceS,
+ {{ask, LServiceS, _}, _Group}) ->
+ ok;
perform(From, Packet, AAttrs, LServiceS,
{{ask, Server, not_renewal}, Group}) ->
send_query_info(Server, LServiceS),
@@ -399,17 +436,17 @@ check_access(LServerS, Access, From) ->
%%%-------------------------
strip_addresses_element(Packet) ->
- case xml:get_subtag(Packet, <<"addresses">>) of
+ case fxml:get_subtag(Packet, <<"addresses">>) of
#xmlel{name = <<"addresses">>, attrs = AAttrs,
children = Addresses} ->
- case xml:get_attr_s(<<"xmlns">>, AAttrs) of
+ case fxml:get_attr_s(<<"xmlns">>, AAttrs) of
?NS_ADDRESS ->
#xmlel{name = Name, attrs = Attrs, children = Els} =
Packet,
Els_stripped = lists:keydelete(<<"addresses">>, 2, Els),
Packet_stripped = #xmlel{name = Name, attrs = Attrs,
children = Els_stripped},
- {ok, Packet_stripped, AAttrs, Addresses};
+ {ok, Packet_stripped, AAttrs, fxml:remove_cdata(Addresses)};
_ -> throw(ewxmlns)
end;
_ -> throw(eadsele)
@@ -423,10 +460,10 @@ split_addresses_todeliver(Addresses) ->
lists:partition(fun (XML) ->
case XML of
#xmlel{name = <<"address">>, attrs = Attrs} ->
- case xml:get_attr_s(<<"delivered">>, Attrs) of
+ case fxml:get_attr_s(<<"delivered">>, Attrs) of
<<"true">> -> false;
_ ->
- Type = xml:get_attr_s(<<"type">>,
+ Type = fxml:get_attr_s(<<"type">>,
Attrs),
case Type of
<<"to">> -> true;
@@ -462,10 +499,10 @@ check_limit_dests(SLimits, FromJID, Packet,
convert_dest_record(XMLs) ->
lists:map(fun (XML) ->
- case xml:get_tag_attr_s(<<"jid">>, XML) of
+ case fxml:get_tag_attr_s(<<"jid">>, XML) of
<<"">> -> #dest{jid_string = none, full_xml = XML};
JIDS ->
- Type = xml:get_tag_attr_s(<<"type">>, XML),
+ Type = fxml:get_tag_attr_s(<<"type">>, XML),
JIDJ = stj(JIDS),
#dest{jid_string = JIDS, jid_jid = JIDJ,
type = Type, full_xml = XML}
@@ -488,7 +525,7 @@ split_dests_jid(Dests) ->
Dests).
report_not_jid(From, Packet, Dests) ->
- Dests2 = [xml:element_to_binary(Dest#dest.full_xml)
+ Dests2 = [fxml:element_to_binary(Dest#dest.full_xml)
|| Dest <- Dests],
[route_error(From, From, Packet, jid_malformed,
<<"This service can not process the address: ",
@@ -597,13 +634,13 @@ decide_action_group(Group) ->
%%% Route packet
%%%-------------------------
-route_packet(From, ToDest, Packet, AAttrs, Addresses) ->
+route_packet(From, ToDest, Packet, AAttrs, Others, Addresses) ->
Dests = case ToDest#dest.type of
<<"bcc">> -> [];
_ -> [ToDest]
end,
route_packet2(From, ToDest#dest.jid_string, Dests,
- Packet, AAttrs, Addresses).
+ Packet, AAttrs, {Others, Addresses}).
route_packet_multicast(From, ToS, Packet, AAttrs, Dests,
Addresses, Limits) ->
@@ -629,6 +666,8 @@ route_packet2(From, ToS, Dests, Packet, AAttrs,
ToJID = stj(ToS),
ejabberd_router:route(From, ToJID, Packet2).
+append_dests(_Dests, {Others, Addresses}) ->
+ Addresses++Others;
append_dests([], Addresses) -> Addresses;
append_dests([Dest | Dests], Addresses) ->
append_dests(Dests, [Dest#dest.full_xml | Addresses]).
@@ -644,7 +683,8 @@ check_relay(RS, LS, Gs) ->
end.
check_relay_required(RServer, LServerS, Groups) ->
- case str:str(RServer, LServerS) > 0 of
+ case lists:suffix(str:tokens(LServerS, <<".">>),
+ str:tokens(RServer, <<".">>)) of
true -> false;
false -> check_relay_required(LServerS, Groups)
end.
@@ -693,12 +733,11 @@ process_iqreply_error(From, LServiceS, _Packet) ->
%%% Check protocol support: Receive response: Disco
%%%-------------------------
-process_iqreply_result(From, LServiceS, Packet,
- State) ->
+process_iqreply_result(From, LServiceS, Packet, State) ->
#xmlel{name = <<"query">>, attrs = Attrs2,
children = Els2} =
- xml:get_subtag(Packet, <<"query">>),
- case xml:get_attr_s(<<"xmlns">>, Attrs2) of
+ fxml:get_subtag(Packet, <<"query">>),
+ case fxml:get_attr_s(<<"xmlns">>, Attrs2) of
?NS_DISCO_INFO ->
process_discoinfo_result(From, LServiceS, Els2, State);
?NS_DISCO_ITEMS ->
@@ -721,37 +760,35 @@ process_discoinfo_result(From, LServiceS, Els,
process_discoinfo_result2(From, FromS, LServiceS, Els,
Waiter) ->
- Multicast_support = lists:any(fun (XML) ->
- case XML of
- #xmlel{name = <<"feature">>,
- attrs = Attrs} ->
- (?NS_ADDRESS) ==
- xml:get_attr_s(<<"var">>,
- Attrs);
- _ -> false
- end
- end,
- Els),
+ Multicast_support =
+ lists:any(
+ fun(XML) ->
+ case XML of
+ #xmlel{name = <<"feature">>, attrs = Attrs} ->
+ (?NS_ADDRESS) == fxml:get_attr_s(<<"var">>, Attrs);
+ _ -> false
+ end
+ end,
+ Els),
Group = Waiter#waiter.group,
RServer = Group#group.server,
case Multicast_support of
- true ->
- SenderT = sender_type(From),
- RLimits = get_limits_xml(Els, SenderT),
- add_response(RServer,
- {multicast_supported, FromS, RLimits}),
- FromM = Waiter#waiter.sender,
- DestsM = Group#group.dests,
- PacketM = Waiter#waiter.packet,
- AAttrsM = Waiter#waiter.aattrs,
- AddressesM = Waiter#waiter.addresses,
- RServiceM = FromS,
- route_packet_multicast(FromM, RServiceM, PacketM,
- AAttrsM, DestsM, AddressesM, RLimits),
- delo_waiter(Waiter);
- false ->
- case FromS of
- RServer ->
+ true ->
+ SenderT = sender_type(From),
+ RLimits = get_limits_xml(Els, SenderT),
+ add_response(RServer, {multicast_supported, FromS, RLimits}),
+ FromM = Waiter#waiter.sender,
+ DestsM = Group#group.dests,
+ PacketM = Waiter#waiter.packet,
+ AAttrsM = Waiter#waiter.aattrs,
+ AddressesM = Waiter#waiter.addresses,
+ RServiceM = FromS,
+ route_packet_multicast(FromM, RServiceM, PacketM,
+ AAttrsM, DestsM, AddressesM, RLimits),
+ delo_waiter(Waiter);
+ false ->
+ case FromS of
+ RServer ->
send_query_items(FromS, LServiceS),
delo_waiter(Waiter),
add_waiter(Waiter#waiter{awaiting =
@@ -772,10 +809,10 @@ get_limits_els(Els) ->
#xmlel{name = <<"x">>, attrs = Attrs,
children = SubEls} ->
case ((?NS_XDATA) ==
- xml:get_attr_s(<<"xmlns">>, Attrs))
+ fxml:get_attr_s(<<"xmlns">>, Attrs))
and
(<<"result">> ==
- xml:get_attr_s(<<"type">>, Attrs))
+ fxml:get_attr_s(<<"type">>, Attrs))
of
true -> get_limits_fields(SubEls) ++ R;
false -> R
@@ -791,11 +828,11 @@ get_limits_fields(Fields) ->
#xmlel{name = <<"field">>,
attrs = Attrs} ->
(<<"FORM_TYPE">> ==
- xml:get_attr_s(<<"var">>,
+ fxml:get_attr_s(<<"var">>,
Attrs))
and
(<<"hidden">> ==
- xml:get_attr_s(<<"type">>,
+ fxml:get_attr_s(<<"type">>,
Attrs));
_ -> false
end
@@ -813,8 +850,8 @@ get_limits_values(Values) ->
children = SubEls} ->
[#xmlel{name = <<"value">>, children = SubElsV}] =
SubEls,
- Number = xml:get_cdata(SubElsV),
- Name = xml:get_attr_s(<<"var">>, Attrs),
+ Number = fxml:get_cdata(SubElsV),
+ Name = fxml:get_attr_s(<<"var">>, Attrs),
[{jlib:binary_to_atom(Name),
jlib:binary_to_integer(Number)}
| R];
@@ -828,29 +865,44 @@ get_limits_values(Values) ->
%%%-------------------------
process_discoitems_result(From, LServiceS, Els) ->
- List = lists:foldl(fun (XML, Res) ->
- case XML of
- #xmlel{name = <<"item">>, attrs = Attrs} ->
- Res ++ [xml:get_attr_s(<<"jid">>, Attrs)];
- _ -> Res
- end
- end,
- [], Els),
- [send_query_info(Item, LServiceS) || Item <- List],
FromS = jts(From),
- {found_waiter, Waiter} = search_waiter(FromS, LServiceS,
- items),
- delo_waiter(Waiter),
- add_waiter(Waiter#waiter{awaiting =
- {List, LServiceS, info},
- renewal = false}).
+ case search_waiter(FromS, LServiceS, items) of
+ {found_waiter, Waiter} ->
+ List = lists:foldl(
+ fun(XML, Res) ->
+ case XML of
+ #xmlel{name = <<"item">>, attrs = Attrs} ->
+ SJID = fxml:get_attr_s(<<"jid">>, Attrs),
+ case jid:from_string(SJID) of
+ #jid{luser = <<"">>,
+ lresource = <<"">>} ->
+ [SJID | Res];
+ _ -> Res
+ end;
+ _ -> Res
+ end
+ end,
+ [], Els),
+ case List of
+ [] ->
+ received_awaiter(FromS, Waiter, LServiceS);
+ _ ->
+ [send_query_info(Item, LServiceS) || Item <- List],
+ delo_waiter(Waiter),
+ add_waiter(Waiter#waiter{awaiting =
+ {List, LServiceS, info},
+ renewal = false})
+ end;
+ _ ->
+ ok
+ end.
%%%-------------------------
%%% Check protocol support: Receive response: Received awaiter
%%%-------------------------
received_awaiter(JID, Waiter, LServiceS) ->
- {JIDs, LServiceS, info} = Waiter#waiter.awaiting,
+ {JIDs, LServiceS, _} = Waiter#waiter.awaiting,
delo_waiter(Waiter),
Group = Waiter#waiter.group,
RServer = Group#group.server,
@@ -862,8 +914,9 @@ received_awaiter(JID, Waiter, LServiceS) ->
From = Waiter#waiter.sender,
Packet = Waiter#waiter.packet,
AAttrs = Waiter#waiter.aattrs,
+ Others = Group#group.others,
Addresses = Waiter#waiter.addresses,
- [route_packet(From, ToUser, Packet, AAttrs, Addresses)
+ [route_packet(From, ToUser, Packet, AAttrs, Others, Addresses)
|| ToUser <- Group#group.dests];
true ->
send_query_info(RServer, LServiceS),
@@ -887,8 +940,7 @@ create_cache() ->
{attributes, record_info(fields, multicastc)}]).
add_response(RServer, Response) ->
- Secs =
- calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())),
+ Secs = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
mnesia:dirty_write(#multicastc{rserver = RServer,
response = Response, ts = Secs}).
@@ -899,8 +951,7 @@ search_server_on_cache(RServer, _LServerS, Maxmins) ->
case look_server(RServer) of
not_cached -> not_cached;
{cached, Response, Ts} ->
- Now =
- calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())),
+ Now = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
case is_obsolete(Response, Ts, Now, Maxmins) of
false -> {cached, Response};
true -> {obsolete, Response}
@@ -928,7 +979,7 @@ purge() ->
Maxmins_positive = (?MAXTIME_CACHE_POSITIVE),
Maxmins_negative = (?MAXTIME_CACHE_NEGATIVE),
Now =
- calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())),
+ calendar:datetime_to_gregorian_seconds(calendar:local_time()),
purge(Now, {Maxmins_positive, Maxmins_negative}).
purge(Now, Maxmins) ->
@@ -982,8 +1033,12 @@ purge_loop(NM) ->
%%%-------------------------
create_pool() ->
- catch ets:new(multicastp,
- [duplicate_bag, public, named_table, {keypos, 2}]).
+ catch
+ begin
+ ets:new(multicastp,
+ [duplicate_bag, public, named_table, {keypos, 2}]),
+ ets:give_away(multicastp, whereis(ejabberd), ok)
+ end.
add_waiter(Waiter) ->
true = ets:insert(multicastp, Waiter).
@@ -991,6 +1046,9 @@ add_waiter(Waiter) ->
delo_waiter(Waiter) ->
true = ets:delete_object(multicastp, Waiter).
+-spec search_waiter(binary(), binary(), info | items) ->
+ {found_waiter, #waiter{}} | waiter_not_found.
+
search_waiter(JID, LServiceS, Type) ->
Rs = ets:foldl(fun (W, Res) ->
{JIDs, LServiceS1, Type1} = W#waiter.awaiting,
@@ -1141,7 +1199,7 @@ to_binary(A) -> list_to_binary(hd(io_lib:format("~p", [A]))).
route_error(From, To, Packet, ErrType, ErrText) ->
#xmlel{attrs = Attrs} = Packet,
- Lang = xml:get_attr_s(<<"xml:lang">>, Attrs),
+ Lang = fxml:get_attr_s(<<"xml:lang">>, Attrs),
Reply = make_reply(ErrType, Lang, ErrText),
Err = jlib:make_error_reply(Packet, Reply),
ejabberd_router:route(From, To, Err).
@@ -1157,6 +1215,13 @@ make_reply(internal_server_error, Lang, ErrText) ->
make_reply(forbidden, Lang, ErrText) ->
?ERRT_FORBIDDEN(Lang, ErrText).
-stj(String) -> jlib:string_to_jid(String).
+stj(String) -> jid:from_string(String).
+
+jts(String) -> jid:to_string(String).
-jts(String) -> jlib:jid_to_string(String).
+mod_opt_type(access) ->
+ fun (A) when is_atom(A) -> A end;
+mod_opt_type(host) -> fun iolist_to_binary/1;
+mod_opt_type(limits) ->
+ fun (A) when is_list(A) -> A end;
+mod_opt_type(_) -> [access, host, limits].