aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBadlop <badlop@process-one.net>2015-04-08 14:01:16 +0200
committerBadlop <badlop@process-one.net>2015-04-08 14:01:16 +0200
commit0cf8d1fa6e931367b63b133396478593146a8d07 (patch)
treeeeea1804b71de0f5d1001f3f2811c14a146700ab
parentAdd SQLite3 library checks at the configure script (diff)
Copy multicast code from ejabberd-contrib to provide XEP-0033 (#521)
Diffstat (limited to '')
-rw-r--r--ejabberd.yml.example1
-rw-r--r--src/ejabberd_c2s.erl23
-rw-r--r--src/ejabberd_router_multicast.erl239
-rw-r--r--src/ejabberd_sup.erl8
-rw-r--r--src/mod_muc_room.erl35
-rw-r--r--src/mod_multicast.erl1162
6 files changed, 1435 insertions, 33 deletions
diff --git a/ejabberd.yml.example b/ejabberd.yml.example
index 7b26cf792..0d4cbb1a2 100644
--- a/ejabberd.yml.example
+++ b/ejabberd.yml.example
@@ -584,6 +584,7 @@ modules:
access_persistent: muc_create
access_admin: muc_admin
## mod_muc_log: {}
+ ## mod_multicast: {}
mod_offline:
access_max_user_messages: max_user_offline_messages
mod_ping: {}
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 2ac28dbb6..7e83fb8c1 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -2216,14 +2216,16 @@ try_roster_subscribe(Type, User, Server, From, To, Packet, StateData) ->
presence_broadcast(StateData, From, JIDSet, Packet) ->
JIDs = ?SETS:to_list(JIDSet),
JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out),
- send_multiple(StateData, From, JIDs2, Packet).
+ Server = StateData#state.server,
+ send_multiple(From, Server, JIDs2, Packet).
%% Send presence when updating presence
presence_broadcast_to_trusted(StateData, From, Trusted, JIDSet, Packet) ->
JIDs = ?SETS:to_list(JIDSet),
JIDs_trusted = [JID || JID <- JIDs, ?SETS:is_element(JID, Trusted)],
JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs_trusted, out),
- send_multiple(StateData, From, JIDs2, Packet).
+ Server = StateData#state.server,
+ send_multiple(From, Server, JIDs2, Packet).
%% Send presence when connecting
presence_broadcast_first(From, StateData, Packet) ->
@@ -2235,7 +2237,7 @@ presence_broadcast_first(From, StateData, Packet) ->
PacketProbe = #xmlel{name = <<"presence">>, attrs = [{<<"type">>,<<"probe">>}], children = []},
JIDs2Probe = format_and_check_privacy(From, StateData, PacketProbe, JIDsProbe, out),
Server = StateData#state.server,
- send_multiple(StateData, From, JIDs2Probe, PacketProbe),
+ send_multiple(From, Server, JIDs2Probe, PacketProbe),
{As, JIDs} =
?SETS:fold(
fun(JID, {A, JID_list}) ->
@@ -2244,8 +2246,7 @@ presence_broadcast_first(From, StateData, Packet) ->
{StateData#state.pres_a, []},
StateData#state.pres_f),
JIDs2 = format_and_check_privacy(From, StateData, Packet, JIDs, out),
- Server = StateData#state.server,
- send_multiple(StateData, From, JIDs2, Packet),
+ send_multiple(From, Server, JIDs2, Packet),
StateData#state{pres_a = As}.
format_and_check_privacy(From, StateData, Packet, JIDs, Dir) ->
@@ -2266,16 +2267,8 @@ format_and_check_privacy(From, StateData, Packet, JIDs, Dir) ->
end,
FJIDs).
-send_multiple(StateData, From, JIDs, Packet) ->
- lists:foreach(
- fun(JID) ->
- case privacy_check_packet(StateData, From, JID, Packet, out) of
- deny ->
- ok;
- allow ->
- ejabberd_router:route(From, JID, Packet)
- end
- end, JIDs).
+send_multiple(From, Server, JIDs, Packet) ->
+ ejabberd_router_multicast:route_multicast(From, Server, JIDs, Packet).
remove_element(E, Set) ->
case (?SETS):is_element(E, Set) of
diff --git a/src/ejabberd_router_multicast.erl b/src/ejabberd_router_multicast.erl
new file mode 100644
index 000000000..e438fefdb
--- /dev/null
+++ b/src/ejabberd_router_multicast.erl
@@ -0,0 +1,239 @@
+%%%----------------------------------------------------------------------
+%%% File : ejabberd_router_multicast.erl
+%%% Author : Badlop <badlop@process-one.net>
+%%% Purpose : Multicast router
+%%% Created : 11 Aug 2007 by Badlop <badlop@process-one.net>
+%%%----------------------------------------------------------------------
+
+-module(ejabberd_router_multicast).
+-author('alexey@process-one.net').
+-author('badlop@process-one.net').
+
+-behaviour(gen_server).
+
+%% API
+-export([route_multicast/4,
+ register_route/1,
+ unregister_route/1
+ ]).
+
+-export([start_link/0]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+-include("jlib.hrl").
+
+-record(route_multicast, {domain, pid}).
+-record(state, {}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+
+route_multicast(From, Domain, Destinations, Packet) ->
+ case catch do_route(From, Domain, Destinations, Packet) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("~p~nwhen processing: ~p",
+ [Reason, {From, Domain, Destinations, Packet}]);
+ _ ->
+ ok
+ end.
+
+register_route(Domain) ->
+ case jlib:nameprep(Domain) of
+ error ->
+ erlang:error({invalid_domain, Domain});
+ LDomain ->
+ Pid = self(),
+ F = fun() ->
+ mnesia:write(#route_multicast{domain = LDomain,
+ pid = Pid})
+ end,
+ mnesia:transaction(F)
+ end.
+
+unregister_route(Domain) ->
+ case jlib:nameprep(Domain) of
+ error ->
+ erlang:error({invalid_domain, Domain});
+ LDomain ->
+ Pid = self(),
+ F = fun() ->
+ case mnesia:select(route_multicast,
+ [{#route_multicast{pid = Pid, domain = LDomain, _ = '_'},
+ [],
+ ['$_']}]) of
+ [R] -> mnesia:delete_object(R);
+ _ -> ok
+ end
+ end,
+ mnesia:transaction(F)
+ end.
+
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%--------------------------------------------------------------------
+init([]) ->
+ mnesia:create_table(route_multicast,
+ [{ram_copies, [node()]},
+ {type, bag},
+ {attributes,
+ record_info(fields, route_multicast)}]),
+ mnesia:add_table_copy(route_multicast, node(), ram_copies),
+ mnesia:subscribe({table, route_multicast, simple}),
+ lists:foreach(
+ fun(Pid) ->
+ erlang:monitor(process, Pid)
+ end,
+ mnesia:dirty_select(route_multicast, [{{route_multicast, '_', '$1'}, [], ['$1']}])),
+ {ok, #state{}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({route_multicast, From, Domain, Destinations, Packet}, State) ->
+ case catch do_route(From, Domain, Destinations, Packet) of
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("~p~nwhen processing: ~p",
+ [Reason, {From, Domain, Destinations, Packet}]);
+ _ ->
+ ok
+ end,
+ {noreply, State};
+handle_info({mnesia_table_event, {write, #route_multicast{pid = Pid}, _ActivityId}},
+ State) ->
+ erlang:monitor(process, Pid),
+ {noreply, State};
+handle_info({'DOWN', _Ref, _Type, Pid, _Info}, State) ->
+ F = fun() ->
+ Es = mnesia:select(
+ route_multicast,
+ [{#route_multicast{pid = Pid, _ = '_'},
+ [],
+ ['$_']}]),
+ lists:foreach(
+ fun(E) ->
+ mnesia:delete_object(E)
+ end, Es)
+ end,
+ mnesia:transaction(F),
+ {noreply, State};
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+%% From = #jid
+%% Destinations = [#jid]
+do_route(From, Domain, Destinations, Packet) ->
+
+ ?DEBUG("route_multicast~n\tfrom ~s~n\tdomain ~s~n\tdestinations ~p~n\tpacket ~p~n",
+ [jlib:jid_to_string(From),
+ Domain,
+ [jlib:jid_to_string(To) || To <- Destinations],
+ Packet]),
+
+ {Groups, Rest} = lists:foldr(
+ fun(Dest, {Groups1, Rest1}) ->
+ case ejabberd_sm:get_session_pid(Dest#jid.luser, Dest#jid.lserver, Dest#jid.lresource) of
+ none ->
+ {Groups1, [Dest|Rest1]};
+ Pid ->
+ Node = node(Pid),
+ if Node /= node() ->
+ {dict:append(Node, Dest, Groups1), Rest1};
+ true ->
+ {Groups1, [Dest|Rest1]}
+ end
+ end
+ end, {dict:new(), []}, Destinations),
+
+ dict:map(
+ fun(Node, [Single]) ->
+ ejabberd_cluster:send({ejabberd_sm, Node},
+ {route, From, Single, Packet});
+ (Node, Dests) ->
+ ejabberd_cluster:send({ejabberd_sm, Node},
+ {route_multiple, From, Dests, Packet})
+ end, Groups),
+
+ %% Try to find an appropriate multicast service
+ case mnesia:dirty_read(route_multicast, Domain) of
+
+ %% If no multicast service is available in this server, send manually
+ [] -> do_route_normal(From, Rest, Packet);
+
+ %% If available, send the packet using multicast service
+ [R] ->
+ case R#route_multicast.pid of
+ Pid when is_pid(Pid) ->
+ Pid ! {route_trusted, From, Rest, Packet};
+ _ -> do_route_normal(From, Rest, Packet)
+ end
+ end.
+
+do_route_normal(From, Destinations, Packet) ->
+ [ejabberd_router:route(From, To, Packet) || To <- Destinations].
diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl
index ecedfa502..423f84ab9 100644
--- a/src/ejabberd_sup.erl
+++ b/src/ejabberd_sup.erl
@@ -62,6 +62,13 @@ init([]) ->
brutal_kill,
worker,
[ejabberd_router]},
+ Router_multicast =
+ {ejabberd_router_multicast,
+ {ejabberd_router_multicast, start_link, []},
+ permanent,
+ brutal_kill,
+ worker,
+ [ejabberd_router_multicast]},
S2S =
{ejabberd_s2s,
{ejabberd_s2s, start_link, []},
@@ -166,6 +173,7 @@ init([]) ->
NodeGroups,
SystemMonitor,
Router,
+ Router_multicast,
S2S,
Local,
Captcha,
diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl
index 64bfd6c44..f381e8458 100644
--- a/src/mod_muc_room.erl
+++ b/src/mod_muc_room.erl
@@ -681,14 +681,11 @@ handle_event({service_message, Msg}, _StateName,
children =
[#xmlel{name = <<"body">>, attrs = [],
children = [{xmlcdata, Msg}]}]},
- lists:foreach(
- fun({_LJID, Info}) ->
- ejabberd_router:route(
- StateData#state.jid,
- Info#user.jid,
- MessagePkt)
- end,
- ?DICT:to_list(StateData#state.users)),
+ send_multiple(
+ StateData#state.jid,
+ StateData#state.server_host,
+ StateData#state.users,
+ MessagePkt),
NSD = add_message_to_history(<<"">>,
StateData#state.jid, MessagePkt, StateData),
{next_state, normal_state, NSD};
@@ -945,16 +942,11 @@ process_groupchat_message(From,
end,
case IsAllowed of
true ->
- lists:foreach(
- fun({_LJID, Info}) ->
- ejabberd_router:route(
- jlib:jid_replace_resource(
- StateData#state.jid,
- FromNick),
- Info#user.jid,
- Packet)
- end,
- ?DICT:to_list(StateData#state.users)),
+ send_multiple(
+ jlib:jid_replace_resource(StateData#state.jid, FromNick),
+ StateData#state.server_host,
+ StateData#state.users,
+ Packet),
NewStateData2 = case has_body_or_subject(Packet) of
true ->
add_message_to_history(FromNick, From,
@@ -4500,3 +4492,10 @@ has_body_or_subject(Packet) ->
(#xmlel{name = <<"subject">>}) -> false;
(_) -> true
end, Packet#xmlel.children).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Multicast
+
+send_multiple(From, Server, Users, Packet) ->
+ JIDs = [ User#user.jid || {_, User} <- ?DICT:to_list(Users)],
+ ejabberd_router_multicast:route_multicast(From, Server, JIDs, Packet).
diff --git a/src/mod_multicast.erl b/src/mod_multicast.erl
new file mode 100644
index 000000000..8a1960088
--- /dev/null
+++ b/src/mod_multicast.erl
@@ -0,0 +1,1162 @@
+%%%----------------------------------------------------------------------
+%%% File : mod_multicast.erl
+%%% Author : Badlop <badlop@process-one.net>
+%%% Purpose : Extended Stanza Addressing (XEP-0033) support
+%%% Created : 29 May 2007 by Badlop <badlop@process-one.net>
+%%%----------------------------------------------------------------------
+
+-module(mod_multicast).
+
+-author('badlop@process-one.net').
+
+-behaviour(gen_server).
+
+-behaviour(gen_mod).
+
+%% API
+-export([start_link/2, start/2, stop/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_info/2, handle_call/3,
+ handle_cast/2, terminate/2, code_change/3]).
+
+-export([purge_loop/1]).
+
+-include("ejabberd.hrl").
+-include("logger.hrl").
+
+-include("jlib.hrl").
+
+-record(state,
+ {lserver, lservice, access, service_limits}).
+
+-record(multicastc, {rserver, response, ts}).
+
+%% ts: timestamp (in seconds) when the cache item was last updated
+
+-record(dest, {jid_string, jid_jid, type, full_xml}).
+
+%% jid_string = string()
+%% jid_jid = jid()
+%% full_xml = xml()
+
+-record(group,
+ {server, dests, multicast, others, addresses}).
+
+%% server = string()
+%% dests = [string()]
+%% multicast = {cached, local_server} | {cached, string()} | {cached, not_supported} | {obsolete, not_supported} | {obsolete, string()} | not_cached
+%% after being updated, possible values are: local | multicast_not_supported | {multicast_supported, string(), limits()}
+%% others = [xml()]
+%% packet = xml()
+
+-record(waiter,
+ {awaiting, group, renewal = false, sender, packet,
+ aattrs, addresses}).
+
+%% awaiting = {[Remote_service], Local_service, Type_awaiting}
+%% Remote_service = Local_service = string()
+%% Type_awaiting = info | items
+%% group = #group
+%% renewal = true | false
+%% sender = From
+%% packet = xml()
+%% aattrs = [xml()]
+
+-record(limits, {message, presence}).
+
+%% message = presence = integer() | infinite
+
+-record(service_limits, {local, remote}).
+
+%% All the elements are of type value()
+
+-define(VERSION_MULTICAST, <<"$Revision: 440 $ ">>).
+
+-define(PROCNAME, ejabberd_mod_multicast).
+
+-define(PURGE_PROCNAME,
+ ejabberd_mod_multicast_purgeloop).
+
+-define(MAXTIME_CACHE_POSITIVE, 86400).
+
+-define(MAXTIME_CACHE_NEGATIVE, 86400).
+
+-define(CACHE_PURGE_TIMER, 86400000).
+
+-define(DISCO_QUERY_TIMEOUT, 10000).
+
+-define(DEFAULT_LIMIT_LOCAL_MESSAGE, 100).
+
+-define(DEFAULT_LIMIT_LOCAL_PRESENCE, 100).
+
+-define(DEFAULT_LIMIT_REMOTE_MESSAGE, 20).
+
+-define(DEFAULT_LIMIT_REMOTE_PRESENCE, 20).
+
+start_link(LServerS, Opts) ->
+ Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME),
+ gen_server:start_link({local, Proc}, ?MODULE,
+ [LServerS, Opts], []).
+
+start(LServerS, Opts) ->
+ Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME),
+ ChildSpec = {Proc,
+ {?MODULE, start_link, [LServerS, Opts]}, temporary,
+ 1000, worker, [?MODULE]},
+ supervisor:start_child(ejabberd_sup, ChildSpec).
+
+stop(LServerS) ->
+ Proc = gen_mod:get_module_proc(LServerS, ?PROCNAME),
+ gen_server:call(Proc, stop),
+ supervisor:terminate_child(ejabberd_sup, Proc),
+ supervisor:delete_child(ejabberd_sup, Proc).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+init([LServerS, Opts]) ->
+ LServiceS = gen_mod:get_opt_host(LServerS, Opts,
+ <<"multicast.@HOST@">>),
+ Access = gen_mod:get_opt(access, Opts,
+ fun (A) when is_atom(A) -> A end, all),
+ SLimits =
+ build_service_limit_record(gen_mod:get_opt(limits, Opts,
+ fun (A) when is_list(A) ->
+ A
+ end,
+ [])),
+ create_cache(),
+ try_start_loop(),
+ create_pool(),
+ ejabberd_router_multicast:register_route(LServerS),
+ ejabberd_router:register_route(LServiceS),
+ {ok,
+ #state{lservice = LServiceS, lserver = LServerS,
+ access = Access, service_limits = SLimits}}.
+
+handle_call(stop, _From, State) ->
+ try_stop_loop(), {stop, normal, ok, State}.
+
+handle_cast(_Msg, State) -> {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+
+handle_info({route, From, To,
+ #xmlel{name = <<"iq">>, attrs = Attrs} = Packet},
+ State) ->
+ IQ = jlib:iq_query_info(Packet),
+ case catch process_iq(From, IQ, State) of
+ Result when is_record(Result, iq) ->
+ ejabberd_router:route(To, From, jlib:iq_to_xml(Result));
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("Error when processing IQ stanza: ~p",
+ [Reason]),
+ Err = jlib:make_error_reply(Packet,
+ ?ERR_INTERNAL_SERVER_ERROR),
+ ejabberd_router:route(To, From, Err);
+ reply ->
+ LServiceS = jts(To),
+ case xml:get_attr_s(<<"type">>, Attrs) of
+ <<"result">> ->
+ process_iqreply_result(From, LServiceS, Packet, State);
+ <<"error">> ->
+ process_iqreply_error(From, LServiceS, Packet)
+ end
+ end,
+ {noreply, State};
+%% XEP33 allows only 'message' and 'presence' stanza type
+handle_info({route, From, To,
+ #xmlel{name = Stanza_type} = Packet},
+ #state{lservice = LServiceS, lserver = LServerS,
+ access = Access, service_limits = SLimits} =
+ State)
+ when (Stanza_type == <<"message">>) or
+ (Stanza_type == <<"presence">>) ->
+ route_untrusted(LServiceS, LServerS, Access, SLimits,
+ From, To, Packet),
+ {noreply, State};
+%% Handle multicast packets sent by trusted local services
+handle_info({route_trusted, From, Destinations, Packet},
+ #state{lservice = LServiceS, lserver = LServerS} =
+ State) ->
+ route_trusted(LServiceS, LServerS, From, Destinations,
+ Packet),
+ {noreply, State};
+handle_info({get_host, Pid}, State) ->
+ Pid ! {my_host, State#state.lservice}, {noreply, State};
+handle_info(_Info, State) -> {noreply, State}.
+
+terminate(_Reason, State) ->
+ ejabberd_router_multicast:unregister_route(State#state.lserver),
+ ejabberd_router:unregister_route(State#state.lservice),
+ ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+%%====================================================================
+%%% Internal functions
+%%====================================================================
+
+%%%------------------------
+%%% IQ Request Processing
+%%%------------------------
+
+process_iq(From,
+ #iq{type = get, xmlns = ?NS_DISCO_INFO, lang = Lang} =
+ IQ,
+ State) ->
+ IQ#iq{type = result,
+ sub_el =
+ [#xmlel{name = <<"query">>,
+ attrs = [{<<"xmlns">>, ?NS_DISCO_INFO}],
+ children = iq_disco_info(From, Lang, State)}]};
+%% disco#items request
+process_iq(_,
+ #iq{type = get, xmlns = ?NS_DISCO_ITEMS} = IQ, _) ->
+ IQ#iq{type = result,
+ sub_el =
+ [#xmlel{name = <<"query">>,
+ attrs = [{<<"xmlns">>, ?NS_DISCO_ITEMS}],
+ children = []}]};
+%% vCard request
+process_iq(_,
+ #iq{type = get, xmlns = ?NS_VCARD, lang = Lang} = IQ,
+ _) ->
+ IQ#iq{type = result,
+ sub_el =
+ [#xmlel{name = <<"vCard">>,
+ attrs = [{<<"xmlns">>, ?NS_VCARD}],
+ children = iq_vcard(Lang)}]};
+%% Unknown "set" or "get" request
+process_iq(_, #iq{type = Type, sub_el = SubEl} = IQ, _)
+ when Type == get; Type == set ->
+ IQ#iq{type = error,
+ sub_el = [SubEl, ?ERR_SERVICE_UNAVAILABLE]};
+%% IQ "result" or "error".
+process_iq(_, reply, _) -> reply;
+%% IQ "result" or "error".
+process_iq(_, _, _) -> ok.
+
+-define(FEATURE(Feat),
+ #xmlel{name = <<"feature">>,
+ attrs = [{<<"var">>, Feat}], children = []}).
+
+iq_disco_info(From, Lang, State) ->
+ [#xmlel{name = <<"identity">>,
+ attrs =
+ [{<<"category">>, <<"service">>},
+ {<<"type">>, <<"multicast">>},
+ {<<"name">>,
+ translate:translate(Lang, <<"Multicast">>)}],
+ children = []},
+ ?FEATURE((?NS_DISCO_INFO)), ?FEATURE((?NS_DISCO_ITEMS)),
+ ?FEATURE((?NS_VCARD)), ?FEATURE((?NS_ADDRESS))]
+ ++ iq_disco_info_extras(From, State).
+
+iq_vcard(Lang) ->
+ [#xmlel{name = <<"FN">>, attrs = [],
+ children = [{xmlcdata, <<"ejabberd/mod_multicast">>}]},
+ #xmlel{name = <<"URL">>, attrs = [],
+ children = [{xmlcdata, ?EJABBERD_URI}]},
+ #xmlel{name = <<"DESC">>, attrs = [],
+ children =
+ [{xmlcdata,
+ <<(translate:translate(Lang,
+ <<"ejabberd Multicast service">>))/binary,
+ "\nCopyright (c) 2002-2015 ProcessOne">>}]}].
+
+%%%-------------------------
+%%% Route
+%%%-------------------------
+
+route_trusted(LServiceS, LServerS, FromJID,
+ Destinations, Packet) ->
+ Packet_stripped = Packet,
+ AAttrs = [{<<"xmlns">>, ?NS_ADDRESS}],
+ Delivereds = [],
+ Dests2 = lists:map(fun (D) ->
+ DS = jts(D),
+ XML = #xmlel{name = <<"address">>,
+ attrs =
+ [{<<"type">>, <<"bcc">>},
+ {<<"jid">>, DS}],
+ children = []},
+ #dest{jid_string = DS, jid_jid = D,
+ type = <<"bcc">>, full_xml = XML}
+ end,
+ Destinations),
+ Groups = group_dests(Dests2),
+ route_common(LServerS, LServiceS, FromJID, Groups,
+ Delivereds, Packet_stripped, AAttrs).
+
+route_untrusted(LServiceS, LServerS, Access, SLimits,
+ From, To, Packet) ->
+ try route_untrusted2(LServiceS, LServerS, Access,
+ SLimits, From, Packet)
+ catch
+ adenied ->
+ route_error(To, From, Packet, forbidden,
+ <<"Access denied by service policy">>);
+ eadsele ->
+ route_error(To, From, Packet, bad_request,
+ <<"No addresses element found">>);
+ eadeles ->
+ route_error(To, From, Packet, bad_request,
+ <<"No address elements found">>);
+ ewxmlns ->
+ route_error(To, From, Packet, bad_request,
+ <<"Wrong xmlns">>);
+ etoorec ->
+ route_error(To, From, Packet, not_acceptable,
+ <<"Too many receiver fields were specified">>);
+ edrelay ->
+ route_error(To, From, Packet, forbidden,
+ <<"Packet relay is denied by service policy">>);
+ EType:EReason ->
+ ?ERROR_MSG("Multicast unknown error: Type: ~p~nReason: ~p",
+ [EType, EReason]),
+ route_error(To, From, Packet, internal_server_error,
+ <<"Unknown problem">>)
+ end.
+
+route_untrusted2(LServiceS, LServerS, Access, SLimits,
+ FromJID, Packet) ->
+ ok = check_access(LServerS, Access, FromJID),
+ {ok, Packet_stripped, AAttrs, Addresses} =
+ strip_addresses_element(Packet),
+ {To_deliver, Delivereds} =
+ split_addresses_todeliver(Addresses),
+ Dests = convert_dest_record(To_deliver),
+ {Dests2, Not_jids} = split_dests_jid(Dests),
+ report_not_jid(FromJID, Packet, Not_jids),
+ ok = check_limit_dests(SLimits, FromJID, Packet,
+ Dests2),
+ Groups = group_dests(Dests2),
+ ok = check_relay(FromJID#jid.server, LServerS, Groups),
+ route_common(LServerS, LServiceS, FromJID, Groups,
+ Delivereds, Packet_stripped, AAttrs).
+
+route_common(LServerS, LServiceS, FromJID, Groups,
+ Delivereds, Packet_stripped, AAttrs) ->
+ Groups2 = look_cached_servers(LServerS, Groups),
+ Groups3 = build_others_xml(Groups2),
+ Groups4 = add_addresses(Delivereds, Groups3),
+ AGroups = decide_action_groups(Groups4),
+ act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
+ AGroups).
+
+act_groups(FromJID, Packet_stripped, AAttrs, LServiceS,
+ AGroups) ->
+ [perform(FromJID, Packet_stripped, AAttrs, LServiceS,
+ AGroup)
+ || AGroup <- AGroups].
+
+perform(From, Packet, AAttrs, _,
+ {route_single, Group}) ->
+ [route_packet(From, ToUser, Packet, AAttrs,
+ Group#group.addresses)
+ || ToUser <- Group#group.dests];
+perform(From, Packet, AAttrs, _,
+ {{route_multicast, JID, RLimits}, Group}) ->
+ route_packet_multicast(From, JID, Packet, AAttrs,
+ Group#group.dests, Group#group.addresses, RLimits);
+perform(From, Packet, AAttrs, LServiceS,
+ {{ask, Old_service, renewal}, Group}) ->
+ send_query_info(Old_service, LServiceS),
+ add_waiter(#waiter{awaiting =
+ {[Old_service], LServiceS, info},
+ group = Group, renewal = true, sender = From,
+ packet = Packet, aattrs = AAttrs,
+ addresses = Group#group.addresses});
+perform(From, Packet, AAttrs, LServiceS,
+ {{ask, Server, not_renewal}, Group}) ->
+ send_query_info(Server, LServiceS),
+ add_waiter(#waiter{awaiting =
+ {[Server], LServiceS, info},
+ group = Group, renewal = false, sender = From,
+ packet = Packet, aattrs = AAttrs,
+ addresses = Group#group.addresses}).
+
+%%%-------------------------
+%%% Check access permission
+%%%-------------------------
+
+check_access(LServerS, Access, From) ->
+ case acl:match_rule(LServerS, Access, From) of
+ allow -> ok;
+ _ -> throw(adenied)
+ end.
+
+%%%-------------------------
+%%% Strip 'addresses' XML element
+%%%-------------------------
+
+strip_addresses_element(Packet) ->
+ case xml:get_subtag(Packet, <<"addresses">>) of
+ #xmlel{name = <<"addresses">>, attrs = AAttrs,
+ children = Addresses} ->
+ case xml:get_attr_s(<<"xmlns">>, AAttrs) of
+ ?NS_ADDRESS ->
+ #xmlel{name = Name, attrs = Attrs, children = Els} =
+ Packet,
+ Els_stripped = lists:keydelete(<<"addresses">>, 2, Els),
+ Packet_stripped = #xmlel{name = Name, attrs = Attrs,
+ children = Els_stripped},
+ {ok, Packet_stripped, AAttrs, Addresses};
+ _ -> throw(ewxmlns)
+ end;
+ _ -> throw(eadsele)
+ end.
+
+%%%-------------------------
+%%% Split Addresses
+%%%-------------------------
+
+split_addresses_todeliver(Addresses) ->
+ lists:partition(fun (XML) ->
+ case XML of
+ #xmlel{name = <<"address">>, attrs = Attrs} ->
+ case xml:get_attr_s(<<"delivered">>, Attrs) of
+ <<"true">> -> false;
+ _ ->
+ Type = xml:get_attr_s(<<"type">>,
+ Attrs),
+ case Type of
+ <<"to">> -> true;
+ <<"cc">> -> true;
+ <<"bcc">> -> true;
+ _ -> false
+ end
+ end;
+ _ -> false
+ end
+ end,
+ Addresses).
+
+%%%-------------------------
+%%% Check does not exceed limit of destinations
+%%%-------------------------
+
+check_limit_dests(SLimits, FromJID, Packet,
+ Addresses) ->
+ SenderT = sender_type(FromJID),
+ Limits = get_slimit_group(SenderT, SLimits),
+ Type_of_stanza = type_of_stanza(Packet),
+ {_Type, Limit_number} = get_limit_number(Type_of_stanza,
+ Limits),
+ case length(Addresses) > Limit_number of
+ false -> ok;
+ true -> throw(etoorec)
+ end.
+
+%%%-------------------------
+%%% Convert Destination XML to record
+%%%-------------------------
+
+convert_dest_record(XMLs) ->
+ lists:map(fun (XML) ->
+ case xml:get_tag_attr_s(<<"jid">>, XML) of
+ <<"">> -> #dest{jid_string = none, full_xml = XML};
+ JIDS ->
+ Type = xml:get_tag_attr_s(<<"type">>, XML),
+ JIDJ = stj(JIDS),
+ #dest{jid_string = JIDS, jid_jid = JIDJ,
+ type = Type, full_xml = XML}
+ end
+ end,
+ XMLs).
+
+%%%-------------------------
+%%% Split destinations by existence of JID
+%%% and send error messages for other dests
+%%%-------------------------
+
+split_dests_jid(Dests) ->
+ lists:partition(fun (Dest) ->
+ case Dest#dest.jid_string of
+ none -> false;
+ _ -> true
+ end
+ end,
+ Dests).
+
+report_not_jid(From, Packet, Dests) ->
+ Dests2 = [xml:element_to_binary(Dest#dest.full_xml)
+ || Dest <- Dests],
+ [route_error(From, From, Packet, jid_malformed,
+ <<"This service can not process the address: ",
+ D/binary>>)
+ || D <- Dests2].
+
+%%%-------------------------
+%%% Group destinations by their servers
+%%%-------------------------
+
+group_dests(Dests) ->
+ D = lists:foldl(fun (Dest, Dict) ->
+ ServerS = (Dest#dest.jid_jid)#jid.server,
+ dict:append(ServerS, Dest, Dict)
+ end,
+ dict:new(), Dests),
+ Keys = dict:fetch_keys(D),
+ [#group{server = Key, dests = dict:fetch(Key, D)}
+ || Key <- Keys].
+
+%%%-------------------------
+%%% Look for cached responses
+%%%-------------------------
+
+look_cached_servers(LServerS, Groups) ->
+ [look_cached(LServerS, Group) || Group <- Groups].
+
+look_cached(LServerS, G) ->
+ Maxtime_positive = (?MAXTIME_CACHE_POSITIVE),
+ Maxtime_negative = (?MAXTIME_CACHE_NEGATIVE),
+ Cached_response = search_server_on_cache(G#group.server,
+ LServerS,
+ {Maxtime_positive,
+ Maxtime_negative}),
+ G#group{multicast = Cached_response}.
+
+%%%-------------------------
+%%% Build delivered XML element
+%%%-------------------------
+
+build_others_xml(Groups) ->
+ [Group#group{others =
+ build_other_xml(Group#group.dests)}
+ || Group <- Groups].
+
+build_other_xml(Dests) ->
+ lists:foldl(fun (Dest, R) ->
+ XML = Dest#dest.full_xml,
+ case Dest#dest.type of
+ <<"to">> -> [add_delivered(XML) | R];
+ <<"cc">> -> [add_delivered(XML) | R];
+ <<"bcc">> -> R;
+ _ -> [XML | R]
+ end
+ end,
+ [], Dests).
+
+add_delivered(#xmlel{name = Name, attrs = Attrs,
+ children = Els}) ->
+ Attrs2 = [{<<"delivered">>, <<"true">>} | Attrs],
+ #xmlel{name = Name, attrs = Attrs2, children = Els}.
+
+%%%-------------------------
+%%% Add preliminary packets
+%%%-------------------------
+
+add_addresses(Delivereds, Groups) ->
+ Ps = [Group#group.others || Group <- Groups],
+ add_addresses2(Delivereds, Groups, [], [], Ps).
+
+add_addresses2(_, [], Res, _, []) -> Res;
+add_addresses2(Delivereds, [Group | Groups], Res, Pa,
+ [Pi | Pz]) ->
+ Addresses = lists:append([Delivereds] ++ Pa ++ Pz),
+ Group2 = Group#group{addresses = Addresses},
+ add_addresses2(Delivereds, Groups, [Group2 | Res],
+ [Pi | Pa], Pz).
+
+%%%-------------------------
+%%% Decide action groups
+%%%-------------------------
+
+decide_action_groups(Groups) ->
+ [{decide_action_group(Group), Group}
+ || Group <- Groups].
+
+decide_action_group(Group) ->
+ Server = Group#group.server,
+ case Group#group.multicast of
+ {cached, local_server} ->
+ %% Send a copy of the packet to each local user on Dests
+ route_single;
+ {cached, not_supported} ->
+ %% Send a copy of the packet to each remote user on Dests
+ route_single;
+ {cached, {multicast_supported, JID, RLimits}} ->
+ {route_multicast, JID, RLimits};
+ {obsolete,
+ {multicast_supported, Old_service, _RLimits}} ->
+ {ask, Old_service, renewal};
+ {obsolete, not_supported} -> {ask, Server, not_renewal};
+ not_cached -> {ask, Server, not_renewal}
+ end.
+
+%%%-------------------------
+%%% Route packet
+%%%-------------------------
+
+route_packet(From, ToDest, Packet, AAttrs, Addresses) ->
+ Dests = case ToDest#dest.type of
+ <<"bcc">> -> [];
+ _ -> [ToDest]
+ end,
+ route_packet2(From, ToDest#dest.jid_string, Dests,
+ Packet, AAttrs, Addresses).
+
+route_packet_multicast(From, ToS, Packet, AAttrs, Dests,
+ Addresses, Limits) ->
+ Type_of_stanza = type_of_stanza(Packet),
+ {_Type, Limit_number} = get_limit_number(Type_of_stanza,
+ Limits),
+ Fragmented_dests = fragment_dests(Dests, Limit_number),
+ [route_packet2(From, ToS, DFragment, Packet, AAttrs,
+ Addresses)
+ || DFragment <- Fragmented_dests].
+
+route_packet2(From, ToS, Dests, Packet, AAttrs,
+ Addresses) ->
+ #xmlel{name = T, attrs = A, children = C} = Packet,
+ C2 = case append_dests(Dests, Addresses) of
+ [] -> C;
+ ACs ->
+ [#xmlel{name = <<"addresses">>, attrs = AAttrs,
+ children = ACs}
+ | C]
+ end,
+ Packet2 = #xmlel{name = T, attrs = A, children = C2},
+ ToJID = stj(ToS),
+ ejabberd_router:route(From, ToJID, Packet2).
+
+append_dests([], Addresses) -> Addresses;
+append_dests([Dest | Dests], Addresses) ->
+ append_dests(Dests, [Dest#dest.full_xml | Addresses]).
+
+%%%-------------------------
+%%% Check relay
+%%%-------------------------
+
+check_relay(RS, LS, Gs) ->
+ case check_relay_required(RS, LS, Gs) of
+ false -> ok;
+ true -> throw(edrelay)
+ end.
+
+check_relay_required(RServer, LServerS, Groups) ->
+ case str:str(RServer, LServerS) > 0 of
+ true -> false;
+ false -> check_relay_required(LServerS, Groups)
+ end.
+
+check_relay_required(LServerS, Groups) ->
+ lists:any(fun (Group) -> Group#group.server /= LServerS
+ end,
+ Groups).
+
+%%%-------------------------
+%%% Check protocol support: Send request
+%%%-------------------------
+
+send_query_info(RServerS, LServiceS) ->
+ case str:str(RServerS, <<"echo.">>) of
+ 1 -> false;
+ _ -> send_query(RServerS, LServiceS, ?NS_DISCO_INFO)
+ end.
+
+send_query_items(RServerS, LServiceS) ->
+ send_query(RServerS, LServiceS, ?NS_DISCO_ITEMS).
+
+send_query(RServerS, LServiceS, XMLNS) ->
+ Packet = #xmlel{name = <<"iq">>,
+ attrs = [{<<"to">>, RServerS}, {<<"type">>, <<"get">>}],
+ children =
+ [#xmlel{name = <<"query">>,
+ attrs = [{<<"xmlns">>, XMLNS}],
+ children = []}]},
+ ejabberd_router:route(stj(LServiceS), stj(RServerS),
+ Packet).
+
+%%%-------------------------
+%%% Check protocol support: Receive response: Error
+%%%-------------------------
+
+process_iqreply_error(From, LServiceS, _Packet) ->
+ FromS = jts(From),
+ case search_waiter(FromS, LServiceS, info) of
+ {found_waiter, Waiter} ->
+ received_awaiter(FromS, Waiter, LServiceS);
+ _ -> ok
+ end.
+
+%%%-------------------------
+%%% Check protocol support: Receive response: Disco
+%%%-------------------------
+
+process_iqreply_result(From, LServiceS, Packet,
+ State) ->
+ #xmlel{name = <<"query">>, attrs = Attrs2,
+ children = Els2} =
+ xml:get_subtag(Packet, <<"query">>),
+ case xml:get_attr_s(<<"xmlns">>, Attrs2) of
+ ?NS_DISCO_INFO ->
+ process_discoinfo_result(From, LServiceS, Els2, State);
+ ?NS_DISCO_ITEMS ->
+ process_discoitems_result(From, LServiceS, Els2)
+ end.
+
+%%%-------------------------
+%%% Check protocol support: Receive response: Disco Info
+%%%-------------------------
+
+process_discoinfo_result(From, LServiceS, Els,
+ _State) ->
+ FromS = jts(From),
+ case search_waiter(FromS, LServiceS, info) of
+ {found_waiter, Waiter} ->
+ process_discoinfo_result2(From, FromS, LServiceS, Els,
+ Waiter);
+ _ -> ok
+ end.
+
+process_discoinfo_result2(From, FromS, LServiceS, Els,
+ Waiter) ->
+ Multicast_support = lists:any(fun (XML) ->
+ case XML of
+ #xmlel{name = <<"feature">>,
+ attrs = Attrs} ->
+ (?NS_ADDRESS) ==
+ xml:get_attr_s(<<"var">>,
+ Attrs);
+ _ -> false
+ end
+ end,
+ Els),
+ Group = Waiter#waiter.group,
+ RServer = Group#group.server,
+ case Multicast_support of
+ true ->
+ SenderT = sender_type(From),
+ RLimits = get_limits_xml(Els, SenderT),
+ add_response(RServer,
+ {multicast_supported, FromS, RLimits}),
+ FromM = Waiter#waiter.sender,
+ DestsM = Group#group.dests,
+ PacketM = Waiter#waiter.packet,
+ AAttrsM = Waiter#waiter.aattrs,
+ AddressesM = Waiter#waiter.addresses,
+ RServiceM = FromS,
+ route_packet_multicast(FromM, RServiceM, PacketM,
+ AAttrsM, DestsM, AddressesM, RLimits),
+ delo_waiter(Waiter);
+ false ->
+ case FromS of
+ RServer ->
+ send_query_items(FromS, LServiceS),
+ delo_waiter(Waiter),
+ add_waiter(Waiter#waiter{awaiting =
+ {[FromS], LServiceS, items},
+ renewal = false});
+ %% We asked a component, and it does not support XEP33
+ _ -> received_awaiter(FromS, Waiter, LServiceS)
+ end
+ end.
+
+get_limits_xml(Els, SenderT) ->
+ LimitOpts = get_limits_els(Els),
+ build_remote_limit_record(LimitOpts, SenderT).
+
+get_limits_els(Els) ->
+ lists:foldl(fun (XML, R) ->
+ case XML of
+ #xmlel{name = <<"x">>, attrs = Attrs,
+ children = SubEls} ->
+ case ((?NS_XDATA) ==
+ xml:get_attr_s(<<"xmlns">>, Attrs))
+ and
+ (<<"result">> ==
+ xml:get_attr_s(<<"type">>, Attrs))
+ of
+ true -> get_limits_fields(SubEls) ++ R;
+ false -> R
+ end;
+ _ -> R
+ end
+ end,
+ [], Els).
+
+get_limits_fields(Fields) ->
+ {Head, Tail} = lists:partition(fun (Field) ->
+ case Field of
+ #xmlel{name = <<"field">>,
+ attrs = Attrs} ->
+ (<<"FORM_TYPE">> ==
+ xml:get_attr_s(<<"var">>,
+ Attrs))
+ and
+ (<<"hidden">> ==
+ xml:get_attr_s(<<"type">>,
+ Attrs));
+ _ -> false
+ end
+ end,
+ Fields),
+ case Head of
+ [] -> [];
+ _ -> get_limits_values(Tail)
+ end.
+
+get_limits_values(Values) ->
+ lists:foldl(fun (Value, R) ->
+ case Value of
+ #xmlel{name = <<"field">>, attrs = Attrs,
+ children = SubEls} ->
+ [#xmlel{name = <<"value">>, children = SubElsV}] =
+ SubEls,
+ Number = xml:get_cdata(SubElsV),
+ Name = xml:get_attr_s(<<"var">>, Attrs),
+ [{jlib:binary_to_atom(Name),
+ jlib:binary_to_integer(Number)}
+ | R];
+ _ -> R
+ end
+ end,
+ [], Values).
+
+%%%-------------------------
+%%% Check protocol support: Receive response: Disco Items
+%%%-------------------------
+
+process_discoitems_result(From, LServiceS, Els) ->
+ List = lists:foldl(fun (XML, Res) ->
+ case XML of
+ #xmlel{name = <<"item">>, attrs = Attrs} ->
+ Res ++ [xml:get_attr_s(<<"jid">>, Attrs)];
+ _ -> Res
+ end
+ end,
+ [], Els),
+ [send_query_info(Item, LServiceS) || Item <- List],
+ FromS = jts(From),
+ {found_waiter, Waiter} = search_waiter(FromS, LServiceS,
+ items),
+ delo_waiter(Waiter),
+ add_waiter(Waiter#waiter{awaiting =
+ {List, LServiceS, info},
+ renewal = false}).
+
+%%%-------------------------
+%%% Check protocol support: Receive response: Received awaiter
+%%%-------------------------
+
+received_awaiter(JID, Waiter, LServiceS) ->
+ {JIDs, LServiceS, info} = Waiter#waiter.awaiting,
+ delo_waiter(Waiter),
+ Group = Waiter#waiter.group,
+ RServer = Group#group.server,
+ case lists:delete(JID, JIDs) of
+ [] ->
+ case Waiter#waiter.renewal of
+ false ->
+ add_response(RServer, not_supported),
+ From = Waiter#waiter.sender,
+ Packet = Waiter#waiter.packet,
+ AAttrs = Waiter#waiter.aattrs,
+ Addresses = Waiter#waiter.addresses,
+ [route_packet(From, ToUser, Packet, AAttrs, Addresses)
+ || ToUser <- Group#group.dests];
+ true ->
+ send_query_info(RServer, LServiceS),
+ add_waiter(Waiter#waiter{awaiting =
+ {[RServer], LServiceS, info},
+ renewal = false})
+ end;
+ JIDs2 ->
+ add_waiter(Waiter#waiter{awaiting =
+ {JIDs2, LServiceS, info},
+ renewal = false})
+ end.
+
+%%%-------------------------
+%%% Cache
+%%%-------------------------
+
+create_cache() ->
+ mnesia:create_table(multicastc,
+ [{ram_copies, [node()]},
+ {attributes, record_info(fields, multicastc)}]).
+
+add_response(RServer, Response) ->
+ Secs =
+ calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())),
+ mnesia:dirty_write(#multicastc{rserver = RServer,
+ response = Response, ts = Secs}).
+
+search_server_on_cache(RServer, LServerS, _Maxmins)
+ when RServer == LServerS ->
+ {cached, local_server};
+search_server_on_cache(RServer, _LServerS, Maxmins) ->
+ case look_server(RServer) of
+ not_cached -> not_cached;
+ {cached, Response, Ts} ->
+ Now =
+ calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())),
+ case is_obsolete(Response, Ts, Now, Maxmins) of
+ false -> {cached, Response};
+ true -> {obsolete, Response}
+ end
+ end.
+
+look_server(RServer) ->
+ case mnesia:dirty_read(multicastc, RServer) of
+ [] -> not_cached;
+ [M] -> {cached, M#multicastc.response, M#multicastc.ts}
+ end.
+
+is_obsolete(Response, Ts, Now, {Max_pos, Max_neg}) ->
+ Max = case Response of
+ multicast_not_supported -> Max_neg;
+ _ -> Max_pos
+ end,
+ Now - Ts > Max.
+
+%%%-------------------------
+%%% Purge cache
+%%%-------------------------
+
+purge() ->
+ Maxmins_positive = (?MAXTIME_CACHE_POSITIVE),
+ Maxmins_negative = (?MAXTIME_CACHE_NEGATIVE),
+ Now =
+ calendar:datetime_to_gregorian_seconds(calendar:now_to_datetime(now())),
+ purge(Now, {Maxmins_positive, Maxmins_negative}).
+
+purge(Now, Maxmins) ->
+ F = fun () ->
+ mnesia:foldl(fun (R, _) ->
+ #multicastc{response = Response, ts = Ts} =
+ R,
+ case is_obsolete(Response, Ts, Now,
+ Maxmins)
+ of
+ true -> mnesia:delete_object(R);
+ false -> ok
+ end
+ end,
+ none, multicastc)
+ end,
+ mnesia:transaction(F).
+
+%%%-------------------------
+%%% Purge cache loop
+%%%-------------------------
+
+try_start_loop() ->
+ case lists:member(?PURGE_PROCNAME, registered()) of
+ true -> ok;
+ false -> start_loop()
+ end,
+ (?PURGE_PROCNAME) ! new_module.
+
+start_loop() ->
+ register(?PURGE_PROCNAME,
+ spawn(?MODULE, purge_loop, [0])),
+ (?PURGE_PROCNAME) ! purge_now.
+
+try_stop_loop() -> (?PURGE_PROCNAME) ! try_stop.
+
+purge_loop(NM) ->
+ receive
+ purge_now ->
+ purge(),
+ timer:send_after(?CACHE_PURGE_TIMER, ?PURGE_PROCNAME,
+ purge_now),
+ purge_loop(NM);
+ new_module -> purge_loop(NM + 1);
+ try_stop when NM > 1 -> purge_loop(NM - 1);
+ try_stop -> purge_loop_finished
+ end.
+
+%%%-------------------------
+%%% Pool
+%%%-------------------------
+
+create_pool() ->
+ catch ets:new(multicastp,
+ [duplicate_bag, public, named_table, {keypos, 2}]).
+
+add_waiter(Waiter) ->
+ true = ets:insert(multicastp, Waiter).
+
+delo_waiter(Waiter) ->
+ true = ets:delete_object(multicastp, Waiter).
+
+search_waiter(JID, LServiceS, Type) ->
+ Rs = ets:foldl(fun (W, Res) ->
+ {JIDs, LServiceS1, Type1} = W#waiter.awaiting,
+ case lists:member(JID, JIDs) and
+ (LServiceS == LServiceS1)
+ and (Type1 == Type)
+ of
+ true -> Res ++ [W];
+ false -> Res
+ end
+ end,
+ [], multicastp),
+ case Rs of
+ [R | _] -> {found_waiter, R};
+ [] -> waiter_not_found
+ end.
+
+%%%-------------------------
+%%% Limits: utils
+%%%-------------------------
+
+%% Type definitions for data structures related with XEP33 limits
+%% limit() = {Name, Value}
+%% Name = atom()
+%% Value = {Type, Number}
+%% Type = default | custom
+%% Number = integer() | infinite
+
+list_of_limits(local) ->
+ [{message, ?DEFAULT_LIMIT_LOCAL_MESSAGE},
+ {presence, ?DEFAULT_LIMIT_LOCAL_PRESENCE}];
+list_of_limits(remote) ->
+ [{message, ?DEFAULT_LIMIT_REMOTE_MESSAGE},
+ {presence, ?DEFAULT_LIMIT_REMOTE_PRESENCE}].
+
+build_service_limit_record(LimitOpts) ->
+ LimitOptsL = get_from_limitopts(LimitOpts, local),
+ LimitOptsR = get_from_limitopts(LimitOpts, remote),
+ {service_limits, build_limit_record(LimitOptsL, local),
+ build_limit_record(LimitOptsR, remote)}.
+
+get_from_limitopts(LimitOpts, SenderT) ->
+ [{StanzaT, Number}
+ || {SenderT2, StanzaT, Number} <- LimitOpts,
+ SenderT =:= SenderT2].
+
+build_remote_limit_record(LimitOpts, SenderT) ->
+ build_limit_record(LimitOpts, SenderT).
+
+build_limit_record(LimitOpts, SenderT) ->
+ Limits = [get_limit_value(Name, Default, LimitOpts)
+ || {Name, Default} <- list_of_limits(SenderT)],
+ list_to_tuple([limits | Limits]).
+
+get_limit_value(Name, Default, LimitOpts) ->
+ case lists:keysearch(Name, 1, LimitOpts) of
+ {value, {Name, Number}} -> {custom, Number};
+ false -> {default, Default}
+ end.
+
+type_of_stanza(#xmlel{name = <<"message">>}) -> message;
+type_of_stanza(#xmlel{name = <<"presence">>}) ->
+ presence.
+
+get_limit_number(message, Limits) ->
+ Limits#limits.message;
+get_limit_number(presence, Limits) ->
+ Limits#limits.presence.
+
+get_slimit_group(local, SLimits) ->
+ SLimits#service_limits.local;
+get_slimit_group(remote, SLimits) ->
+ SLimits#service_limits.remote.
+
+fragment_dests(Dests, Limit_number) ->
+ {R, _} = lists:foldl(fun (Dest, {Res, Count}) ->
+ case Count of
+ Limit_number ->
+ Head2 = [Dest], {[Head2 | Res], 0};
+ _ ->
+ [Head | Tail] = Res,
+ Head2 = [Dest | Head],
+ {[Head2 | Tail], Count + 1}
+ end
+ end,
+ {[[]], 0}, Dests),
+ R.
+
+%%%-------------------------
+%%% Limits: XEP-0128 Service Discovery Extensions
+%%%-------------------------
+
+%% Some parts of code are borrowed from mod_muc_room.erl
+
+-define(RFIELDT(Type, Var, Val),
+ #xmlel{name = <<"field">>,
+ attrs = [{<<"var">>, Var}, {<<"type">>, Type}],
+ children =
+ [#xmlel{name = <<"value">>, attrs = [],
+ children = [{xmlcdata, Val}]}]}).
+
+-define(RFIELDV(Var, Val),
+ #xmlel{name = <<"field">>, attrs = [{<<"var">>, Var}],
+ children =
+ [#xmlel{name = <<"value">>, attrs = [],
+ children = [{xmlcdata, Val}]}]}).
+
+iq_disco_info_extras(From, State) ->
+ SenderT = sender_type(From),
+ Service_limits = State#state.service_limits,
+ case iq_disco_info_extras2(SenderT, Service_limits) of
+ [] -> [];
+ List_limits_xmpp ->
+ [#xmlel{name = <<"x">>,
+ attrs =
+ [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"result">>}],
+ children =
+ [?RFIELDT(<<"hidden">>, <<"FORM_TYPE">>, (?NS_ADDRESS))]
+ ++ List_limits_xmpp}]
+ end.
+
+sender_type(From) ->
+ Local_hosts = (?MYHOSTS),
+ case lists:member(From#jid.lserver, Local_hosts) of
+ true -> local;
+ false -> remote
+ end.
+
+iq_disco_info_extras2(SenderT, SLimits) ->
+ Limits = get_slimit_group(SenderT, SLimits),
+ Stanza_types = [message, presence],
+ lists:foldl(fun (Type_of_stanza, R) ->
+ case get_limit_number(Type_of_stanza, Limits) of
+ {custom, Number} ->
+ [?RFIELDV((to_binary(Type_of_stanza)),
+ (to_binary(Number)))
+ | R];
+ {default, _} -> R
+ end
+ end,
+ [], Stanza_types).
+
+to_binary(A) -> list_to_binary(hd(io_lib:format("~p", [A]))).
+
+%%%-------------------------
+%%% Error report
+%%%-------------------------
+
+route_error(From, To, Packet, ErrType, ErrText) ->
+ #xmlel{attrs = Attrs} = Packet,
+ Lang = xml:get_attr_s(<<"xml:lang">>, Attrs),
+ Reply = make_reply(ErrType, Lang, ErrText),
+ Err = jlib:make_error_reply(Packet, Reply),
+ ejabberd_router:route(From, To, Err).
+
+make_reply(bad_request, Lang, ErrText) ->
+ ?ERRT_BAD_REQUEST(Lang, ErrText);
+make_reply(jid_malformed, Lang, ErrText) ->
+ ?ERRT_JID_MALFORMED(Lang, ErrText);
+make_reply(not_acceptable, Lang, ErrText) ->
+ ?ERRT_NOT_ACCEPTABLE(Lang, ErrText);
+make_reply(internal_server_error, Lang, ErrText) ->
+ ?ERRT_INTERNAL_SERVER_ERROR(Lang, ErrText);
+make_reply(forbidden, Lang, ErrText) ->
+ ?ERRT_FORBIDDEN(Lang, ErrText).
+
+stj(String) -> jlib:string_to_jid(String).
+
+jts(String) -> jlib:jid_to_string(String).