diff options
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r-- | src/mod_offline.erl | 499 |
1 files changed, 391 insertions, 108 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 6b560c708..68fbe4cd0 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -1,7 +1,7 @@ %%%---------------------------------------------------------------------- %%% File : mod_offline.erl %%% Author : Alexey Shchepin <alexey@process-one.net> -%%% Purpose : Store and manage offline messages in Mnesia database. +%%% Purpose : Store and manage offline messages. %%% Created : 5 Jan 2003 by Alexey Shchepin <alexey@process-one.net> %%% %%% @@ -29,14 +29,16 @@ -behaviour(gen_mod). +-export([count_offline_messages/2]). + -export([start/2, - init/1, + loop/2, stop/1, store_packet/3, resend_offline_messages/2, pop_offline_messages/3, - remove_expired_messages/0, - remove_old_messages/1, + remove_expired_messages/1, + remove_old_messages/2, remove_user/2, webadmin_page/3, webadmin_user/4, @@ -53,12 +55,21 @@ -define(PROCNAME, ejabberd_offline). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). +%% default value for the maximum number of user messages +-define(MAX_USER_MESSAGES, infinity). + start(Host, Opts) -> - mnesia:create_table(offline_msg, - [{disc_only_copies, [node()]}, - {type, bag}, - {attributes, record_info(fields, offline_msg)}]), - update_table(), + case gen_mod:db_type(Opts) of + mnesia -> + mnesia:create_table(offline_msg, + [{disc_only_copies, [node()]}, + {type, bag}, + {attributes, + record_info(fields, offline_msg)}]), + update_table(); + _ -> + ok + end, ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), ejabberd_hooks:add(resend_offline_messages_hook, Host, @@ -75,61 +86,122 @@ start(Host, Opts) -> ?MODULE, webadmin_user_parse_query, 50), ejabberd_hooks:add(count_offline_messages, Host, ?MODULE, count_offline_messages, 50), - MaxOfflineMsgs = gen_mod:get_opt(user_max_messages, Opts, infinity), + AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, max_user_offline_messages), register(gen_mod:get_module_proc(Host, ?PROCNAME), - spawn(?MODULE, init, [MaxOfflineMsgs])). - -%% MaxOfflineMsgs is either infinity of integer > 0 -init(infinity) -> - loop(infinity); -init(MaxOfflineMsgs) - when is_integer(MaxOfflineMsgs), MaxOfflineMsgs > 0 -> - loop(MaxOfflineMsgs). + spawn(?MODULE, loop, [Host, AccessMaxOfflineMsgs])). -loop(MaxOfflineMsgs) -> +loop(Host, AccessMaxOfflineMsgs) -> receive - #offline_msg{us=US} = Msg -> - Msgs = receive_all(US, [Msg]), + #offline_msg{us = User} = Msg -> + DBType = gen_mod:db_type(Host, ?MODULE), + Msgs = receive_all(User, [Msg], DBType), Len = length(Msgs), - F = fun() -> - %% Only count messages if needed: - Count = if MaxOfflineMsgs =/= infinity -> - Len + p1_mnesia:count_records( - offline_msg, - #offline_msg{us=US, _='_'}); - true -> - 0 - end, - if - Count > MaxOfflineMsgs -> - discard_warn_sender(Msgs); - true -> - if - Len >= ?OFFLINE_TABLE_LOCK_THRESHOLD -> - mnesia:write_lock_table(offline_msg); - true -> - ok - end, - lists:foreach(fun(M) -> - mnesia:write(M) - end, Msgs) - end - end, - mnesia:transaction(F), - loop(MaxOfflineMsgs); - _ -> - loop(MaxOfflineMsgs) + MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, + User, Host), + store_offline_msg(Host, User, Msgs, Len, MaxOfflineMsgs, DBType), + loop(Host, AccessMaxOfflineMsgs); + _ -> + loop(Host, AccessMaxOfflineMsgs) + end. + +store_offline_msg(_Host, US, Msgs, Len, MaxOfflineMsgs, mnesia) -> + F = fun() -> + %% Only count messages if needed: + Count = if MaxOfflineMsgs =/= infinity -> + Len + p1_mnesia:count_records( + offline_msg, + #offline_msg{us=US, _='_'}); + true -> + 0 + end, + if + Count > MaxOfflineMsgs -> + discard_warn_sender(Msgs); + true -> + if + Len >= ?OFFLINE_TABLE_LOCK_THRESHOLD -> + mnesia:write_lock_table(offline_msg); + true -> + ok + end, + lists:foreach(fun(M) -> + mnesia:write(M) + end, Msgs) + end + end, + mnesia:transaction(F); +store_offline_msg(Host, User, Msgs, Len, MaxOfflineMsgs, odbc) -> + Count = if MaxOfflineMsgs =/= infinity -> + Len + count_offline_messages(User, Host); + true -> 0 + end, + if + Count > MaxOfflineMsgs -> + discard_warn_sender(Msgs); + true -> + Query = lists:map( + fun(M) -> + Username = + ejabberd_odbc:escape( + (M#offline_msg.to)#jid.luser), + From = M#offline_msg.from, + To = M#offline_msg.to, + {xmlelement, Name, Attrs, Els} = + M#offline_msg.packet, + Attrs2 = jlib:replace_from_to_attrs( + jlib:jid_to_string(From), + jlib:jid_to_string(To), + Attrs), + Packet = {xmlelement, Name, Attrs2, + Els ++ + [jlib:timestamp_to_xml( + calendar:now_to_universal_time( + M#offline_msg.timestamp), + utc, + jlib:make_jid("", Host, ""), + "Offline Storage"), + %% TODO: Delete the next three lines once XEP-0091 is Obsolete + jlib:timestamp_to_xml( + calendar:now_to_universal_time( + M#offline_msg.timestamp))]}, + XML = + ejabberd_odbc:escape( + xml:element_to_binary(Packet)), + odbc_queries:add_spool_sql(Username, XML) + end, Msgs), + odbc_queries:add_spool(Host, Query) end. -receive_all(US, Msgs) -> +%% Function copied from ejabberd_sm.erl: +get_max_user_messages(AccessRule, LUser, Host) -> + case acl:match_rule( + Host, AccessRule, jlib:make_jid(LUser, Host, "")) of + Max when is_integer(Max) -> Max; + infinity -> infinity; + _ -> ?MAX_USER_MESSAGES + end. + +receive_all(US, Msgs, DBType) -> receive #offline_msg{us=US} = Msg -> - receive_all(US, [Msg | Msgs]) + receive_all(US, [Msg | Msgs], DBType) after 0 -> - Msgs + %% FIXME: the diff between mnesia and odbc version: + %% + %% after 0 -> + %% - Msgs + %% + lists:reverse(Msgs) + %% end. + %% + %% Is it a bug in mnesia version? + case DBType of + mnesia -> + Msgs; + odbc -> + lists:reverse(Msgs) + end end. - stop(Host) -> ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, store_packet, 50), @@ -277,6 +349,10 @@ resend_offline_messages(User, Server) -> pop_offline_messages(Ls, User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), + pop_offline_messages(Ls, LUser, LServer, + gen_mod:db_type(LServer, ?MODULE)). + +pop_offline_messages(Ls, LUser, LServer, mnesia) -> US = {LUser, LServer}, F = fun() -> Rs = mnesia:wread({offline_msg, US}), @@ -295,7 +371,14 @@ pop_offline_messages(Ls, User, Server) -> {xmlelement, Name, Attrs, Els ++ [jlib:timestamp_to_xml( - calendar:now_to_universal_time( + calendar:now_to_universal_time( + R#offline_msg.timestamp), + utc, + jlib:make_jid("", LServer, ""), + "Offline Storage"), + %% TODO: Delete the next three lines once XEP-0091 is Obsolete + jlib:timestamp_to_xml( + calendar:now_to_universal_time( R#offline_msg.timestamp))]}} end, lists:filter( @@ -310,9 +393,39 @@ pop_offline_messages(Ls, User, Server) -> lists:keysort(#offline_msg.timestamp, Rs))); _ -> Ls + end; +pop_offline_messages(Ls, LUser, LServer, odbc) -> + EUser = ejabberd_odbc:escape(LUser), + case odbc_queries:get_and_del_spool_msg_t(LServer, EUser) of + {atomic, {selected, ["username","xml"], Rs}} -> + Ls ++ lists:flatmap( + fun({_, XML}) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + []; + El -> + To = jlib:string_to_jid( + xml:get_tag_attr_s("to", El)), + From = jlib:string_to_jid( + xml:get_tag_attr_s("from", El)), + if + (To /= error) and + (From /= error) -> + [{route, From, To, El}]; + true -> + [] + end + end + end, Rs); + _ -> + Ls end. -remove_expired_messages() -> +remove_expired_messages(Server) -> + LServer = jlib:nameprep(Server), + remove_expired_messages(LServer, gen_mod:db_type(LServer, ?MODULE)). + +remove_expired_messages(_LServer, mnesia) -> TimeStamp = now(), F = fun() -> mnesia:write_lock_table(offline_msg), @@ -331,9 +444,16 @@ remove_expired_messages() -> end end, ok, offline_msg) end, - mnesia:transaction(F). + mnesia:transaction(F); +remove_expired_messages(_LServer, odbc) -> + %% TODO + {atomic, ok}. + +remove_old_messages(Days, Server) -> + LServer = jlib:nameprep(Server), + remove_old_messages(Days, LServer, gen_mod:db_type(LServer, ?MODULE)). -remove_old_messages(Days) -> +remove_old_messages(Days, _LServer, mnesia) -> {MegaSecs, Secs, _MicroSecs} = now(), S = MegaSecs * 1000000 + Secs - 60 * 60 * 24 * Days, MegaSecs1 = S div 1000000, @@ -348,16 +468,25 @@ remove_old_messages(Days) -> (_Rec, _Acc) -> ok end, ok, offline_msg) end, - mnesia:transaction(F). + mnesia:transaction(F); +remove_old_messages(_Days, _LServer, odbc) -> + %% TODO + {atomic, ok}. remove_user(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), + remove_user(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). + +remove_user(LUser, LServer, mnesia) -> US = {LUser, LServer}, F = fun() -> mnesia:delete({offline_msg, US}) end, - mnesia:transaction(F). + mnesia:transaction(F); +remove_user(LUser, LServer, odbc) -> + Username = ejabberd_odbc:escape(LUser), + odbc_queries:del_spool_msg(LServer, Username). update_table() -> Fields = record_info(fields, offline_msg), @@ -471,35 +600,76 @@ webadmin_page(_, Host, webadmin_page(Acc, _, _) -> Acc. +read_all_msgs(LUser, LServer, mnesia) -> + US = {LUser, LServer}, + lists:keysort(#offline_msg.timestamp, + mnesia:dirty_read({offline_msg, US})); +read_all_msgs(LUser, LServer, odbc) -> + Username = ejabberd_odbc:escape(LUser), + case catch ejabberd_odbc:sql_query( + LServer, + ["select xml from spool" + " where username='", Username, "'" + " order by seq;"]) of + {selected, ["username", "xml"], Rs} -> + lists:flatmap( + fun({XML}) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + []; + El -> + [El] + end + end, Rs); + _ -> + [] + end. + +format_user_queue(Msgs, mnesia) -> + lists:map( + fun(#offline_msg{timestamp = TimeStamp, from = From, to = To, + packet = {xmlelement, Name, Attrs, Els}} = Msg) -> + ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg))), + {{Year, Month, Day}, {Hour, Minute, Second}} = + calendar:now_to_local_time(TimeStamp), + Time = lists:flatten( + io_lib:format( + "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w", + [Year, Month, Day, Hour, Minute, Second])), + SFrom = jlib:jid_to_string(From), + STo = jlib:jid_to_string(To), + Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs), + Packet = {xmlelement, Name, Attrs2, Els}, + FPacket = ejabberd_web_admin:pretty_print_xml(Packet), + ?XE("tr", + [?XAE("td", [{"class", "valign"}], [?INPUT("checkbox", "selected", ID)]), + ?XAC("td", [{"class", "valign"}], Time), + ?XAC("td", [{"class", "valign"}], SFrom), + ?XAC("td", [{"class", "valign"}], STo), + ?XAE("td", [{"class", "valign"}], [?XC("pre", FPacket)])] + ) + end, Msgs); +format_user_queue(Msgs, odbc) -> + lists:map( + fun({xmlelement, _Name, _Attrs, _Els} = Msg) -> + ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg))), + Packet = Msg, + FPacket = ejabberd_web_admin:pretty_print_xml(Packet), + ?XE("tr", + [?XAE("td", [{"class", "valign"}], [?INPUT("checkbox", "selected", ID)]), + ?XAE("td", [{"class", "valign"}], [?XC("pre", FPacket)])] + ) + end, Msgs). + user_queue(User, Server, Query, Lang) -> - US = {jlib:nodeprep(User), jlib:nameprep(Server)}, - Res = user_queue_parse_query(US, Query), - Msgs = lists:keysort(#offline_msg.timestamp, - mnesia:dirty_read({offline_msg, US})), - FMsgs = - lists:map( - fun(#offline_msg{timestamp = TimeStamp, from = From, to = To, - packet = {xmlelement, Name, Attrs, Els}} = Msg) -> - ID = jlib:encode_base64(binary_to_list(term_to_binary(Msg))), - {{Year, Month, Day}, {Hour, Minute, Second}} = - calendar:now_to_local_time(TimeStamp), - Time = lists:flatten( - io_lib:format( - "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w", - [Year, Month, Day, Hour, Minute, Second])), - SFrom = jlib:jid_to_string(From), - STo = jlib:jid_to_string(To), - Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs), - Packet = {xmlelement, Name, Attrs2, Els}, - FPacket = ejabberd_web_admin:pretty_print_xml(Packet), - ?XE("tr", - [?XAE("td", [{"class", "valign"}], [?INPUT("checkbox", "selected", ID)]), - ?XAC("td", [{"class", "valign"}], Time), - ?XAC("td", [{"class", "valign"}], SFrom), - ?XAC("td", [{"class", "valign"}], STo), - ?XAE("td", [{"class", "valign"}], [?XC("pre", FPacket)])] - ) - end, Msgs), + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + US = {LUser, LServer}, + DBType = gen_mod:db_type(LServer, ?MODULE), + Res = user_queue_parse_query(LUser, LServer, Query, DBType), + MsgsAll = read_all_msgs(LUser, LServer, DBType), + Msgs = get_messages_subset(User, Server, MsgsAll, DBType), + FMsgs = format_user_queue(Msgs, DBType), [?XC("h1", io_lib:format(?T("~s's Offline Messages Queue"), [us_to_list(US)]))] ++ case Res of @@ -530,7 +700,8 @@ user_queue(User, Server, Query, Lang) -> ?INPUTT("submit", "delete", "Delete Selected") ])]. -user_queue_parse_query(US, Query) -> +user_queue_parse_query(LUser, LServer, Query, mnesia) -> + US = {LUser, LServer}, case lists:keysearch("delete", 1, Query) of {value, _} -> Msgs = lists:keysort(#offline_msg.timestamp, @@ -552,28 +723,129 @@ user_queue_parse_query(US, Query) -> ok; false -> nothing + end; +user_queue_parse_query(LUser, LServer, Query, odbc) -> + Username = ejabberd_odbc:escape(LUser), + case lists:keysearch("delete", 1, Query) of + {value, _} -> + Msgs = case catch ejabberd_odbc:sql_query( + LServer, + ["select xml, seq from spool" + " where username='", Username, "'" + " order by seq;"]) of + {selected, ["xml", "seq"], Rs} -> + lists:flatmap( + fun({XML, Seq}) -> + case xml_stream:parse_element(XML) of + {error, _Reason} -> + []; + El -> + [{El, Seq}] + end + end, Rs); + _ -> + [] + end, + F = fun() -> + lists:foreach( + fun({Msg, Seq}) -> + ID = jlib:encode_base64( + binary_to_list(term_to_binary(Msg))), + case lists:member({"selected", ID}, Query) of + true -> + SSeq = ejabberd_odbc:escape(Seq), + catch ejabberd_odbc:sql_query( + LServer, + ["delete from spool" + " where username='", Username, "'" + " and seq='", SSeq, "';"]); + false -> + ok + end + end, Msgs) + end, + mnesia:transaction(F), + ok; + false -> + nothing end. us_to_list({User, Server}) -> jlib:jid_to_string({User, Server, ""}). +get_queue_length(LUser, LServer) -> + get_queue_length(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). + +get_queue_length(LUser, LServer, mnesia) -> + length(mnesia:dirty_read({offline_msg, {LUser, LServer}})); +get_queue_length(LUser, LServer, odbc) -> + Username = ejabberd_odbc:escape(LUser), + case catch ejabberd_odbc:sql_query( + LServer, + ["select count(*) from spool" + " where username='", Username, "';"]) of + {selected, [_], [{SCount}]} -> + list_to_integer(SCount); + _ -> + 0 + end. + +get_messages_subset(User, Host, MsgsAll, DBType) -> + Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, + max_user_offline_messages), + MaxOfflineMsgs = case get_max_user_messages(Access, User, Host) of + Number when is_integer(Number) -> Number; + _ -> 100 + end, + Length = length(MsgsAll), + get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll, DBType). + +get_messages_subset2(Max, Length, MsgsAll, _DBType) when Length =< Max*2 -> + MsgsAll; +get_messages_subset2(Max, Length, MsgsAll, mnesia) -> + FirstN = Max, + {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), + MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), + NoJID = jlib:make_jid("...", "...", ""), + IntermediateMsg = #offline_msg{timestamp = now(), from = NoJID, to = NoJID, + packet = {xmlelement, "...", [], []}}, + MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN; +get_messages_subset2(Max, Length, MsgsAll, odbc) -> + FirstN = Max, + {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), + MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), + IntermediateMsg = {xmlelement, "...", [], []}, + MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. + webadmin_user(Acc, User, Server, Lang) -> - US = {jlib:nodeprep(User), jlib:nameprep(Server)}, - QueueLen = length(mnesia:dirty_read({offline_msg, US})), + QueueLen = get_queue_length(jlib:nodeprep(User), jlib:nameprep(Server)), FQueueLen = [?AC("queue/", integer_to_list(QueueLen))], Acc ++ [?XCT("h3", "Offline Messages:")] ++ FQueueLen ++ [?C(" "), ?INPUTT("submit", "removealloffline", "Remove All Offline Messages")]. -webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> - US = {User, Server}, +delete_all_msgs(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + delete_all_msgs(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). + +delete_all_msgs(LUser, LServer, mnesia) -> + US = {LUser, LServer}, F = fun() -> - mnesia:write_lock_table(offline_msg), - lists:foreach( - fun(Msg) -> - mnesia:delete_object(Msg) - end, mnesia:dirty_read({offline_msg, US})) + mnesia:write_lock_table(offline_msg), + lists:foreach( + fun(Msg) -> + mnesia:delete_object(Msg) + end, mnesia:dirty_read({offline_msg, US})) end, - case mnesia:transaction(F) of + mnesia:transaction(F); +delete_all_msgs(LUser, LServer, odbc) -> + Username = ejabberd_odbc:escape(LUser), + odbc_queries:del_spool_msg(LServer, Username), + %% TODO: process the output + {atomic, ok}. + +webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> + case delete_all_msgs(User, Server) of {aborted, Reason} -> ?ERROR_MSG("Failed to remove offline messages: ~p", [Reason]), {stop, error}; @@ -584,22 +856,33 @@ webadmin_user_parse_query(_, "removealloffline", User, Server, _Query) -> webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> Acc. - -%% ------------------------------------------------ -%% mod_offline: number of messages quota management - -count_offline_messages(_Acc, User, Server) -> +count_offline_messages(User, Server) -> LUser = jlib:nodeprep(User), LServer = jlib:nameprep(Server), + DBType = gen_mod:db_type(LServer, ?MODULE), + count_offline_messages(LUser, LServer, DBType). + +count_offline_messages(LUser, LServer, mnesia) -> US = {LUser, LServer}, F = fun () -> p1_mnesia:count_records( offline_msg, #offline_msg{us=US, _='_'}) end, - N = case catch mnesia:async_dirty(F) of - I when is_integer(I) -> I; - _ -> 0 - end, - {stop, N}. + case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end; +count_offline_messages(LUser, LServer, odbc) -> + Username = ejabberd_odbc:escape(LUser), + case catch odbc_queries:count_records_where( + LServer, "spool", "where username='" ++ Username ++ "'") of + {selected, [_], [{Res}]} -> + list_to_integer(Res); + _ -> + 0 + end; +count_offline_messages(_Acc, User, Server) -> + N = count_offline_messages(User, Server), + {stop, N}. |