aboutsummaryrefslogtreecommitdiff
path: root/src/mod_offline.erl
diff options
context:
space:
mode:
authorPaweł Chmielowski <pchmielowski@process-one.net>2019-04-26 19:59:06 +0200
committerPaweł Chmielowski <pchmielowski@process-one.net>2019-04-26 19:59:06 +0200
commitb76f90fe396ee7a1ed5c5f7006431879929fc2a1 (patch)
treeb6f5a1c67af18564fd00cec00404d931c1d2db1b /src/mod_offline.erl
parentReturn "Bad request" error when origin in websocket connection doesn't match (diff)
Add mod_offline option for fetching data from mam instead of from spool table
This commit introduces `use_mam_for_storage` option that take boolean argument. Enabling it will make mod_offline not use spool table for storing offline message, but instead will use mam archive to retrieve messages stored when offline. Enabling this option have couple drawback currently, only messages that were stored in mam will be available, most of flexible message retrieval queries don't work (those that allow retrieval/deletion of messages by id).
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r--src/mod_offline.erl298
1 files changed, 232 insertions, 66 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 4a1a4cca2..6a9114a92 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -61,7 +61,8 @@
c2s_copy_session/2,
webadmin_page/3,
webadmin_user/4,
- webadmin_user_parse_query/5]).
+ webadmin_user_parse_query/5,
+ user_unset_presence/4]).
-export([mod_opt_type/1, mod_options/1, depends/2]).
@@ -131,6 +132,8 @@ start(Host, Opts) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:add(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
+ ejabberd_hooks:add(unset_presence_hook, Host, ?MODULE,
+ user_unset_presence, 50),
gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
?MODULE, handle_offline_query).
@@ -153,6 +156,8 @@ stop(Host) ->
?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
+ ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE,
+ user_unset_presence, 50),
gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE).
reload(Host, NewOpts, OldOpts) ->
@@ -165,17 +170,28 @@ reload(Host, NewOpts, OldOpts) ->
end.
-spec store_offline_msg(#offline_msg{}) -> ok | {error, full | any()}.
-store_offline_msg(#offline_msg{us = {User, Server}} = Msg) ->
- Mod = gen_mod:db_mod(Server, ?MODULE),
- case get_max_user_messages(User, Server) of
- infinity ->
- Mod:store_message(Msg);
- Limit ->
- Num = count_offline_messages(User, Server),
- if Num < Limit ->
+store_offline_msg(#offline_msg{us = {User, Server}, packet = Pkt} = Msg) ->
+ case (not xmpp:get_meta(Pkt, activity_marker, false)) andalso
+ use_mam_for_user(User, Server) of
+ true ->
+ case xmpp:get_meta(Pkt, first_from_queue, false) of
+ true ->
+ store_last_activity_marker(User, Server, xmpp:get_meta(Pkt, stanza_id));
+ _ ->
+ ok
+ end;
+ _ ->
+ Mod = gen_mod:db_mod(Server, ?MODULE),
+ case get_max_user_messages(User, Server) of
+ infinity ->
Mod:store_message(Msg);
- true ->
- {error, full}
+ Limit ->
+ Num = count_offline_messages(User, Server),
+ if Num < Limit ->
+ Mod:store_message(Msg);
+ true ->
+ {error, full}
+ end
end
end.
@@ -298,34 +314,44 @@ handle_offline_query(#iq{lang = Lang} = IQ) ->
-spec handle_offline_items_view(jid(), [offline_item()]) -> boolean().
handle_offline_items_view(JID, Items) ->
{U, S, R} = jid:tolower(JID),
- lists:foldl(
- fun(#offline_item{node = Node, action = view}, Acc) ->
- case fetch_msg_by_node(JID, Node) of
- {ok, OfflineMsg} ->
- case offline_msg_to_route(S, OfflineMsg) of
- {route, El} ->
- NewEl = set_offline_tag(El, Node),
- case ejabberd_sm:get_session_pid(U, S, R) of
- Pid when is_pid(Pid) ->
- Pid ! {route, NewEl};
- none ->
- ok
- end,
- Acc or true;
- error ->
- Acc or false
- end;
- error ->
- Acc or false
- end
- end, false, Items).
+ case use_mam_for_user(U, S) of
+ true ->
+ false;
+ _ ->
+ lists:foldl(
+ fun(#offline_item{node = Node, action = view}, Acc) ->
+ case fetch_msg_by_node(JID, Node) of
+ {ok, OfflineMsg} ->
+ case offline_msg_to_route(S, OfflineMsg) of
+ {route, El} ->
+ NewEl = set_offline_tag(El, Node),
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ Pid when is_pid(Pid) ->
+ Pid ! {route, NewEl};
+ none ->
+ ok
+ end,
+ Acc or true;
+ error ->
+ Acc or false
+ end;
+ error ->
+ Acc or false
+ end
+ end, false, Items) end.
-spec handle_offline_items_remove(jid(), [offline_item()]) -> boolean().
handle_offline_items_remove(JID, Items) ->
- lists:foldl(
- fun(#offline_item{node = Node, action = remove}, Acc) ->
- Acc or remove_msg_by_node(JID, Node)
- end, false, Items).
+ {U, S, _R} = jid:tolower(JID),
+ case use_mam_for_user(U, S) of
+ true ->
+ false;
+ _ ->
+ lists:foldl(
+ fun(#offline_item{node = Node, action = remove}, Acc) ->
+ Acc or remove_msg_by_node(JID, Node)
+ end, false, Items)
+ end.
-spec set_offline_tag(message(), binary()) -> message().
set_offline_tag(Msg, Node) ->
@@ -334,11 +360,11 @@ set_offline_tag(Msg, Node) ->
-spec handle_offline_fetch(jid()) -> ok.
handle_offline_fetch(#jid{luser = U, lserver = S} = JID) ->
ejabberd_sm:route(JID, {resend_offline, false}),
- lists:foreach(
- fun({Node, El}) ->
- El1 = set_offline_tag(El, Node),
- ejabberd_router:route(El1)
- end, read_messages(U, S)).
+ lists:foreach(
+ fun({Node, El}) ->
+ El1 = set_offline_tag(El, Node),
+ ejabberd_router:route(El1)
+ end, read_messages(U, S)).
-spec fetch_msg_by_node(jid(), binary()) -> error | {ok, #offline_msg{}}.
fetch_msg_by_node(To, Seq) ->
@@ -508,15 +534,26 @@ c2s_self_presence(Acc) ->
-spec route_offline_messages(c2s_state()) -> ok.
route_offline_messages(#{jid := #jid{luser = LUser, lserver = LServer}} = State) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
- case Mod:pop_messages(LUser, LServer) of
- {ok, OffMsgs} ->
- lists:foreach(
- fun(OffMsg) ->
- route_offline_message(State, OffMsg)
- end, OffMsgs);
- _ ->
- ok
- end.
+ Msgs = case Mod:pop_messages(LUser, LServer) of
+ {ok, OffMsgs} ->
+ case use_mam_for_user(LUser, LServer) of
+ true ->
+ lists:map(
+ fun({_, #message{from = From, to = To} = Msg}) ->
+ #offline_msg{from = From, to = To,
+ us = {LUser, LServer},
+ packet = Msg}
+ end, read_mam_messages(LUser, LServer, OffMsgs));
+ _ ->
+ OffMsgs
+ end;
+ _ ->
+ []
+ end,
+ lists:foreach(
+ fun(OffMsg) ->
+ route_offline_message(State, OffMsg)
+ end, Msgs).
-spec route_offline_message(c2s_state(), #offline_msg{}) -> ok.
route_offline_message(#{lserver := LServer} = State,
@@ -574,6 +611,31 @@ remove_user(User, Server) ->
Mod:remove_user(LUser, LServer),
ok.
+-spec user_unset_presence(binary(), binary(), binary(), binary()) -> any().
+user_unset_presence(User, Server, _Resource, _Status) ->
+ case use_mam_for_user(User, Server) of
+ true ->
+ case ejabberd_sm:get_user_present_resources(User, Server) of
+ [] ->
+ TimeStamp = erlang:system_time(microsecond),
+ store_last_activity_marker(User, Server, TimeStamp);
+ _ ->
+ ok
+ end;
+ _ ->
+ ok
+ end.
+
+store_last_activity_marker(User, Server, Timestamp) ->
+ Jid = jid:make(User, Server, <<>>),
+ Pkt = xmpp:put_meta(#message{id = <<"ActivityMarker">>, type = error},
+ activity_marker, true),
+
+ Msg = #offline_msg{us = {User, Server}, from = Jid, to = Jid,
+ timestamp = misc:usec_to_now(Timestamp),
+ packet = Pkt},
+ store_offline_msg(Msg).
+
%% Helper functions:
-spec check_if_message_should_be_bounced(message()) -> boolean().
@@ -641,25 +703,123 @@ offline_msg_to_route(LServer, #offline_msg{from = From, to = To} = R) ->
-spec read_messages(binary(), binary()) -> [{binary(), message()}].
read_messages(LUser, LServer) ->
+ Res = read_db_messages(LUser, LServer),
+ case use_mam_for_user(LUser, LServer) of
+ true ->
+ read_mam_messages(LUser, LServer, Res);
+ _ ->
+ Res
+ end.
+
+read_db_messages(LUser, LServer) ->
Mod = gen_mod:db_mod(LServer, ?MODULE),
CodecOpts = ejabberd_config:codec_options(LServer),
lists:flatmap(
- fun({Seq, From, To, TS, El}) ->
- Node = integer_to_binary(Seq),
- try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
- Pkt ->
- Node = integer_to_binary(Seq),
- Pkt1 = add_delay_info(Pkt, LServer, TS),
- Pkt2 = xmpp:set_from_to(Pkt1, From, To),
- [{Node, Pkt2}]
- catch _:{xmpp_codec, Why} ->
- ?ERROR_MSG("failed to decode packet ~p "
- "of user ~s: ~s",
- [El, jid:encode(To),
- xmpp:format_error(Why)]),
- []
- end
- end, Mod:read_message_headers(LUser, LServer)).
+ fun({Seq, From, To, TS, El}) ->
+ Node = integer_to_binary(Seq),
+ try xmpp:decode(El, ?NS_CLIENT, CodecOpts) of
+ Pkt ->
+ Node = integer_to_binary(Seq),
+ Pkt1 = add_delay_info(Pkt, LServer, TS),
+ Pkt2 = xmpp:set_from_to(Pkt1, From, To),
+ [{Node, Pkt2}]
+ catch _:{xmpp_codec, Why} ->
+ ?ERROR_MSG("failed to decode packet ~p "
+ "of user ~s: ~s",
+ [El, jid:encode(To),
+ xmpp:format_error(Why)]),
+ []
+ end
+ end, Mod:read_message_headers(LUser, LServer)).
+
+read_mam_messages(LUser, LServer, ReadMsgs) ->
+ {Timestamp, ExtraMsgs} = lists:foldl(
+ fun({_Node, #message{id = <<"ActivityMarker">>,
+ body = [], type = error} = Msg}, {T, E}) ->
+ case xmpp:get_subtag(Msg, #delay{}) of
+ #delay{stamp = Time} ->
+ if T == none orelse T > Time ->
+ {Time, E};
+ true ->
+ {T, E}
+ end
+ end;
+ (#offline_msg{from = From, to = To, timestamp = TS, packet = Pkt},
+ {T, E}) ->
+ try xmpp:decode(Pkt) of
+ #message{id = <<"ActivityMarker">>,
+ body = [], type = error} = Msg ->
+ TS2 = case TS of
+ undefined ->
+ case xmpp:get_subtag(Msg, #delay{}) of
+ #delay{stamp = TS0} ->
+ TS0;
+ _ ->
+ erlang:timestamp()
+ end
+ end,
+ if T == none orelse T > TS2 ->
+ {TS2, E};
+ true ->
+ {T, E}
+ end;
+ Decoded ->
+ Pkt1 = add_delay_info(Decoded, LServer, TS),
+ {T, [xmpp:set_from_to(Pkt1, From, To) | E]}
+ catch _:{xmpp_codec, _Why} ->
+ {T, E}
+ end;
+ ({_Node, Msg}, {T, E}) ->
+ {T, [Msg | E]}
+ end, {none, []}, ReadMsgs),
+ Start = case {Timestamp, ExtraMsgs} of
+ {none, [First|_]} ->
+ case xmpp:get_subtag(First, #delay{}) of
+ #delay{stamp = {Mega, Sec, Micro}} ->
+ {Mega, Sec, Micro+1};
+ _ ->
+ none
+ end;
+ {none, _} ->
+ none;
+ _ ->
+ Timestamp
+ end,
+ AllMsgs = case Start of
+ none ->
+ ExtraMsgs;
+ _ ->
+ MaxOfflineMsgs = case get_max_user_messages(LUser, LServer) of
+ Number when is_integer(Number) -> Number;
+ _ -> 100
+ end,
+ JID = jid:make(LUser, LServer, <<>>),
+ {MamMsgs, _, _} = mod_mam:select(LServer, JID, JID,
+ [{start, Start}],
+ #rsm_set{max = MaxOfflineMsgs,
+ before = <<"9999999999999999">>},
+ chat),
+ MamMsgs2 = lists:map(
+ fun({_, _, #forwarded{sub_els = [MM | _], delay = #delay{stamp = MMT}}}) ->
+ add_delay_info(MM, LServer, MMT)
+ end, MamMsgs),
+
+ ExtraMsgs ++ MamMsgs2
+ end,
+ AllMsgs2 = lists:sort(
+ fun(A, B) ->
+ case {xmpp:get_subtag(A, #delay{}), xmpp:get_subtag(B, #delay{})} of
+ {#delay{stamp = TA}, #delay{stamp = TB}} ->
+ TA < TB;
+ _ ->
+ true
+ end
+ end, AllMsgs),
+ {AllMsgs3, _} = lists:mapfoldl(
+ fun(Msg, Counter) ->
+ {{Counter, Msg}, Counter + 1}
+ end, 1, AllMsgs2),
+ AllMsgs3.
format_user_queue(Hdrs) ->
lists:map(
@@ -873,6 +1033,9 @@ import(LServer, {sql, _}, DBType, <<"spool">>,
Mod = gen_mod:db_mod(DBType, ?MODULE),
Mod:import(OffMsg).
+use_mam_for_user(_User, Server) ->
+ gen_mod:get_module_opt(Server, ?MODULE, use_mam_for_storage).
+
mod_opt_type(access_max_user_messages) ->
fun acl:shaper_rules_validator/1;
mod_opt_type(db_type) -> fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
@@ -880,6 +1043,8 @@ mod_opt_type(store_groupchat) ->
fun(V) when is_boolean(V) -> V end;
mod_opt_type(bounce_groupchat) ->
fun(V) when is_boolean(V) -> V end;
+mod_opt_type(use_mam_for_storage) ->
+ fun(V) when is_boolean(V) -> V end;
mod_opt_type(store_empty_body) ->
fun (V) when is_boolean(V) -> V;
(unless_chat_state) -> unless_chat_state
@@ -889,5 +1054,6 @@ mod_options(Host) ->
[{db_type, ejabberd_config:default_db(Host, ?MODULE)},
{access_max_user_messages, max_user_offline_messages},
{store_empty_body, unless_chat_state},
+ {use_mam_for_storage, false},
{bounce_groupchat, false},
{store_groupchat, false}].