aboutsummaryrefslogtreecommitdiff
path: root/src/mod_offline.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r--src/mod_offline.erl1384
1 files changed, 874 insertions, 510 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 87a136853..c3fca8868 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -5,7 +5,7 @@
%%% Created : 5 Jan 2003 by Alexey Shchepin <alexey@process-one.net>
%%%
%%%
-%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
+%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
@@ -33,47 +33,43 @@
-protocol({xep, 160, '1.0'}).
-protocol({xep, 334, '0.2'}).
--define(GEN_SERVER, p1_server).
--behaviour(?GEN_SERVER).
-
-behaviour(gen_mod).
-export([start/2,
- start_link/2,
stop/1,
- store_packet/3,
- store_offline_msg/5,
- resend_offline_messages/2,
- pop_offline_messages/3,
+ reload/3,
+ store_packet/1,
+ store_offline_msg/1,
+ c2s_self_presence/1,
get_sm_features/5,
get_sm_identity/5,
get_sm_items/5,
get_info/5,
- handle_offline_query/3,
+ handle_offline_query/1,
remove_expired_messages/1,
remove_old_messages/2,
remove_user/2,
- import/1,
- import/3,
+ import_info/0,
+ import_start/2,
+ import/5,
export/1,
get_queue_length/2,
count_offline_messages/2,
get_offline_els/2,
find_x_expire/2,
+ c2s_handle_info/2,
+ c2s_copy_session/2,
webadmin_page/3,
webadmin_user/4,
webadmin_user_parse_query/5]).
--export([init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3,
- mod_opt_type/1, depends/2]).
+-export([mod_opt_type/1, mod_options/1, depends/2]).
-deprecated({get_queue_length,2}).
--include("ejabberd.hrl").
-include("logger.hrl").
--include("jlib.hrl").
+-include("xmpp.hrl").
-include("ejabberd_http.hrl").
@@ -81,70 +77,51 @@
-include("mod_offline.hrl").
--define(PROCNAME, ejabberd_offline).
+-include("translate.hrl").
-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
%% default value for the maximum number of user messages
-define(MAX_USER_MESSAGES, infinity).
--type us() :: {binary(), binary()}.
+-define(SPOOL_COUNTER_CACHE, offline_msg_counter_cache).
+
+-type c2s_state() :: ejabberd_c2s:state().
+
-callback init(binary(), gen_mod:opts()) -> any().
--callback import(binary(), #offline_msg{}) -> ok | pass.
--callback store_messages(binary(), us(), [#offline_msg{}],
- non_neg_integer(), non_neg_integer()) ->
- {atomic, any()}.
+-callback import(#offline_msg{}) -> ok.
+-callback store_message(#offline_msg{}) -> ok | {error, any()}.
-callback pop_messages(binary(), binary()) ->
- {atomic, [#offline_msg{}]} | {aborted, any()}.
+ {ok, [#offline_msg{}]} | {error, any()}.
-callback remove_expired_messages(binary()) -> {atomic, any()}.
-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
--callback remove_user(binary(), binary()) -> {atomic, any()}.
--callback read_message_headers(binary(), binary()) -> any().
+-callback remove_user(binary(), binary()) -> any().
+-callback read_message_headers(binary(), binary()) ->
+ [{non_neg_integer(), jid(), jid(), undefined | erlang:timestamp(), xmlel()}] | error.
-callback read_message(binary(), binary(), non_neg_integer()) ->
{ok, #offline_msg{}} | error.
--callback remove_message(binary(), binary(), non_neg_integer()) -> ok.
+-callback remove_message(binary(), binary(), non_neg_integer()) -> ok | {error, any()}.
-callback read_all_messages(binary(), binary()) -> [#offline_msg{}].
-callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
--callback count_messages(binary(), binary()) -> non_neg_integer().
-
-start_link(Host, Opts) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
- ?GEN_SERVER:start_link({local, Proc}, ?MODULE,
- [Host, Opts], []).
-
-start(Host, Opts) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
- ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
- transient, 1000, worker, [?MODULE]},
- supervisor:start_child(ejabberd_sup, ChildSpec).
+-callback count_messages(binary(), binary()) -> {ets_cache:tag(), non_neg_integer()}.
+-callback use_cache(binary()) -> boolean().
+-callback cache_nodes(binary()) -> [node()].
-stop(Host) ->
- Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
- catch ?GEN_SERVER:call(Proc, stop),
- supervisor:terminate_child(ejabberd_sup, Proc),
- supervisor:delete_child(ejabberd_sup, Proc),
- ok.
+-optional_callbacks([remove_expired_messages/1, remove_old_messages/2,
+ use_cache/1, cache_nodes/1]).
depends(_Host, _Opts) ->
[].
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-init([Host, Opts]) ->
- Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
+start(Host, Opts) ->
+ Mod = gen_mod:db_mod(Opts, ?MODULE),
Mod:init(Host, Opts),
- IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
- no_queue),
+ init_cache(Mod, Host, Opts),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
store_packet, 50),
- ejabberd_hooks:add(resend_offline_messages_hook, Host,
- ?MODULE, pop_offline_messages, 50),
+ ejabberd_hooks:add(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
ejabberd_hooks:add(remove_user, Host,
?MODULE, remove_user, 50),
- ejabberd_hooks:add(anonymous_purge_hook, Host,
- ?MODULE, remove_user, 50),
ejabberd_hooks:add(disco_sm_features, Host,
?MODULE, get_sm_features, 50),
ejabberd_hooks:add(disco_local_features, Host,
@@ -154,6 +131,8 @@ init([Host, Opts]) ->
ejabberd_hooks:add(disco_sm_items, Host,
?MODULE, get_sm_items, 50),
ejabberd_hooks:add(disco_info, Host, ?MODULE, get_info, 50),
+ ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50),
+ ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE, c2s_copy_session, 50),
ejabberd_hooks:add(webadmin_page_host, Host,
?MODULE, webadmin_page, 50),
ejabberd_hooks:add(webadmin_user, Host,
@@ -161,95 +140,117 @@ init([Host, Opts]) ->
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
- ?MODULE, handle_offline_query, IQDisc),
- AccessMaxOfflineMsgs =
- gen_mod:get_opt(access_max_user_messages, Opts,
- fun acl:shaper_rules_validator/1,
- max_user_offline_messages),
- {ok,
- #state{host = Host,
- access_max_offline_messages = AccessMaxOfflineMsgs}}.
-
-
-handle_call(stop, _From, State) ->
- {stop, normal, ok, State}.
+ ?MODULE, handle_offline_query).
-
-handle_cast(_Msg, State) -> {noreply, State}.
-
-
-handle_info(#offline_msg{us = UserServer} = Msg, State) ->
- #state{host = Host,
- access_max_offline_messages = AccessMaxOfflineMsgs} = State,
- DBType = gen_mod:db_type(Host, ?MODULE),
- Msgs = receive_all(UserServer, [Msg], DBType),
- Len = length(Msgs),
- MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs,
- UserServer, Host),
- store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs),
- {noreply, State};
-
-handle_info(_Info, State) ->
- ?ERROR_MSG("got unexpected info: ~p", [_Info]),
- {noreply, State}.
-
-
-terminate(_Reason, State) ->
- Host = State#state.host,
+stop(Host) ->
ejabberd_hooks:delete(offline_message_hook, Host,
?MODULE, store_packet, 50),
- ejabberd_hooks:delete(resend_offline_messages_hook,
- Host, ?MODULE, pop_offline_messages, 50),
+ ejabberd_hooks:delete(c2s_self_presence, Host, ?MODULE, c2s_self_presence, 50),
ejabberd_hooks:delete(remove_user, Host, ?MODULE,
remove_user, 50),
- ejabberd_hooks:delete(anonymous_purge_hook, Host,
- ?MODULE, remove_user, 50),
ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50),
ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50),
ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, get_sm_identity, 50),
ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, get_sm_items, 50),
ejabberd_hooks:delete(disco_info, Host, ?MODULE, get_info, 50),
+ ejabberd_hooks:delete(c2s_handle_info, Host, ?MODULE, c2s_handle_info, 50),
+ ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE, c2s_copy_session, 50),
ejabberd_hooks:delete(webadmin_page_host, Host,
?MODULE, webadmin_page, 50),
ejabberd_hooks:delete(webadmin_user, Host,
?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
- gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE),
- ok.
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
+
+reload(Host, NewOpts, OldOpts) ->
+ NewMod = gen_mod:db_mod(NewOpts, ?MODULE),
+ OldMod = gen_mod:db_mod(OldOpts, ?MODULE),
+ init_cache(NewMod, Host, NewOpts),
+ if NewMod /= OldMod ->
+ NewMod:init(Host, NewOpts);
+ true ->
+ ok
+ end.
+init_cache(Mod, Host, Opts) ->
+ CacheOpts = [{max_size, mod_offline_opt:cache_size(Opts)},
+ {life_time, mod_offline_opt:cache_life_time(Opts)},
+ {cache_missed, false}],
+ case use_cache(Mod, Host) of
+ true ->
+ ets_cache:new(?SPOOL_COUNTER_CACHE, CacheOpts);
+ false ->
+ ets_cache:delete(?SPOOL_COUNTER_CACHE)
+ end.
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
+-spec use_cache(module(), binary()) -> boolean().
+use_cache(Mod, Host) ->
+ case erlang:function_exported(Mod, use_cache, 1) of
+ true -> Mod:use_cache(Host);
+ false -> mod_offline_opt:use_cache(Host)
+ end.
-store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) ->
- Mod = gen_mod:db_mod(Host, ?MODULE),
- case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of
- {atomic, discard} ->
- discard_warn_sender(Msgs);
- _ ->
+-spec cache_nodes(module(), binary()) -> [node()].
+cache_nodes(Mod, Host) ->
+ case erlang:function_exported(Mod, cache_nodes, 1) of
+ true -> Mod:cache_nodes(Host);
+ false -> ejabberd_cluster:get_nodes()
+ end.
+
+-spec flush_cache(module(), binary(), binary()) -> ok.
+flush_cache(Mod, User, Server) ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:delete(?SPOOL_COUNTER_CACHE,
+ {User, Server},
+ cache_nodes(Mod, Server));
+ false ->
ok
end.
-get_max_user_messages(AccessRule, {User, Server}, Host) ->
- case acl:match_rule(
- Host, AccessRule, jid:make(User, Server, <<"">>)) of
+-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
+store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
+ UseMam = use_mam_for_user(User, Server),
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case UseMam andalso xmpp:get_meta(Pkt, mam_archived, false) of
+ true ->
+ case count_offline_messages(User, Server) of
+ 0 ->
+ store_message_in_db(Mod, Msg);
+ _ ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ false ->
+ ok
+ end
+ end;
+ false ->
+ case get_max_user_messages(User, Server) of
+ infinity ->
+ store_message_in_db(Mod, Msg);
+ Limit ->
+ Num = count_offline_messages(User, Server),
+ if Num < Limit ->
+ store_message_in_db(Mod, Msg);
+ true ->
+ {error, full}
+ end
+ end
+ end.
+
+get_max_user_messages(User, Server) ->
+ Access = mod_offline_opt:access_max_user_messages(Server),
+ case ejabberd_shaper:match(Server, Access, jid:make(User, Server)) of
Max when is_integer(Max) -> Max;
infinity -> infinity;
_ -> ?MAX_USER_MESSAGES
end.
-receive_all(US, Msgs, DBType) ->
- receive
- #offline_msg{us = US} = Msg ->
- receive_all(US, [Msg | Msgs], DBType)
- after 0 ->
- case DBType of
- mnesia -> Msgs;
- sql -> lists:reverse(Msgs);
- riak -> Msgs
- end
- end.
-
get_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
Feats = case Acc of
{result, I} -> I;
@@ -268,152 +269,157 @@ get_sm_features(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}
get_sm_features(Acc, _From, _To, _Node, _Lang) ->
Acc.
-get_sm_identity(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
+get_sm_identity(Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
?NS_FLEX_OFFLINE, _Lang) ->
- Identity = #xmlel{name = <<"identity">>,
- attrs = [{<<"category">>, <<"automation">>},
- {<<"type">>, <<"message-list">>}]},
- [Identity];
+ [#identity{category = <<"automation">>,
+ type = <<"message-list">>}|Acc];
get_sm_identity(Acc, _From, _To, _Node, _Lang) ->
Acc.
-get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID,
+get_sm_items(_Acc, #jid{luser = U, lserver = S} = JID,
#jid{luser = U, lserver = S},
?NS_FLEX_OFFLINE, _Lang) ->
- case ejabberd_sm:get_session_pid(U, S, R) of
- Pid when is_pid(Pid) ->
- Hdrs = read_message_headers(U, S),
- BareJID = jid:to_string(jid:remove_resource(JID)),
- Pid ! dont_ask_offline,
+ ejabberd_sm:route(JID, {resend_offline, false}),
+ Mod = gen_mod:db_mod(S, ?MODULE),
+ Hdrs = case Mod:read_message_headers(U, S) of
+ L when is_list(L) ->
+ L;
+ _ ->
+ []
+ end,
+ BareJID = jid:remove_resource(JID),
{result, lists:map(
- fun({Node, From, _To, _El}) ->
- #xmlel{name = <<"item">>,
- attrs = [{<<"jid">>, BareJID},
- {<<"node">>, Node},
- {<<"name">>, jid:to_string(From)}]}
+ fun({Seq, From, _To, _TS, _El}) ->
+ Node = integer_to_binary(Seq),
+ #disco_item{jid = BareJID,
+ node = Node,
+ name = jid:encode(From)}
end, Hdrs)};
- none ->
- {result, []}
- end;
get_sm_items(Acc, _From, _To, _Node, _Lang) ->
Acc.
-get_info(_Acc, #jid{luser = U, lserver = S, lresource = R},
- #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, _Lang) ->
- N = jlib:integer_to_binary(count_offline_messages(U, S)),
- case ejabberd_sm:get_session_pid(U, S, R) of
- Pid when is_pid(Pid) ->
- Pid ! dont_ask_offline;
- none ->
- ok
- end,
- [#xmlel{name = <<"x">>,
- attrs = [{<<"xmlns">>, ?NS_XDATA},
- {<<"type">>, <<"result">>}],
- children = [#xmlel{name = <<"field">>,
- attrs = [{<<"var">>, <<"FORM_TYPE">>},
- {<<"type">>, <<"hidden">>}],
- children = [#xmlel{name = <<"value">>,
- children = [{xmlcdata,
- ?NS_FLEX_OFFLINE}]}]},
- #xmlel{name = <<"field">>,
- attrs = [{<<"var">>, <<"number_of_messages">>}],
- children = [#xmlel{name = <<"value">>,
- children = [{xmlcdata, N}]}]}]}];
+-spec get_info([xdata()], binary(), module(), binary(), binary()) -> [xdata()];
+ ([xdata()], jid(), jid(), binary(), binary()) -> [xdata()].
+get_info(_Acc, #jid{luser = U, lserver = S} = JID,
+ #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, Lang) ->
+ ejabberd_sm:route(JID, {resend_offline, false}),
+ [#xdata{type = result,
+ fields = flex_offline:encode(
+ [{number_of_messages, count_offline_messages(U, S)}],
+ Lang)}];
get_info(Acc, _From, _To, _Node, _Lang) ->
Acc.
-handle_offline_query(#jid{luser = U, lserver = S} = From,
- #jid{luser = U, lserver = S} = _To,
- #iq{type = Type, sub_el = SubEl} = IQ) ->
- case Type of
- get ->
- case fxml:get_subtag(SubEl, <<"fetch">>) of
- #xmlel{} ->
- handle_offline_fetch(From);
- false ->
- handle_offline_items_view(From, SubEl)
+-spec c2s_handle_info(c2s_state(), term()) -> c2s_state().
+c2s_handle_info(State, {resend_offline, Flag}) ->
+ {stop, State#{resend_offline => Flag}};
+c2s_handle_info(State, _) ->
+ State.
+
+-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state().
+c2s_copy_session(State, #{resend_offline := Flag}) ->
+ State#{resend_offline => Flag};
+c2s_copy_session(State, _) ->
+ State.
+
+-spec handle_offline_query(iq()) -> iq().
+handle_offline_query(#iq{from = #jid{luser = U1, lserver = S1},
+ to = #jid{luser = U2, lserver = S2},
+ lang = Lang,
+ sub_els = [#offline{}]} = IQ)
+ when {U1, S1} /= {U2, S2} ->
+ Txt = ?T("Query to another users is forbidden"),
+ xmpp:make_error(IQ, xmpp:err_forbidden(Txt, Lang));
+handle_offline_query(#iq{from = #jid{luser = U, lserver = S} = From,
+ to = #jid{luser = U, lserver = S} = _To,
+ type = Type, lang = Lang,
+ sub_els = [#offline{} = Offline]} = IQ) ->
+ case {Type, Offline} of
+ {get, #offline{fetch = true, items = [], purge = false}} ->
+ %% TODO: report database errors
+ handle_offline_fetch(From),
+ xmpp:make_iq_result(IQ);
+ {get, #offline{fetch = false, items = [_|_] = Items, purge = false}} ->
+ case handle_offline_items_view(From, Items) of
+ true -> xmpp:make_iq_result(IQ);
+ false -> xmpp:make_error(IQ, xmpp:err_item_not_found())
end;
- set ->
- case fxml:get_subtag(SubEl, <<"purge">>) of
- #xmlel{} ->
- delete_all_msgs(U, S);
- false ->
- handle_offline_items_remove(From, SubEl)
- end
- end,
- IQ#iq{type = result, sub_el = []};
-handle_offline_query(_From, _To, #iq{sub_el = SubEl, lang = Lang} = IQ) ->
- Txt = <<"Query to another users is forbidden">>,
- IQ#iq{type = error, sub_el = [SubEl, ?ERRT_FORBIDDEN(Lang, Txt)]}.
+ {set, #offline{fetch = false, items = [], purge = true}} ->
+ case delete_all_msgs(U, S) of
+ {atomic, ok} ->
+ xmpp:make_iq_result(IQ);
+ _Err ->
+ Txt = ?T("Database failure"),
+ xmpp:make_error(IQ, xmpp:err_internal_server_error(Txt, Lang))
+ end;
+ {set, #offline{fetch = false, items = [_|_] = Items, purge = false}} ->
+ case handle_offline_items_remove(From, Items) of
+ true -> xmpp:make_iq_result(IQ);
+ false -> xmpp:make_error(IQ, xmpp:err_item_not_found())
+ end;
+ _ ->
+ xmpp:make_error(IQ, xmpp:err_bad_request())
+ end;
+handle_offline_query(#iq{lang = Lang} = IQ) ->
+ Txt = ?T("No module is handling this query"),
+ xmpp:make_error(IQ, xmpp:err_service_unavailable(Txt, Lang)).
-handle_offline_items_view(JID, #xmlel{children = Items}) ->
+-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
+handle_offline_items_view(JID, Items) ->
{U, S, R} = jid:tolower(JID),
- lists:foreach(
- fun(Node) ->
- case fetch_msg_by_node(JID, Node) of
- {ok, OfflineMsg} ->
- case offline_msg_to_route(S, OfflineMsg) of
- {route, From, To, El} ->
- NewEl = set_offline_tag(El, Node),
- case ejabberd_sm:get_session_pid(U, S, R) of
- Pid when is_pid(Pid) ->
- Pid ! {route, From, To, NewEl};
- none ->
- ok
- end;
- error ->
- ok
- end;
- error ->
- ok
- end
- end, get_nodes_from_items(Items, <<"view">>)).
-
-handle_offline_items_remove(JID, #xmlel{children = Items}) ->
- lists:foreach(
- fun(Node) ->
- remove_msg_by_node(JID, Node)
- end, get_nodes_from_items(Items, <<"remove">>)).
-
-get_nodes_from_items(Items, Action) ->
- lists:flatmap(
- fun(#xmlel{name = <<"item">>, attrs = Attrs}) ->
- case fxml:get_attr_s(<<"action">>, Attrs) of
- Action ->
- case fxml:get_attr_s(<<"node">>, Attrs) of
- <<"">> ->
- [];
- TS ->
- [TS]
- end;
- _ ->
- []
- end;
- (_) ->
- []
- end, Items).
-
-set_offline_tag(#xmlel{children = Els} = El, Node) ->
- OfflineEl = #xmlel{name = <<"offline">>,
- attrs = [{<<"xmlns">>, ?NS_FLEX_OFFLINE}],
- children = [#xmlel{name = <<"item">>,
- attrs = [{<<"node">>, Node}]}]},
- El#xmlel{children = [OfflineEl|Els]}.
-
-handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) ->
- case ejabberd_sm:get_session_pid(U, S, R) of
- none ->
- ok;
- Pid when is_pid(Pid) ->
- Pid ! dont_ask_offline,
- lists:foreach(
- fun({Node, From, To, El}) ->
- NewEl = set_offline_tag(El, Node),
- Pid ! {route, From, To, NewEl}
- end, read_message_headers(U, S))
+ case use_mam_for_user(U, S) of
+ true ->
+ false;
+ _ ->
+ lists:foldl(
+ fun(#offline_item{node = Node, action = view}, Acc) ->
+ case fetch_msg_by_node(JID, Node) of
+ {ok, OfflineMsg} ->
+ case offline_msg_to_route(S, OfflineMsg) of
+ {route, El} ->
+ NewEl = set_offline_tag(El, Node),
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ Pid when is_pid(Pid) ->
+ ejabberd_c2s:route(Pid, {route, NewEl});
+ none ->
+ ok
+ end,
+ Acc or true;
+ error ->
+ Acc or false
+ end;
+ error ->
+ Acc or false
+ end
+ end, false, Items) end.
+
+-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean().
+handle_offline_items_remove(JID, Items) ->
+ {U, S, _R} = jid:tolower(JID),
+ case use_mam_for_user(U, S) of
+ true ->
+ false;
+ _ ->
+ lists:foldl(
+ fun(#offline_item{node = Node, action = remove}, Acc) ->
+ Acc or remove_msg_by_node(JID, Node)
+ end, false, Items)
end.
+-spec set_offline_tag(message(), binary()) -> message().
+set_offline_tag(Msg, Node) ->
+ xmpp:set_subtag(Msg, #offline{items = [#offline_item{node = Node}]}).
+
+-spec handle_offline_fetch(jid()) -> ok.
+handle_offline_fetch(#jid{luser = U, lserver = S} = JID) ->
+ ejabberd_sm:route(JID, {resend_offline, false}),
+ lists:foreach(
+ fun({Node, El}) ->
+ El1 = set_offline_tag(El, Node),
+ ejabberd_router:route(El1)
+ end, read_messages(U, S)).
+
+-spec fetch_msg_by_node(jid(), binary()) -> error | {ok, #offline_msg{}}.
fetch_msg_by_node(To, Seq) ->
case catch binary_to_integer(Seq) of
I when is_integer(I), I >= 0 ->
@@ -425,70 +431,117 @@ fetch_msg_by_node(To, Seq) ->
error
end.
+-spec remove_msg_by_node(jid(), binary()) -> boolean().
remove_msg_by_node(To, Seq) ->
case catch binary_to_integer(Seq) of
I when is_integer(I), I>= 0 ->
LUser = To#jid.luser,
LServer = To#jid.lserver,
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:remove_message(LUser, LServer, I);
+ Mod:remove_message(LUser, LServer, I),
+ flush_cache(Mod, LUser, LServer),
+ true;
_ ->
- ok
+ false
end.
-need_to_store(LServer, Packet) ->
- case has_offline_tag(Packet) of
+-spec need_to_store(binary(), message()) -> boolean().
+need_to_store(_LServer, #message{type = error}) -> false;
+need_to_store(LServer, #message{type = Type} = Packet) ->
+ case xmpp:has_subtag(Packet, #offline{}) of
false ->
- case {check_store_hint(Packet),
- fxml:get_tag_attr_s(<<"type">>, Packet)} of
- {_Hint, <<"error">>} ->
- false;
- {store, _Type} ->
- true;
- {no_store, _Type} ->
- false;
- {none, <<"groupchat">>} ->
- false;
- {none, <<"headline">>} ->
- false;
- {none, _Type} ->
- case gen_mod:get_module_opt(
- LServer, ?MODULE, store_empty_body,
- fun(V) when is_boolean(V) -> V;
- (unless_chat_state) -> unless_chat_state
- end,
- unless_chat_state) of
- true ->
+ case misc:unwrap_mucsub_message(Packet) of
+ #message{type = groupchat} = Msg ->
+ need_to_store(LServer, Msg#message{type = chat});
+ #message{} = Msg ->
+ need_to_store(LServer, Msg);
+ _ ->
+ case check_store_hint(Packet) of
+ store ->
true;
- false ->
- fxml:get_subtag(Packet, <<"body">>) /= false;
- unless_chat_state ->
- not jlib:is_standalone_chat_state(Packet)
+ no_store ->
+ false;
+ none ->
+ Store = case Type of
+ groupchat ->
+ mod_offline_opt:store_groupchat(LServer);
+ headline ->
+ false;
+ _ ->
+ true
+ end,
+ case {Store, mod_offline_opt:store_empty_body(LServer)} of
+ {false, _} ->
+ false;
+ {_, true} ->
+ true;
+ {_, false} ->
+ Packet#message.body /= [];
+ {_, unless_chat_state} ->
+ not misc:is_standalone_chat_state(Packet)
+ end
end
end;
true ->
false
end.
-store_packet(From, To, Packet) ->
+-spec store_packet({any(), message()}) -> {any(), message()}.
+store_packet({_Action, #message{from = From, to = To} = Packet} = Acc) ->
case need_to_store(To#jid.lserver, Packet) of
true ->
- case check_event(From, To, Packet) of
+ case check_event(Packet) of
true ->
#jid{luser = LUser, lserver = LServer} = To,
- TimeStamp = p1_time_compat:timestamp(),
- #xmlel{children = Els} = Packet,
- Expire = find_x_expire(TimeStamp, Els),
- gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) !
- #offline_msg{us = {LUser, LServer},
- timestamp = TimeStamp, expire = Expire,
- from = From, to = To, packet = Packet},
- stop;
- _ -> ok
+ case ejabberd_hooks:run_fold(store_offline_message, LServer,
+ Packet, []) of
+ drop ->
+ Acc;
+ NewPacket ->
+ TimeStamp = erlang:timestamp(),
+ Expire = find_x_expire(TimeStamp, NewPacket),
+ OffMsg = #offline_msg{us = {LUser, LServer},
+ timestamp = TimeStamp,
+ expire = Expire,
+ from = From,
+ to = To,
+ packet = NewPacket},
+ case store_offline_msg(OffMsg) of
+ ok ->
+ {offlined, NewPacket};
+ {error, Reason} ->
+ discard_warn_sender(Packet, Reason),
+ stop
+ end
+ end;
+ _ ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end;
+ false ->
+ maybe_update_cache(To, Packet),
+ Acc
+ end.
+
+-spec maybe_update_cache(jid(), message()) -> ok.
+maybe_update_cache(#jid{lserver = Server, luser = User}, Packet) ->
+ case xmpp:get_meta(Packet, mam_archived, false) of
+ true ->
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case use_mam_for_user(User, Server) andalso use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ _ ->
+ ok
end;
- false -> ok
+ _ ->
+ ok
end.
+-spec check_store_hint(message()) -> store | no_store | none.
check_store_hint(Packet) ->
case has_store_hint(Packet) of
true ->
@@ -502,164 +555,191 @@ check_store_hint(Packet) ->
end
end.
+-spec has_store_hint(message()) -> boolean().
has_store_hint(Packet) ->
- fxml:get_subtag_with_xmlns(Packet, <<"store">>, ?NS_HINTS) =/= false.
+ xmpp:has_subtag(Packet, #hint{type = 'store'}).
+-spec has_no_store_hint(message()) -> boolean().
has_no_store_hint(Packet) ->
- fxml:get_subtag_with_xmlns(Packet, <<"no-store">>, ?NS_HINTS) =/= false
- orelse
- fxml:get_subtag_with_xmlns(Packet, <<"no-storage">>, ?NS_HINTS) =/= false.
-
-has_offline_tag(Packet) ->
- fxml:get_subtag_with_xmlns(Packet, <<"offline">>, ?NS_FLEX_OFFLINE) =/= false.
+ xmpp:has_subtag(Packet, #hint{type = 'no-store'})
+ orelse
+ xmpp:has_subtag(Packet, #hint{type = 'no-storage'}).
%% Check if the packet has any content about XEP-0022
-check_event(From, To, Packet) ->
- #xmlel{name = Name, attrs = Attrs, children = Els} =
- Packet,
- case find_x_event(Els) of
- false -> true;
- El ->
- case fxml:get_subtag(El, <<"id">>) of
- false ->
- case fxml:get_subtag(El, <<"offline">>) of
- false -> true;
- _ ->
- ID = case fxml:get_tag_attr_s(<<"id">>, Packet) of
- <<"">> ->
- #xmlel{name = <<"id">>, attrs = [],
- children = []};
- S ->
- #xmlel{name = <<"id">>, attrs = [],
- children = [{xmlcdata, S}]}
- end,
- ejabberd_router:route(To, From,
- #xmlel{name = Name, attrs = Attrs,
- children =
- [#xmlel{name = <<"x">>,
- attrs =
- [{<<"xmlns">>,
- ?NS_EVENT}],
- children =
- [ID,
- #xmlel{name
- =
- <<"offline">>,
- attrs
- =
- [],
- children
- =
- []}]}]}),
- true
- end;
- _ -> false
- end
+-spec check_event(message()) -> boolean().
+check_event(#message{from = From, to = To, id = ID, type = Type} = Msg) ->
+ case xmpp:get_subtag(Msg, #xevent{}) of
+ false ->
+ true;
+ #xevent{id = undefined, offline = false} ->
+ true;
+ #xevent{id = undefined, offline = true} ->
+ NewMsg = #message{from = To, to = From, id = ID, type = Type,
+ sub_els = [#xevent{id = ID, offline = true}]},
+ ejabberd_router:route(NewMsg),
+ true;
+ _ ->
+ false
end.
-%% Check if the packet has subelements about XEP-0022
-find_x_event([]) -> false;
-find_x_event([{xmlcdata, _} | Els]) ->
- find_x_event(Els);
-find_x_event([El | Els]) ->
- case fxml:get_tag_attr_s(<<"xmlns">>, El) of
- ?NS_EVENT -> El;
- _ -> find_x_event(Els)
+-spec find_x_expire(erlang:timestamp(), message()) -> erlang:timestamp() | never.
+find_x_expire(TimeStamp, Msg) ->
+ case xmpp:get_subtag(Msg, #expire{seconds = 0}) of
+ #expire{seconds = Int} ->
+ {MegaSecs, Secs, MicroSecs} = TimeStamp,
+ S = MegaSecs * 1000000 + Secs + Int,
+ MegaSecs1 = S div 1000000,
+ Secs1 = S rem 1000000,
+ {MegaSecs1, Secs1, MicroSecs};
+ false ->
+ never
end.
-find_x_expire(_, []) -> never;
-find_x_expire(TimeStamp, [{xmlcdata, _} | Els]) ->
- find_x_expire(TimeStamp, Els);
-find_x_expire(TimeStamp, [El | Els]) ->
- case fxml:get_tag_attr_s(<<"xmlns">>, El) of
- ?NS_EXPIRE ->
- Val = fxml:get_tag_attr_s(<<"seconds">>, El),
- case catch jlib:binary_to_integer(Val) of
- {'EXIT', _} -> never;
- Int when Int > 0 ->
- {MegaSecs, Secs, MicroSecs} = TimeStamp,
- S = MegaSecs * 1000000 + Secs + Int,
- MegaSecs1 = S div 1000000,
- Secs1 = S rem 1000000,
- {MegaSecs1, Secs1, MicroSecs};
- _ -> never
- end;
- _ -> find_x_expire(TimeStamp, Els)
- end.
+c2s_self_presence({_Pres, #{resend_offline := false}} = Acc) ->
+ Acc;
+c2s_self_presence({#presence{type = available} = NewPres, State} = Acc) ->
+ NewPrio = get_priority_from_presence(NewPres),
+ LastPrio = case maps:get(pres_last, State, undefined) of
+ undefined -> -1;
+ LastPres -> get_priority_from_presence(LastPres)
+ end,
+ if LastPrio < 0 andalso NewPrio >= 0 ->
+ route_offline_messages(State);
+ true ->
+ ok
+ end,
+ Acc;
+c2s_self_presence(Acc) ->
+ Acc.
-resend_offline_messages(User, Server) ->
- LUser = jid:nodeprep(User),
- LServer = jid:nameprep(Server),
+-spec route_offline_messages(c2s_state()) -> ok.
+route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
- case Mod:pop_messages(LUser, LServer) of
- {ok, Rs} ->
- lists:foreach(fun (R) ->
- ejabberd_sm ! offline_msg_to_route(LServer, R)
- end,
- lists:keysort(#offline_msg.timestamp, Rs));
- _ -> ok
+ Msgs = case Mod:pop_messages(LUser, LServer) of
+ {ok, OffMsgs} ->
+ case use_mam_for_user(LUser, LServer) of
+ true ->
+ flush_cache(Mod, LUser, LServer),
+ lists:map(
+ fun({_, #message{from = From, to = To} = Msg}) ->
+ #offline_msg{from = From, to = To,
+ us = {LUser, LServer},
+ packet = Msg}
+ end, read_mam_messages(LUser, LServer, OffMsgs));
+ _ ->
+ flush_cache(Mod, LUser, LServer),
+ OffMsgs
+ end;
+ _ ->
+ []
+ end,
+ lists:foreach(
+ fun(OffMsg) ->
+ route_offline_message(State, OffMsg)
+ end, Msgs).
+
+-spec route_offline_message(c2s_state(), #offline_msg{}) -> ok.
+route_offline_message(#{lserver := LServer} = State,
+ #offline_msg{expire = Expire} = OffMsg) ->
+ case offline_msg_to_route(LServer, OffMsg) of
+ error ->
+ ok;
+ {route, Msg} ->
+ case is_message_expired(Expire, Msg) of
+ true ->
+ ok;
+ false ->
+ case privacy_check_packet(State, Msg, in) of
+ allow -> ejabberd_router:route(Msg);
+ deny -> ok
+ end
+ end
end.
-pop_offline_messages(Ls, User, Server) ->
- LUser = jid:nodeprep(User),
- LServer = jid:nameprep(Server),
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- case Mod:pop_messages(LUser, LServer) of
- {ok, Rs} ->
- TS = p1_time_compat:timestamp(),
- Ls ++
- lists:map(fun (R) ->
- offline_msg_to_route(LServer, R)
- end,
- lists:filter(
- fun(#offline_msg{packet = Pkt} = R) ->
- #xmlel{children = Els} = Pkt,
- Expire = case R#offline_msg.expire of
- undefined ->
- find_x_expire(TS, Els);
- Exp ->
- Exp
- end,
- case Expire of
- never -> true;
- TimeStamp -> TS < TimeStamp
- end
- end, Rs));
- _ ->
- Ls
- end.
+-spec is_message_expired(erlang:timestamp() | never, message()) -> boolean().
+is_message_expired(Expire, Msg) ->
+ TS = erlang:timestamp(),
+ Expire1 = case Expire of
+ undefined -> find_x_expire(TS, Msg);
+ _ -> Expire
+ end,
+ Expire1 /= never andalso Expire1 =< TS.
+
+-spec privacy_check_packet(c2s_state(), stanza(), in | out) -> allow | deny.
+privacy_check_packet(#{lserver := LServer} = State, Pkt, Dir) ->
+ ejabberd_hooks:run_fold(privacy_check_packet,
+ LServer, allow, [State, Pkt, Dir]).
remove_expired_messages(Server) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:remove_expired_messages(LServer).
+ case erlang:function_exported(Mod, remove_expired_messages, 1) of
+ true ->
+ Ret = Mod:remove_expired_messages(LServer),
+ ets_cache:clear(?SPOOL_COUNTER_CACHE),
+ Ret;
+ false ->
+ erlang:error(not_implemented)
+ end.
remove_old_messages(Days, Server) ->
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:remove_old_messages(Days, LServer).
+ case erlang:function_exported(Mod, remove_old_messages, 2) of
+ true ->
+ Ret = Mod:remove_old_messages(Days, LServer),
+ ets_cache:clear(?SPOOL_COUNTER_CACHE),
+ Ret;
+ false ->
+ erlang:error(not_implemented)
+ end.
+-spec remove_user(binary(), binary()) -> ok.
remove_user(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:remove_user(LUser, LServer).
+ Mod:remove_user(LUser, LServer),
+ flush_cache(Mod, LUser, LServer).
%% Helper functions:
+-spec check_if_message_should_be_bounced(message()) -> boolean().
+check_if_message_should_be_bounced(Packet) ->
+ case Packet of
+ #message{type = groupchat, to = #jid{lserver = LServer}} ->
+ mod_offline_opt:bounce_groupchat(LServer);
+ #message{to = #jid{lserver = LServer}} ->
+ case misc:is_mucsub_message(Packet) of
+ true ->
+ mod_offline_opt:bounce_groupchat(LServer);
+ _ ->
+ true
+ end;
+ _ ->
+ true
+ end.
+
%% Warn senders that their messages have been discarded:
-discard_warn_sender(Msgs) ->
- lists:foreach(fun (#offline_msg{from = From, to = To,
- packet = Packet}) ->
- ErrText = <<"Your contact offline message queue is "
- "full. The message has been discarded.">>,
- Lang = fxml:get_tag_attr_s(<<"xml:lang">>, Packet),
- Err = jlib:make_error_reply(Packet,
- ?ERRT_RESOURCE_CONSTRAINT(Lang,
- ErrText)),
- ejabberd_router:route(To, From, Err)
+
+-spec discard_warn_sender(message(), full | any()) -> ok.
+discard_warn_sender(Packet, Reason) ->
+ case check_if_message_should_be_bounced(Packet) of
+ true ->
+ Lang = xmpp:get_lang(Packet),
+ Err = case Reason of
+ full ->
+ ErrText = ?T("Your contact offline message queue is "
+ "full. The message has been discarded."),
+ xmpp:err_resource_constraint(ErrText, Lang);
+ _ ->
+ ErrText = ?T("Database failure"),
+ xmpp:err_internal_server_error(ErrText, Lang)
end,
- Msgs).
+ ejabberd_router:route_error(Packet, Err);
+ _ ->
+ ok
+ end.
webadmin_page(_, Host,
#request{us = _US, path = [<<"user">>, U, <<"queue">>],
@@ -669,51 +749,223 @@ webadmin_page(_, Host,
webadmin_page(Acc, _, _) -> Acc.
get_offline_els(LUser, LServer) ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- Hdrs = Mod:read_message_headers(LUser, LServer),
- lists:map(
- fun({_Seq, From, To, Packet}) ->
- jlib:replace_from_to(From, To, Packet)
- end, Hdrs).
+ [Packet || {_Seq, Packet} <- read_messages(LUser, LServer)].
+
+-spec offline_msg_to_route(binary(), #offline_msg{}) ->
+ {route, message()} | error.
+offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
+ CodecOpts = ejabberd_config:codec_options(),
+ try xmpp:decode(R#offline_msg.packet, ?NS_CLIENT, CodecOpts) of
+ Pkt ->
+ Pkt1 = xmpp:set_from_to(Pkt, From, To),
+ Pkt2 = add_delay_info(Pkt1, LServer, R#offline_msg.timestamp),
+ {route, Pkt2}
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("Failed to decode packet ~p of user ~ts: ~ts",
+ [R#offline_msg.packet, jid:encode(To),
+ xmpp:format_error(Why)]),
+ error
+ end.
-offline_msg_to_route(LServer, #offline_msg{} = R) ->
- El = case R#offline_msg.timestamp of
- undefined ->
- R#offline_msg.packet;
- TS ->
- jlib:add_delay_info(R#offline_msg.packet, LServer, TS,
- <<"Offline Storage">>)
- end,
- {route, R#offline_msg.from, R#offline_msg.to, El}.
+-spec read_messages(binary(), binary()) -> [{binary(), message()}].
+read_messages(LUser, LServer) ->
+ Res = case read_db_messages(LUser, LServer) of
+ error ->
+ [];
+ L when is_list(L) ->
+ L
+ end,
+ case use_mam_for_user(LUser, LServer) of
+ true ->
+ read_mam_messages(LUser, LServer, Res);
+ _ ->
+ Res
+ end.
-read_message_headers(LUser, LServer) ->
+-spec read_db_messages(binary(), binary()) -> [{binary(), message()}] | error.
+read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
- lists:map(
- fun({Seq, From, To, El}) ->
- Node = integer_to_binary(Seq),
- {Node, From, To, El}
- end, Mod:read_message_headers(LUser, LServer)).
+ CodecOpts = ejabberd_config:codec_options(),
+ case Mod:read_message_headers(LUser, LServer) of
+ error ->
+ error;
+ L ->
+ lists:flatmap(
+ fun({Seq, From, To, TS, El}) ->
+ Node = integer_to_binary(Seq),
+ try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+ Pkt ->
+ Node = integer_to_binary(Seq),
+ Pkt1 = add_delay_info(Pkt, LServer, TS),
+ Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+ [{Node, Pkt2}]
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("Failed to decode packet ~p "
+ "of user ~ts: ~ts",
+ [El, jid:encode(To),
+ xmpp:format_error(Why)]),
+ []
+ end
+ end, L)
+ end.
+
+-spec parse_marker_messages(binary(), [#offline_msg{} | {any(), message()}]) ->
+ {integer() | none, [message()]}.
+parse_marker_messages(LServer, ReadMsgs) ->
+ {Timestamp, ExtraMsgs} = lists:foldl(
+ fun({_Node, #message{id = <<"ActivityMarker">>,
+ body = [], type = error} = Msg}, {T, E}) ->
+ case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of
+ #delay{stamp = Time} ->
+ if T == none orelse T > Time ->
+ {Time, E};
+ true ->
+ {T, E}
+ end
+ end;
+ (#offline_msg{from = From, to = To, timestamp = TS, packet = Pkt},
+ {T, E}) ->
+ try xmpp:decode(Pkt) of
+ #message{id = <<"ActivityMarker">>,
+ body = [], type = error} = Msg ->
+ TS2 = case TS of
+ undefined ->
+ case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of
+ #delay{stamp = TS0} ->
+ TS0;
+ _ ->
+ erlang:timestamp()
+ end;
+ _ ->
+ TS
+ end,
+ if T == none orelse T > TS2 ->
+ {TS2, E};
+ true ->
+ {T, E}
+ end;
+ Decoded ->
+ Pkt1 = add_delay_info(Decoded, LServer, TS),
+ {T, [xmpp:set_from_to(Pkt1, From, To) | E]}
+ catch _:{xmpp_codec, _Why} ->
+ {T, E}
+ end;
+ ({_Node, Msg}, {T, E}) ->
+ {T, [Msg | E]}
+ end, {none, []}, ReadMsgs),
+ Start = case {Timestamp, ExtraMsgs} of
+ {none, [First|_]} ->
+ case xmpp:get_subtag(First, #delay{stamp = {0,0,0}}) of
+ #delay{stamp = {Mega, Sec, Micro}} ->
+ {Mega, Sec, Micro+1};
+ _ ->
+ none
+ end;
+ {none, _} ->
+ none;
+ _ ->
+ Timestamp
+ end,
+ {Start, ExtraMsgs}.
+
+-spec read_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}]) ->
+ [{integer(), message()}].
+read_mam_messages(LUser, LServer, ReadMsgs) ->
+ {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
+ AllMsgs = case Start of
+ none ->
+ ExtraMsgs;
+ _ ->
+ MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
+ Number when is_integer(Number) ->
+ max(0, Number - length(ExtraMsgs));
+ infinity ->
+ undefined
+ end,
+ JID = jid:make(LUser, LServer, <<>>),
+ {MamMsgs, _, _} = mod_mam:select(LServer, JID, JID,
+ [{start, Start}],
+ #rsm_set{max = MaxOfflineMsgs,
+ before = <<"9999999999999999">>},
+ chat, only_messages),
+ MamMsgs2 = lists:map(
+ fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
+ add_delay_info(MM, LServer, MMT)
+ end, MamMsgs),
+
+ ExtraMsgs ++ MamMsgs2
+ end,
+ AllMsgs2 = lists:sort(
+ fun(A, B) ->
+ DA = case xmpp:get_subtag(A, #stanza_id{by = #jid{}}) of
+ #stanza_id{id = IDA} ->
+ IDA;
+ _ -> case xmpp:get_subtag(A, #delay{stamp = {0,0,0}}) of
+ #delay{stamp = STA} ->
+ integer_to_binary(misc:now_to_usec(STA));
+ _ ->
+ <<"unknown">>
+ end
+ end,
+ DB = case xmpp:get_subtag(B, #stanza_id{by = #jid{}}) of
+ #stanza_id{id = IDB} ->
+ IDB;
+ _ -> case xmpp:get_subtag(B, #delay{stamp = {0,0,0}}) of
+ #delay{stamp = STB} ->
+ integer_to_binary(misc:now_to_usec(STB));
+ _ ->
+ <<"unknown">>
+ end
+ end,
+ DA < DB
+ end, AllMsgs),
+ {AllMsgs3, _} = lists:mapfoldl(
+ fun(Msg, Counter) ->
+ {{Counter, Msg}, Counter + 1}
+ end, 1, AllMsgs2),
+ AllMsgs3.
+
+-spec count_mam_messages(binary(), binary(), [#offline_msg{} | {any(), message()}] | error) ->
+ {cache, integer()} | {nocache, integer()}.
+count_mam_messages(_LUser, _LServer, error) ->
+ {nocache, 0};
+count_mam_messages(LUser, LServer, ReadMsgs) ->
+ {Start, ExtraMsgs} = parse_marker_messages(LServer, ReadMsgs),
+ case Start of
+ none ->
+ {cache, length(ExtraMsgs)};
+ _ ->
+ MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
+ Number when is_integer(Number) -> Number - length(ExtraMsgs);
+ infinity -> undefined
+ end,
+ JID = jid:make(LUser, LServer, <<>>),
+ {_, _, Count} = mod_mam:select(LServer, JID, JID,
+ [{start, Start}],
+ #rsm_set{max = MaxOfflineMsgs,
+ before = <<"9999999999999999">>},
+ chat, only_count),
+ {cache, Count + length(ExtraMsgs)}
+ end.
format_user_queue(Hdrs) ->
lists:map(
- fun({Seq, From, To, El}) ->
+ fun({Seq, From, To, TS, El}) ->
ID = integer_to_binary(Seq),
FPacket = ejabberd_web_admin:pretty_print_xml(El),
- SFrom = jid:to_string(From),
- STo = jid:to_string(To),
- Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
- {attr, <<"stamp">>}]),
- Time = case jlib:datetime_string_to_timestamp(Stamp) of
+ SFrom = jid:encode(From),
+ STo = jid:encode(To),
+ Time = case TS of
+ undefined ->
+ Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
+ {attr, <<"stamp">>}]),
+ try xmpp_util:decode_timestamp(Stamp) of
+ {_, _, _} = Now -> format_time(Now)
+ catch _:_ ->
+ <<"">>
+ end;
{_, _, _} = Now ->
- {{Year, Month, Day}, {Hour, Minute, Second}} =
- calendar:now_to_local_time(Now),
- iolist_to_binary(
- io_lib:format(
- "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w",
- [Year, Month, Day, Hour, Minute,
- Second]));
- _ ->
- <<"">>
+ format_time(Now)
end,
?XE(<<"tr">>,
[?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
@@ -725,33 +977,35 @@ format_user_queue(Hdrs) ->
[?XC(<<"pre">>, FPacket)])])
end, Hdrs).
+format_time(Now) ->
+ {{Year, Month, Day}, {Hour, Minute, Second}} = calendar:now_to_local_time(Now),
+ str:format("~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w",
+ [Year, Month, Day, Hour, Minute, Second]).
+
user_queue(User, Server, Query, Lang) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
US = {LUser, LServer},
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Res = user_queue_parse_query(LUser, LServer, Query),
- HdrsAll = Mod:read_message_headers(LUser, LServer),
- Hdrs = get_messages_subset(US, Server, HdrsAll),
+ user_queue_parse_query(LUser, LServer, Query),
+ HdrsAll = case Mod:read_message_headers(LUser, LServer) of
+ error -> [];
+ L -> L
+ end,
+ Hdrs = get_messages_subset(User, Server, HdrsAll),
FMsgs = format_user_queue(Hdrs),
- [?XC(<<"h1">>,
- list_to_binary(io_lib:format(?T(<<"~s's Offline Messages Queue">>),
- [us_to_list(US)])))]
- ++
- case Res of
- ok -> [?XREST(<<"Submitted">>)];
- nothing -> []
- end
- ++
+ PageTitle = str:format(translate:translate(Lang, ?T("~ts's Offline Messages Queue")), [us_to_list(US)]),
+ (?H1GL(PageTitle, <<"mod-offline">>, <<"mod_offline">>))
+ ++ [?XREST(?T("Submitted"))] ++
[?XAE(<<"form">>,
[{<<"action">>, <<"">>}, {<<"method">>, <<"post">>}],
[?XE(<<"table">>,
[?XE(<<"thead">>,
[?XE(<<"tr">>,
- [?X(<<"td">>), ?XCT(<<"td">>, <<"Time">>),
- ?XCT(<<"td">>, <<"From">>),
- ?XCT(<<"td">>, <<"To">>),
- ?XCT(<<"td">>, <<"Packet">>)])]),
+ [?X(<<"td">>), ?XCT(<<"td">>, ?T("Time")),
+ ?XCT(<<"td">>, ?T("From")),
+ ?XCT(<<"td">>, ?T("To")),
+ ?XCT(<<"td">>, ?T("Packet"))])]),
?XE(<<"tbody">>,
if FMsgs == [] ->
[?XE(<<"tr">>,
@@ -761,41 +1015,45 @@ user_queue(User, Server, Query, Lang) ->
end)]),
?BR,
?INPUTT(<<"submit">>, <<"delete">>,
- <<"Delete Selected">>)])].
+ ?T("Delete Selected"))])].
user_queue_parse_query(LUser, LServer, Query) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
case lists:keysearch(<<"delete">>, 1, Query) of
{value, _} ->
- case lists:keyfind(<<"selected">>, 1, Query) of
- {_, Seq} ->
- case catch binary_to_integer(Seq) of
- I when is_integer(I), I>=0 ->
- Mod:remove_message(LUser, LServer, I),
- ok;
- _ ->
- nothing
- end;
+ case user_queue_parse_query(LUser, LServer, Query, Mod, false) of
+ true ->
+ flush_cache(Mod, LUser, LServer);
false ->
- nothing
+ ok
end;
_ ->
- nothing
+ ok
+ end.
+
+user_queue_parse_query(LUser, LServer, Query, Mod, Acc) ->
+ case lists:keytake(<<"selected">>, 1, Query) of
+ {value, {_, Seq}, Query2} ->
+ NewAcc = case catch binary_to_integer(Seq) of
+ I when is_integer(I), I>=0 ->
+ Mod:remove_message(LUser, LServer, I),
+ true;
+ _ ->
+ Acc
+ end,
+ user_queue_parse_query(LUser, LServer, Query2, Mod, NewAcc);
+ false ->
+ Acc
end.
us_to_list({User, Server}) ->
- jid:to_string({User, Server, <<"">>}).
+ jid:encode({User, Server, <<"">>}).
get_queue_length(LUser, LServer) ->
count_offline_messages(LUser, LServer).
get_messages_subset(User, Host, MsgsAll) ->
- Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages,
- fun(A) when is_atom(A) -> A end,
- max_user_offline_messages),
- MaxOfflineMsgs = case get_max_user_messages(Access,
- User, Host)
- of
+ MaxOfflineMsgs = case get_max_user_messages(User, Host) of
Number when is_integer(Number) -> Number;
_ -> 100
end,
@@ -809,7 +1067,7 @@ get_messages_subset2(Max, Length, MsgsAll) ->
{MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
Msgs2),
- NoJID = jid:make(<<"...">>, <<"...">>, <<"">>),
+ NoJID = jid:make(<<"...">>, <<"...">>),
Seq = <<"0">>,
IntermediateMsg = #xmlel{name = <<"...">>, attrs = [],
children = []},
@@ -819,61 +1077,167 @@ webadmin_user(Acc, User, Server, Lang) ->
QueueLen = count_offline_messages(jid:nodeprep(User),
jid:nameprep(Server)),
FQueueLen = [?AC(<<"queue/">>,
- (iolist_to_binary(integer_to_list(QueueLen))))],
+ (integer_to_binary(QueueLen)))],
Acc ++
- [?XCT(<<"h3">>, <<"Offline Messages:">>)] ++
+ [?XCT(<<"h3">>, ?T("Offline Messages:"))] ++
FQueueLen ++
[?C(<<" ">>),
?INPUTT(<<"submit">>, <<"removealloffline">>,
- <<"Remove All Offline Messages">>)].
+ ?T("Remove All Offline Messages"))].
+-spec delete_all_msgs(binary(), binary()) -> {atomic, any()}.
delete_all_msgs(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:remove_all_messages(LUser, LServer).
+ Ret = Mod:remove_all_messages(LUser, LServer),
+ flush_cache(Mod, LUser, LServer),
+ Ret.
webadmin_user_parse_query(_, <<"removealloffline">>,
User, Server, _Query) ->
case delete_all_msgs(User, Server) of
- {aborted, Reason} ->
- ?ERROR_MSG("Failed to remove offline messages: ~p",
- [Reason]),
- {stop, error};
- {atomic, ok} ->
- ?INFO_MSG("Removed all offline messages for ~s@~s",
- [User, Server]),
- {stop, ok}
+ {atomic, ok} ->
+ ?INFO_MSG("Removed all offline messages for ~ts@~ts",
+ [User, Server]),
+ {stop, ok};
+ Err ->
+ ?ERROR_MSG("Failed to remove offline messages: ~p",
+ [Err]),
+ {stop, error}
end;
webadmin_user_parse_query(Acc, _Action, _User, _Server,
_Query) ->
Acc.
%% Returns as integer the number of offline messages for a given user
+-spec count_offline_messages(binary(), binary()) -> non_neg_integer().
count_offline_messages(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:count_messages(LUser, LServer).
+ case use_mam_for_user(User, Server) of
+ true ->
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Res = read_db_messages(LUser, LServer),
+ count_mam_messages(LUser, LServer, Res)
+ end);
+ false ->
+ Res = read_db_messages(LUser, LServer),
+ ets_cache:untag(count_mam_messages(LUser, LServer, Res))
+ end;
+ _ ->
+ case use_cache(Mod, LServer) of
+ true ->
+ ets_cache:lookup(
+ ?SPOOL_COUNTER_CACHE, {LUser, LServer},
+ fun() ->
+ Mod:count_messages(LUser, LServer)
+ end);
+ false ->
+ ets_cache:untag(Mod:count_messages(LUser, LServer))
+ end
+ end.
+
+-spec store_message_in_db(module(), #offline_msg{}) -> ok | {error, any()}.
+store_message_in_db(Mod, #offline_msg{us = {User, Server}} = Msg) ->
+ case Mod:store_message(Msg) of
+ ok ->
+ case use_cache(Mod, Server) of
+ true ->
+ ets_cache:incr(
+ ?SPOOL_COUNTER_CACHE,
+ {User, Server}, 1,
+ cache_nodes(Mod, Server));
+ false ->
+ ok
+ end;
+ Err ->
+ Err
+ end.
+
+-spec add_delay_info(message(), binary(),
+ undefined | erlang:timestamp()) -> message().
+add_delay_info(Packet, LServer, TS) ->
+ NewTS = case TS of
+ undefined -> erlang:timestamp();
+ _ -> TS
+ end,
+ Packet1 = xmpp:put_meta(Packet, from_offline, true),
+ misc:add_delay_info(Packet1, jid:make(LServer), NewTS,
+ <<"Offline storage">>).
+
+-spec get_priority_from_presence(presence()) -> integer().
+get_priority_from_presence(#presence{priority = Prio}) ->
+ case Prio of
+ undefined -> 0;
+ _ -> Prio
+ end.
export(LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
Mod:export(LServer).
-import(LServer) ->
- Mod = gen_mod:db_mod(LServer, ?MODULE),
- Mod:import(LServer).
+import_info() ->
+ [{<<"spool">>, 4}].
-import(LServer, DBType, Data) ->
+import_start(LServer, DBType) ->
Mod = gen_mod:db_mod(DBType, ?MODULE),
- Mod:import(LServer, Data).
+ Mod:import(LServer, []).
+
+import(LServer, {sql, _}, DBType, <<"spool">>,
+ [LUser, XML, _Seq, _TimeStamp]) ->
+ El = fxml_stream:parse_element(XML),
+ #message{from = From, to = To} = Msg = xmpp:decode(El, ?NS_CLIENT, [ignore_els]),
+ TS = case xmpp:get_subtag(Msg, #delay{stamp = {0,0,0}}) of
+ #delay{stamp = {MegaSecs, Secs, _}} ->
+ {MegaSecs, Secs, 0};
+ false ->
+ erlang:timestamp()
+ end,
+ US = {LUser, LServer},
+ Expire = find_x_expire(TS, Msg),
+ OffMsg = #offline_msg{us = US, packet = El,
+ from = From, to = To,
+ timestamp = TS, expire = Expire},
+ Mod = gen_mod:db_mod(DBType, ?MODULE),
+ Mod:import(OffMsg).
+
+use_mam_for_user(_User, Server) ->
+ mod_offline_opt:use_mam_for_storage(Server).
mod_opt_type(access_max_user_messages) ->
- fun acl:shaper_rules_validator/1;
-mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
+ econf:shaper();
+mod_opt_type(store_groupchat) ->
+ econf:bool();
+mod_opt_type(bounce_groupchat) ->
+ econf:bool();
+mod_opt_type(use_mam_for_storage) ->
+ econf:bool();
mod_opt_type(store_empty_body) ->
- fun (V) when is_boolean(V) -> V;
- (unless_chat_state) -> unless_chat_state
- end;
-mod_opt_type(_) ->
- [access_max_user_messages, db_type, store_empty_body].
+ econf:either(
+ unless_chat_state,
+ econf:bool());
+mod_opt_type(db_type) ->
+ econf:db_type(?MODULE);
+mod_opt_type(use_cache) ->
+ econf:bool();
+mod_opt_type(cache_size) ->
+ econf:pos_int(infinity);
+mod_opt_type(cache_life_time) ->
+ econf:timeout(second, infinity).
+
+mod_options(Host) ->
+ [{db_type, ejabberd_config:default_db(Host, ?MODULE)},
+ {access_max_user_messages, max_user_offline_messages},
+ {store_empty_body, unless_chat_state},
+ {use_mam_for_storage, false},
+ {bounce_groupchat, false},
+ {store_groupchat, false},
+ {use_cache, ejabberd_option:use_cache(Host)},
+ {cache_size, ejabberd_option:cache_size(Host)},
+ {cache_life_time, ejabberd_option:cache_life_time(Host)}].