diff options
Diffstat (limited to 'src/mod_offline_mnesia.erl')
-rw-r--r-- | src/mod_offline_mnesia.erl | 232 |
1 files changed, 232 insertions, 0 deletions
diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl new file mode 100644 index 000000000..6a1d9e309 --- /dev/null +++ b/src/mod_offline_mnesia.erl @@ -0,0 +1,232 @@ +%%%------------------------------------------------------------------- +%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> +%%% @copyright (C) 2016, Evgeny Khramtsov +%%% @doc +%%% +%%% @end +%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net> +%%%------------------------------------------------------------------- +-module(mod_offline_mnesia). + +-behaviour(mod_offline). + +-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1, + remove_old_messages/2, remove_user/2, read_message_headers/2, + read_message/3, remove_message/3, read_all_messages/2, + remove_all_messages/2, count_messages/2, import/2]). + +-include("jlib.hrl"). +-include("mod_offline.hrl"). +-include("logger.hrl"). + +-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). + +%%%=================================================================== +%%% API +%%%=================================================================== +init(_Host, _Opts) -> + mnesia:create_table(offline_msg, + [{disc_only_copies, [node()]}, {type, bag}, + {attributes, record_info(fields, offline_msg)}]), + update_table(). + +store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) -> + F = fun () -> + Count = if MaxOfflineMsgs =/= infinity -> + Len + count_mnesia_records(US); + true -> 0 + end, + if Count > MaxOfflineMsgs -> discard; + true -> + if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) -> + mnesia:write_lock_table(offline_msg); + true -> ok + end, + lists:foreach(fun (M) -> mnesia:write(M) end, Msgs) + end + end, + mnesia:transaction(F). + +pop_messages(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> + Rs = mnesia:wread({offline_msg, US}), + mnesia:delete({offline_msg, US}), + Rs + end, + case mnesia:transaction(F) of + {atomic, L} -> + {ok, lists:keysort(#offline_msg.timestamp, L)}; + {aborted, Reason} -> + {error, Reason} + end. + +remove_expired_messages(_LServer) -> + TimeStamp = p1_time_compat:timestamp(), + F = fun () -> + mnesia:write_lock_table(offline_msg), + mnesia:foldl(fun (Rec, _Acc) -> + case Rec#offline_msg.expire of + never -> ok; + TS -> + if TS < TimeStamp -> + mnesia:delete_object(Rec); + true -> ok + end + end + end, + ok, offline_msg) + end, + mnesia:transaction(F). + +remove_old_messages(Days, _LServer) -> + S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days, + MegaSecs1 = S div 1000000, + Secs1 = S rem 1000000, + TimeStamp = {MegaSecs1, Secs1, 0}, + F = fun () -> + mnesia:write_lock_table(offline_msg), + mnesia:foldl(fun (#offline_msg{timestamp = TS} = Rec, + _Acc) + when TS < TimeStamp -> + mnesia:delete_object(Rec); + (_Rec, _Acc) -> ok + end, + ok, offline_msg) + end, + mnesia:transaction(F). + +remove_user(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> mnesia:delete({offline_msg, US}) end, + mnesia:transaction(F). + +read_message_headers(LUser, LServer) -> + Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}), + Hdrs = lists:map( + fun(#offline_msg{from = From, to = To, packet = Pkt, + timestamp = TS}) -> + Seq = now_to_integer(TS), + NewPkt = jlib:add_delay_info(Pkt, LServer, TS, + <<"Offline Storage">>), + {Seq, From, To, NewPkt} + end, Msgs), + lists:keysort(1, Hdrs). + +read_message(LUser, LServer, I) -> + US = {LUser, LServer}, + TS = integer_to_now(I), + case mnesia:dirty_match_object( + offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}) of + [Msg|_] -> + {ok, Msg}; + _ -> + error + end. + +remove_message(LUser, LServer, I) -> + US = {LUser, LServer}, + TS = integer_to_now(I), + Msgs = mnesia:dirty_match_object( + offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}), + lists:foreach( + fun(Msg) -> + mnesia:dirty_delete_object(Msg) + end, Msgs). + +read_all_messages(LUser, LServer) -> + US = {LUser, LServer}, + lists:keysort(#offline_msg.timestamp, + mnesia:dirty_read({offline_msg, US})). + +remove_all_messages(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> + mnesia:write_lock_table(offline_msg), + lists:foreach(fun (Msg) -> mnesia:delete_object(Msg) end, + mnesia:dirty_read({offline_msg, US})) + end, + mnesia:transaction(F). + +count_messages(LUser, LServer) -> + US = {LUser, LServer}, + F = fun () -> + count_mnesia_records(US) + end, + case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end. + +import(_LServer, #offline_msg{} = Msg) -> + mnesia:dirty_write(Msg). + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +%% Return the number of records matching a given match expression. +%% This function is intended to be used inside a Mnesia transaction. +%% The count has been written to use the fewest possible memory by +%% getting the record by small increment and by using continuation. +-define(BATCHSIZE, 100). + +count_mnesia_records(US) -> + MatchExpression = #offline_msg{us = US, _ = '_'}, + case mnesia:select(offline_msg, [{MatchExpression, [], [[]]}], + ?BATCHSIZE, read) of + {Result, Cont} -> + Count = length(Result), + count_records_cont(Cont, Count); + '$end_of_table' -> + 0 + end. + +count_records_cont(Cont, Count) -> + case mnesia:select(Cont) of + {Result, Cont} -> + NewCount = Count + length(Result), + count_records_cont(Cont, NewCount); + '$end_of_table' -> + Count + end. + +jid_to_binary(#jid{user = U, server = S, resource = R, + luser = LU, lserver = LS, lresource = LR}) -> + #jid{user = iolist_to_binary(U), + server = iolist_to_binary(S), + resource = iolist_to_binary(R), + luser = iolist_to_binary(LU), + lserver = iolist_to_binary(LS), + lresource = iolist_to_binary(LR)}. + +now_to_integer({MS, S, US}) -> + (MS * 1000000 + S) * 1000000 + US. + +integer_to_now(Int) -> + Secs = Int div 1000000, + USec = Int rem 1000000, + MSec = Secs div 1000000, + Sec = Secs rem 1000000, + {MSec, Sec, USec}. + +update_table() -> + Fields = record_info(fields, offline_msg), + case mnesia:table_info(offline_msg, attributes) of + Fields -> + ejabberd_config:convert_table_to_binary( + offline_msg, Fields, bag, + fun(#offline_msg{us = {U, _}}) -> U end, + fun(#offline_msg{us = {U, S}, + from = From, + to = To, + packet = El} = R) -> + R#offline_msg{us = {iolist_to_binary(U), + iolist_to_binary(S)}, + from = jid_to_binary(From), + to = jid_to_binary(To), + packet = fxml:to_xmlel(El)} + end); + _ -> + ?INFO_MSG("Recreating offline_msg table", []), + mnesia:transform_table(offline_msg, ignore, Fields) + end. |