Add rate limited delete_old_mam_messages command
diff --git a/src/ejabberd_batch.erl b/src/ejabberd_batch.erl
new file mode 100644
index 000000000..05750164c
--- /dev/null
+++ b/src/ejabberd_batch.erl
@@ -0,0 +1,205 @@
+%%% File : ejabberd_batch.erl
+%%% Author : Paweł Chmielowski <pawel@process-one.net>
+%%% Purpose : Batch tasks manager
+%%% Created : 8 mar 2022 by Paweł Chmielowski <pawel@process-one.net>
+%%% ejabberd, Copyright (C) 2002-2022 ProcessOne
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% General Public License for more details.
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%% API
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+-export([register_task/5, task_status/1, abort_task/1]).
+-define(SERVER, ?MODULE).
+-record(state, {tasks = #{}}).
+-record(task, {state = not_started, pid, steps, done_steps}).
+%%% API
+%% @doc Spawns the server and registers the local name (unique)
+-spec(start_link() ->
+ {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+register_task(Type, Steps, Rate, JobState, JobFun) ->
+ gen_server:call(?MODULE, {register_task, Type, Steps, Rate, JobState, JobFun}).
+task_status(Type) ->
+ gen_server:call(?MODULE, {task_status, Type}).
+abort_task(Type) ->
+ gen_server:call(?MODULE, {abort_task, Type}).
+%%% gen_server callbacks
+%% @private
+%% @doc Initializes the server
+-spec(init(Args :: term()) ->
+ {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term()} | ignore).
+init([]) ->
+ {ok, #state{}}.
+%% @private
+%% @doc Handling call messages
+-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+ State :: #state{}) ->
+ {reply, Reply :: term(), NewState :: #state{}} |
+ {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_call({register_task, Type, Steps, Rate, JobState, JobFun}, _From, #state{tasks = Tasks} = State) ->
+ case maps:get(Type, Tasks, #task{}) of
+ #task{state = S} when S == completed; S == not_started; S == aborted; S == failed ->
+ Pid = spawn(fun() -> work_loop(Type, JobState, JobFun, Rate, erlang:monotonic_time(second), 0) end),
+ Tasks2 = maps:put(Type, #task{state = working, pid = Pid, steps = Steps, done_steps = 0}, Tasks),
+ {reply, ok, #state{tasks = Tasks2}};
+ #task{state = working} ->
+ {reply, {error, in_progress}, State}
+ end;
+handle_call({task_status, Type}, _From, #state{tasks = Tasks} = State) ->
+ case maps:get(Type, Tasks, none) of
+ none ->
+ {reply, not_started, State};
+ #task{state = not_started} ->
+ {reply, not_started, State};
+ #task{state = failed, done_steps = Steps, pid = Error} ->
+ {reply, {failed, Steps, Error}, State};
+ #task{state = aborted, done_steps = Steps} ->
+ {reply, {aborted, Steps}, State};
+ #task{state = working, done_steps = Steps} ->
+ {reply, {working, Steps}, State};
+ #task{state = completed, done_steps = Steps} ->
+ {reply, {completed, Steps}, State}
+ end;
+handle_call({abort_task, Type}, _From, #state{tasks = Tasks} = State) ->
+ case maps:get(Type, Tasks, none) of
+ #task{state = working, pid = Pid} = T ->
+ Pid ! abort,
+ Tasks2 = maps:put(Type, T#task{state = aborted, pid = none}, Tasks),
+ {reply, aborted, State#state{tasks = Tasks2}};
+ _ ->
+ {reply, not_started, State}
+ end;
+handle_call(_Request, _From, State = #state{}) ->
+ {reply, ok, State}.
+%% @private
+%% @doc Handling cast messages
+-spec(handle_cast(Request :: term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_cast({task_finished, Type, Pid}, #state{tasks = Tasks} = State) ->
+ case maps:get(Type, Tasks, none) of
+ #task{state = working, pid = Pid2} = T when Pid == Pid2 ->
+ Tasks2 = maps:put(Type, T#task{state = completed, pid = none}, Tasks),
+ {noreply, State#state{tasks = Tasks2}};
+ _ ->
+ {noreply, State}
+ end;
+handle_cast({task_progress, Type, Pid, Count}, #state{tasks = Tasks} = State) ->
+ case maps:get(Type, Tasks, none) of
+ #task{state = working, pid = Pid2, done_steps = Steps} = T when Pid == Pid2 ->
+ Tasks2 = maps:put(Type, T#task{done_steps = Steps + Count}, Tasks),
+ {noreply, State#state{tasks = Tasks2}};
+ _ ->
+ {noreply, State}
+ end;
+handle_cast({task_error, Type, Pid, Error}, #state{tasks = Tasks} = State) ->
+ case maps:get(Type, Tasks, none) of
+ #task{state = working, pid = Pid2} = T when Pid == Pid2 ->
+ Tasks2 = maps:put(Type, T#task{state = failed, pid = Error}, Tasks),
+ {noreply, State#state{tasks = Tasks2}};
+ _ ->
+ {noreply, State}
+ end;
+handle_cast(_Request, State = #state{}) ->
+ {noreply, State}.
+%% @private
+%% @doc Handling all non call/cast messages
+-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
+ {noreply, NewState :: #state{}} |
+ {noreply, NewState :: #state{}, timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: #state{}}).
+handle_info(_Info, State = #state{}) ->
+ {noreply, State}.
+%% @private
+%% @doc This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+ State :: #state{}) -> term()).
+terminate(_Reason, _State = #state{}) ->
+ ok.
+%% @private
+%% @doc Convert process state when code is changed
+-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
+ Extra :: term()) ->
+ {ok, NewState :: #state{}} | {error, Reason :: term()}).
+code_change(_OldVsn, State = #state{}, _Extra) ->
+ {ok, State}.
+%%% Internal functions
+work_loop(Task, JobState, JobFun, Rate, StartDate, CurrentProgress) ->
+ try JobFun(JobState) of
+ {ok, _NewState, 0} ->
+ gen_server:cast(?MODULE, {task_finished, Task, self()});
+ {ok, NewState, Count} ->
+ gen_server:cast(?MODULE, {task_progress, Task, self(), Count}),
+ NewProgress = CurrentProgress + Count,
+ TimeSpent = erlang:monotonic_time(second) - StartDate,
+ SleepTime = max(0, NewProgress/Rate*60 - TimeSpent),
+ receive
+ abort -> ok
+ after floor(SleepTime*1000) ->
+ work_loop(Task, NewState, JobFun, Rate, StartDate, NewProgress)
+ end;
+ {error, Error} ->
+ gen_server:cast(?MODULE, {task_error, Task, self(), Error})
+ catch _:_ ->
+ gen_server:cast(?MODULE, {task_error, Task, self(), internal_error})
+ end.
diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl
index a97c7784b..e15e658c4 100644
--- a/src/ejabberd_sup.erl
+++ b/src/ejabberd_sup.erl
@@ -66,7 +66,8 @@ init([]) ->
supervisor(ejabberd_gen_mod_sup, gen_mod),
- worker(ejabberd_oauth)]}}.
+ worker(ejabberd_oauth),
+ worker(ejabberd_batch)]}}.
-spec stop_child(atom()) -> ok.
stop_child(Name) ->
diff --git a/src/mod_mam.erl b/src/mod_mam.erl
index 59940ec81..ffbd1d4eb 100644
--- a/src/mod_mam.erl
+++ b/src/mod_mam.erl
@@ -43,7 +43,8 @@
get_room_config/4, set_room_option/3, offline_message/1, export/1,
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
- process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7]).
+ process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/7,
+ delete_old_messages_batch/5, delete_old_messages_status/1, delete_old_messages_abort/1]).
@@ -568,6 +569,56 @@ message_is_archived(false, #{lserver := LServer}, Pkt) ->
+delete_old_messages_batch(Server, Type, Days, BatchSize, Rate) when Type == <<"chat">>;
+ Type == <<"groupchat">>;
+ Type == <<"all">> ->
+ CurrentTime = make_id(),
+ Diff = Days * 24 * 60 * 60 * 1000000,
+ TimeStamp = misc:usec_to_now(CurrentTime - Diff),
+ TypeA = misc:binary_to_atom(Type),
+ LServer = jid:nameprep(Server),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize},
+ fun({L, T, St, B} = S) ->
+ case Mod:delete_old_messages_batch(L, St, T, B) of
+ {ok, Count} ->
+ {ok, S, Count};
+ {error, _} = E ->
+ E
+ end
+ end) of
+ ok ->
+ {ok, ""};
+ {error, in_progress} ->
+ {error, "Operation in progress"}
+ end.
+delete_old_messages_status(Server) ->
+ LServer = jid:nameprep(Server),
+ Msg = case ejabberd_batch:task_status({mam, LServer}) of
+ not_started ->
+ "Operation not started";
+ {failed, Steps, Error} ->
+ io_lib:format("Operation failed after deleting ~p messages with error ~p",
+ [Steps, misc:format_val(Error)]);
+ {aborted, Steps} ->
+ io_lib:format("Operation was aborted after deleting ~p messages",
+ [Steps]);
+ {working, Steps} ->
+ io_lib:format("Operation in progress, deleted ~p messages",
+ [Steps]);
+ {completed, Steps} ->
+ io_lib:format("Operation was completed after deleting ~p messages",
+ [Steps])
+ end,
+ lists:flatten(Msg).
+delete_old_messages_abort(Server) ->
+ LServer = jid:nameprep(Server),
+ case ejabberd_batch:abort_task({mam, LServer}) of
+ aborted -> "Operation aborted";
+ not_started -> "No task running"
+ end.
delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
TypeBin == <<"groupchat">>;
TypeBin == <<"all">> ->
@@ -1379,6 +1430,39 @@ get_commands_spec() ->
args_example = [<<"all">>, 31],
args = [{type, binary}, {days, integer}],
result = {res, rescode}},
+ #ejabberd_commands{name = delete_old_mam_messages_batch, tags = [purge],
+ desc = "Delete MAM messages older than DAYS",
+ longdesc = "Valid message TYPEs: "
+ "\"chat\", \"groupchat\", \"all\".",
+ module = ?MODULE, function = delete_old_messages_batch,
+ args_desc = ["Name of host where messages should be deleted",
+ "Type of messages to delete (chat, groupchat, all)",
+ "Days to keep messages",
+ "Number of messages to delete per batch",
+ "Desired rate of messages to delete per minute"],
+ args_example = [<<"localhost">>, <<"all">>, 31, 1000, 10000],
+ args = [{host, binary}, {type, binary}, {days, integer}, {batch_size, integer}, {rate, integer}],
+ result = {res, restuple},
+ result_desc = "Result tuple",
+ result_example = {ok, <<"Removal of 5000 messages in progress">>}},
+ #ejabberd_commands{name = delete_old_mam_messages_status, tags = [purge],
+ desc = "Status of delete old MAM messages operation",
+ module = ?MODULE, function = delete_old_messages_status,
+ args_desc = ["Name of host where messages should be deleted"],
+ args_example = [<<"localhost">>],
+ args = [{host, binary}],
+ result = {status, string},
+ result_desc = "Status test",
+ result_example = {"Operation in progress, delete 5000 messages"}},
+ #ejabberd_commands{name = abort_delete_old_mam_messages, tags = [purge],
+ desc = "Abort currently running delete old MAM messages operation",
+ module = ?MODULE, function = delete_old_messages_abort,
+ args_desc = ["Name of host where operation should be aborted"],
+ args_example = [<<"localhost">>],
+ args = [{host, binary}],
+ result = {status, string},
+ result_desc = "Status text",
+ result_example = {"Operation aborted"}},
#ejabberd_commands{name = remove_mam_for_user, tags = [mam],
desc = "Remove mam archive for user",
module = ?MODULE, function = remove_mam_for_user,
diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl
index 8e803587e..9eb9716fb 100644
--- a/src/mod_mam_sql.erl
+++ b/src/mod_mam_sql.erl
@@ -30,7 +30,8 @@
%% API
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3,
- is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]).
+ is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6,
+ delete_old_messages_batch/4, count_messages_to_delete/3]).
@@ -71,6 +72,56 @@ remove_from_archive(LUser, LServer, WithJid) ->
_ -> ok
+count_messages_to_delete(ServerHost, TimeStamp, Type) ->
+ TS = misc:now_to_usec(TimeStamp),
+ Res =
+ case Type of
+ all ->
+ ejabberd_sql:sql_query(
+ ServerHost,
+ ?SQL("select count(*) from archive"
+ " where timestamp < %(TS)d and %(ServerHost)H"));
+ _ ->
+ SType = misc:atom_to_binary(Type),
+ ejabberd_sql:sql_query(
+ ServerHost,
+ ?SQL("select @(count(*))d from archive"
+ " where timestamp < %(TS)d"
+ " and kind=%(SType)s"
+ " and %(ServerHost)H"))
+ end,
+ case Res of
+ {selected, [Count]} ->
+ {ok, Count};
+ _ ->
+ error
+ end.
+delete_old_messages_batch(ServerHost, TimeStamp, Type, Batch) ->
+ TS = misc:now_to_usec(TimeStamp),
+ Res =
+ case Type of
+ all ->
+ ejabberd_sql:sql_query(
+ ServerHost,
+ ?SQL("delete from archive"
+ " where timestamp < %(TS)d and %(ServerHost)H limit %(Batch)d"));
+ _ ->
+ SType = misc:atom_to_binary(Type),
+ ejabberd_sql:sql_query(
+ ServerHost,
+ ?SQL("delete from archive"
+ " where timestamp < %(TS)d"
+ " and kind=%(SType)s"
+ " and %(ServerHost)H limit %(Batch)d"))
+ end,
+ case Res of
+ {updated, Count} ->
+ {ok, Count};
+ {error, _} = Error ->
+ Error
+ end.
delete_old_messages(ServerHost, TimeStamp, Type) ->
TS = misc:now_to_usec(TimeStamp),
case Type of