aboutsummaryrefslogtreecommitdiff
path: root/src/mod_pubsub_ng/pubsub_broadcast.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_pubsub_ng/pubsub_broadcast.erl')
-rw-r--r--src/mod_pubsub_ng/pubsub_broadcast.erl1240
1 files changed, 1240 insertions, 0 deletions
diff --git a/src/mod_pubsub_ng/pubsub_broadcast.erl b/src/mod_pubsub_ng/pubsub_broadcast.erl
new file mode 100644
index 000000000..33dce8a49
--- /dev/null
+++ b/src/mod_pubsub_ng/pubsub_broadcast.erl
@@ -0,0 +1,1240 @@
+%%% ====================================================================
+%%% ``The contents of this file are subject to the Erlang Public License,
+%%% Version 1.1, (the "License"); you may not use this file except in
+%%% compliance with the License. You should have received a copy of the
+%%% Erlang Public License along with this software. If not, it can be
+%%% retrieved via the world wide web at http://www.erlang.org/.
+%%%
+%%% Software distributed under the License is distributed on an "AS IS"
+%%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%%% the License for the specific language governing rights and limitations
+%%% under the License.
+%%%
+%%% The Initial Developer of the Original Code is ProcessOne.
+%%% Portions created by ProcessOne are Copyright 2006-2011, ProcessOne
+%%% All Rights Reserved.''
+%%% This software is copyright 2006-2011, ProcessOne.
+%%%
+%%% @copyright 2006-2011 ProcessOne
+%%% @author Karim Gemayel <karim.gemayel@process-one.net>
+%%% [http://www.process-one.net/]
+%%% @version {@vsn}, {@date} {@time}
+%%% @end
+%%% ====================================================================
+
+%%% @headerfile "pubsub_dev.hrl"
+
+-module(pubsub_broadcast).
+-author('karim.gemayel@process-one.net').
+
+-compile(export_all).
+
+-include("pubsub_dev.hrl").
+-include("pubsub_api.hrl").
+
+
+-import(pubsub_tools,
+[
+ get_option/2,
+ get_option/3,
+ get_value/2,
+ get_value/3,
+ set_value/3,
+ %%
+ check_access_model/3,
+ check_access_model/7,
+ check_publish_model/3,
+ is_contact_subscribed_to_node_owners/3,
+ is_contact_in_allowed_roster_groups/2,
+ has_subscriptions/1,
+ get_resources/2,
+ get_resources_show/2,
+ rosters_groups_allowed_cache/2
+]).
+
+
+-spec(notify_subscriptions/5 ::
+(
+ Host :: xmpp_jid:raw_jid_component_bare(),
+ Pubsub_Host :: exmpp_pubsub:host(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ Notification_Type :: pubsub_options:notification_type(),
+ Entities_Subscriptions :: [Entity_Subscriptions :: {
+ Entity :: xmpp_jid:usr_entity(),
+ Subscriptions :: _%sub:subscriptions()
+ }])
+ -> 'ok'
+).
+
+notify_subscriptions(_Host, Pubsub_Host, NodeId, Notification_Type,
+ Entities_Subscriptions) ->
+ Pubsub_Component_Jid = jlib:make_jid(<<>>, Pubsub_Host, <<>>),
+ lists:foreach(fun
+ (_Entity = {{U,S,_R}, Subscriptions}) ->
+ lists:foreach(fun
+ ({Subscription_State, SubId, Resource, _Subscription_Options}) ->
+ Recipient_Jid = jlib:make_jid(U,S,Resource),
+ ejabberd_router:route(_From = Pubsub_Component_Jid,
+ _To = Recipient_Jid,
+ _Stanza_Message = exmpp_pubsub:xmlel_message(
+ Notification_Type,
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_subscription('pubsub#event',
+ NodeId,
+ _Jid = pubsub_tools:jid_to_string(Recipient_Jid),
+ SubId,
+ Subscription_State
+ )
+ ])]
+ )
+ )
+
+ end, Subscriptions)
+ end, Entities_Subscriptions).
+
+%%
+-spec(notify_subscription/6 ::
+(
+ Host :: xmpp_jid:raw_jid_component_bare(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ Pubsub_Component_Jid :: xmpp_jid:component_bare(),
+ Recipient :: xmpp_jid:entity_bare() | [xmpp_jid:entity_bare(),...],
+ Notification_Type :: pubsub_options:notification_type(),
+ Subscription :: {
+ Subscriber :: xmpp_jid:raw_jid_entity(),
+ Subscription_State :: exmpp_pubsub:subscription_state(),
+ SubId :: exmpp_pubsub:subId()
+ })
+ -> 'ok'
+).
+
+notify_subscription(_Host, NodeId, Pubsub_Component_Jid, Recipients,
+ Notification_Type, {Subscriber, Subscription_State, SubId})
+ when is_list(Recipients) ->
+ Stanza_Message = exmpp_pubsub:xmlel_message(Notification_Type,
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_subscription('pubsub#event',
+ NodeId, Subscriber, SubId, Subscription_State)
+ ])
+ ]),
+ lists:foreach(fun
+ (Recipient) ->
+ ejabberd_router:route(_From = Pubsub_Component_Jid,
+ _To = pubsub_tools:make_jid(Recipient), Stanza_Message)
+ end, Recipients);
+%%
+notify_subscription(_Host, NodeId, Pubsub_Component_Jid, Recipient,
+ Notification_Type, {Subscriber, Subscription_State, SubId}) ->
+ Stanza_Message = exmpp_pubsub:xmlel_message(Notification_Type,
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_subscription('pubsub#event',
+ NodeId, Subscriber, SubId, Subscription_State)
+ ])
+ ]),
+ ejabberd_router:route(_From = Pubsub_Component_Jid,
+ _To = Recipient, Stanza_Message).
+
+%%
+-spec(notify_delete/6 ::
+(
+ Host :: xmpp_jid:raw_jid_component_bare(),
+ Pubsub_Host :: exmpp_pubsub:host(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ Notification_Type :: pubsub_options:notification_type(),
+ RedirectURI :: undefined | binary(),
+ Recipients :: [Recipient::xmpp_jid:usr_bare(),...])
+ -> 'ok'
+).
+
+notify_delete(_Host, Pubsub_Host, NodeId, Notification_Type, RedirectURI,
+ Recipients) ->
+ Pubsub_Component_Jid = jlib:make_jid(<<>>, Pubsub_Host, <<>>),
+ Stanza_Message = exmpp_pubsub:xmlel_message(Notification_Type,
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_delete(NodeId, RedirectURI)])]
+ ),
+ lists:foreach(fun
+ (Recipient) ->
+ ejabberd_router:route(_To = Pubsub_Component_Jid,
+ _From = pubsub_tools:make_jid(Recipient),
+ Stanza_Message
+ )
+ end, Recipients).
+
+%%
+-spec(notify_purge/5 ::
+(
+ Host :: xmpp_jid:raw_jid_component_bare(),
+ Pubsub_Host :: exmpp_pubsub:host(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ Notification_Type :: pubsub_options:notification_type(),
+ Recipients :: [Recipient::xmpp_jid:usr_bare(),...])
+ -> 'ok'
+).
+
+notify_purge(_Host, Pubsub_Host, NodeId, Notification_Type, Recipients) ->
+ Pubsub_Component_Jid = jlib:make_jid(<<>>, Pubsub_Host, <<>>),
+ Stanza_Message = exmpp_pubsub:xmlel_message(Notification_Type,
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_purge(NodeId)])]),
+ lists:foreach(fun
+ (Recipient) ->
+ ejabberd_router:route(_To = Pubsub_Component_Jid,
+ _From = pubsub_tools:make_jid(Recipient),
+ Stanza_Message
+ )
+ end, Recipients).
+
+%%
+-spec(notify_publish/8 ::
+(
+ From :: xmpp_jid:component_bare(),
+ To :: xmpp_jid:entity(),
+ Notification_Type :: 'headline' | 'normal',
+ NodeId :: exmpp_pubsub:nodeId(),
+ ItemId :: exmpp_pubsub:itemId(),
+ Publisher :: undefined | xmpp_jid:raw_jid_entity(),
+ Payload :: exmpp_pubsub:payload(),
+ SubIds :: [SubId::exmpp_pubsub:subId(),...])
+ -> 'ok'
+).
+
+notify_publish(From, To, Notification_Type, NodeId, ItemId, Publisher, Payload,
+ SubIds) ->
+ ejabberd_router:route(From, To,
+ _Stanza_Message = exmpp_pubsub:xmlel_message(Notification_Type,
+ case SubIds of
+ [_SubId] ->
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_items('pubsub#event', NodeId,
+ [exmpp_pubsub:xmlel_item('pubsub#event', ItemId,
+ Publisher, Payload)
+ ])
+ ])
+ ];
+ _SubIds ->
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_items('pubsub#event', NodeId,
+ [exmpp_pubsub:xmlel_item('pubsub#event', ItemId,
+ Publisher, Payload)
+ ])
+ ]),
+ exmpp_pubsub:xmlel_headers(
+ [exmpp_pubsub:xmlel_header(SubId) || SubId <- SubIds])
+ ]
+ end)
+ ).
+
+
+-define(Xmlel(NS, Name, Attrs, Children),
+(
+ #xmlel{
+ name = Name,
+ attrs = [{<<"xmlns">>, NS} | Attrs],
+ children = Children
+ }
+)).
+
+-define(CData(Data),
+(
+ {xmlcdata, Data}
+)).
+
+-define(Stanza_Message(Type, Id, Children),
+(
+ ?Xmlel(?NS_JABBER_CLIENT, <<"message">>, [{<<"type">>, Type}, {<<"id">>, Id}],
+ Children)
+)).
+
+%%
+-spec(stanza_notify_publish/5 ::
+(
+ Notification_Type :: pubsub_options:notification_type(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ ItemId :: exmpp_pubsub:itemId(),
+ Publisher :: undefined | xmpp_jid:usr_entity() | xmpp_jid:raw_jid_entity(),
+ Payload :: exmpp_pubsub:payload())
+ -> Stanza_Notify_Publish::xmlel()
+).
+
+stanza_notify_publish(Notification_Type, NodeId, ItemId, Publisher, Payload)
+ when is_tuple(Publisher) ->
+ stanza_notify_publish(Notification_Type, NodeId, ItemId,
+ pubsub_tools:jid_to_string(Publisher), Payload);
+
+stanza_notify_publish(Notification_Type, NodeId, ItemId, Publisher, Payload) ->
+ ?Stanza_Message(list_to_binary(atom_to_list(Notification_Type)), exmpp_pubsub:id(),[
+ ?Xmlel(?NS_PUBSUB_EVENT, <<"event">>, [], [
+ ?Xmlel(?NS_PUBSUB_EVENT, <<"items">>, [{<<"node">>, NodeId}], [
+ ?Xmlel(?NS_PUBSUB_EVENT, <<"item">>,
+ case Publisher of
+ undefined ->
+ [{<<"id">>, ItemId}];
+ _ ->
+ [{<<"id">>, ItemId}, {<<"publisher">>, Publisher}]
+ end,
+ case Payload of
+ [] -> [];
+ _ -> [Payload]
+ end)
+ ])
+ ])
+ ]).
+
+
+%%
+-spec(stanza_notify_retract/3 ::
+(
+ Notification_Type :: pubsub_options:notification_type(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ ItemId :: exmpp_pubsub:itemId())
+ -> Stanza_Notify_Retract::xmlel()
+).
+
+stanza_notify_retract(Notification_Type, NodeId, ItemId) ->
+ ?Stanza_Message(list_to_binary(atom_to_list(Notification_Type)), exmpp_pubsub:id(),[
+ ?Xmlel(?NS_PUBSUB_EVENT, <<"event">>, [], [
+ ?Xmlel(?NS_PUBSUB_EVENT, <<"items">>, [{<<"node">>, NodeId}], [
+ ?Xmlel(?NS_PUBSUB_EVENT, <<"retract">>, [{<<"id">>, ItemId}], [])
+ ])
+ ])
+ ]).
+
+
+%%
+-spec(notify_retract/6 ::
+(
+ From :: xmpp_jid:component_bare(),
+ To :: xmpp_jid:entity(),
+ Notification_Type :: 'headline' | 'normal',
+ NodeId :: exmpp_pubsub:nodeId(),
+ ItemId :: exmpp_pubsub:itemId(),
+ SubIds :: [SubId::exmpp_pubsub:subId(),...])
+ -> none
+).
+
+notify_retract(From, To, Notification_Type, NodeId, ItemId, SubIds) ->
+ ejabberd_router:route(From, To,
+ _Stanza_Message = exmpp_pubsub:xmlel_message(Notification_Type,
+ case SubIds of
+ [_SubId] ->
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_items('pubsub#event', NodeId,
+ [exmpp_pubsub:xmlel_retract(ItemId)])
+ ])
+ ];
+ _SubIds ->
+ [exmpp_pubsub:xmlel_event([
+ exmpp_pubsub:xmlel_items('pubsub#event', NodeId,
+ [exmpp_pubsub:xmlel_retract(ItemId)])
+ ]),
+ exmpp_pubsub:xmlel_headers(
+ [exmpp_pubsub:xmlel_header(SubId) || SubId <- SubIds])
+ ]
+ end)
+ ).
+
+
+broadcast_item(From, To, Stanza, SubIds) ->
+ ejabberd_router:route(From, To,
+ case SubIds of
+ [_] -> Stanza;
+ _ -> exmpp_xml:append_child(Stanza, shim_headers(SubIds))
+ end).
+
+
+shim_headers(SubIds) ->
+ ?Xmlel(?NS_SHIM, <<"headers">>, [],
+ [?Xmlel(?NS_SHIM, <<"header">>, [{<<"name">>, <<"SubID">>}], [?CData(SubId)])
+ || SubId <- SubIds]
+ ).
+
+%%
+xmlel_delay(From, DateTime) ->
+ {T_string, Tz_string} = jlib:timestamp_to_iso(
+ calendar:now_to_datetime(DateTime), 'utc'),
+ ?Xmlel(?NS_DELAY, <<"delay">>,
+ [{<<"from">>, pubsub_tools:jid_to_string(From)},
+ {<<"stamp">>, list_to_binary(T_string ++ Tz_string)}],
+ []).
+
+%%
+-spec(broadcast_publish/8 ::
+(
+ Host :: xmpp_jid:raw_jid_component_bare(),
+ Pubsub_Host :: exmpp_pubsub:host(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ Node_Options :: pubsub_options:options_node(),
+ Node_Owners :: [Node_Owner::xmpp_jid:usr_bare(),...],
+ Broadcasted_Items :: {
+ Published_Items :: [{
+ ItemId :: exmpp_pubsub:itemId(),
+ Item_Options :: [] | pubsub_options:options_item(),
+ Payload :: exmpp_pubsub:payload(),
+ Publisher :: xmpp_jid:usr_entity()
+ }],
+ %
+ Retracted_Items :: [{
+ ItemId :: exmpp_pubsub:itemId(),
+ Item_Options :: [] | pubsub_options:options_item()
+ }]
+ },
+ Subscription :: undefined
+ | {Subscriber :: xmpp_jid:raw_jid_entity(),
+ Subscription_State :: _,%exmpp_pubsub:subscription_state(),
+ SubId :: exmpp_pubsub:subId()},
+ Pubsub_States :: [Pubsub_State::mod_pubsub_dev:pubsub_state(),...])
+ -> 'ok'
+).
+
+broadcast_publish(Host, Pubsub_Host, NodeId, Node_Options, Node_Owners,
+ {Published_Items, Retracted_Items}, Subscription, Pubsub_States) ->
+ Event = #event{
+ host = Host,
+ component = jlib:make_jid(<<>>, Pubsub_Host, <<>>),
+ node = #node{
+ id = NodeId,
+ owners = Node_Owners,
+ access_model = get_value(Node_Options, 'access_model'),
+ rosters_groups_allowed = get_value(Node_Options, 'roster_groups_allowed')
+ }
+ },
+ Broadcasted_Items = lists:map(fun
+ %%
+ ({ItemId, _Item_Options = []}) ->
+ #item{
+ presence_based_delivery = get_value(Node_Options,
+ 'presence_based_delivery'),
+ stanza = stanza_notify_retract(
+ get_value(Node_Options, 'notification_type', 'headline'),
+ NodeId, ItemId)
+ };
+ %%
+ ({ItemId, Item_Options}) ->
+ #item{
+ access_model = get_value(Item_Options, 'access_model', undefined),
+ presence_based_delivery = get_value({Node_Options, Item_Options},
+ 'presence_based_delivery'),
+ rosters_groups_allowed = get_value(Item_Options,
+ 'roster_groups_allowed', []),
+ stanza = stanza_notify_retract(
+ get_value({Node_Options, Item_Options}, 'notification_type',
+ 'headline'),
+ NodeId, ItemId)
+ };
+ %%
+ ({ItemId, _Item_Options = [], Payload, Publisher}) ->
+ #item{
+ presence_based_delivery = get_value(Node_Options,
+ 'presence_based_delivery'),
+ stanza = stanza_notify_publish(
+ get_value(Node_Options, 'notification_type', 'headline'),
+ NodeId,
+ ItemId,
+ _Publisher = case
+ get_value(Node_Options, 'itemreply', 'owner')
+ of
+ 'publisher' -> pubsub_tools:jid_to_string(Publisher);
+ _Owner -> undefined
+ end,
+ Payload)
+ };
+ %%
+ ({ItemId, Item_Options, Payload, Publisher}) ->
+ #item{
+ access_model = get_value(Item_Options, 'access_model'),
+ presence_based_delivery = get_value({Node_Options, Item_Options},
+ 'presence_based_delivery'),
+ rosters_groups_allowed = get_value(Item_Options,
+ 'roster_groups_allowed', []),
+ stanza = stanza_notify_publish(
+ get_value({Node_Options, Item_Options}, 'notification_type',
+ 'headline'),
+ NodeId,
+ ItemId,
+ _Publisher = case
+ get_value({Node_Options, Item_Options}, 'itemreply',
+ 'owner')
+ of
+ 'publisher' -> pubsub_tools:jid_to_string(Publisher);
+ _Owner -> undefined
+ end,
+ Payload)
+ }
+ %
+ end, lists:append(Retracted_Items, Published_Items)),
+ %
+ lists:foreach(fun
+ %%
+ (#pubsub_state_dev{id = {{U,S,R} = _Entity, _NodeIdx},
+ affiliation = Affiliation, subscriptions = Subscriptions})
+ when (Affiliation == 'member'
+ orelse Affiliation == 'owner'
+ orelse Affiliation == 'publisher')
+ andalso Subscriptions =/= [] ->
+ case Subscription =/= undefined andalso Affiliation == 'owner' of
+ true ->
+ notify_subscription(Host, NodeId,
+ _Pubsub_Component_Jid = Event#event.component,
+ _Recipient = jlib:make_jid(U,S,<<>>),
+ _Notification_Type = get_value(Node_Options,
+ 'notification_type'),
+ Subscription);
+ false ->
+ ok
+ end,
+ lists:foldl(fun broadcast_item/2,
+ Event#event{
+ entity = #entity{
+ id = {U,S,R},
+ local = S == Event#event.host,
+ affiliation = Affiliation,
+ subscriptions = Subscriptions
+ },
+ cache = #cache{},
+ subids = #subids{}
+ },
+ Broadcasted_Items);
+ %%
+ (_Pubsub_State) ->
+ ok
+ %
+ end, Pubsub_States).
+
+%%
+
+-spec(broadcast_publish_last/9 ::
+(
+ Host :: xmpp_jid:raw_jid_component_bare(),
+ Pubsub_Host :: exmpp_pubsub:host(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ Node_Options :: pubsub_options:options_node(),
+ Node_Owners :: [Node_Owner::xmpp_jid:usr_bare(),...],
+ Published_Items :: [Published_Item :: {
+ ItemId :: exmpp_pubsub:itemId(),
+ Item_Options :: [] | pubsub_options:options_item(),
+ Payload :: exmpp_pubsub:payload(),
+ Publisher :: xmpp_jid:usr_entity(),
+ DateTime :: erlang:timestamp()
+ },...],
+ Entity :: xmpp_jid:usr_entity(),
+ Affiliation :: 'member' | 'owner' | 'publisher',
+ Subscription :: exmpp_pubsub:subscription())
+ -> none()
+).
+
+broadcast_publish_last(Host, Pubsub_Host, NodeId, Node_Options, Node_Owners,
+ Published_Items, {U,S,R} = Entity, Affiliation, Subscription) ->
+ Event = #event{
+ host = Host,
+ component = jlib:make_jid(<<>>, Pubsub_Host, <<>>),
+ node = #node{
+ id = NodeId,
+ owners = Node_Owners,
+ access_model = get_value(Node_Options, 'access_model'),
+ rosters_groups_allowed = get_value(Node_Options, 'roster_groups_allowed')
+ }
+ },
+ Broadcasted_Items = lists:map(fun
+ %%
+ ({ItemId, _Item_Options = [], Payload, Publisher, DateTime}) ->
+ #item{
+ presence_based_delivery = get_value(Node_Options,
+ 'presence_based_delivery'),
+ stanza = exmpp_xml:append_child(stanza_notify_publish(
+ get_value(Node_Options, 'notification_type', 'headline'),
+ NodeId,
+ ItemId,
+ _Publisher = case
+ get_value(Node_Options, 'itemreply', 'owner')
+ of
+ 'publisher' -> pubsub_tools:jid_to_string(Publisher);
+ _Owner -> undefined
+ end,
+ Payload),
+ xmlel_delay(Publisher, DateTime))
+ };
+ %%
+ ({ItemId, Item_Options, Payload, Publisher, DateTime}) ->
+ #item{
+ access_model = get_value(Item_Options, 'access_model'),
+ presence_based_delivery = get_value({Node_Options, Item_Options},
+ 'presence_based_delivery'),
+ rosters_groups_allowed = get_value(Item_Options,
+ 'roster_groups_allowed', []),
+ stanza = exmpp_xml:append_child(stanza_notify_publish(
+ get_value({Node_Options, Item_Options}, 'notification_type',
+ 'headline'),
+ NodeId,
+ ItemId,
+ _Publisher = case
+ get_value({Node_Options, Item_Options}, 'itemreply',
+ 'owner')
+ of
+ 'publisher' -> pubsub_tools:jid_to_string(Publisher);
+ _Owner -> undefined
+ end,
+ Payload),
+ xmlel_delay(Publisher, DateTime))
+ }
+ %
+ end, Published_Items),
+ case get_value(Node_Options, 'notify_sub', false) of
+ true ->
+ {Subscription_State, SubId, Resource, _} = Subscription,
+ Subscriber = pubsub_tools:jid_to_string({U,S,Resource}),
+ Notification_Type = get_value(Node_Options, 'notification_type',
+ 'headline'),
+ lists:foreach(fun
+ (Node_Owner) ->
+ notify_subscription(Host, NodeId,
+ _Pubsub_Component_Jid = Event#event.component,
+ _Recipient = pubsub_tools:make_jid(Node_Owner),
+ Notification_Type,
+ {Subscriber, Subscription_State, SubId})
+
+ end, Node_Owners);
+ false ->
+ ok
+ end,
+ %%
+ lists:foldl(fun broadcast_item/2,
+ Event#event{
+ entity = #entity{
+ id = {U,S,R},
+ local = S == Event#event.host,
+ affiliation = Affiliation,
+ subscriptions = [Subscription]
+ },
+ cache = #cache{},
+ subids = #subids{}
+ },
+ Broadcasted_Items).
+
+
+-spec(broadcast_publish_last2/9 ::
+(
+ Host :: xmpp_jid:raw_jid_component_bare(),
+ Pubsub_Host :: exmpp_pubsub:host(),
+ NodeId :: exmpp_pubsub:nodeId(),
+ Node_Options :: pubsub_options:options_node(),
+ Node_Owners :: [Node_Owner::xmpp_jid:usr_bare(),...],
+ Published_Items :: [Published_Item :: {
+ ItemId :: exmpp_pubsub:itemId(),
+ Item_Options :: [] | pubsub_options:options_item(),
+ Payload :: exmpp_pubsub:payload(),
+ Publisher :: xmpp_jid:usr_entity(),
+ DateTime :: erlang:timestamp()
+ },...],
+ Entity :: xmpp_jid:usr_entity(),
+ Affiliation :: 'member' | 'owner' | 'publisher',
+ Subscriptions :: exmpp_pubsub:subscription())
+ -> none()
+).
+
+broadcast_publish_last2(Host, Pubsub_Host, NodeId, Node_Options, Node_Owners,
+ Published_Items, {U,S,R} = Entity, Affiliation, Subscriptions) ->
+ Event = #event{
+ host = Host,
+ component = jlib:make_jid(<<>>, Pubsub_Host, <<>>),
+ node = #node{
+ id = NodeId,
+ owners = Node_Owners,
+ access_model = get_value(Node_Options, 'access_model'),
+ rosters_groups_allowed = get_value(Node_Options, 'roster_groups_allowed')
+ }
+ },
+ Broadcasted_Items = lists:map(fun
+ %%
+ ({ItemId, _Item_Options = [], Payload, Publisher, DateTime}) ->
+ #item{
+ presence_based_delivery = get_value(Node_Options,
+ 'presence_based_delivery'),
+ stanza = exmpp_xml:append_child(stanza_notify_publish(
+ get_value(Node_Options, 'notification_type', 'headline'),
+ NodeId,
+ ItemId,
+ _Publisher = case
+ get_value(Node_Options, 'itemreply', 'owner')
+ of
+ 'publisher' -> pubsub_tools:jid_to_string(Publisher);
+ _Owner -> undefined
+ end,
+ Payload),
+ xmlel_delay(Publisher, DateTime))
+ };
+ %%
+ ({ItemId, Item_Options, Payload, Publisher, DateTime}) ->
+ #item{
+ access_model = get_value(Item_Options, 'access_model'),
+ presence_based_delivery = get_value({Node_Options, Item_Options},
+ 'presence_based_delivery'),
+ rosters_groups_allowed = get_value(Item_Options,
+ 'roster_groups_allowed', []),
+ stanza = exmpp_xml:append_child(stanza_notify_publish(
+ get_value({Node_Options, Item_Options}, 'notification_type',
+ 'headline'),
+ NodeId,
+ ItemId,
+ _Publisher = case
+ get_value({Node_Options, Item_Options}, 'itemreply',
+ 'owner')
+ of
+ 'publisher' -> pubsub_tools:jid_to_string(Publisher);
+ _Owner -> undefined
+ end,
+ Payload),
+ xmlel_delay(Publisher, DateTime))
+ }
+ %
+ end, Published_Items),
+ %%
+ lists:foldl(fun broadcast_item/2,
+ Event#event{
+ entity = #entity{
+ id = {U,S,R},
+ local = S == Event#event.host,
+ affiliation = Affiliation,
+ subscriptions = Subscriptions
+ },
+ cache = #cache{},
+ subids = #subids{}
+ },
+ Broadcasted_Items).
+
+%%
+-define(Resources_Subids(Item, Event),
+ (Item#item.presence_based_delivery == false
+ andalso
+ Event#event.subids#subids.no_presence =/= undefined
+ andalso
+ Event#event.subids#subids.no_presence =/= [])
+ orelse
+ (Item#item.presence_based_delivery == true
+ andalso
+ Event#event.subids#subids.presence =/= undefined
+ andalso
+ Event#event.subids#subids.presence =/= [])
+).
+
+-define(Item_Access_Model(Item, Event),
+ Item#item.access_model == undefined
+ orelse
+ Item#item.access_model == 'open'
+ orelse
+ Item#item.access_model == 'authorize'
+ orelse
+ Item#item.access_model == 'whitelist'
+ orelse
+ (Item#item.access_model == Event#event.node#node.access_model
+ andalso
+ Item#item.access_model =/= 'roster')
+ orelse
+ (Item#item.access_model == 'presence'
+ andalso
+ (Event#event.entity#entity.affiliation == 'owner'
+ orelse
+% Event#event.entity#entity.affiliation == 'publisher'
+% orelse
+ Event#event.node#node.access_model == 'presence'
+ orelse
+ Event#event.node#node.access_model == 'roster'
+ orelse
+ Event#event.cache#cache.presence_subscriptions == true)
+ )
+ orelse
+ (Item#item.access_model == 'roster'
+ andalso
+ (Event#event.entity#entity.affiliation == 'owner'
+ orelse
+ Event#event.entity#entity.affiliation == 'publisher'
+ orelse
+ Event#event.cache#cache.rosters_groups == true)
+ )
+).
+
+
+-define(Broadcast_Event(Item, Event),
+ (?Resources_Subids(Item, Event)) and (?Item_Access_Model(Item, Event))
+).
+
+-define(Presence_Based_Delivery(Item, Event),
+ Item#item.presence_based_delivery == true
+ andalso
+ Event#event.subids#subids.presence == undefined
+).
+
+-define(No_Presence_Based_Delivery(Item, Event),
+ Item#item.presence_based_delivery == false
+ andalso
+ Event#event.subids#subids.no_presence == undefined
+).
+
+-define(Presence_Based_Delivery_Cache(Item, Event),
+ Item#item.presence_based_delivery == true
+ andalso
+ Event#event.cache#cache.presence == undefined
+).
+
+-define(Item_Access_Model_Presence(Item, Event),
+ Item#item.access_model == 'presence'
+ andalso
+ Event#event.cache#cache.presence_subscriptions == undefined
+).
+
+-define(Item_Access_Model_Roster(Item, Event),
+ Item#item.access_model == 'roster'
+ andalso
+ Event#event.cache#cache.rosters_groups == undefined
+).
+
+
+-spec(broadcast_item/2 ::
+(
+ Item :: mod_pubsub_dev:item(),
+ Event :: mod_pubsub_dev:event())
+ -> Event::mod_pubsub_dev:event()
+% Event::#event{cache::#cache{rosters_groups :: undefined}}
+).
+
+
+
+%%
+broadcast_item(#item{stanza = Stanza} = Item,
+ #event{entity = #entity{id = {U,S,_R}}} = Event)
+ when ?Broadcast_Event(Item, Event) ->
+ lists:foreach(fun
+ ({Resource, SubIds}) ->
+ broadcast_item(_From = Event#event.component,
+ _To = jlib:make_jid(U,S,Resource),
+ Stanza,
+ SubIds)
+ end,
+ _Resources_SubIds = case Item#item.presence_based_delivery of
+ true -> Event#event.subids#subids.presence;
+ false -> Event#event.subids#subids.no_presence
+ end),
+ Event#event{cache = Event#event.cache#cache{rosters_groups = undefined}};
+%%
+broadcast_item(Item, Event)
+ when ?No_Presence_Based_Delivery(Item, Event) ->
+ {Presence_Cache, Resources_SubIds} = resources_subids(
+ _Entity = Event#event.entity#entity.id,
+ _Local_Entity = Event#event.entity#entity.local,
+ _Presence_Based_Delivery = false,
+ _Subscriptions = Event#event.entity#entity.subscriptions,
+ {Event#event.cache#cache.presence, []}),
+ case Resources_SubIds of
+ [] ->
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence = Presence_Cache,
+ rosters_groups = undefined
+ },
+ subids = Event#event.subids#subids{
+ no_presence = []
+ }
+ };
+ _Resources_SubIds ->
+ broadcast_item(Item,
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence = Presence_Cache
+ },
+ subids = Event#event.subids#subids{
+ no_presence = Resources_SubIds
+ }
+ }
+ )
+ end;
+%%
+broadcast_item(Item, #event{entity = #entity{id = {U,S,_R}}} = Event)
+ when ?Presence_Based_Delivery_Cache(Item, Event) ->
+ case get_resources_show(U,S) of
+ [] ->
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence = [],
+ rosters_groups = undefined
+ }
+ };
+ Resources_Show ->
+ broadcast_item(Item,
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence = Resources_Show
+ }
+ })
+ end;
+%%
+broadcast_item(Item, Event)
+ when ?Presence_Based_Delivery(Item, Event) ->
+ {Presence_Cache, Resources_SubIds} = resources_subids(
+ _Entity = Event#event.entity#entity.id,
+ _Local_Entity = Event#event.entity#entity.local,
+ _Presence_Based_Delivery = true,
+ _Subscriptions = Event#event.entity#entity.subscriptions,
+ {Event#event.cache#cache.presence, []}),
+ case Resources_SubIds of
+ [] ->
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence = Presence_Cache,
+ rosters_groups = undefined
+ },
+ subids = Event#event.subids#subids{
+ presence = []}
+ };
+ _Resources_SubIds ->
+ broadcast_item(Item,
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence = Presence_Cache
+ },
+ subids = Event#event.subids#subids{
+ presence = Resources_SubIds
+ }
+ })
+ end;
+%%
+broadcast_item(Item, Event)
+ when ?Item_Access_Model_Presence(Item, Event) ->
+ case
+ is_contact_subscribed_to_node_owners(
+ _Host = Event#event.host,
+ _Entity = Event#event.entity#entity.id,
+ _Node_Owners = Event#event.node#node.owners
+ )
+ of
+ false ->
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence_subscriptions = false,
+ rosters_groups = undefined
+ }
+ };
+ _Node_Owner ->
+ broadcast_item(Item,
+ Event#event{
+ cache = Event#event.cache#cache{
+ presence_subscriptions = true
+ }
+ }
+ )
+ end;
+%%
+broadcast_item(Item, Event)
+ when ?Item_Access_Model_Roster(Item, Event) ->
+ case Event#event.node#node.access_model of
+ 'roster' ->
+ case Item#item.rosters_groups_allowed of
+ %%
+ [] ->
+ broadcast_item(Item,
+ Event#event{
+ cache = Event#event.cache#cache{
+ rosters_groups = true
+ }
+ }
+ );
+ %%
+ Item_Rosters_Groups_Allowed ->
+ case
+ is_contact_in_allowed_roster_groups(
+ _Contact = Event#event.entity#entity.id,
+ _Rosters_Groups_Allowed = Item_Rosters_Groups_Allowed
+ )
+ of
+ false ->
+ Event;
+ {_Node_Owner, _Roster_Group_Allowed} ->
+ broadcast_item(Item,
+ Event#event{
+ cache = Event#event.cache#cache{
+ rosters_groups = true
+ }
+ })
+ end
+ end;
+ _Node_Access_Model ->
+ case
+ is_contact_in_allowed_roster_groups(
+ _Contact = Event#event.entity#entity.id,
+ _Rosters_Groups_Allowed = case
+ Item#item.rosters_groups_allowed
+ of
+ [] ->
+ Event#event.node#node.rosters_groups_allowed;
+ Item_Rosters_Groups_Allowed ->
+ Item_Rosters_Groups_Allowed
+ end
+ )
+ of
+ false ->
+ Event;
+ {_Node_Owner, _Roster_Group_Allowed} ->
+ broadcast_item(Item,
+ Event#event{
+ cache = Event#event.cache#cache{rosters_groups = true}
+ })
+ end
+ end;
+%%
+broadcast_item(_Item, Event) ->
+ Event#event{cache = Event#event.cache#cache{rosters_groups = undefined}}.
+
+
+%%
+-spec(resources_subids/5 ::
+(
+ Entity :: xmpp_jid:usr_bare(),
+ Local_Entity :: boolean(),
+ Presence_Based_Delivery :: boolean(),
+ Subscriptions :: _,%exmpp_pubsub:subscriptions() | [],
+ Cache :: {
+ Presence_Cache :: undefined | [] | mod_pubsub_dev:presence_cache(),
+ Resources_SubIds :: undefined | [] | mod_pubsub_dev:resources_subids()
+ })
+ -> {Presence_Cache :: undefined | [] | mod_pubsub_dev:presence_cache(),
+ Resources_SubIds :: [] | mod_pubsub_dev:resources_subids()}
+).
+
+resources_subids(_Entity, _Local_Entity, _Presence_Based_Delivery,
+ [] = _Subscriptions, {Presence_Cache, Resources_SubIds}) ->
+ {Presence_Cache, Resources_SubIds};
+%%
+resources_subids(Entity, Local_Entity, Presence_Based_Delivery,
+ [{'pending', _SubId, _Resource, _Subscription_Options} | Subscriptions],
+ {Presence_Cache, Resources_SubIds}) ->
+ resources_subids(Entity, Local_Entity, Presence_Based_Delivery, Subscriptions,
+ {Presence_Cache, Resources_SubIds});
+%%
+resources_subids(Entity, Local_Entity, Presence_Based_Delivery,
+ [{Subscription_State, SubId, {caps, Resource}, Subscription_Options}
+ | Subscriptions], {Presence_Cache, Resources_SubIds}) ->
+ resources_subids(Entity, Local_Entity, Presence_Based_Delivery,
+ [{Subscription_State, SubId, Resource, Subscription_Options}
+ | Subscriptions],
+ {Presence_Cache, Resources_SubIds});
+%% Presence_Based_Delivery = false
+%% Local_Entity = boolean()
+%% Subscription_Options = []
+resources_subids(Entity, Local_Entity, false = Presence_Based_Delivery,
+ [{_Subscription_State, SubId, Resource, [] = _Subscription_Options}
+ | Subscriptions], {Presence_Cache, Resources_SubIds}) ->
+ resources_subids(Entity, Local_Entity, Presence_Based_Delivery, Subscriptions,
+ {Presence_Cache,
+ _Resources_SubIds = resources_subids(Resource, SubId,
+ Resources_SubIds)});
+%% Presence_Based_Delivery = false
+%% Local_Entity = true
+%% Resource = undefined | xmpp_jid:resource_jid()
+%% Subscription_Options =/= []
+resources_subids({U,S,R} = _Entity, true = Local_Entity, false = Presence_Based_Delivery,
+ [{_Subscription_State, SubId, Resource, Subscription_Options} | Subscriptions],
+ {Presence_Cache, Resources_SubIds}) ->
+ resources_subids({U,S,R}, Local_Entity, Presence_Based_Delivery,
+ Subscriptions,
+ case get_value(Subscription_Options, 'deliver', true) of
+ true ->
+ case get_value(Subscription_Options, 'show-values') of
+ 'none' -> %% 'show-values' NA
+ {Presence_Cache,
+ _Resources_SubIds = resources_subids(Resource, SubId,
+ Resources_SubIds)};
+ %%
+ Show_Values
+ when Show_Values == []
+ orelse Presence_Cache == [] ->
+ {Presence_Cache, Resources_SubIds};
+ %%
+ Show_Values
+ when Presence_Cache == undefined ->
+ case get_resources_show(U, S) of
+ %%%%
+ [] ->
+ {_Presence_Cache = [], Resources_SubIds};
+ %%%%
+ Resources_Show ->
+ {_Presence_Cache = Resources_Show,
+ _Resources_SubIds = lists:foldl(fun
+ (Resource_To_Deliver, Resources_SubIds_Bis) ->
+ resources_subids(Resource_To_Deliver,
+ SubId, Resources_SubIds_Bis)
+ end,
+ Resources_SubIds,
+ _Resources_To_Deliver = resources_to_deliver(
+ Resource, Show_Values, Resources_Show,
+ []))}
+ end;
+ %%
+ Show_Values ->
+ {Presence_Cache,
+ _Resources_SubIds = lists:foldl(fun
+ (Resource_To_Deliver, Resources_SubIds_Bis) ->
+ resources_subids(Resource_To_Deliver,
+ SubId, Resources_SubIds_Bis)
+ end,
+ Resources_SubIds,
+ _Resources_To_Deliver = resources_to_deliver(
+ Resource, Show_Values, Presence_Cache, []))}
+ end;
+ false ->
+ {Presence_Cache, Resources_SubIds}
+ end);
+%% Presence_Based_Delivery = false
+%% Local_Entity = false
+%% Subscription_Options =/= []
+resources_subids(Entity, false = Local_Entity, false = Presence_Based_Delivery,
+ [{_Subscription_State, SubId, Resource, Subscription_Options} | Subscriptions],
+ {Presence_Cache, Resources_SubIds}) ->
+ resources_subids(Entity, Local_Entity, Presence_Based_Delivery, Subscriptions,
+ case get_value(Subscription_Options, 'deliver', true) of
+ true ->
+ {Presence_Cache,
+ _Resources_SubIds = resources_subids(Resource, SubId,
+ Resources_SubIds)};
+ false ->
+ {Presence_Cache, Resources_SubIds}
+ end);
+%% Presence_Based_Delivery = true
+%% Local_Entity = true
+%% Subscription_Options : NA
+%% Presence_Cache = 'none'
+resources_subids({U,S,R} = _Entity, true = _Local_Entity,
+ true = _Presence_Based_Delivery, Subscriptions,
+ {undefined = _Presence_Cache, Resources_SubIds}) ->
+ case get_resources_show(U, S) of
+ [] ->
+ {[], Resources_SubIds};
+ _Resources_Show = Presence_Cache ->
+ resources_subids({U,S,R}, true, true, Subscriptions,
+ {Presence_Cache, Resources_SubIds})
+ end;
+%% Presence_Based_Delivery = true
+%% Local_Entity = true
+%% Subscription_Options : NA
+%% Presence_Cache = []
+resources_subids(_Entity, true = _Local_Entity, true = _Presence_Based_Delivery,
+ _Subscriptions, {[] = Presence_Cache, Resources_SubIds}) ->
+ {Presence_Cache, Resources_SubIds};
+%% Presence_Based_Delivery = true
+%% Local_Entity = true
+%% Resource = undefined | xmpp_jid:resource_jid()
+%% Subscription_Options =/= []
+%% Presence_Cache =/= []
+resources_subids(Entity, true = _Local_Entity, true = Presence_Based_Delivery,
+ [{_Subscription_State, SubId, Resource, Subscription_Options} | Subscriptions],
+ {Presence_Cache, Resources_SubIds}) ->
+ resources_subids(Entity, true, Presence_Based_Delivery, Subscriptions,
+ case
+ get_value(Subscription_Options, 'deliver', true) == true
+ andalso
+ (Resource == undefined
+ orelse lists:keymember(Resource, 1, Presence_Cache))
+ of
+ true ->
+ {Presence_Cache,
+ _Resources_SubIds = case
+ get_value(Subscription_Options, 'show-values')
+ of
+ 'none' when Resource == undefined -> %% 'show-values' NA
+ lists:foldl(fun
+ ({Resource_To_Deliver, _Show}, Resources_SubIds_Bis) ->
+ resources_subids(Resource_To_Deliver,
+ SubId, Resources_SubIds_Bis)
+ end, Resources_SubIds, Presence_Cache);
+ 'none' -> %% 'show-values' NA
+ resources_subids(Resource, SubId, Resources_SubIds);
+ [] ->
+ Resources_SubIds;
+ Show_Values ->
+ lists:foldl(fun
+ (Resource_To_Deliver, Resources_SubIds_Bis) ->
+ resources_subids(Resource_To_Deliver,
+ SubId, Resources_SubIds_Bis)
+ end,
+ Resources_SubIds,
+ _Resources_To_Deliver = resources_to_deliver(
+ Resource, Show_Values, Presence_Cache, []))
+ end};
+ false ->
+ {Presence_Cache, Resources_SubIds}
+ end);
+%% Presence_Based_Delivery = true
+%% Local_Entity = false
+%% Subscription_Options : NA
+resources_subids(_Entity, false = _Local_Entity, true = _Presence_Based_Delivery,
+ _Subscriptions, {Presence_Cache, Resources_SubIds}) ->
+ {Presence_Cache, Resources_SubIds}.
+
+%%
+-spec(resources_subids/3 ::
+(
+ Resource :: xmpp_jid:resource_jid(),
+ SubId :: exmpp_pubsub:subId(),
+ Resources_SubIds :: [] | mod_pubsub_dev:resources_subids())
+ -> %%
+ Resources_SubIds::mod_pubsub_dev:resources_subids()
+).
+
+resources_subids(Resource, SubId, Resources_SubIds) ->
+ _Resources_SubIds = case lists:keyfind(Resource, 1, Resources_SubIds) of
+ {_Resource, SubIds} ->
+ lists:keyreplace(Resource, 1, Resources_SubIds,
+ {Resource, [SubId | SubIds]});
+ false ->
+ [{Resource, [SubId]} | Resources_SubIds]
+ end.
+
+%%
+-spec(resources_to_deliver/4 ::
+(
+ Resource :: undefined | xmpp_jid:resource_jid(),
+ Show_Values :: ['away' | 'chat' | 'dnd' | 'online' | 'xa',...],
+ Presence_Cache :: [] | mod_pubsub_dev:presence_cache(),
+ Resources_To_Deliver :: [Resource::xmpp_jid:resource_jid()])
+ -> %%
+ Resources_To_Deliver::[Resource::xmpp_jid:resource_jid()]
+).
+
+resources_to_deliver(_Resource, _Show_Values, [] = _Presence_Cache,
+ Resources_To_Deliver) ->
+ Resources_To_Deliver;
+%%
+resources_to_deliver(Resource, Show_Values, [Resource_Show | Presence_Cache],
+ Resources_To_Deliver) ->
+ resources_to_deliver(Resource, Show_Values, Presence_Cache,
+ _Resources_To_Deliver = resource_to_deliver(Resource, Show_Values,
+ Resource_Show, Resources_To_Deliver)).
+
+%%
+-spec(resource_to_deliver/4 ::
+(
+ Resource :: undefined | xmpp_jid:resource_jid(),
+ Show_Values :: ['away' | 'chat' | 'dnd' | 'online' | 'xa',...],
+ Resource_Show :: mod_pubsub_dev:resource_show(),
+ Resources_To_Deliver :: [Resource::xmpp_jid:resource_jid()])
+ -> %%
+ Resources_To_Deliver::[Resource::xmpp_jid:resource_jid()]
+).
+
+resource_to_deliver(_Resource, [] = _Show_Values, _Resource_Show,
+ Resources_To_Deliver) ->
+ Resources_To_Deliver;
+%%
+resource_to_deliver(undefined = _Resource, [_Show_Value = Show | Show_Values],
+ {Resource, Show}, Resources_To_Deliver) ->
+ resource_to_deliver(_Resource = undefined, Show_Values, {Resource, Show},
+ [_Resource_To_Deliver = Resource | Resources_To_Deliver]);
+%%
+resource_to_deliver(Resource, [_Show_Value = Show | Show_Values],
+ {Resource, Show}, Resources_To_Deliver) ->
+ resource_to_deliver(Resource, Show_Values, {Resource, Show},
+ [_Resource_To_Deliver = Resource | Resources_To_Deliver]);
+%%
+resource_to_deliver(Resource, [_Show_Value | Show_Values], Resource_Show,
+ Resources_To_Deliver) ->
+ resource_to_deliver(Resource, Show_Values, Resource_Show,
+ Resources_To_Deliver).
+