diff options
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r-- | src/mod_offline.erl | 1384 |
1 files changed, 874 insertions, 510 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 87a136853..c3fca8868 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -5,7 +5,7 @@ %%% Created : 5 Jan 2003 by Alexey Shchepin <alexey@process-one.net> %%% %%% -%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% ejabberd, Copyright (C) 2002-2019 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -33,47 +33,43 @@ -protocol({xep, 160, '1.0'}). -protocol({xep, 334, '0.2'}). --define(GEN_SERVER, p1_server). --behaviour(?GEN_SERVER). - -behaviour(gen_mod). -export([start/2, - start_link/2, stop/1, - store_packet/3, - store_offline_msg/5, - resend_offline_messages/2, - pop_offline_messages/3, + reload/3, + store_packet/1, + store_offline_msg/1, + c2s_self_presence/1, get_sm_features/5, get_sm_identity/5, get_sm_items/5, get_info/5, - handle_offline_query/3, + handle_offline_query/1, remove_expired_messages/1, remove_old_messages/2, remove_user/2, - import/1, - import/3, + import_info/0, + import_start/2, + import/5, export/1, get_queue_length/2, count_offline_messages/2, get_offline_els/2, find_x_expire/2, + c2s_handle_info/2, + c2s_copy_session/2, webadmin_page/3, webadmin_user/4, webadmin_user_parse_query/5]). --export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, - mod_opt_type/1, depends/2]). +-export([mod_opt_type/1, mod_options/1, depends/2]). -deprecated({get_queue_length,2}). --include("ejabberd.hrl"). -include("logger.hrl"). --include("jlib.hrl"). +-include("xmpp.hrl"). -include("ejabberd_http.hrl"). @@ -81,70 +77,51 @@ -include("mod_offline.hrl"). --define(PROCNAME, ejabberd_offline). +-include("translate.hrl"). -define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000). %% default value for the maximum number of user messages -define(MAX_USER_MESSAGES, infinity). --type us() :: {binary(), binary()}. +-define(SPOOL_COUNTER_CACHE, offline_msg_counter_cache). + +-type c2s_state() :: ejabberd_c2s:state(). + -callback init(binary(), gen_mod:opts()) -> any(). --callback import(binary(), #offline_msg{}) -> ok | pass. --callback store_messages(binary(), us(), [#offline_msg{}], - non_neg_integer(), non_neg_integer()) -> - {atomic, any()}. +-callback import(#offline_msg{}) -> ok. +-callback store_message(#offline_msg{}) -> ok | {error, any()}. -callback pop_messages(binary(), binary()) -> - {atomic, [#offline_msg{}]} | {aborted, any()}. + {ok, [#offline_msg{}]} | {error, any()}. -callback remove_expired_messages(binary()) -> {atomic, any()}. -callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}. --callback remove_user(binary(), binary()) -> {atomic, any()}. --callback read_message_headers(binary(), binary()) -> any(). +-callback remove_user(binary(), binary()) -> any(). +-callback read_message_headers(binary(), binary()) -> + [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error. -callback read_message(binary(), binary(), non_neg_integer()) -> {ok, #offline_msg{}} | error. --callback remove_message(binary(), binary(), non_neg_integer()) -> ok. +-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}. -callback read_all_messages(binary(), binary()) -> [#offline_msg{}]. -callback remove_all_messages(binary(), binary()) -> {atomic, any()}. --callback count_messages(binary(), binary()) -> non_neg_integer(). - -start_link(Host, Opts) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - ?GEN_SERVER:start_link({local, Proc}, ?MODULE, - [Host, Opts], []). - -start(Host, Opts) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, - transient, 1000, worker, [?MODULE]}, - supervisor:start_child(ejabberd_sup, ChildSpec). +-callback count_messages(binary(), binary()) -> {ets_cache:tag(), non_neg_integer()}. +-callback use_cache(binary()) -> boolean(). +-callback cache_nodes(binary()) -> [node()]. -stop(Host) -> - Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - catch ?GEN_SERVER:call(Proc, stop), - supervisor:terminate_child(ejabberd_sup, Proc), - supervisor:delete_child(ejabberd_sup, Proc), - ok. +-optional_callbacks([remove_expired_messages/1, remove_old_messages/2, + use_cache/1, cache_nodes/1]). depends(_Host, _Opts) -> []. -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -init([Host, Opts]) -> - Mod = gen_mod:db_mod(Host, Opts, ?MODULE), +start(Host, Opts) -> + Mod = gen_mod:db_mod(Opts, ?MODULE), Mod:init(Host, Opts), - IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1, - no_queue), + init_cache(Mod, Host, Opts), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), - ejabberd_hooks:add(resend_offline_messages_hook, Host, - ?MODULE, pop_offline_messages, 50), + ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50), ejabberd_hooks:add(remove_user, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:add(anonymous_purge_hook, Host, - ?MODULE, remove_user, 50), ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:add(disco_local_features, Host, @@ -154,6 +131,8 @@ init([Host, Opts]) -> ejabberd_hooks:add(disco_sm_items, Host, ?MODULE, get_sm_items, 50), ejabberd_hooks:add(disco_info, Host, ?MODULE, get_info, 50), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50), + ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE, c2s_copy_session, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, @@ -161,95 +140,117 @@ init([Host, Opts]) -> ejabberd_hooks:add(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, - ?MODULE, handle_offline_query, IQDisc), - AccessMaxOfflineMsgs = - gen_mod:get_opt(access_max_user_messages, Opts, - fun acl:shaper_rules_validator/1, - max_user_offline_messages), - {ok, - #state{host = Host, - access_max_offline_messages = AccessMaxOfflineMsgs}}. - - -handle_call(stop, _From, State) -> - {stop, normal, ok, State}. + ?MODULE, handle_offline_query). - -handle_cast(_Msg, State) -> {noreply, State}. - - -handle_info(#offline_msg{us = UserServer} = Msg, State) -> - #state{host = Host, - access_max_offline_messages = AccessMaxOfflineMsgs} = State, - DBType = gen_mod:db_type(Host, ?MODULE), - Msgs = receive_all(UserServer, [Msg], DBType), - Len = length(Msgs), - MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs, - UserServer, Host), - store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs), - {noreply, State}; - -handle_info(_Info, State) -> - ?ERROR_MSG("got unexpected info: ~p", [_Info]), - {noreply, State}. - - -terminate(_Reason, State) -> - Host = State#state.host, +stop(Host) -> ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, store_packet, 50), - ejabberd_hooks:delete(resend_offline_messages_hook, - Host, ?MODULE, pop_offline_messages, 50), + ejabberd_hooks:delete(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50), ejabberd_hooks:delete(remove_user, Host, ?MODULE, remove_user, 50), - ejabberd_hooks:delete(anonymous_purge_hook, Host, - ?MODULE, remove_user, 50), ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, get_sm_identity, 50), ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, get_sm_items, 50), ejabberd_hooks:delete(disco_info, Host, ?MODULE, get_info, 50), + ejabberd_hooks:delete(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50), + ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE, c2s_copy_session, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:delete(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), - gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE), - ok. + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE). + +reload(Host, NewOpts, OldOpts) -> + NewMod = gen_mod:db_mod(NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(OldOpts, ?MODULE), + init_cache(NewMod, Host, NewOpts), + if NewMod /= OldMod -> + NewMod:init(Host, NewOpts); + true -> + ok + end. +init_cache(Mod, Host, Opts) -> + CacheOpts = [{max_size, mod_offline_opt:cache_size(Opts)}, + {life_time, mod_offline_opt:cache_life_time(Opts)}, + {cache_missed, false}], + case use_cache(Mod, Host) of + true -> + ets_cache:new(?SPOOL_COUNTER_CACHE, CacheOpts); + false -> + ets_cache:delete(?SPOOL_COUNTER_CACHE) + end. -code_change(_OldVsn, State, _Extra) -> {ok, State}. +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> mod_offline_opt:use_cache(Host) + end. -store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) -> - Mod = gen_mod:db_mod(Host, ?MODULE), - case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of - {atomic, discard} -> - discard_warn_sender(Msgs); - _ -> +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. + +-spec flush_cache(module(), binary(), binary()) -> ok. +flush_cache(Mod, User, Server) -> + case use_cache(Mod, Server) of + true -> + ets_cache:delete(?SPOOL_COUNTER_CACHE, + {User, Server}, + cache_nodes(Mod, Server)); + false -> ok end. -get_max_user_messages(AccessRule, {User, Server}, Host) -> - case acl:match_rule( - Host, AccessRule, jid:make(User, Server, <<"">>)) of +-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}. +store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) -> + UseMam = use_mam_for_user(User, Server), + Mod = gen_mod:db_mod(Server, ?MODULE), + case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of + true -> + case count_offline_messages(User, Server) of + 0 -> + store_message_in_db(Mod, Msg); + _ -> + case use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + false -> + ok + end + end; + false -> + case get_max_user_messages(User, Server) of + infinity -> + store_message_in_db(Mod, Msg); + Limit -> + Num = count_offline_messages(User, Server), + if Num < Limit -> + store_message_in_db(Mod, Msg); + true -> + {error, full} + end + end + end. + +get_max_user_messages(User, Server) -> + Access = mod_offline_opt:access_max_user_messages(Server), + case ejabberd_shaper:match(Server, Access, jid:make(User, Server)) of Max when is_integer(Max) -> Max; infinity -> infinity; _ -> ?MAX_USER_MESSAGES end. -receive_all(US, Msgs, DBType) -> - receive - #offline_msg{us = US} = Msg -> - receive_all(US, [Msg | Msgs], DBType) - after 0 -> - case DBType of - mnesia -> Msgs; - sql -> lists:reverse(Msgs); - riak -> Msgs - end - end. - get_sm_features(Acc, _From, _To, <<"">>, _Lang) -> Feats = case Acc of {result, I} -> I; @@ -268,152 +269,157 @@ get_sm_features(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S} get_sm_features(Acc, _From, _To, _Node, _Lang) -> Acc. -get_sm_identity(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, +get_sm_identity(Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, _Lang) -> - Identity = #xmlel{name = <<"identity">>, - attrs = [{<<"category">>, <<"automation">>}, - {<<"type">>, <<"message-list">>}]}, - [Identity]; + [#identity{category = <<"automation">>, + type = <<"message-list">>}|Acc]; get_sm_identity(Acc, _From, _To, _Node, _Lang) -> Acc. -get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID, +get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID, #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, _Lang) -> - case ejabberd_sm:get_session_pid(U, S, R) of - Pid when is_pid(Pid) -> - Hdrs = read_message_headers(U, S), - BareJID = jid:to_string(jid:remove_resource(JID)), - Pid ! dont_ask_offline, + ejabberd_sm:route(JID, {resend_offline, false}), + Mod = gen_mod:db_mod(S, ?MODULE), + Hdrs = case Mod:read_message_headers(U, S) of + L when is_list(L) -> + L; + _ -> + [] + end, + BareJID = jid:remove_resource(JID), {result, lists:map( - fun({Node, From, _To, _El}) -> - #xmlel{name = <<"item">>, - attrs = [{<<"jid">>, BareJID}, - {<<"node">>, Node}, - {<<"name">>, jid:to_string(From)}]} + fun({Seq, From, _To, _TS, _El}) -> + Node = integer_to_binary(Seq), + #disco_item{jid = BareJID, + node = Node, + name = jid:encode(From)} end, Hdrs)}; - none -> - {result, []} - end; get_sm_items(Acc, _From, _To, _Node, _Lang) -> Acc. -get_info(_Acc, #jid{luser = U, lserver = S, lresource = R}, - #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, _Lang) -> - N = jlib:integer_to_binary(count_offline_messages(U, S)), - case ejabberd_sm:get_session_pid(U, S, R) of - Pid when is_pid(Pid) -> - Pid ! dont_ask_offline; - none -> - ok - end, - [#xmlel{name = <<"x">>, - attrs = [{<<"xmlns">>, ?NS_XDATA}, - {<<"type">>, <<"result">>}], - children = [#xmlel{name = <<"field">>, - attrs = [{<<"var">>, <<"FORM_TYPE">>}, - {<<"type">>, <<"hidden">>}], - children = [#xmlel{name = <<"value">>, - children = [{xmlcdata, - ?NS_FLEX_OFFLINE}]}]}, - #xmlel{name = <<"field">>, - attrs = [{<<"var">>, <<"number_of_messages">>}], - children = [#xmlel{name = <<"value">>, - children = [{xmlcdata, N}]}]}]}]; +-spec get_info([xdata()], binary(), module(), binary(), binary()) -> [xdata()]; + ([xdata()], jid(), jid(), binary(), binary()) -> [xdata()]. +get_info(_Acc, #jid{luser = U, lserver = S} = JID, + #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, Lang) -> + ejabberd_sm:route(JID, {resend_offline, false}), + [#xdata{type = result, + fields = flex_offline:encode( + [{number_of_messages, count_offline_messages(U, S)}], + Lang)}]; get_info(Acc, _From, _To, _Node, _Lang) -> Acc. -handle_offline_query(#jid{luser = U, lserver = S} = From, - #jid{luser = U, lserver = S} = _To, - #iq{type = Type, sub_el = SubEl} = IQ) -> - case Type of - get -> - case fxml:get_subtag(SubEl, <<"fetch">>) of - #xmlel{} -> - handle_offline_fetch(From); - false -> - handle_offline_items_view(From, SubEl) +-spec c2s_handle_info(c2s_state(), term()) -> c2s_state(). +c2s_handle_info(State, {resend_offline, Flag}) -> + {stop, State#{resend_offline => Flag}}; +c2s_handle_info(State, _) -> + State. + +-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state(). +c2s_copy_session(State, #{resend_offline := Flag}) -> + State#{resend_offline => Flag}; +c2s_copy_session(State, _) -> + State. + +-spec handle_offline_query(iq()) -> iq(). +handle_offline_query(#iq{from = #jid{luser = U1, lserver = S1}, + to = #jid{luser = U2, lserver = S2}, + lang = Lang, + sub_els = [#offline{}]} = IQ) + when {U1, S1} /= {U2, S2} -> + Txt = ?T("Query to another users is forbidden"), + xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang)); +handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From, + to = #jid{luser = U, lserver = S} = _To, + type = Type, lang = Lang, + sub_els = [#offline{} = Offline]} = IQ) -> + case {Type, Offline} of + {get, #offline{fetch = true, items = [], purge = false}} -> + %% TODO: report database errors + handle_offline_fetch(From), + xmpp:make_iq_result(IQ); + {get, #offline{fetch = false, items = [_|_] = Items, purge = false}} -> + case handle_offline_items_view(From, Items) of + true -> xmpp:make_iq_result(IQ); + false -> xmpp:make_error(IQ, xmpp:err_item_not_found()) end; - set -> - case fxml:get_subtag(SubEl, <<"purge">>) of - #xmlel{} -> - delete_all_msgs(U, S); - false -> - handle_offline_items_remove(From, SubEl) - end - end, - IQ#iq{type = result, sub_el = []}; -handle_offline_query(_From, _To, #iq{sub_el = SubEl, lang = Lang} = IQ) -> - Txt = <<"Query to another users is forbidden">>, - IQ#iq{type = error, sub_el = [SubEl, ?ERRT_FORBIDDEN(Lang, Txt)]}. + {set, #offline{fetch = false, items = [], purge = true}} -> + case delete_all_msgs(U, S) of + {atomic, ok} -> + xmpp:make_iq_result(IQ); + _Err -> + Txt = ?T("Database failure"), + xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang)) + end; + {set, #offline{fetch = false, items = [_|_] = Items, purge = false}} -> + case handle_offline_items_remove(From, Items) of + true -> xmpp:make_iq_result(IQ); + false -> xmpp:make_error(IQ, xmpp:err_item_not_found()) + end; + _ -> + xmpp:make_error(IQ, xmpp:err_bad_request()) + end; +handle_offline_query(#iq{lang = Lang} = IQ) -> + Txt = ?T("No module is handling this query"), + xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)). -handle_offline_items_view(JID, #xmlel{children = Items}) -> +-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean(). +handle_offline_items_view(JID, Items) -> {U, S, R} = jid:tolower(JID), - lists:foreach( - fun(Node) -> - case fetch_msg_by_node(JID, Node) of - {ok, OfflineMsg} -> - case offline_msg_to_route(S, OfflineMsg) of - {route, From, To, El} -> - NewEl = set_offline_tag(El, Node), - case ejabberd_sm:get_session_pid(U, S, R) of - Pid when is_pid(Pid) -> - Pid ! {route, From, To, NewEl}; - none -> - ok - end; - error -> - ok - end; - error -> - ok - end - end, get_nodes_from_items(Items, <<"view">>)). - -handle_offline_items_remove(JID, #xmlel{children = Items}) -> - lists:foreach( - fun(Node) -> - remove_msg_by_node(JID, Node) - end, get_nodes_from_items(Items, <<"remove">>)). - -get_nodes_from_items(Items, Action) -> - lists:flatmap( - fun(#xmlel{name = <<"item">>, attrs = Attrs}) -> - case fxml:get_attr_s(<<"action">>, Attrs) of - Action -> - case fxml:get_attr_s(<<"node">>, Attrs) of - <<"">> -> - []; - TS -> - [TS] - end; - _ -> - [] - end; - (_) -> - [] - end, Items). - -set_offline_tag(#xmlel{children = Els} = El, Node) -> - OfflineEl = #xmlel{name = <<"offline">>, - attrs = [{<<"xmlns">>, ?NS_FLEX_OFFLINE}], - children = [#xmlel{name = <<"item">>, - attrs = [{<<"node">>, Node}]}]}, - El#xmlel{children = [OfflineEl|Els]}. - -handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) -> - case ejabberd_sm:get_session_pid(U, S, R) of - none -> - ok; - Pid when is_pid(Pid) -> - Pid ! dont_ask_offline, - lists:foreach( - fun({Node, From, To, El}) -> - NewEl = set_offline_tag(El, Node), - Pid ! {route, From, To, NewEl} - end, read_message_headers(U, S)) + case use_mam_for_user(U, S) of + true -> + false; + _ -> + lists:foldl( + fun(#offline_item{node = Node, action = view}, Acc) -> + case fetch_msg_by_node(JID, Node) of + {ok, OfflineMsg} -> + case offline_msg_to_route(S, OfflineMsg) of + {route, El} -> + NewEl = set_offline_tag(El, Node), + case ejabberd_sm:get_session_pid(U, S, R) of + Pid when is_pid(Pid) -> + ejabberd_c2s:route(Pid, {route, NewEl}); + none -> + ok + end, + Acc or true; + error -> + Acc or false + end; + error -> + Acc or false + end + end, false, Items) end. + +-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean(). +handle_offline_items_remove(JID, Items) -> + {U, S, _R} = jid:tolower(JID), + case use_mam_for_user(U, S) of + true -> + false; + _ -> + lists:foldl( + fun(#offline_item{node = Node, action = remove}, Acc) -> + Acc or remove_msg_by_node(JID, Node) + end, false, Items) end. +-spec set_offline_tag(message(), binary()) -> message(). +set_offline_tag(Msg, Node) -> + xmpp:set_subtag(Msg, #offline{items = [#offline_item{node = Node}]}). + +-spec handle_offline_fetch(jid()) -> ok. +handle_offline_fetch(#jid{luser = U, lserver = S} = JID) -> + ejabberd_sm:route(JID, {resend_offline, false}), + lists:foreach( + fun({Node, El}) -> + El1 = set_offline_tag(El, Node), + ejabberd_router:route(El1) + end, read_messages(U, S)). + +-spec fetch_msg_by_node(jid(), binary()) -> error | {ok, #offline_msg{}}. fetch_msg_by_node(To, Seq) -> case catch binary_to_integer(Seq) of I when is_integer(I), I >= 0 -> @@ -425,70 +431,117 @@ fetch_msg_by_node(To, Seq) -> error end. +-spec remove_msg_by_node(jid(), binary()) -> boolean(). remove_msg_by_node(To, Seq) -> case catch binary_to_integer(Seq) of I when is_integer(I), I>= 0 -> LUser = To#jid.luser, LServer = To#jid.lserver, Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_message(LUser, LServer, I); + Mod:remove_message(LUser, LServer, I), + flush_cache(Mod, LUser, LServer), + true; _ -> - ok + false end. -need_to_store(LServer, Packet) -> - case has_offline_tag(Packet) of +-spec need_to_store(binary(), message()) -> boolean(). +need_to_store(_LServer, #message{type = error}) -> false; +need_to_store(LServer, #message{type = Type} = Packet) -> + case xmpp:has_subtag(Packet, #offline{}) of false -> - case {check_store_hint(Packet), - fxml:get_tag_attr_s(<<"type">>, Packet)} of - {_Hint, <<"error">>} -> - false; - {store, _Type} -> - true; - {no_store, _Type} -> - false; - {none, <<"groupchat">>} -> - false; - {none, <<"headline">>} -> - false; - {none, _Type} -> - case gen_mod:get_module_opt( - LServer, ?MODULE, store_empty_body, - fun(V) when is_boolean(V) -> V; - (unless_chat_state) -> unless_chat_state - end, - unless_chat_state) of - true -> + case misc:unwrap_mucsub_message(Packet) of + #message{type = groupchat} = Msg -> + need_to_store(LServer, Msg#message{type = chat}); + #message{} = Msg -> + need_to_store(LServer, Msg); + _ -> + case check_store_hint(Packet) of + store -> true; - false -> - fxml:get_subtag(Packet, <<"body">>) /= false; - unless_chat_state -> - not jlib:is_standalone_chat_state(Packet) + no_store -> + false; + none -> + Store = case Type of + groupchat -> + mod_offline_opt:store_groupchat(LServer); + headline -> + false; + _ -> + true + end, + case {Store, mod_offline_opt:store_empty_body(LServer)} of + {false, _} -> + false; + {_, true} -> + true; + {_, false} -> + Packet#message.body /= []; + {_, unless_chat_state} -> + not misc:is_standalone_chat_state(Packet) + end end end; true -> false end. -store_packet(From, To, Packet) -> +-spec store_packet({any(), message()}) -> {any(), message()}. +store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) -> case need_to_store(To#jid.lserver, Packet) of true -> - case check_event(From, To, Packet) of + case check_event(Packet) of true -> #jid{luser = LUser, lserver = LServer} = To, - TimeStamp = p1_time_compat:timestamp(), - #xmlel{children = Els} = Packet, - Expire = find_x_expire(TimeStamp, Els), - gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) ! - #offline_msg{us = {LUser, LServer}, - timestamp = TimeStamp, expire = Expire, - from = From, to = To, packet = Packet}, - stop; - _ -> ok + case ejabberd_hooks:run_fold(store_offline_message, LServer, + Packet, []) of + drop -> + Acc; + NewPacket -> + TimeStamp = erlang:timestamp(), + Expire = find_x_expire(TimeStamp, NewPacket), + OffMsg = #offline_msg{us = {LUser, LServer}, + timestamp = TimeStamp, + expire = Expire, + from = From, + to = To, + packet = NewPacket}, + case store_offline_msg(OffMsg) of + ok -> + {offlined, NewPacket}; + {error, Reason} -> + discard_warn_sender(Packet, Reason), + stop + end + end; + _ -> + maybe_update_cache(To, Packet), + Acc + end; + false -> + maybe_update_cache(To, Packet), + Acc + end. + +-spec maybe_update_cache(jid(), message()) -> ok. +maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) -> + case xmpp:get_meta(Packet, mam_archived, false) of + true -> + Mod = gen_mod:db_mod(Server, ?MODULE), + case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + _ -> + ok end; - false -> ok + _ -> + ok end. +-spec check_store_hint(message()) -> store | no_store | none. check_store_hint(Packet) -> case has_store_hint(Packet) of true -> @@ -502,164 +555,191 @@ check_store_hint(Packet) -> end end. +-spec has_store_hint(message()) -> boolean(). has_store_hint(Packet) -> - fxml:get_subtag_with_xmlns(Packet, <<"store">>, ?NS_HINTS) =/= false. + xmpp:has_subtag(Packet, #hint{type = 'store'}). +-spec has_no_store_hint(message()) -> boolean(). has_no_store_hint(Packet) -> - fxml:get_subtag_with_xmlns(Packet, <<"no-store">>, ?NS_HINTS) =/= false - orelse - fxml:get_subtag_with_xmlns(Packet, <<"no-storage">>, ?NS_HINTS) =/= false. - -has_offline_tag(Packet) -> - fxml:get_subtag_with_xmlns(Packet, <<"offline">>, ?NS_FLEX_OFFLINE) =/= false. + xmpp:has_subtag(Packet, #hint{type = 'no-store'}) + orelse + xmpp:has_subtag(Packet, #hint{type = 'no-storage'}). %% Check if the packet has any content about XEP-0022 -check_event(From, To, Packet) -> - #xmlel{name = Name, attrs = Attrs, children = Els} = - Packet, - case find_x_event(Els) of - false -> true; - El -> - case fxml:get_subtag(El, <<"id">>) of - false -> - case fxml:get_subtag(El, <<"offline">>) of - false -> true; - _ -> - ID = case fxml:get_tag_attr_s(<<"id">>, Packet) of - <<"">> -> - #xmlel{name = <<"id">>, attrs = [], - children = []}; - S -> - #xmlel{name = <<"id">>, attrs = [], - children = [{xmlcdata, S}]} - end, - ejabberd_router:route(To, From, - #xmlel{name = Name, attrs = Attrs, - children = - [#xmlel{name = <<"x">>, - attrs = - [{<<"xmlns">>, - ?NS_EVENT}], - children = - [ID, - #xmlel{name - = - <<"offline">>, - attrs - = - [], - children - = - []}]}]}), - true - end; - _ -> false - end +-spec check_event(message()) -> boolean(). +check_event(#message{from = From, to = To, id = ID, type = Type} = Msg) -> + case xmpp:get_subtag(Msg, #xevent{}) of + false -> + true; + #xevent{id = undefined, offline = false} -> + true; + #xevent{id = undefined, offline = true} -> + NewMsg = #message{from = To, to = From, id = ID, type = Type, + sub_els = [#xevent{id = ID, offline = true}]}, + ejabberd_router:route(NewMsg), + true; + _ -> + false end. -%% Check if the packet has subelements about XEP-0022 -find_x_event([]) -> false; -find_x_event([{xmlcdata, _} | Els]) -> - find_x_event(Els); -find_x_event([El | Els]) -> - case fxml:get_tag_attr_s(<<"xmlns">>, El) of - ?NS_EVENT -> El; - _ -> find_x_event(Els) +-spec find_x_expire(erlang:timestamp(), message()) -> erlang:timestamp() | never. +find_x_expire(TimeStamp, Msg) -> + case xmpp:get_subtag(Msg, #expire{seconds = 0}) of + #expire{seconds = Int} -> + {MegaSecs, Secs, MicroSecs} = TimeStamp, + S = MegaSecs * 1000000 + Secs + Int, + MegaSecs1 = S div 1000000, + Secs1 = S rem 1000000, + {MegaSecs1, Secs1, MicroSecs}; + false -> + never end. -find_x_expire(_, []) -> never; -find_x_expire(TimeStamp, [{xmlcdata, _} | Els]) -> - find_x_expire(TimeStamp, Els); -find_x_expire(TimeStamp, [El | Els]) -> - case fxml:get_tag_attr_s(<<"xmlns">>, El) of - ?NS_EXPIRE -> - Val = fxml:get_tag_attr_s(<<"seconds">>, El), - case catch jlib:binary_to_integer(Val) of - {'EXIT', _} -> never; - Int when Int > 0 -> - {MegaSecs, Secs, MicroSecs} = TimeStamp, - S = MegaSecs * 1000000 + Secs + Int, - MegaSecs1 = S div 1000000, - Secs1 = S rem 1000000, - {MegaSecs1, Secs1, MicroSecs}; - _ -> never - end; - _ -> find_x_expire(TimeStamp, Els) - end. +c2s_self_presence({_Pres, #{resend_offline := false}} = Acc) -> + Acc; +c2s_self_presence({#presence{type = available} = NewPres, State} = Acc) -> + NewPrio = get_priority_from_presence(NewPres), + LastPrio = case maps:get(pres_last, State, undefined) of + undefined -> -1; + LastPres -> get_priority_from_presence(LastPres) + end, + if LastPrio < 0 andalso NewPrio >= 0 -> + route_offline_messages(State); + true -> + ok + end, + Acc; +c2s_self_presence(Acc) -> + Acc. -resend_offline_messages(User, Server) -> - LUser = jid:nodeprep(User), - LServer = jid:nameprep(Server), +-spec route_offline_messages(c2s_state()) -> ok. +route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - case Mod:pop_messages(LUser, LServer) of - {ok, Rs} -> - lists:foreach(fun (R) -> - ejabberd_sm ! offline_msg_to_route(LServer, R) - end, - lists:keysort(#offline_msg.timestamp, Rs)); - _ -> ok + Msgs = case Mod:pop_messages(LUser, LServer) of + {ok, OffMsgs} -> + case use_mam_for_user(LUser, LServer) of + true -> + flush_cache(Mod, LUser, LServer), + lists:map( + fun({_, #message{from = From, to = To} = Msg}) -> + #offline_msg{from = From, to = To, + us = {LUser, LServer}, + packet = Msg} + end, read_mam_messages(LUser, LServer, OffMsgs)); + _ -> + flush_cache(Mod, LUser, LServer), + OffMsgs + end; + _ -> + [] + end, + lists:foreach( + fun(OffMsg) -> + route_offline_message(State, OffMsg) + end, Msgs). + +-spec route_offline_message(c2s_state(), #offline_msg{}) -> ok. +route_offline_message(#{lserver := LServer} = State, + #offline_msg{expire = Expire} = OffMsg) -> + case offline_msg_to_route(LServer, OffMsg) of + error -> + ok; + {route, Msg} -> + case is_message_expired(Expire, Msg) of + true -> + ok; + false -> + case privacy_check_packet(State, Msg, in) of + allow -> ejabberd_router:route(Msg); + deny -> ok + end + end end. -pop_offline_messages(Ls, User, Server) -> - LUser = jid:nodeprep(User), - LServer = jid:nameprep(Server), - Mod = gen_mod:db_mod(LServer, ?MODULE), - case Mod:pop_messages(LUser, LServer) of - {ok, Rs} -> - TS = p1_time_compat:timestamp(), - Ls ++ - lists:map(fun (R) -> - offline_msg_to_route(LServer, R) - end, - lists:filter( - fun(#offline_msg{packet = Pkt} = R) -> - #xmlel{children = Els} = Pkt, - Expire = case R#offline_msg.expire of - undefined -> - find_x_expire(TS, Els); - Exp -> - Exp - end, - case Expire of - never -> true; - TimeStamp -> TS < TimeStamp - end - end, Rs)); - _ -> - Ls - end. +-spec is_message_expired(erlang:timestamp() | never, message()) -> boolean(). +is_message_expired(Expire, Msg) -> + TS = erlang:timestamp(), + Expire1 = case Expire of + undefined -> find_x_expire(TS, Msg); + _ -> Expire + end, + Expire1 /= never andalso Expire1 =< TS. + +-spec privacy_check_packet(c2s_state(), stanza(), in | out) -> allow | deny. +privacy_check_packet(#{lserver := LServer} = State, Pkt, Dir) -> + ejabberd_hooks:run_fold(privacy_check_packet, + LServer, allow, [State, Pkt, Dir]). remove_expired_messages(Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_expired_messages(LServer). + case erlang:function_exported(Mod, remove_expired_messages, 1) of + true -> + Ret = Mod:remove_expired_messages(LServer), + ets_cache:clear(?SPOOL_COUNTER_CACHE), + Ret; + false -> + erlang:error(not_implemented) + end. remove_old_messages(Days, Server) -> LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_old_messages(Days, LServer). + case erlang:function_exported(Mod, remove_old_messages, 2) of + true -> + Ret = Mod:remove_old_messages(Days, LServer), + ets_cache:clear(?SPOOL_COUNTER_CACHE), + Ret; + false -> + erlang:error(not_implemented) + end. +-spec remove_user(binary(), binary()) -> ok. remove_user(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_user(LUser, LServer). + Mod:remove_user(LUser, LServer), + flush_cache(Mod, LUser, LServer). %% Helper functions: +-spec check_if_message_should_be_bounced(message()) -> boolean(). +check_if_message_should_be_bounced(Packet) -> + case Packet of + #message{type = groupchat, to = #jid{lserver = LServer}} -> + mod_offline_opt:bounce_groupchat(LServer); + #message{to = #jid{lserver = LServer}} -> + case misc:is_mucsub_message(Packet) of + true -> + mod_offline_opt:bounce_groupchat(LServer); + _ -> + true + end; + _ -> + true + end. + %% Warn senders that their messages have been discarded: -discard_warn_sender(Msgs) -> - lists:foreach(fun (#offline_msg{from = From, to = To, - packet = Packet}) -> - ErrText = <<"Your contact offline message queue is " - "full. The message has been discarded.">>, - Lang = fxml:get_tag_attr_s(<<"xml:lang">>, Packet), - Err = jlib:make_error_reply(Packet, - ?ERRT_RESOURCE_CONSTRAINT(Lang, - ErrText)), - ejabberd_router:route(To, From, Err) + +-spec discard_warn_sender(message(), full | any()) -> ok. +discard_warn_sender(Packet, Reason) -> + case check_if_message_should_be_bounced(Packet) of + true -> + Lang = xmpp:get_lang(Packet), + Err = case Reason of + full -> + ErrText = ?T("Your contact offline message queue is " + "full. The message has been discarded."), + xmpp:err_resource_constraint(ErrText, Lang); + _ -> + ErrText = ?T("Database failure"), + xmpp:err_internal_server_error(ErrText, Lang) end, - Msgs). + ejabberd_router:route_error(Packet, Err); + _ -> + ok + end. webadmin_page(_, Host, #request{us = _US, path = [<<"user">>, U, <<"queue">>], @@ -669,51 +749,223 @@ webadmin_page(_, Host, webadmin_page(Acc, _, _) -> Acc. get_offline_els(LUser, LServer) -> - Mod = gen_mod:db_mod(LServer, ?MODULE), - Hdrs = Mod:read_message_headers(LUser, LServer), - lists:map( - fun({_Seq, From, To, Packet}) -> - jlib:replace_from_to(From, To, Packet) - end, Hdrs). + [Packet || {_Seq, Packet} <- read_messages(LUser, LServer)]. + +-spec offline_msg_to_route(binary(), #offline_msg{}) -> + {route, message()} | error. +offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) -> + CodecOpts = ejabberd_config:codec_options(), + try xmpp:decode(R#offline_msg.packet, ?NS_CLIENT, CodecOpts) of + Pkt -> + Pkt1 = xmpp:set_from_to(Pkt, From, To), + Pkt2 = add_delay_info(Pkt1, LServer, R#offline_msg.timestamp), + {route, Pkt2} + catch _:{xmpp_codec, Why} -> + ?ERROR_MSG("Failed to decode packet ~p of user ~ts: ~ts", + [R#offline_msg.packet, jid:encode(To), + xmpp:format_error(Why)]), + error + end. -offline_msg_to_route(LServer, #offline_msg{} = R) -> - El = case R#offline_msg.timestamp of - undefined -> - R#offline_msg.packet; - TS -> - jlib:add_delay_info(R#offline_msg.packet, LServer, TS, - <<"Offline Storage">>) - end, - {route, R#offline_msg.from, R#offline_msg.to, El}. +-spec read_messages(binary(), binary()) -> [{binary(), message()}]. +read_messages(LUser, LServer) -> + Res = case read_db_messages(LUser, LServer) of + error -> + []; + L when is_list(L) -> + L + end, + case use_mam_for_user(LUser, LServer) of + true -> + read_mam_messages(LUser, LServer, Res); + _ -> + Res + end. -read_message_headers(LUser, LServer) -> +-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error. +read_db_messages(LUser, LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), - lists:map( - fun({Seq, From, To, El}) -> - Node = integer_to_binary(Seq), - {Node, From, To, El} - end, Mod:read_message_headers(LUser, LServer)). + CodecOpts = ejabberd_config:codec_options(), + case Mod:read_message_headers(LUser, LServer) of + error -> + error; + L -> + lists:flatmap( + fun({Seq, From, To, TS, El}) -> + Node = integer_to_binary(Seq), + try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of + Pkt -> + Node = integer_to_binary(Seq), + Pkt1 = add_delay_info(Pkt, LServer, TS), + Pkt2 = xmpp:set_from_to(Pkt1, From, To), + [{Node, Pkt2}] + catch _:{xmpp_codec, Why} -> + ?ERROR_MSG("Failed to decode packet ~p " + "of user ~ts: ~ts", + [El, jid:encode(To), + xmpp:format_error(Why)]), + [] + end + end, L) + end. + +-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) -> + {integer() | none, [message()]}. +parse_marker_messages(LServer, ReadMsgs) -> + {Timestamp, ExtraMsgs} = lists:foldl( + fun({_Node, #message{id = <<"ActivityMarker">>, + body = [], type = error} = Msg}, {T, E}) -> + case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of + #delay{stamp = Time} -> + if T == none orelse T > Time -> + {Time, E}; + true -> + {T, E} + end + end; + (#offline_msg{from = From, to = To, timestamp = TS, packet = Pkt}, + {T, E}) -> + try xmpp:decode(Pkt) of + #message{id = <<"ActivityMarker">>, + body = [], type = error} = Msg -> + TS2 = case TS of + undefined -> + case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of + #delay{stamp = TS0} -> + TS0; + _ -> + erlang:timestamp() + end; + _ -> + TS + end, + if T == none orelse T > TS2 -> + {TS2, E}; + true -> + {T, E} + end; + Decoded -> + Pkt1 = add_delay_info(Decoded, LServer, TS), + {T, [xmpp:set_from_to(Pkt1, From, To) | E]} + catch _:{xmpp_codec, _Why} -> + {T, E} + end; + ({_Node, Msg}, {T, E}) -> + {T, [Msg | E]} + end, {none, []}, ReadMsgs), + Start = case {Timestamp, ExtraMsgs} of + {none, [First|_]} -> + case xmpp:get_subtag(First, #delay{stamp = {0,0,0}}) of + #delay{stamp = {Mega, Sec, Micro}} -> + {Mega, Sec, Micro+1}; + _ -> + none + end; + {none, _} -> + none; + _ -> + Timestamp + end, + {Start, ExtraMsgs}. + +-spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) -> + [{integer(), message()}]. +read_mam_messages(LUser, LServer, ReadMsgs) -> + {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs), + AllMsgs = case Start of + none -> + ExtraMsgs; + _ -> + MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of + Number when is_integer(Number) -> + max(0, Number - length(ExtraMsgs)); + infinity -> + undefined + end, + JID = jid:make(LUser, LServer, <<>>), + {MamMsgs, _, _} = mod_mam:select(LServer, JID, JID, + [{start, Start}], + #rsm_set{max = MaxOfflineMsgs, + before = <<"9999999999999999">>}, + chat, only_messages), + MamMsgs2 = lists:map( + fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) -> + add_delay_info(MM, LServer, MMT) + end, MamMsgs), + + ExtraMsgs ++ MamMsgs2 + end, + AllMsgs2 = lists:sort( + fun(A, B) -> + DA = case xmpp:get_subtag(A, #stanza_id{by = #jid{}}) of + #stanza_id{id = IDA} -> + IDA; + _ -> case xmpp:get_subtag(A, #delay{stamp = {0,0,0}}) of + #delay{stamp = STA} -> + integer_to_binary(misc:now_to_usec(STA)); + _ -> + <<"unknown">> + end + end, + DB = case xmpp:get_subtag(B, #stanza_id{by = #jid{}}) of + #stanza_id{id = IDB} -> + IDB; + _ -> case xmpp:get_subtag(B, #delay{stamp = {0,0,0}}) of + #delay{stamp = STB} -> + integer_to_binary(misc:now_to_usec(STB)); + _ -> + <<"unknown">> + end + end, + DA < DB + end, AllMsgs), + {AllMsgs3, _} = lists:mapfoldl( + fun(Msg, Counter) -> + {{Counter, Msg}, Counter + 1} + end, 1, AllMsgs2), + AllMsgs3. + +-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) -> + {cache, integer()} | {nocache, integer()}. +count_mam_messages(_LUser, _LServer, error) -> + {nocache, 0}; +count_mam_messages(LUser, LServer, ReadMsgs) -> + {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs), + case Start of + none -> + {cache, length(ExtraMsgs)}; + _ -> + MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of + Number when is_integer(Number) -> Number - length(ExtraMsgs); + infinity -> undefined + end, + JID = jid:make(LUser, LServer, <<>>), + {_, _, Count} = mod_mam:select(LServer, JID, JID, + [{start, Start}], + #rsm_set{max = MaxOfflineMsgs, + before = <<"9999999999999999">>}, + chat, only_count), + {cache, Count + length(ExtraMsgs)} + end. format_user_queue(Hdrs) -> lists:map( - fun({Seq, From, To, El}) -> + fun({Seq, From, To, TS, El}) -> ID = integer_to_binary(Seq), FPacket = ejabberd_web_admin:pretty_print_xml(El), - SFrom = jid:to_string(From), - STo = jid:to_string(To), - Stamp = fxml:get_path_s(El, [{elem, <<"delay">>}, - {attr, <<"stamp">>}]), - Time = case jlib:datetime_string_to_timestamp(Stamp) of + SFrom = jid:encode(From), + STo = jid:encode(To), + Time = case TS of + undefined -> + Stamp = fxml:get_path_s(El, [{elem, <<"delay">>}, + {attr, <<"stamp">>}]), + try xmpp_util:decode_timestamp(Stamp) of + {_, _, _} = Now -> format_time(Now) + catch _:_ -> + <<"">> + end; {_, _, _} = Now -> - {{Year, Month, Day}, {Hour, Minute, Second}} = - calendar:now_to_local_time(Now), - iolist_to_binary( - io_lib:format( - "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w", - [Year, Month, Day, Hour, Minute, - Second])); - _ -> - <<"">> + format_time(Now) end, ?XE(<<"tr">>, [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}], @@ -725,33 +977,35 @@ format_user_queue(Hdrs) -> [?XC(<<"pre">>, FPacket)])]) end, Hdrs). +format_time(Now) -> + {{Year, Month, Day}, {Hour, Minute, Second}} = calendar:now_to_local_time(Now), + str:format("~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w", + [Year, Month, Day, Hour, Minute, Second]). + user_queue(User, Server, Query, Lang) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), US = {LUser, LServer}, Mod = gen_mod:db_mod(LServer, ?MODULE), - Res = user_queue_parse_query(LUser, LServer, Query), - HdrsAll = Mod:read_message_headers(LUser, LServer), - Hdrs = get_messages_subset(US, Server, HdrsAll), + user_queue_parse_query(LUser, LServer, Query), + HdrsAll = case Mod:read_message_headers(LUser, LServer) of + error -> []; + L -> L + end, + Hdrs = get_messages_subset(User, Server, HdrsAll), FMsgs = format_user_queue(Hdrs), - [?XC(<<"h1">>, - list_to_binary(io_lib:format(?T(<<"~s's Offline Messages Queue">>), - [us_to_list(US)])))] - ++ - case Res of - ok -> [?XREST(<<"Submitted">>)]; - nothing -> [] - end - ++ + PageTitle = str:format(translate:translate(Lang, ?T("~ts's Offline Messages Queue")), [us_to_list(US)]), + (?H1GL(PageTitle, <<"mod-offline">>, <<"mod_offline">>)) + ++ [?XREST(?T("Submitted"))] ++ [?XAE(<<"form">>, [{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}], [?XE(<<"table">>, [?XE(<<"thead">>, [?XE(<<"tr">>, - [?X(<<"td">>), ?XCT(<<"td">>, <<"Time">>), - ?XCT(<<"td">>, <<"From">>), - ?XCT(<<"td">>, <<"To">>), - ?XCT(<<"td">>, <<"Packet">>)])]), + [?X(<<"td">>), ?XCT(<<"td">>, ?T("Time")), + ?XCT(<<"td">>, ?T("From")), + ?XCT(<<"td">>, ?T("To")), + ?XCT(<<"td">>, ?T("Packet"))])]), ?XE(<<"tbody">>, if FMsgs == [] -> [?XE(<<"tr">>, @@ -761,41 +1015,45 @@ user_queue(User, Server, Query, Lang) -> end)]), ?BR, ?INPUTT(<<"submit">>, <<"delete">>, - <<"Delete Selected">>)])]. + ?T("Delete Selected"))])]. user_queue_parse_query(LUser, LServer, Query) -> Mod = gen_mod:db_mod(LServer, ?MODULE), case lists:keysearch(<<"delete">>, 1, Query) of {value, _} -> - case lists:keyfind(<<"selected">>, 1, Query) of - {_, Seq} -> - case catch binary_to_integer(Seq) of - I when is_integer(I), I>=0 -> - Mod:remove_message(LUser, LServer, I), - ok; - _ -> - nothing - end; + case user_queue_parse_query(LUser, LServer, Query, Mod, false) of + true -> + flush_cache(Mod, LUser, LServer); false -> - nothing + ok end; _ -> - nothing + ok + end. + +user_queue_parse_query(LUser, LServer, Query, Mod, Acc) -> + case lists:keytake(<<"selected">>, 1, Query) of + {value, {_, Seq}, Query2} -> + NewAcc = case catch binary_to_integer(Seq) of + I when is_integer(I), I>=0 -> + Mod:remove_message(LUser, LServer, I), + true; + _ -> + Acc + end, + user_queue_parse_query(LUser, LServer, Query2, Mod, NewAcc); + false -> + Acc end. us_to_list({User, Server}) -> - jid:to_string({User, Server, <<"">>}). + jid:encode({User, Server, <<"">>}). get_queue_length(LUser, LServer) -> count_offline_messages(LUser, LServer). get_messages_subset(User, Host, MsgsAll) -> - Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, - fun(A) when is_atom(A) -> A end, - max_user_offline_messages), - MaxOfflineMsgs = case get_max_user_messages(Access, - User, Host) - of + MaxOfflineMsgs = case get_max_user_messages(User, Host) of Number when is_integer(Number) -> Number; _ -> 100 end, @@ -809,7 +1067,7 @@ get_messages_subset2(Max, Length, MsgsAll) -> {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), - NoJID = jid:make(<<"...">>, <<"...">>, <<"">>), + NoJID = jid:make(<<"...">>, <<"...">>), Seq = <<"0">>, IntermediateMsg = #xmlel{name = <<"...">>, attrs = [], children = []}, @@ -819,61 +1077,167 @@ webadmin_user(Acc, User, Server, Lang) -> QueueLen = count_offline_messages(jid:nodeprep(User), jid:nameprep(Server)), FQueueLen = [?AC(<<"queue/">>, - (iolist_to_binary(integer_to_list(QueueLen))))], + (integer_to_binary(QueueLen)))], Acc ++ - [?XCT(<<"h3">>, <<"Offline Messages:">>)] ++ + [?XCT(<<"h3">>, ?T("Offline Messages:"))] ++ FQueueLen ++ [?C(<<" ">>), ?INPUTT(<<"submit">>, <<"removealloffline">>, - <<"Remove All Offline Messages">>)]. + ?T("Remove All Offline Messages"))]. +-spec delete_all_msgs(binary(), binary()) -> {atomic, any()}. delete_all_msgs(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:remove_all_messages(LUser, LServer). + Ret = Mod:remove_all_messages(LUser, LServer), + flush_cache(Mod, LUser, LServer), + Ret. webadmin_user_parse_query(_, <<"removealloffline">>, User, Server, _Query) -> case delete_all_msgs(User, Server) of - {aborted, Reason} -> - ?ERROR_MSG("Failed to remove offline messages: ~p", - [Reason]), - {stop, error}; - {atomic, ok} -> - ?INFO_MSG("Removed all offline messages for ~s@~s", - [User, Server]), - {stop, ok} + {atomic, ok} -> + ?INFO_MSG("Removed all offline messages for ~ts@~ts", + [User, Server]), + {stop, ok}; + Err -> + ?ERROR_MSG("Failed to remove offline messages: ~p", + [Err]), + {stop, error} end; webadmin_user_parse_query(Acc, _Action, _User, _Server, _Query) -> Acc. %% Returns as integer the number of offline messages for a given user +-spec count_offline_messages(binary(), binary()) -> non_neg_integer(). count_offline_messages(User, Server) -> LUser = jid:nodeprep(User), LServer = jid:nameprep(Server), Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:count_messages(LUser, LServer). + case use_mam_for_user(User, Server) of + true -> + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?SPOOL_COUNTER_CACHE, {LUser, LServer}, + fun() -> + Res = read_db_messages(LUser, LServer), + count_mam_messages(LUser, LServer, Res) + end); + false -> + Res = read_db_messages(LUser, LServer), + ets_cache:untag(count_mam_messages(LUser, LServer, Res)) + end; + _ -> + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?SPOOL_COUNTER_CACHE, {LUser, LServer}, + fun() -> + Mod:count_messages(LUser, LServer) + end); + false -> + ets_cache:untag(Mod:count_messages(LUser, LServer)) + end + end. + +-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}. +store_message_in_db(Mod, #offline_msg{us = {User, Server}} = Msg) -> + case Mod:store_message(Msg) of + ok -> + case use_cache(Mod, Server) of + true -> + ets_cache:incr( + ?SPOOL_COUNTER_CACHE, + {User, Server}, 1, + cache_nodes(Mod, Server)); + false -> + ok + end; + Err -> + Err + end. + +-spec add_delay_info(message(), binary(), + undefined | erlang:timestamp()) -> message(). +add_delay_info(Packet, LServer, TS) -> + NewTS = case TS of + undefined -> erlang:timestamp(); + _ -> TS + end, + Packet1 = xmpp:put_meta(Packet, from_offline, true), + misc:add_delay_info(Packet1, jid:make(LServer), NewTS, + <<"Offline storage">>). + +-spec get_priority_from_presence(presence()) -> integer(). +get_priority_from_presence(#presence{priority = Prio}) -> + case Prio of + undefined -> 0; + _ -> Prio + end. export(LServer) -> Mod = gen_mod:db_mod(LServer, ?MODULE), Mod:export(LServer). -import(LServer) -> - Mod = gen_mod:db_mod(LServer, ?MODULE), - Mod:import(LServer). +import_info() -> + [{<<"spool">>, 4}]. -import(LServer, DBType, Data) -> +import_start(LServer, DBType) -> Mod = gen_mod:db_mod(DBType, ?MODULE), - Mod:import(LServer, Data). + Mod:import(LServer, []). + +import(LServer, {sql, _}, DBType, <<"spool">>, + [LUser, XML, _Seq, _TimeStamp]) -> + El = fxml_stream:parse_element(XML), + #message{from = From, to = To} = Msg = xmpp:decode(El, ?NS_CLIENT, [ignore_els]), + TS = case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of + #delay{stamp = {MegaSecs, Secs, _}} -> + {MegaSecs, Secs, 0}; + false -> + erlang:timestamp() + end, + US = {LUser, LServer}, + Expire = find_x_expire(TS, Msg), + OffMsg = #offline_msg{us = US, packet = El, + from = From, to = To, + timestamp = TS, expire = Expire}, + Mod = gen_mod:db_mod(DBType, ?MODULE), + Mod:import(OffMsg). + +use_mam_for_user(_User, Server) -> + mod_offline_opt:use_mam_for_storage(Server). mod_opt_type(access_max_user_messages) -> - fun acl:shaper_rules_validator/1; -mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end; + econf:shaper(); +mod_opt_type(store_groupchat) -> + econf:bool(); +mod_opt_type(bounce_groupchat) -> + econf:bool(); +mod_opt_type(use_mam_for_storage) -> + econf:bool(); mod_opt_type(store_empty_body) -> - fun (V) when is_boolean(V) -> V; - (unless_chat_state) -> unless_chat_state - end; -mod_opt_type(_) -> - [access_max_user_messages, db_type, store_empty_body]. + econf:either( + unless_chat_state, + econf:bool()); +mod_opt_type(db_type) -> + econf:db_type(?MODULE); +mod_opt_type(use_cache) -> + econf:bool(); +mod_opt_type(cache_size) -> + econf:pos_int(infinity); +mod_opt_type(cache_life_time) -> + econf:timeout(second, infinity). + +mod_options(Host) -> + [{db_type, ejabberd_config:default_db(Host, ?MODULE)}, + {access_max_user_messages, max_user_offline_messages}, + {store_empty_body, unless_chat_state}, + {use_mam_for_storage, false}, + {bounce_groupchat, false}, + {store_groupchat, false}, + {use_cache, ejabberd_option:use_cache(Host)}, + {cache_size, ejabberd_option:cache_size(Host)}, + {cache_life_time, ejabberd_option:cache_life_time(Host)}]. |