aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPaweł Chmielowski <pawel@process-one.net>2022-05-02 15:05:55 +0200
committerPaweł Chmielowski <pawel@process-one.net>2022-05-02 15:05:55 +0200
commit6f11210edd7623a6bb19a33e5d8fb8100facad20 (patch)
tree4e00bb6bd1a7539300f9603163f46161cc115196
parentSimplify rules for choosing jiffy version (diff)
Implement batch operations in mnesia backend
-rw-r--r--src/ejabberd_admin.erl27
-rw-r--r--src/mod_mam.erl28
-rw-r--r--src/mod_mam_mnesia.erl39
-rw-r--r--src/mod_offline_mnesia.erl44
4 files changed, 122 insertions, 16 deletions
diff --git a/src/ejabberd_admin.erl b/src/ejabberd_admin.erl
index cbad89a9c..ccd72a61f 100644
--- a/src/ejabberd_admin.erl
+++ b/src/ejabberd_admin.erl
@@ -715,13 +715,26 @@ delete_old_messages(Days) ->
delete_old_messages_batch(Server, Days, BatchSize, Rate) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, mod_offline),
- case ejabberd_batch:register_task({spool, LServer}, 0, Rate, {LServer, Days, BatchSize},
- fun({L, Da, B} = S) ->
- case Mod:remove_old_messages_batch(L, Da, B) of
- {ok, Count} ->
- {ok, S, Count};
- {error, _} = E ->
- E
+ case ejabberd_batch:register_task({spool, LServer}, 0, Rate, {LServer, Days, BatchSize, none},
+ fun({L, Da, B, IS} = S) ->
+ case {erlang:function_exported(Mod, remove_old_messages_batch, 3),
+ erlang:function_exported(Mod, remove_old_messages_batch, 4)} of
+ {true, _} ->
+ case Mod:remove_old_messages_batch(L, Da, B) of
+ {ok, Count} ->
+ {ok, S, Count};
+ {error, _} = E ->
+ E
+ end;
+ {_, true} ->
+ case Mod:remove_old_messages_batch(L, Da, B, IS) of
+ {ok, IS2, Count} ->
+ {ok, {L, Da, B, IS2}, Count};
+ {error, _} = E ->
+ E
+ end;
+ _ ->
+ {error, not_implemented_for_backend}
end
end) of
ok ->
diff --git a/src/mod_mam.erl b/src/mod_mam.erl
index fd292736b..0288e2d52 100644
--- a/src/mod_mam.erl
+++ b/src/mod_mam.erl
@@ -578,13 +578,27 @@ delete_old_messages_batch(Server, Type, Days, BatchSize, Rate) when Type == <<"c
TypeA = misc:binary_to_atom(Type),
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize},
- fun({L, T, St, B} = S) ->
- case Mod:delete_old_messages_batch(L, St, T, B) of
- {ok, Count} ->
- {ok, S, Count};
- {error, _} = E ->
- E
+
+ case ejabberd_batch:register_task({mam, LServer}, 0, Rate, {LServer, TypeA, TimeStamp, BatchSize, none},
+ fun({L, T, St, B, IS} = S) ->
+ case {erlang:function_exported(Mod, remove_old_messages_batch, 4),
+ erlang:function_exported(Mod, remove_old_messages_batch, 5)} of
+ {true, _} ->
+ case Mod:delete_old_messages_batch(L, St, T, B) of
+ {ok, Count} ->
+ {ok, S, Count};
+ {error, _} = E ->
+ E
+ end;
+ {_, true} ->
+ case Mod:remove_old_messages_batch(L, St, T, B, IS) of
+ {ok, IS2, Count} ->
+ {ok, {L, St, T, B, IS2}, Count};
+ {error, _} = E ->
+ E
+ end;
+ _ ->
+ {error, not_implemented_for_backend}
end
end) of
ok ->
diff --git a/src/mod_mam_mnesia.erl b/src/mod_mam_mnesia.erl
index dc5898fca..0fd459c67 100644
--- a/src/mod_mam_mnesia.erl
+++ b/src/mod_mam_mnesia.erl
@@ -29,7 +29,7 @@
%% API
-export([init/2, remove_user/2, remove_room/3, delete_old_messages/3,
extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/6, remove_from_archive/3,
- is_empty_for_user/2, is_empty_for_room/3]).
+ is_empty_for_user/2, is_empty_for_room/3, remove_old_messages_batch/5]).
-include_lib("stdlib/include/ms_transform.hrl").
-include_lib("xmpp/include/xmpp.hrl").
@@ -131,6 +131,43 @@ delete_old_user_messages(User, TimeStamp, Type) ->
Err
end.
+delete_batch('$end_of_table', _LServer, _TS, _Type, Num) ->
+ {Num, '$end_of_table'};
+delete_batch(LastUS, _LServer, _TS, _Type, 0) ->
+ {0, LastUS};
+delete_batch(none, LServer, TS, Type, Num) ->
+ delete_batch(mnesia:first(archive_msg), LServer, TS, Type, Num);
+delete_batch({_, LServer2} = LastUS, LServer, TS, Type, Num) when LServer /= LServer2 ->
+ delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Num);
+delete_batch(LastUS, LServer, TS, Type, Num) ->
+ Left =
+ lists:foldl(
+ fun(_, 0) ->
+ 0;
+ (#archive_msg{timestamp = TS2, type = Type2} = O, Num2) when TS2 < TS, (Type == all orelse Type == Type2) ->
+ mnesia:delete_object(O),
+ Num2 - 1;
+ (_, Num2) ->
+ Num2
+ end, Num, mnesia:wread({archive_msg, LastUS})),
+ case Left of
+ 0 -> {0, LastUS};
+ _ -> delete_batch(mnesia:next(archive_msg, LastUS), LServer, TS, Type, Left)
+ end.
+
+remove_old_messages_batch(LServer, TimeStamp, Type, Batch, LastUS) ->
+ R = mnesia:transaction(
+ fun() ->
+ {Num, NextUS} = delete_batch(LastUS, LServer, TimeStamp, Type, Batch),
+ {Batch - Num, NextUS}
+ end),
+ case R of
+ {atomic, {Num, State}} ->
+ {ok, State, Num};
+ {aborted, Err} ->
+ {error, Err}
+ end.
+
extended_fields() ->
[].
diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl
index 34893cd97..28a105dcf 100644
--- a/src/mod_offline_mnesia.erl
+++ b/src/mod_offline_mnesia.erl
@@ -29,7 +29,8 @@
-export([init/2, store_message/1, pop_messages/2, remove_expired_messages/1,
remove_old_messages/2, remove_user/2, read_message_headers/2,
read_message/3, remove_message/3, read_all_messages/2,
- remove_all_messages/2, count_messages/2, import/1]).
+ remove_all_messages/2, count_messages/2, import/1,
+ remove_old_messages_batch/4]).
-export([need_transform/1, transform/1]).
-include_lib("xmpp/include/xmpp.hrl").
@@ -97,6 +98,47 @@ remove_old_messages(Days, _LServer) ->
end,
mnesia:transaction(F).
+delete_batch('$end_of_table', _LServer, _TS, Num) ->
+ {Num, '$end_of_table'};
+delete_batch(LastUS, _LServer, _TS, 0) ->
+ {0, LastUS};
+delete_batch(none, LServer, TS, Num) ->
+ delete_batch(mnesia:first(offline_msg), LServer, TS, Num);
+delete_batch({_, LServer2} = LastUS, LServer, TS, Num) when LServer /= LServer2 ->
+ delete_batch(mnesia:next(offline_msg, LastUS), LServer, TS, Num);
+delete_batch(LastUS, LServer, TS, Num) ->
+ Left =
+ lists:foldl(
+ fun(_, 0) ->
+ 0;
+ (#offline_msg{timestamp = TS2} = O, Num2) when TS2 < TS ->
+ mnesia:delete_object(O),
+ Num2 - 1;
+ (_, Num2) ->
+ Num2
+ end, Num, mnesia:wread({offline_msg, LastUS})),
+ case Left of
+ 0 -> {0, LastUS};
+ _ -> delete_batch(mnesia:next(offline_msg, LastUS), LServer, TS, Left)
+ end.
+
+remove_old_messages_batch(LServer, Days, Batch, LastUS) ->
+ S = erlang:system_time(second) - 60 * 60 * 24 * Days,
+ MegaSecs1 = S div 1000000,
+ Secs1 = S rem 1000000,
+ TimeStamp = {MegaSecs1, Secs1, 0},
+ R = mnesia:transaction(
+ fun() ->
+ {Num, NextUS} = delete_batch(LastUS, LServer, TimeStamp, Batch),
+ {Batch - Num, NextUS}
+ end),
+ case R of
+ {atomic, {Num, State}} ->
+ {ok, State, Num};
+ {aborted, Err} ->
+ {error, Err}
+ end.
+
remove_user(LUser, LServer) ->
US = {LUser, LServer},
F = fun () -> mnesia:delete({offline_msg, US}) end,