aboutsummaryrefslogtreecommitdiff
path: root/src/mod_pubsub/mod_pubsub.erl
diff options
context:
space:
mode:
authorChristophe Romain <christophe.romain@process-one.net>2009-04-22 22:19:41 +0000
committerChristophe Romain <christophe.romain@process-one.net>2009-04-22 22:19:41 +0000
commiteab7a509f959e7d8f9df7373fc716318e0369e39 (patch)
treef28ee30eafb1d3d8a23889002f3c6c12c12f8196 /src/mod_pubsub/mod_pubsub.erl
parent* src/ejabberd.cfg.example: Fix English typos. Fix line length: (diff)
improve send last published items spawning
SVN Revision: 2036
Diffstat (limited to 'src/mod_pubsub/mod_pubsub.erl')
-rw-r--r--src/mod_pubsub/mod_pubsub.erl199
1 files changed, 130 insertions, 69 deletions
diff --git a/src/mod_pubsub/mod_pubsub.erl b/src/mod_pubsub/mod_pubsub.erl
index 240a40b60..1ce3f2cba 100644
--- a/src/mod_pubsub/mod_pubsub.erl
+++ b/src/mod_pubsub/mod_pubsub.erl
@@ -108,6 +108,10 @@
code_change/3
]).
+%% calls for parallel sending of last items
+-export([send_loop/1
+ ]).
+
-define(PROCNAME, ejabberd_mod_pubsub).
-define(PLUGIN_PREFIX, "node_").
-define(TREE_PREFIX, "nodetree_").
@@ -116,8 +120,10 @@
host,
access,
pep_mapping = [],
+ pep_sendlast_offline,
nodetree = ?STDTREE,
- plugins = [?STDNODE]}).
+ plugins = [?STDNODE],
+ send_loop}).
%%====================================================================
%% API
@@ -157,6 +163,7 @@ init([ServerHost, Opts]) ->
?DEBUG("pubsub init ~p ~p",[ServerHost,Opts]),
Host = gen_mod:get_opt_host(ServerHost, Opts, "pubsub.@HOST@"),
Access = gen_mod:get_opt(access_createnode, Opts, all),
+ PepOffline = gen_mod:get_opt(pep_sendlast_offline, Opts, false),
IQDisc = gen_mod:get_opt(iqdisc, Opts, one_queue),
mod_disco:register_feature(ServerHost, ?NS_PUBSUB),
ejabberd_hooks:add(disco_sm_identity, ServerHost, ?MODULE, disco_sm_identity, 75),
@@ -188,12 +195,15 @@ init([ServerHost, Opts]) ->
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {plugins, Plugins}),
ets:insert(gen_mod:get_module_proc(ServerHost, pubsub_state), {pep_mapping, PepMapping}),
init_nodes(Host, ServerHost),
- {ok, #state{host = Host,
+ State = #state{host = Host,
server_host = ServerHost,
access = Access,
pep_mapping = PepMapping,
+ pep_sendlast_offline = PepOffline,
nodetree = NodeTree,
- plugins = Plugins}}.
+ plugins = Plugins},
+ SendLoop = spawn(?MODULE, send_loop, [State]), %% TODO supervise that process
+ {ok, State#state{send_loop = SendLoop}}.
%% @spec (Host, ServerHost, Opts) -> Plugins
%% Host = mod_pubsub:host() Opts = [{Key,Value}]
@@ -308,6 +318,113 @@ update_database(Host) ->
ok
end.
+send_loop(State) ->
+ receive
+ {presence, JID, Pid} ->
+ Host = State#state.host,
+ ServerHost = State#state.server_host,
+ LJID = jlib:jid_tolower(JID),
+ BJID = jlib:jid_remove_resource(LJID),
+ %% for each node From is subscribed to
+ %% and if the node is so configured, send the last published item to From
+ lists:foreach(fun(Type) ->
+ {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
+ lists:foreach(
+ fun({Node, subscribed, SubJID}) ->
+ if (SubJID == LJID) or (SubJID == BJID) ->
+ case tree_action(Host, get_node, [Host, Node, JID]) of
+ #pubsub_node{options = Options} ->
+ case get_option(Options, send_last_published_item) of
+ on_sub_and_presence ->
+ send_items(Host, Node, SubJID, last);
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end;
+ true ->
+ % resource not concerned about that subscription
+ ok
+ end;
+ (_) ->
+ ok
+ end, Subscriptions)
+ end, State#state.plugins),
+ %% and force send the last PEP events published by its offline and local contacts
+ %% only if pubsub is explicitely configured for that.
+ %% this is a hack in a sense that PEP should only be based on presence
+ %% and is not able to "store" events of remote users (via s2s)
+ %% this makes that hack only work for local domain by now
+ if State#state.pep_sendlast_offline ->
+ case catch ejabberd_c2s:get_subscribed(Pid) of
+ Contacts when is_list(Contacts) ->
+ {User, Server, Resource} = jlib:jid_tolower(JID),
+ lists:foreach(
+ fun({U, S, R}) -> %% local contacts
+ case ejabberd_sm:get_user_resources(U, S) of
+ [] -> %% offline
+ case S of
+ ServerHost -> %% local contact, so we may have pep items
+ PeerJID = jlib:make_jid(U, S, R),
+ handle_cast({presence, User, Server, [Resource], PeerJID}, State);
+ _ -> %% remote contact, no items available
+ ok
+ end;
+ _ -> %% online
+ % this is already handled by presence probe
+ ok
+ end;
+ (_) -> %% remote contacts
+ % we can not do anything in any cases
+ ok
+ end, Contacts);
+ _ ->
+ ok
+ end;
+ true ->
+ ok
+ end,
+ send_loop(State);
+ {presence, User, Server, Resources, JID} ->
+ Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
+ Host = State#state.host,
+ ServerHost = State#state.server_host,
+ lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
+ case get_option(Options, send_last_published_item) of
+ on_sub_and_presence ->
+ lists:foreach(fun(Resource) ->
+ LJID = {User, Server, Resource},
+ case is_caps_notify(ServerHost, Node, LJID) of
+ true ->
+ Subscribed = case get_option(Options, access_model) of
+ open -> true;
+ presence -> true;
+ whitelist -> false; % subscribers are added manually
+ authorize -> false; % likewise
+ roster ->
+ Grps = get_option(Options, roster_groups_allowed, []),
+ {OU, OS, _} = Owner,
+ element(2, get_roster_info(OU, OS, LJID, Grps))
+ end,
+ if Subscribed ->
+ send_items(Owner, Node, LJID, last);
+ true ->
+ ok
+ end;
+ false ->
+ ok
+ end
+ end, Resources);
+ _ ->
+ ok
+ end
+ end, tree_action(Host, get_nodes, [Owner, JID])),
+ send_loop(State);
+ stop ->
+ ok
+ end.
+
%% -------
%% disco hooks handling functions
%%
@@ -415,9 +532,9 @@ disco_sm_items(Acc, From, To, Node, _Lang) ->
%% presence hooks handling functions
%%
-presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, _Pid) ->
+presence_probe(#jid{luser = User, lserver = Server, lresource = Resource} = JID, JID, Pid) ->
Proc = gen_mod:get_module_proc(Server, ?PROCNAME),
- gen_server:cast(Proc, {presence, JID}),
+ gen_server:cast(Proc, {presence, JID, Pid}),
gen_server:cast(Proc, {presence, User, Server, [Resource], JID});
presence_probe(#jid{luser = User, lserver = Server, lresource = Resource}, #jid{lserver = Host} = JID, _Pid) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
@@ -478,72 +595,14 @@ handle_call(stop, _From, State) ->
%% Description: Handling cast messages
%%--------------------------------------------------------------------
%% @private
-handle_cast({presence, JID}, State) ->
+handle_cast({presence, JID, Pid}, State) ->
%% A new resource is available. send last published items
- Host = State#state.host,
- LJID = jlib:jid_tolower(JID),
- %% for each node From is subscribed to
- %% and if the node is so configured, send the last published item to From
- spawn(fun() ->
- lists:foreach(fun(Type) ->
- {result, Subscriptions} = node_action(Type, get_entity_subscriptions, [Host, JID]),
- lists:foreach(
- fun({Node, subscribed, _SubJID}) ->
- case tree_action(Host, get_node, [Host, Node, JID]) of
- #pubsub_node{options = Options} ->
- case get_option(Options, send_last_published_item) of
- on_sub_and_presence ->
- send_items(Host, Node, LJID, last);
- _ ->
- ok
- end;
- _ ->
- ok
- end;
- (_) ->
- ok
- end, Subscriptions)
- end, State#state.plugins)
- end),
+ State#state.send_loop ! {presence, JID, Pid},
{noreply, State};
handle_cast({presence, User, Server, Resources, JID}, State) ->
%% A new resource is available. send last published PEP items
- Owner = jlib:jid_remove_resource(jlib:jid_tolower(JID)),
- Host = State#state.host,
- ServerHost = State#state.server_host,
- spawn(fun() ->
- lists:foreach(fun(#pubsub_node{nodeid = {_, Node}, options = Options}) ->
- case get_option(Options, send_last_published_item) of
- on_sub_and_presence ->
- lists:foreach(fun(Resource) ->
- LJID = {User, Server, Resource},
- case is_caps_notify(ServerHost, Node, LJID) of
- true ->
- Subscribed = case get_option(Options, access_model) of
- open -> true;
- presence -> true;
- whitelist -> false; % subscribers are added manually
- authorize -> false; % likewise
- roster ->
- Grps = get_option(Options, roster_groups_allowed, []),
- {OU, OS, _} = Owner,
- element(2, get_roster_info(OU, OS, LJID, Grps))
- end,
- if Subscribed ->
- send_items(Owner, Node, LJID, last);
- true ->
- ok
- end;
- false ->
- ok
- end
- end, Resources);
- _ ->
- ok
- end
- end, tree_action(Host, get_nodes, [Owner, JID]))
- end),
+ State#state.send_loop ! {presence, User, Server, Resources, JID},
{noreply, State};
handle_cast({remove_user, LUser, LServer}, State) ->
@@ -600,7 +659,8 @@ handle_info(_Info, State) ->
terminate(_Reason, #state{host = Host,
server_host = ServerHost,
nodetree = TreePlugin,
- plugins = Plugins}) ->
+ plugins = Plugins,
+ send_loop = SendLoop}) ->
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
ejabberd_router:unregister_route(Host),
case lists:member(?PEPNODE, Plugins) of
@@ -622,6 +682,7 @@ terminate(_Reason, #state{host = Host,
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB),
gen_iq_handler:remove_iq_handler(ejabberd_sm, ServerHost, ?NS_PUBSUB_OWNER),
mod_disco:unregister_feature(ServerHost, ?NS_PUBSUB),
+ SendLoop ! stop,
ok.
%%--------------------------------------------------------------------
@@ -820,7 +881,7 @@ iq_disco_items(Host, Item, From) ->
%% Note: Multiple Node Discovery not supported (mask on pubsub#type)
%% TODO this code is also back-compatible with pubsub v1.8 (for client issue)
%% TODO make it pubsub v1.12 compliant (breaks client compatibility ?)
- %% TODO That is, remove name attribute
+ %% TODO That is, remove name attribute (or node?, please check)
Action =
fun(#pubsub_node{type = Type}) ->
NodeItems = case node_call(Type, get_items, [Host, Node, From]) of
@@ -1563,7 +1624,7 @@ publish_item(Host, ServerHost, Node, Publisher, ItemId, Payload) ->
end
end,
ejabberd_hooks:run(pubsub_publish_item, ServerHost, [ServerHost, Node, Publisher, service_jid(Host), ItemId, Payload]),
- Reply = [],
+ Reply = [], %% TODO EJAB-909
case transaction(Host, Node, Action, sync_dirty) of
{error, ?ERR_ITEM_NOT_FOUND} ->
%% handles auto-create feature