aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHolger Weiss <holger@zedat.fu-berlin.de>2021-08-22 12:44:50 +0200
committerHolger Weiss <holger@zedat.fu-berlin.de>2021-08-22 12:44:50 +0200
commit8d5025076f53abc0474c86a0f879ff1736714844 (patch)
tree06b8c8d9c4c4275d42e208628ad47aafdaf5e468
parentPubSub: Optimize publishing on large nodes (SQL) (diff)
PubSub: Add delete_old_pubsub_items command
Add a command for keeping only the specified number of items on each node and removing all older items. This might be especially useful if nodes may be configured to have no 'max_items' limit. Thanks to Ammonit Measurement GmbH for sponsoring this work.
-rw-r--r--src/gen_pubsub_node.erl5
-rw-r--r--src/gen_pubsub_nodetree.erl3
-rw-r--r--src/mod_pubsub.erl53
-rw-r--r--src/node_flat.erl13
-rw-r--r--src/node_flat_sql.erl22
-rw-r--r--src/node_pep.erl6
-rw-r--r--src/node_pep_sql.erl6
-rw-r--r--src/nodetree_tree.erl11
-rw-r--r--src/nodetree_tree_sql.erl31
-rw-r--r--src/nodetree_virtual.erl6
10 files changed, 146 insertions, 10 deletions
diff --git a/src/gen_pubsub_node.erl b/src/gen_pubsub_node.erl
index 5bdebdfc6..625e490fc 100644
--- a/src/gen_pubsub_node.erl
+++ b/src/gen_pubsub_node.erl
@@ -123,6 +123,11 @@
{error, stanza_error()}.
-callback remove_extra_items(NodeIdx :: nodeIdx(),
+ Max_Items :: unlimited | non_neg_integer()) ->
+ {result, {[itemId()], [itemId()]}
+ }.
+
+-callback remove_extra_items(NodeIdx :: nodeIdx(),
Max_Items :: unlimited | non_neg_integer(),
ItemIds :: [itemId()]) ->
{result, {[itemId()], [itemId()]}
diff --git a/src/gen_pubsub_nodetree.erl b/src/gen_pubsub_nodetree.erl
index 5a24db2c4..b6b73b8cb 100644
--- a/src/gen_pubsub_nodetree.erl
+++ b/src/gen_pubsub_nodetree.erl
@@ -67,6 +67,9 @@
-callback get_nodes(Host :: host())->
[pubsubNode()].
+-callback get_all_nodes(Host :: host()) ->
+ [pubsubNode()].
+
-callback get_parentnodes(Host :: host(),
NodeId :: nodeId(),
From :: jid:jid()) ->
diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl
index c6f485077..8792b2ab9 100644
--- a/src/mod_pubsub.erl
+++ b/src/mod_pubsub.erl
@@ -45,6 +45,7 @@
-include("mod_roster.hrl").
-include("translate.hrl").
-include("ejabberd_stacktrace.hrl").
+-include("ejabberd_commands.hrl").
-define(STDTREE, <<"tree">>).
-define(STDNODE, <<"flat">>).
@@ -93,6 +94,9 @@
handle_call/3, handle_cast/2, handle_info/2, mod_doc/0,
terminate/2, code_change/3, depends/2, mod_opt_type/1, mod_options/1]).
+%% ejabberd commands
+-export([get_commands_spec/0, delete_old_items/1]).
+
-export([route/1]).
%%====================================================================
@@ -337,6 +341,7 @@ init([ServerHost|_]) ->
false ->
ok
end,
+ ejabberd_commands:register_commands(?MODULE, get_commands_spec()),
NodeTree = config(ServerHost, nodetree),
Plugins = config(ServerHost, plugins),
PepMapping = config(ServerHost, pep_mapping),
@@ -806,7 +811,13 @@ terminate(_Reason,
gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_COMMANDS),
terminate_plugins(Host, ServerHost, Plugins, TreePlugin),
ejabberd_router:unregister_route(Host)
- end, Hosts).
+ end, Hosts),
+ case gen_mod:is_loaded_elsewhere(ServerHost, ?MODULE) of
+ false ->
+ ejabberd_commands:unregister_commands(get_commands_spec());
+ true ->
+ ok
+ end.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
@@ -4142,6 +4153,46 @@ purge_offline(Host, LJID, Node) ->
{error, xmpp:err_internal_server_error(Txt, Lang)}
end.
+-spec delete_old_items(non_neg_integer()) -> ok | error.
+delete_old_items(N) ->
+ Results = lists:flatmap(
+ fun(Host) ->
+ case tree_action(Host, get_all_nodes, [Host]) of
+ Nodes when is_list(Nodes) ->
+ lists:map(
+ fun(#pubsub_node{id = Nidx, type = Type}) ->
+ case node_action(Host, Type,
+ remove_extra_items,
+ [Nidx , N]) of
+ {result, _} ->
+ ok;
+ {error, _} ->
+ error
+ end
+ end, Nodes);
+ _ ->
+ error
+ end
+ end, ejabberd_option:hosts()),
+ case lists:member(error, Results) of
+ true ->
+ error;
+ false ->
+ ok
+ end.
+
+-spec get_commands_spec() -> [ejabberd_commands()].
+get_commands_spec() ->
+ [#ejabberd_commands{name = delete_old_pubsub_items, tags = [purge],
+ desc = "Keep only NUMBER of PubSub items per node",
+ module = ?MODULE, function = delete_old_items,
+ args_desc = ["Number of items to keep per node"],
+ args = [{number, integer}],
+ result = {res, rescode},
+ result_desc = "0 if command failed, 1 when succeeded",
+ args_example = [1000],
+ result_example = ok}].
+
-spec mod_opt_type(atom()) -> econf:validator().
mod_opt_type(access_createnode) ->
econf:acl();
diff --git a/src/node_flat.erl b/src/node_flat.erl
index fe08be258..97f149f9c 100644
--- a/src/node_flat.erl
+++ b/src/node_flat.erl
@@ -39,7 +39,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
- publish_item/7, delete_item/4, remove_extra_items/3,
+ publish_item/7, delete_item/4,
+ remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@@ -403,6 +404,16 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
end
end.
+remove_extra_items(Nidx, MaxItems) ->
+ {result, States} = get_states(Nidx),
+ Records = States ++ mnesia:read({pubsub_orphan, Nidx}),
+ ItemIds = lists:flatmap(fun(#pubsub_state{items = Is}) ->
+ Is;
+ (#pubsub_orphan{items = Is}) ->
+ Is
+ end, Records),
+ remove_extra_items(Nidx, MaxItems, ItemIds).
+
%% @doc <p>This function is used to remove extra items, most notably when the
%% maximum number of items has been reached.</p>
%% <p>This function is used internally by the core PubSub module, as no
diff --git a/src/node_flat_sql.erl b/src/node_flat_sql.erl
index 5033eda51..724958eb1 100644
--- a/src/node_flat_sql.erl
+++ b/src/node_flat_sql.erl
@@ -40,9 +40,10 @@
-include("translate.hrl").
-export([init/3, terminate/2, options/0, features/0,
- create_node_permission/6, create_node/2, delete_node/1,
- purge_node/2, subscribe_node/8, unsubscribe_node/4,
- publish_item/7, delete_item/4, remove_extra_items/3,
+ create_node_permission/6, create_node/2, delete_node/1, purge_node/2,
+ subscribe_node/8, unsubscribe_node/4,
+ publish_item/7, delete_item/4,
+ remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@@ -273,6 +274,9 @@ publish_item(Nidx, Publisher, PublishModel, MaxItems, ItemId, Payload,
end
end.
+remove_extra_items(Nidx, MaxItems) ->
+ remove_extra_items(Nidx, MaxItems, itemids(Nidx)).
+
remove_extra_items(_Nidx, unlimited, ItemIds) ->
{result, {ItemIds, []}};
remove_extra_items(Nidx, MaxItems, ItemIds) ->
@@ -863,6 +867,18 @@ first_in_list(Pred, [H | T]) ->
_ -> first_in_list(Pred, T)
end.
+itemids(Nidx) ->
+ case catch
+ ejabberd_sql:sql_query_t(
+ ?SQL("select @(itemid)s from pubsub_item where "
+ "nodeid=%(Nidx)d order by modification desc"))
+ of
+ {selected, RItems} ->
+ [ItemId || {ItemId} <- RItems];
+ _ ->
+ []
+ end.
+
itemids(Nidx, {_U, _S, _R} = JID) ->
SJID = encode_jid(JID),
SJIDLike = <<(encode_jid_like(JID))/binary, "/%">>,
diff --git a/src/node_pep.erl b/src/node_pep.erl
index 58c3050a0..44388ca31 100644
--- a/src/node_pep.erl
+++ b/src/node_pep.erl
@@ -35,7 +35,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
- publish_item/7, delete_item/4, remove_extra_items/3,
+ publish_item/7, delete_item/4,
+ remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@@ -135,6 +136,9 @@ publish_item(Nidx, Publisher, Model, MaxItems, ItemId, Payload, PubOpts) ->
node_flat:publish_item(Nidx, Publisher, Model, MaxItems, ItemId,
Payload, PubOpts).
+remove_extra_items(Nidx, MaxItems) ->
+ node_flat:remove_extra_items(Nidx, MaxItems).
+
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat:remove_extra_items(Nidx, MaxItems, ItemIds).
diff --git a/src/node_pep_sql.erl b/src/node_pep_sql.erl
index 7b21aa901..c0cf2b166 100644
--- a/src/node_pep_sql.erl
+++ b/src/node_pep_sql.erl
@@ -37,7 +37,8 @@
-export([init/3, terminate/2, options/0, features/0,
create_node_permission/6, create_node/2, delete_node/1,
purge_node/2, subscribe_node/8, unsubscribe_node/4,
- publish_item/7, delete_item/4, remove_extra_items/3,
+ publish_item/7, delete_item/4,
+ remove_extra_items/2, remove_extra_items/3,
get_entity_affiliations/2, get_node_affiliations/1,
get_affiliation/2, set_affiliation/3,
get_entity_subscriptions/2, get_node_subscriptions/1,
@@ -92,6 +93,9 @@ publish_item(Nidx, Publisher, Model, MaxItems, ItemId, Payload, PubOpts) ->
node_flat_sql:publish_item(Nidx, Publisher, Model, MaxItems, ItemId,
Payload, PubOpts).
+remove_extra_items(Nidx, MaxItems) ->
+ node_flat_sql:remove_extra_items(Nidx, MaxItems).
+
remove_extra_items(Nidx, MaxItems, ItemIds) ->
node_flat_sql:remove_extra_items(Nidx, MaxItems, ItemIds).
diff --git a/src/nodetree_tree.erl b/src/nodetree_tree.erl
index fe15f3323..853c1fb93 100644
--- a/src/nodetree_tree.erl
+++ b/src/nodetree_tree.erl
@@ -46,7 +46,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
- get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
+ get_nodes/1, get_all_nodes/1,
+ get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@@ -98,6 +99,14 @@ get_nodes(Host, Limit) ->
{Nodes, _} -> Nodes
end.
+get_all_nodes({_U, _S, _R} = Owner) ->
+ Host = jid:tolower(jid:remove_resource(Owner)),
+ mnesia:match_object(#pubsub_node{nodeid = {Host, '_'}, _ = '_'});
+get_all_nodes(Host) ->
+ mnesia:match_object(#pubsub_node{nodeid = {Host, '_'}, _ = '_'})
+ ++ mnesia:match_object(#pubsub_node{nodeid = {{'_', Host, '_'}, '_'},
+ _ = '_'}).
+
get_parentnodes(Host, Node, _From) ->
case catch mnesia:read({pubsub_node, {Host, Node}}) of
[Record] when is_record(Record, pubsub_node) ->
diff --git a/src/nodetree_tree_sql.erl b/src/nodetree_tree_sql.erl
index d68355202..402c50901 100644
--- a/src/nodetree_tree_sql.erl
+++ b/src/nodetree_tree_sql.erl
@@ -45,7 +45,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
- get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
+ get_nodes/1, get_all_nodes/1,
+ get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@@ -165,6 +166,34 @@ get_nodes(Host, Limit) ->
[]
end.
+get_all_nodes({_U, _S, _R} = JID) ->
+ SubKey = jid:tolower(JID),
+ GenKey = jid:remove_resource(SubKey),
+ EncKey = node_flat_sql:encode_jid(GenKey),
+ Pattern = <<(node_flat_sql:encode_jid_like(GenKey))/binary, "/%">>,
+ case ejabberd_sql:sql_query_t(
+ ?SQL("select @(node)s, @(parent)s, @(plugin)s, @(nodeid)d "
+ "from pubsub_node where host=%(EncKey)s "
+ "or host like %(Pattern)s %ESCAPE")) of
+ {selected, RItems} ->
+ [raw_to_node(GenKey, Item) || Item <- RItems];
+ _ ->
+ []
+ end;
+get_all_nodes(Host) ->
+ Pattern1 = <<"%@", Host/binary>>,
+ Pattern2 = <<"%@", Host/binary, "/%">>,
+ case ejabberd_sql:sql_query_t(
+ ?SQL("select @(node)s, @(parent)s, @(plugin)s, @(nodeid)d "
+ "from pubsub_node where host=%(Host)s "
+ "or host like %(Pattern1)s "
+ "or host like %(Pattern2)s %ESCAPE")) of
+ {selected, RItems} ->
+ [raw_to_node(Host, Item) || Item <- RItems];
+ _ ->
+ []
+ end.
+
get_parentnodes(Host, Node, _From) ->
case get_node(Host, Node) of
Record when is_record(Record, pubsub_node) ->
diff --git a/src/nodetree_virtual.erl b/src/nodetree_virtual.erl
index 9cf7a80ca..c0274a795 100644
--- a/src/nodetree_virtual.erl
+++ b/src/nodetree_virtual.erl
@@ -38,7 +38,8 @@
-export([init/3, terminate/2, options/0, set_node/1,
get_node/3, get_node/2, get_node/1, get_nodes/2,
- get_nodes/1, get_parentnodes/3, get_parentnodes_tree/3,
+ get_nodes/1, get_all_nodes/1,
+ get_parentnodes/3, get_parentnodes_tree/3,
get_subnodes/3, get_subnodes_tree/3, create_node/6,
delete_node/2]).
@@ -71,6 +72,9 @@ get_nodes(Host) ->
get_nodes(_Host, _Limit) ->
[].
+get_all_nodes(_Host) ->
+ [].
+
get_parentnodes(_Host, _Node, _From) ->
[].