aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mod_mam.erl6
-rw-r--r--src/mod_offline.erl298
-rw-r--r--src/mod_stream_mgmt.erl38
3 files changed, 260 insertions, 82 deletions
diff --git a/src/mod_mam.erl b/src/mod_mam.erl
index 5e20184fa..73a00180e 100644
--- a/src/mod_mam.erl
+++ b/src/mod_mam.erl
@@ -42,7 +42,7 @@
get_room_config/4, set_room_option/3, offline_message/1, export/1,
mod_options/1, remove_mam_for_user_with_peer/3, remove_mam_for_user/2,
is_empty_for_user/2, is_empty_for_room/3, check_create_room/4,
- process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2]).
+ process_iq/3, store_mam_message/7, make_id/0, wrap_as_mucsub/2, select/6]).
-include("xmpp.hrl").
-include("logger.hrl").
@@ -112,7 +112,7 @@ start(Host, Opts) ->
ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
user_send_packet_strip_tag, 500),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
- offline_message, 50),
+ offline_message, 49),
ejabberd_hooks:add(muc_filter_message, Host, ?MODULE,
muc_filter_message, 50),
ejabberd_hooks:add(muc_process_iq, Host, ?MODULE,
@@ -188,7 +188,7 @@ stop(Host) ->
ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
user_send_packet_strip_tag, 500),
ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE,
- offline_message, 50),
+ offline_message, 49),
ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE,
muc_filter_message, 50),
ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE,
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 4a1a4cca2..6a9114a92 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -61,7 +61,8 @@
c2s_copy_session/2,
webadmin_page/3,
webadmin_user/4,
- webadmin_user_parse_query/5]).
+ webadmin_user_parse_query/5,
+ user_unset_presence/4]).
-export([mod_opt_type/1, mod_options/1, depends/2]).
@@ -131,6 +132,8 @@ start(Host, Opts) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
+ ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE,
+ user_unset_presence, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
?MODULE, handle_offline_query).
@@ -153,6 +156,8 @@ stop(Host) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
+ ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE,
+ user_unset_presence, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
reload(Host, NewOpts, OldOpts) ->
@@ -165,17 +170,28 @@ reload(Host, NewOpts, OldOpts) ->
end.
-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
-store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
- Mod = gen_mod:db_mod(Server, ?MODULE),
- case get_max_user_messages(User, Server) of
- infinity ->
- Mod:store_message(Msg);
- Limit ->
- Num = count_offline_messages(User, Server),
- if Num < Limit ->
+store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
+ case (not xmpp:get_meta(Pkt, activity_marker, false)) andalso
+ use_mam_for_user(User, Server) of
+ true ->
+ case xmpp:get_meta(Pkt, first_from_queue, false) of
+ true ->
+ store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id));
+ _ ->
+ ok
+ end;
+ _ ->
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case get_max_user_messages(User, Server) of
+ infinity ->
Mod:store_message(Msg);
- true ->
- {error, full}
+ Limit ->
+ Num = count_offline_messages(User, Server),
+ if Num < Limit ->
+ Mod:store_message(Msg);
+ true ->
+ {error, full}
+ end
end
end.
@@ -298,34 +314,44 @@ handle_offline_query(#iq{lang = Lang} = IQ) ->
-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
handle_offline_items_view(JID, Items) ->
{U, S, R} = jid:tolower(JID),
- lists:foldl(
- fun(#offline_item{node = Node, action = view}, Acc) ->
- case fetch_msg_by_node(JID, Node) of
- {ok, OfflineMsg} ->
- case offline_msg_to_route(S, OfflineMsg) of
- {route, El} ->
- NewEl = set_offline_tag(El, Node),
- case ejabberd_sm:get_session_pid(U, S, R) of
- Pid when is_pid(Pid) ->
- Pid ! {route, NewEl};
- none ->
- ok
- end,
- Acc or true;
- error ->
- Acc or false
- end;
- error ->
- Acc or false
- end
- end, false, Items).
+ case use_mam_for_user(U, S) of
+ true ->
+ false;
+ _ ->
+ lists:foldl(
+ fun(#offline_item{node = Node, action = view}, Acc) ->
+ case fetch_msg_by_node(JID, Node) of
+ {ok, OfflineMsg} ->
+ case offline_msg_to_route(S, OfflineMsg) of
+ {route, El} ->
+ NewEl = set_offline_tag(El, Node),
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ Pid when is_pid(Pid) ->
+ Pid ! {route, NewEl};
+ none ->
+ ok
+ end,
+ Acc or true;
+ error ->
+ Acc or false
+ end;
+ error ->
+ Acc or false
+ end
+ end, false, Items) end.
-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean().
handle_offline_items_remove(JID, Items) ->
- lists:foldl(
- fun(#offline_item{node = Node, action = remove}, Acc) ->
- Acc or remove_msg_by_node(JID, Node)
- end, false, Items).
+ {U, S, _R} = jid:tolower(JID),
+ case use_mam_for_user(U, S) of
+ true ->
+ false;
+ _ ->
+ lists:foldl(
+ fun(#offline_item{node = Node, action = remove}, Acc) ->
+ Acc or remove_msg_by_node(JID, Node)
+ end, false, Items)
+ end.
-spec set_offline_tag(message(), binary()) -> message().
set_offline_tag(Msg, Node) ->
@@ -334,11 +360,11 @@ set_offline_tag(Msg, Node) ->
-spec handle_offline_fetch(jid()) -> ok.
handle_offline_fetch(#jid{luser = U, lserver = S} = JID) ->
ejabberd_sm:route(JID, {resend_offline, false}),
- lists:foreach(
- fun({Node, El}) ->
- El1 = set_offline_tag(El, Node),
- ejabberd_router:route(El1)
- end, read_messages(U, S)).
+ lists:foreach(
+ fun({Node, El}) ->
+ El1 = set_offline_tag(El, Node),
+ ejabberd_router:route(El1)
+ end, read_messages(U, S)).
-spec fetch_msg_by_node(jid(), binary()) -> error | {ok, #offline_msg{}}.
fetch_msg_by_node(To, Seq) ->
@@ -508,15 +534,26 @@ c2s_self_presence(Acc) ->
-spec route_offline_messages(c2s_state()) -> ok.
route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
- case Mod:pop_messages(LUser, LServer) of
- {ok, OffMsgs} ->
- lists:foreach(
- fun(OffMsg) ->
- route_offline_message(State, OffMsg)
- end, OffMsgs);
- _ ->
- ok
- end.
+ Msgs = case Mod:pop_messages(LUser, LServer) of
+ {ok, OffMsgs} ->
+ case use_mam_for_user(LUser, LServer) of
+ true ->
+ lists:map(
+ fun({_, #message{from = From, to = To} = Msg}) ->
+ #offline_msg{from = From, to = To,
+ us = {LUser, LServer},
+ packet = Msg}
+ end, read_mam_messages(LUser, LServer, OffMsgs));
+ _ ->
+ OffMsgs
+ end;
+ _ ->
+ []
+ end,
+ lists:foreach(
+ fun(OffMsg) ->
+ route_offline_message(State, OffMsg)
+ end, Msgs).
-spec route_offline_message(c2s_state(), #offline_msg{}) -> ok.
route_offline_message(#{lserver := LServer} = State,
@@ -574,6 +611,31 @@ remove_user(User, Server) ->
Mod:remove_user(LUser, LServer),
ok.
+-spec user_unset_presence(binary(), binary(), binary(), binary()) -> any().
+user_unset_presence(User, Server, _Resource, _Status) ->
+ case use_mam_for_user(User, Server) of
+ true ->
+ case ejabberd_sm:get_user_present_resources(User, Server) of
+ [] ->
+ TimeStamp = erlang:system_time(microsecond),
+ store_last_activity_marker(User, Server, TimeStamp);
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end.
+
+store_last_activity_marker(User, Server, Timestamp) ->
+ Jid = jid:make(User, Server, <<>>),
+ Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error},
+ activity_marker, true),
+
+ Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid,
+ timestamp = misc:usec_to_now(Timestamp),
+ packet = Pkt},
+ store_offline_msg(Msg).
+
%% Helper functions:
-spec check_if_message_should_be_bounced(message()) -> boolean().
@@ -641,25 +703,123 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
read_messages(LUser, LServer) ->
+ Res = read_db_messages(LUser, LServer),
+ case use_mam_for_user(LUser, LServer) of
+ true ->
+ read_mam_messages(LUser, LServer, Res);
+ _ ->
+ Res
+ end.
+
+read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
CodecOpts = ejabberd_config:codec_options(LServer),
lists:flatmap(
- fun({Seq, From, To, TS, El}) ->
- Node = integer_to_binary(Seq),
- try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
- Pkt ->
- Node = integer_to_binary(Seq),
- Pkt1 = add_delay_info(Pkt, LServer, TS),
- Pkt2 = xmpp:set_from_to(Pkt1, From, To),
- [{Node, Pkt2}]
- catch _:{xmpp_codec, Why} ->
- ?ERROR_MSG("failed to decode packet ~p "
- "of user ~s: ~s",
- [El, jid:encode(To),
- xmpp:format_error(Why)]),
- []
- end
- end, Mod:read_message_headers(LUser, LServer)).
+ fun({Seq, From, To, TS, El}) ->
+ Node = integer_to_binary(Seq),
+ try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+ Pkt ->
+ Node = integer_to_binary(Seq),
+ Pkt1 = add_delay_info(Pkt, LServer, TS),
+ Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+ [{Node, Pkt2}]
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("failed to decode packet ~p "
+ "of user ~s: ~s",
+ [El, jid:encode(To),
+ xmpp:format_error(Why)]),
+ []
+ end
+ end, Mod:read_message_headers(LUser, LServer)).
+
+read_mam_messages(LUser, LServer, ReadMsgs) ->
+ {Timestamp, ExtraMsgs} = lists:foldl(
+ fun({_Node, #message{id = <<"ActivityMarker">>,
+ body = [], type = error} = Msg}, {T, E}) ->
+ case xmpp:get_subtag(Msg, #delay{}) of
+ #delay{stamp = Time} ->
+ if T == none orelse T > Time ->
+ {Time, E};
+ true ->
+ {T, E}
+ end
+ end;
+ (#offline_msg{from = From, to = To, timestamp = TS, packet = Pkt},
+ {T, E}) ->
+ try xmpp:decode(Pkt) of
+ #message{id = <<"ActivityMarker">>,
+ body = [], type = error} = Msg ->
+ TS2 = case TS of
+ undefined ->
+ case xmpp:get_subtag(Msg, #delay{}) of
+ #delay{stamp = TS0} ->
+ TS0;
+ _ ->
+ erlang:timestamp()
+ end
+ end,
+ if T == none orelse T > TS2 ->
+ {TS2, E};
+ true ->
+ {T, E}
+ end;
+ Decoded ->
+ Pkt1 = add_delay_info(Decoded, LServer, TS),
+ {T, [xmpp:set_from_to(Pkt1, From, To) | E]}
+ catch _:{xmpp_codec, _Why} ->
+ {T, E}
+ end;
+ ({_Node, Msg}, {T, E}) ->
+ {T, [Msg | E]}
+ end, {none, []}, ReadMsgs),
+ Start = case {Timestamp, ExtraMsgs} of
+ {none, [First|_]} ->
+ case xmpp:get_subtag(First, #delay{}) of
+ #delay{stamp = {Mega, Sec, Micro}} ->
+ {Mega, Sec, Micro+1};
+ _ ->
+ none
+ end;
+ {none, _} ->
+ none;
+ _ ->
+ Timestamp
+ end,
+ AllMsgs = case Start of
+ none ->
+ ExtraMsgs;
+ _ ->
+ MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
+ Number when is_integer(Number) -> Number;
+ _ -> 100
+ end,
+ JID = jid:make(LUser, LServer, <<>>),
+ {MamMsgs, _, _} = mod_mam:select(LServer, JID, JID,
+ [{start, Start}],
+ #rsm_set{max = MaxOfflineMsgs,
+ before = <<"9999999999999999">>},
+ chat),
+ MamMsgs2 = lists:map(
+ fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
+ add_delay_info(MM, LServer, MMT)
+ end, MamMsgs),
+
+ ExtraMsgs ++ MamMsgs2
+ end,
+ AllMsgs2 = lists:sort(
+ fun(A, B) ->
+ case {xmpp:get_subtag(A, #delay{}), xmpp:get_subtag(B, #delay{})} of
+ {#delay{stamp = TA}, #delay{stamp = TB}} ->
+ TA < TB;
+ _ ->
+ true
+ end
+ end, AllMsgs),
+ {AllMsgs3, _} = lists:mapfoldl(
+ fun(Msg, Counter) ->
+ {{Counter, Msg}, Counter + 1}
+ end, 1, AllMsgs2),
+ AllMsgs3.
format_user_queue(Hdrs) ->
lists:map(
@@ -873,6 +1033,9 @@ import(LServer, {sql, _}, DBType, <<"spool">>,
Mod = gen_mod:db_mod(DBType, ?MODULE),
Mod:import(OffMsg).
+use_mam_for_user(_User, Server) ->
+ gen_mod:get_module_opt(Server, ?MODULE, use_mam_for_storage).
+
mod_opt_type(access_max_user_messages) ->
fun acl:shaper_rules_validator/1;
mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
@@ -880,6 +1043,8 @@ mod_opt_type(store_groupchat) ->
fun(V) when is_boolean(V) -> V end;
mod_opt_type(bounce_groupchat) ->
fun(V) when is_boolean(V) -> V end;
+mod_opt_type(use_mam_for_storage) ->
+ fun(V) when is_boolean(V) -> V end;
mod_opt_type(store_empty_body) ->
fun (V) when is_boolean(V) -> V;
(unless_chat_state) -> unless_chat_state
@@ -889,5 +1054,6 @@ mod_options(Host) ->
[{db_type, ejabberd_config:default_db(Host, ?MODULE)},
{access_max_user_messages, max_user_offline_messages},
{store_empty_body, unless_chat_state},
+ {use_mam_for_storage, false},
{bounce_groupchat, false},
{store_groupchat, false}].
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index 1a4308c58..34ce4e53d 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -591,22 +591,25 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
end,
?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s",
[p1_queue:len(Queue), jid:encode(JID)]),
- p1_queue:foreach(
- fun({_, _Time, #presence{from = From}}) ->
- ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]);
- ({_, _Time, #iq{} = El}) ->
+ p1_queue:foldl(
+ fun({_, _Time, #presence{from = From}}, Acc) ->
+ ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]),
+ Acc;
+ ({_, _Time, #iq{} = El}, Acc) ->
Txt = <<"User session terminated">>,
ejabberd_router:route_error(
- El, xmpp:err_service_unavailable(Txt, Lang));
- ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}) ->
+ El, xmpp:err_service_unavailable(Txt, Lang)),
+ Acc;
+ ({_, _Time, #message{from = From, meta = #{carbon_copy := true}}}, Acc) ->
%% XEP-0280 says: "When a receiving server attempts to deliver a
%% forked message, and that message bounces with an error for
%% any reason, the receiving server MUST NOT forward that error
%% back to the original sender." Resending such a stanza could
%% easily lead to unexpected results as well.
?DEBUG("Dropping forwarded message stanza from ~s",
- [jid:encode(From)]);
- ({_, Time, #message{} = Msg}) ->
+ [jid:encode(From)]),
+ Acc;
+ ({_, Time, #message{} = Msg}, Acc) ->
case ejabberd_hooks:run_fold(message_is_archived,
LServer, false,
[State, Msg]) of
@@ -615,17 +618,26 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
[jid:encode(xmpp:get_from(Msg))]);
false when ResendOnTimeout ->
NewEl = add_resent_delay_info(State, Msg, Time),
- ejabberd_router:route(NewEl);
+ NewEl2 = case Acc of
+ first_resend ->
+ xmpp:put_meta(NewEl, first_from_queue, true);
+ _ ->
+ NewEl
+ end,
+ ejabberd_router:route(NewEl2),
+ false;
false ->
Txt = <<"User session terminated">>,
ejabberd_router:route_error(
- Msg, xmpp:err_service_unavailable(Txt, Lang))
+ Msg, xmpp:err_service_unavailable(Txt, Lang)),
+ Acc
end;
- ({_, _Time, El}) ->
+ ({_, _Time, El}, Acc) ->
%% Raw element of type 'error' resulting from a validation error
%% We cannot pass it to the router, it will generate an error
- ?DEBUG("Do not route raw element from ack queue: ~p", [El])
- end, Queue);
+ ?DEBUG("Do not route raw element from ack queue: ~p", [El]),
+ Acc
+ end, first_resend, Queue);
route_unacked_stanzas(_State) ->
ok.