aboutsummaryrefslogtreecommitdiff
path: root/src/mod_mam.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_mam.erl')
-rw-r--r--src/mod_mam.erl1421
1 files changed, 1421 insertions, 0 deletions
diff --git a/src/mod_mam.erl b/src/mod_mam.erl
new file mode 100644
index 000000000..38642c0c6
--- /dev/null
+++ b/src/mod_mam.erl
@@ -0,0 +1,1421 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%% @doc
+%%% Message Archive Management (XEP-0313)
+%%% @end
+%%% Created : 4 Jul 2013 by Evgeniy Khramtsov <ekhramtsov@process-one.net>
+%%%
+%%%
+%%% ejabberd, Copyright (C) 2013-2016 ProcessOne
+%%%
+%%% This program is free software; you can redistribute it and/or
+%%% modify it under the terms of the GNU General Public License as
+%%% published by the Free Software Foundation; either version 2 of the
+%%% License, or (at your option) any later version.
+%%%
+%%% This program is distributed in the hope that it will be useful,
+%%% but WITHOUT ANY WARRANTY; without even the implied warranty of
+%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+%%% General Public License for more details.
+%%%
+%%% You should have received a copy of the GNU General Public License along
+%%% with this program; if not, write to the Free Software Foundation, Inc.,
+%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+%%%
+%%%-------------------------------------------------------------------
+-module(mod_mam).
+
+-protocol({xep, 313, '0.4'}).
+-protocol({xep, 334, '0.2'}).
+
+-behaviour(gen_mod).
+
+%% API
+-export([start/2, stop/1]).
+
+-export([user_send_packet/4, user_receive_packet/5,
+ process_iq_v0_2/3, process_iq_v0_3/3, disco_sm_features/5,
+ remove_user/2, remove_user/3, mod_opt_type/1, muc_process_iq/4,
+ muc_filter_message/5, message_is_archived/5, delete_old_messages/2,
+ get_commands_spec/0]).
+
+-include_lib("stdlib/include/ms_transform.hrl").
+-include("jlib.hrl").
+-include("logger.hrl").
+-include("mod_muc_room.hrl").
+-include("ejabberd_commands.hrl").
+
+-define(DEF_PAGE_SIZE, 50).
+-define(MAX_PAGE_SIZE, 250).
+
+-define(BIN_GREATER_THAN(A, B),
+ ((A > B andalso byte_size(A) == byte_size(B))
+ orelse byte_size(A) > byte_size(B))).
+-define(BIN_LESS_THAN(A, B),
+ ((A < B andalso byte_size(A) == byte_size(B))
+ orelse byte_size(A) < byte_size(B))).
+
+-record(archive_msg,
+ {us = {<<"">>, <<"">>} :: {binary(), binary()} | '$2',
+ id = <<>> :: binary() | '_',
+ timestamp = p1_time_compat:timestamp() :: erlang:timestamp() | '_' | '$1',
+ peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3' | undefined,
+ bare_peer = {<<"">>, <<"">>, <<"">>} :: ljid() | '_' | '$3',
+ packet = #xmlel{} :: xmlel() | '_',
+ nick = <<"">> :: binary(),
+ type = chat :: chat | groupchat}).
+
+-record(archive_prefs,
+ {us = {<<"">>, <<"">>} :: {binary(), binary()},
+ default = never :: never | always | roster,
+ always = [] :: [ljid()],
+ never = [] :: [ljid()]}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+start(Host, Opts) ->
+ IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
+ one_queue),
+ DBType = gen_mod:db_type(Host, Opts),
+ init_db(DBType, Host),
+ init_cache(DBType, Opts),
+ gen_iq_handler:add_iq_handler(ejabberd_local, Host,
+ ?NS_MAM_TMP, ?MODULE, process_iq_v0_2, IQDisc),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
+ ?NS_MAM_TMP, ?MODULE, process_iq_v0_2, IQDisc),
+ gen_iq_handler:add_iq_handler(ejabberd_local, Host,
+ ?NS_MAM_0, ?MODULE, process_iq_v0_3, IQDisc),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
+ ?NS_MAM_0, ?MODULE, process_iq_v0_3, IQDisc),
+ gen_iq_handler:add_iq_handler(ejabberd_local, Host,
+ ?NS_MAM_1, ?MODULE, process_iq_v0_3, IQDisc),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host,
+ ?NS_MAM_1, ?MODULE, process_iq_v0_3, IQDisc),
+ ejabberd_hooks:add(user_receive_packet, Host, ?MODULE,
+ user_receive_packet, 500),
+ ejabberd_hooks:add(user_send_packet, Host, ?MODULE,
+ user_send_packet, 500),
+ ejabberd_hooks:add(muc_filter_message, Host, ?MODULE,
+ muc_filter_message, 50),
+ ejabberd_hooks:add(muc_process_iq, Host, ?MODULE,
+ muc_process_iq, 50),
+ ejabberd_hooks:add(disco_sm_features, Host, ?MODULE,
+ disco_sm_features, 50),
+ ejabberd_hooks:add(remove_user, Host, ?MODULE,
+ remove_user, 50),
+ ejabberd_hooks:add(anonymous_purge_hook, Host, ?MODULE,
+ remove_user, 50),
+ case gen_mod:get_opt(assume_mam_usage, Opts,
+ fun(if_enabled) -> if_enabled;
+ (on_request) -> on_request;
+ (never) -> never
+ end, never) of
+ never ->
+ ok;
+ _ ->
+ ejabberd_hooks:add(message_is_archived, Host, ?MODULE,
+ message_is_archived, 50)
+ end,
+ ejabberd_commands:register_commands(get_commands_spec()),
+ ok.
+
+init_db(mnesia, _Host) ->
+ mnesia:create_table(archive_msg,
+ [{disc_only_copies, [node()]},
+ {type, bag},
+ {attributes, record_info(fields, archive_msg)}]),
+ mnesia:create_table(archive_prefs,
+ [{disc_only_copies, [node()]},
+ {attributes, record_info(fields, archive_prefs)}]);
+init_db(_, _) ->
+ ok.
+
+init_cache(_DBType, Opts) ->
+ MaxSize = gen_mod:get_opt(cache_size, Opts,
+ fun(I) when is_integer(I), I>0 -> I end,
+ 1000),
+ LifeTime = gen_mod:get_opt(cache_life_time, Opts,
+ fun(I) when is_integer(I), I>0 -> I end,
+ timer:hours(1) div 1000),
+ cache_tab:new(archive_prefs, [{max_size, MaxSize},
+ {life_time, LifeTime}]).
+
+stop(Host) ->
+ ejabberd_hooks:delete(user_send_packet, Host, ?MODULE,
+ user_send_packet, 500),
+ ejabberd_hooks:delete(user_receive_packet, Host, ?MODULE,
+ user_receive_packet, 500),
+ ejabberd_hooks:delete(muc_filter_message, Host, ?MODULE,
+ muc_filter_message, 50),
+ ejabberd_hooks:delete(muc_process_iq, Host, ?MODULE,
+ muc_process_iq, 50),
+ gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_TMP),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_TMP),
+ gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_0),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_0),
+ gen_iq_handler:remove_iq_handler(ejabberd_local, Host, ?NS_MAM_1),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_MAM_1),
+ ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE,
+ disco_sm_features, 50),
+ ejabberd_hooks:delete(remove_user, Host, ?MODULE,
+ remove_user, 50),
+ ejabberd_hooks:delete(anonymous_purge_hook, Host,
+ ?MODULE, remove_user, 50),
+ case gen_mod:get_module_opt(Host, ?MODULE, assume_mam_usage,
+ fun(if_enabled) -> if_enabled;
+ (on_request) -> on_request;
+ (never) -> never
+ end, never) of
+ never ->
+ ok;
+ _ ->
+ ejabberd_hooks:delete(message_is_archived, Host, ?MODULE,
+ message_is_archived, 50)
+ end,
+ ejabberd_commands:unregister_commands(get_commands_spec()),
+ ok.
+
+remove_user(User, Server) ->
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
+ remove_user(LUser, LServer,
+ gen_mod:db_type(LServer, ?MODULE)).
+
+remove_user(LUser, LServer, mnesia) ->
+ US = {LUser, LServer},
+ F = fun () ->
+ mnesia:delete({archive_msg, US}),
+ mnesia:delete({archive_prefs, US})
+ end,
+ mnesia:transaction(F);
+remove_user(LUser, LServer, odbc) ->
+ SUser = ejabberd_odbc:escape(LUser),
+ ejabberd_odbc:sql_query(
+ LServer,
+ [<<"delete from archive where username='">>, SUser, <<"';">>]),
+ ejabberd_odbc:sql_query(
+ LServer,
+ [<<"delete from archive_prefs where username='">>, SUser, <<"';">>]).
+
+user_receive_packet(Pkt, C2SState, JID, Peer, To) ->
+ LUser = JID#jid.luser,
+ LServer = JID#jid.lserver,
+ IsBareCopy = is_bare_copy(JID, To),
+ case should_archive(Pkt, LServer) of
+ true when not IsBareCopy ->
+ NewPkt = strip_my_archived_tag(Pkt, LServer),
+ case store_msg(C2SState, NewPkt, LUser, LServer, Peer, recv) of
+ {ok, ID} ->
+ Archived = #xmlel{name = <<"archived">>,
+ attrs = [{<<"by">>, LServer},
+ {<<"xmlns">>, ?NS_MAM_TMP},
+ {<<"id">>, ID}]},
+ StanzaID = #xmlel{name = <<"stanza-id">>,
+ attrs = [{<<"by">>, LServer},
+ {<<"xmlns">>, ?NS_SID_0},
+ {<<"id">>, ID}]},
+ NewEls = [Archived, StanzaID|NewPkt#xmlel.children],
+ NewPkt#xmlel{children = NewEls};
+ _ ->
+ NewPkt
+ end;
+ _ ->
+ Pkt
+ end.
+
+user_send_packet(Pkt, C2SState, JID, Peer) ->
+ LUser = JID#jid.luser,
+ LServer = JID#jid.lserver,
+ case should_archive(Pkt, LServer) of
+ true ->
+ NewPkt = strip_my_archived_tag(Pkt, LServer),
+ store_msg(C2SState, jlib:replace_from_to(JID, Peer, NewPkt),
+ LUser, LServer, Peer, send),
+ NewPkt;
+ false ->
+ Pkt
+ end.
+
+muc_filter_message(Pkt, #state{config = Config} = MUCState,
+ RoomJID, From, FromNick) ->
+ if Config#config.mam ->
+ LServer = RoomJID#jid.lserver,
+ NewPkt = strip_my_archived_tag(Pkt, LServer),
+ StorePkt = strip_x_jid_tags(NewPkt),
+ case store_muc(MUCState, StorePkt, RoomJID, From, FromNick) of
+ {ok, ID} ->
+ Archived = #xmlel{name = <<"archived">>,
+ attrs = [{<<"by">>, LServer},
+ {<<"xmlns">>, ?NS_MAM_TMP},
+ {<<"id">>, ID}]},
+ StanzaID = #xmlel{name = <<"stanza-id">>,
+ attrs = [{<<"by">>, LServer},
+ {<<"xmlns">>, ?NS_SID_0},
+ {<<"id">>, ID}]},
+ NewEls = [Archived, StanzaID|NewPkt#xmlel.children],
+ NewPkt#xmlel{children = NewEls};
+ _ ->
+ NewPkt
+ end;
+ true ->
+ Pkt
+ end.
+
+% Query archive v0.2
+process_iq_v0_2(#jid{lserver = LServer} = From,
+ #jid{lserver = LServer} = To,
+ #iq{type = get, sub_el = #xmlel{name = <<"query">>} = SubEl} = IQ) ->
+ Fs = parse_query_v0_2(SubEl),
+ process_iq(LServer, From, To, IQ, SubEl, Fs, chat);
+process_iq_v0_2(From, To, IQ) ->
+ process_iq(From, To, IQ).
+
+% Query archive v0.3
+process_iq_v0_3(#jid{lserver = LServer} = From,
+ #jid{lserver = LServer} = To,
+ #iq{type = set, sub_el = #xmlel{name = <<"query">>} = SubEl} = IQ) ->
+ process_iq(LServer, From, To, IQ, SubEl, get_xdata_fields(SubEl), chat);
+process_iq_v0_3(#jid{lserver = LServer},
+ #jid{lserver = LServer},
+ #iq{type = get, sub_el = #xmlel{name = <<"query">>}} = IQ) ->
+ process_iq(LServer, IQ);
+process_iq_v0_3(From, To, IQ) ->
+ process_iq(From, To, IQ).
+
+muc_process_iq(#iq{type = set,
+ sub_el = #xmlel{name = <<"query">>,
+ attrs = Attrs} = SubEl} = IQ,
+ MUCState, From, To) ->
+ case fxml:get_attr_s(<<"xmlns">>, Attrs) of
+ NS when NS == ?NS_MAM_0; NS == ?NS_MAM_1 ->
+ muc_process_iq(IQ, MUCState, From, To, get_xdata_fields(SubEl));
+ _ ->
+ IQ
+ end;
+muc_process_iq(#iq{type = get,
+ sub_el = #xmlel{name = <<"query">>,
+ attrs = Attrs} = SubEl} = IQ,
+ MUCState, From, To) ->
+ case fxml:get_attr_s(<<"xmlns">>, Attrs) of
+ ?NS_MAM_TMP ->
+ muc_process_iq(IQ, MUCState, From, To, parse_query_v0_2(SubEl));
+ NS when NS == ?NS_MAM_0; NS == ?NS_MAM_1 ->
+ LServer = MUCState#state.server_host,
+ process_iq(LServer, IQ);
+ _ ->
+ IQ
+ end;
+muc_process_iq(IQ, _MUCState, _From, _To) ->
+ IQ.
+
+get_xdata_fields(SubEl) ->
+ case {fxml:get_subtag_with_xmlns(SubEl, <<"x">>, ?NS_XDATA),
+ fxml:get_subtag_with_xmlns(SubEl, <<"set">>, ?NS_RSM)} of
+ {#xmlel{} = XData, false} ->
+ jlib:parse_xdata_submit(XData);
+ {#xmlel{} = XData, #xmlel{}} ->
+ [{<<"set">>, SubEl} | jlib:parse_xdata_submit(XData)];
+ {false, #xmlel{}} ->
+ [{<<"set">>, SubEl}];
+ {false, false} ->
+ []
+ end.
+
+disco_sm_features(empty, From, To, Node, Lang) ->
+ disco_sm_features({result, []}, From, To, Node, Lang);
+disco_sm_features({result, OtherFeatures},
+ #jid{luser = U, lserver = S},
+ #jid{luser = U, lserver = S}, <<>>, _Lang) ->
+ {result, [?NS_MAM_TMP, ?NS_MAM_0, ?NS_MAM_1 | OtherFeatures]};
+disco_sm_features(Acc, _From, _To, _Node, _Lang) ->
+ Acc.
+
+message_is_archived(true, _C2SState, _Peer, _JID, _Pkt) ->
+ true;
+message_is_archived(false, C2SState, Peer,
+ #jid{luser = LUser, lserver = LServer}, Pkt) ->
+ Res = case gen_mod:get_module_opt(LServer, ?MODULE, assume_mam_usage,
+ fun(if_enabled) -> if_enabled;
+ (on_request) -> on_request;
+ (never) -> never
+ end, never) of
+ if_enabled ->
+ get_prefs(LUser, LServer);
+ on_request ->
+ DBType = gen_mod:db_type(LServer, ?MODULE),
+ cache_tab:lookup(archive_prefs, {LUser, LServer},
+ fun() ->
+ get_prefs(LUser, LServer, DBType)
+ end);
+ never ->
+ error
+ end,
+ case Res of
+ {ok, Prefs} ->
+ should_archive(strip_my_archived_tag(Pkt, LServer), LServer)
+ andalso should_archive_peer(C2SState, Prefs, Peer);
+ error ->
+ false
+ end.
+
+delete_old_messages(TypeBin, Days) when TypeBin == <<"chat">>;
+ TypeBin == <<"groupchat">>;
+ TypeBin == <<"all">> ->
+ Diff = Days * 24 * 60 * 60 * 1000000,
+ TimeStamp = usec_to_now(p1_time_compat:system_time(micro_seconds) - Diff),
+ Type = jlib:binary_to_atom(TypeBin),
+ {Results, _} =
+ lists:foldl(fun(Host, {Results, MnesiaDone}) ->
+ case {gen_mod:db_type(Host, ?MODULE), MnesiaDone} of
+ {mnesia, true} ->
+ {Results, true};
+ {mnesia, false} ->
+ Res = delete_old_messages(TimeStamp, Type,
+ global, mnesia),
+ {[Res|Results], true};
+ {DBType, _} ->
+ Res = delete_old_messages(TimeStamp, Type,
+ Host, DBType),
+ {[Res|Results], MnesiaDone}
+ end
+ end, {[], false}, ?MYHOSTS),
+ case lists:filter(fun(Res) -> Res /= ok end, Results) of
+ [] -> ok;
+ [NotOk|_] -> NotOk
+ end;
+delete_old_messages(_TypeBin, _Days) ->
+ unsupported_type.
+
+delete_old_messages(TimeStamp, Type, global, mnesia) ->
+ MS = ets:fun2ms(fun(#archive_msg{timestamp = MsgTS,
+ type = MsgType} = Msg)
+ when MsgTS < TimeStamp,
+ MsgType == Type orelse Type == all ->
+ Msg
+ end),
+ OldMsgs = mnesia:dirty_select(archive_msg, MS),
+ lists:foreach(fun(Rec) ->
+ ok = mnesia:dirty_delete_object(Rec)
+ end, OldMsgs);
+delete_old_messages(_TimeStamp, _Type, _Host, _DBType) ->
+ %% TODO
+ not_implemented.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+
+process_iq(LServer, #iq{sub_el = #xmlel{attrs = Attrs}} = IQ) ->
+ NS = case fxml:get_attr_s(<<"xmlns">>, Attrs) of
+ ?NS_MAM_0 ->
+ ?NS_MAM_0;
+ _ ->
+ ?NS_MAM_1
+ end,
+ CommonFields = [#xmlel{name = <<"field">>,
+ attrs = [{<<"type">>, <<"hidden">>},
+ {<<"var">>, <<"FORM_TYPE">>}],
+ children = [#xmlel{name = <<"value">>,
+ children = [{xmlcdata, NS}]}]},
+ #xmlel{name = <<"field">>,
+ attrs = [{<<"type">>, <<"jid-single">>},
+ {<<"var">>, <<"with">>}]},
+ #xmlel{name = <<"field">>,
+ attrs = [{<<"type">>, <<"text-single">>},
+ {<<"var">>, <<"start">>}]},
+ #xmlel{name = <<"field">>,
+ attrs = [{<<"type">>, <<"text-single">>},
+ {<<"var">>, <<"end">>}]}],
+ Fields = case gen_mod:db_type(LServer, ?MODULE) of
+ odbc ->
+ WithText = #xmlel{name = <<"field">>,
+ attrs = [{<<"type">>, <<"text-single">>},
+ {<<"var">>, <<"withtext">>}]},
+ [WithText|CommonFields];
+ _ ->
+ CommonFields
+ end,
+ Form = #xmlel{name = <<"x">>,
+ attrs = [{<<"xmlns">>, ?NS_XDATA}, {<<"type">>, <<"form">>}],
+ children = Fields},
+ IQ#iq{type = result,
+ sub_el = [#xmlel{name = <<"query">>,
+ attrs = [{<<"xmlns">>, NS}],
+ children = [Form]}]}.
+
+% Preference setting (both v0.2 & v0.3)
+process_iq(#jid{luser = LUser, lserver = LServer},
+ #jid{lserver = LServer},
+ #iq{type = set, sub_el = #xmlel{name = <<"prefs">>} = SubEl} = IQ) ->
+ try {case fxml:get_tag_attr_s(<<"default">>, SubEl) of
+ <<"always">> -> always;
+ <<"never">> -> never;
+ <<"roster">> -> roster
+ end,
+ lists:foldl(
+ fun(#xmlel{name = <<"always">>, children = Els}, {A, N}) ->
+ {get_jids(Els) ++ A, N};
+ (#xmlel{name = <<"never">>, children = Els}, {A, N}) ->
+ {A, get_jids(Els) ++ N};
+ (_, {A, N}) ->
+ {A, N}
+ end, {[], []}, SubEl#xmlel.children)} of
+ {Default, {Always0, Never0}} ->
+ Always = lists:usort(Always0),
+ Never = lists:usort(Never0),
+ case write_prefs(LUser, LServer, LServer, Default, Always, Never) of
+ ok ->
+ NewPrefs = prefs_el(Default, Always, Never, IQ#iq.xmlns),
+ IQ#iq{type = result, sub_el = [NewPrefs]};
+ _Err ->
+ IQ#iq{type = error,
+ sub_el = [SubEl, ?ERR_INTERNAL_SERVER_ERROR]}
+ end
+ catch _:_ ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]}
+ end;
+process_iq(#jid{luser = LUser, lserver = LServer},
+ #jid{lserver = LServer},
+ #iq{type = get, sub_el = #xmlel{name = <<"prefs">>}} = IQ) ->
+ Prefs = get_prefs(LUser, LServer),
+ PrefsEl = prefs_el(Prefs#archive_prefs.default,
+ Prefs#archive_prefs.always,
+ Prefs#archive_prefs.never,
+ IQ#iq.xmlns),
+ IQ#iq{type = result, sub_el = [PrefsEl]};
+process_iq(_, _, #iq{sub_el = SubEl} = IQ) ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_NOT_ALLOWED]}.
+
+process_iq(LServer, #jid{luser = LUser} = From, To, IQ, SubEl, Fs, MsgType) ->
+ case MsgType of
+ chat ->
+ maybe_activate_mam(LUser, LServer);
+ {groupchat, _Role, _MUCState} ->
+ ok
+ end,
+ case catch lists:foldl(
+ fun({<<"start">>, [Data|_]}, {_, End, With, RSM}) ->
+ {{_, _, _} = jlib:datetime_string_to_timestamp(Data),
+ End, With, RSM};
+ ({<<"end">>, [Data|_]}, {Start, _, With, RSM}) ->
+ {Start,
+ {_, _, _} = jlib:datetime_string_to_timestamp(Data),
+ With, RSM};
+ ({<<"with">>, [Data|_]}, {Start, End, _, RSM}) ->
+ {Start, End, jid:tolower(jid:from_string(Data)), RSM};
+ ({<<"withtext">>, [Data|_]}, {Start, End, _, RSM}) ->
+ {Start, End, {text, Data}, RSM};
+ ({<<"set">>, El}, {Start, End, With, _}) ->
+ {Start, End, With, jlib:rsm_decode(El)};
+ (_, Acc) ->
+ Acc
+ end, {none, [], none, none}, Fs) of
+ {'EXIT', _} ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_BAD_REQUEST]};
+ {_Start, _End, _With, #rsm_in{index = Index}} when is_integer(Index) ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_FEATURE_NOT_IMPLEMENTED]};
+ {Start, End, With, RSM} ->
+ NS = fxml:get_tag_attr_s(<<"xmlns">>, SubEl),
+ select_and_send(LServer, From, To, Start, End,
+ With, limit_max(RSM, NS), IQ, MsgType)
+ end.
+
+muc_process_iq(#iq{lang = Lang, sub_el = SubEl} = IQ, MUCState, From, To, Fs) ->
+ case may_enter_room(From, MUCState) of
+ true ->
+ LServer = MUCState#state.server_host,
+ Role = mod_muc_room:get_role(From, MUCState),
+ process_iq(LServer, From, To, IQ, SubEl, Fs,
+ {groupchat, Role, MUCState});
+ false ->
+ Text = <<"Only members may query archives of this room">>,
+ Error = ?ERRT_FORBIDDEN(Lang, Text),
+ IQ#iq{type = error, sub_el = [SubEl, Error]}
+ end.
+
+parse_query_v0_2(Query) ->
+ lists:flatmap(
+ fun (#xmlel{name = <<"start">>} = El) ->
+ [{<<"start">>, [fxml:get_tag_cdata(El)]}];
+ (#xmlel{name = <<"end">>} = El) ->
+ [{<<"end">>, [fxml:get_tag_cdata(El)]}];
+ (#xmlel{name = <<"with">>} = El) ->
+ [{<<"with">>, [fxml:get_tag_cdata(El)]}];
+ (#xmlel{name = <<"withtext">>} = El) ->
+ [{<<"withtext">>, [fxml:get_tag_cdata(El)]}];
+ (#xmlel{name = <<"set">>}) ->
+ [{<<"set">>, Query}];
+ (_) ->
+ []
+ end, Query#xmlel.children).
+
+should_archive(#xmlel{name = <<"message">>} = Pkt, LServer) ->
+ case fxml:get_attr_s(<<"type">>, Pkt#xmlel.attrs) of
+ <<"error">> ->
+ false;
+ <<"groupchat">> ->
+ false;
+ _ ->
+ case is_resent(Pkt, LServer) of
+ true ->
+ false;
+ false ->
+ case check_store_hint(Pkt) of
+ store ->
+ true;
+ no_store ->
+ false;
+ none ->
+ case fxml:get_subtag_cdata(Pkt, <<"body">>) of
+ <<>> ->
+ %% Empty body
+ false;
+ _ ->
+ true
+ end
+ end
+ end
+ end;
+should_archive(#xmlel{}, _LServer) ->
+ false.
+
+strip_my_archived_tag(Pkt, LServer) ->
+ NewEls = lists:filter(
+ fun(#xmlel{name = Tag, attrs = Attrs})
+ when Tag == <<"archived">>; Tag == <<"stanza-id">> ->
+ case catch jid:nameprep(
+ fxml:get_attr_s(
+ <<"by">>, Attrs)) of
+ LServer ->
+ false;
+ _ ->
+ true
+ end;
+ (_) ->
+ true
+ end, Pkt#xmlel.children),
+ Pkt#xmlel{children = NewEls}.
+
+strip_x_jid_tags(Pkt) ->
+ NewEls = lists:filter(
+ fun(#xmlel{name = <<"x">>} = XEl) ->
+ not lists:any(fun(ItemEl) ->
+ fxml:get_tag_attr(<<"jid">>, ItemEl)
+ /= false
+ end, fxml:get_subtags(XEl, <<"item">>));
+ (_) ->
+ true
+ end, Pkt#xmlel.children),
+ Pkt#xmlel{children = NewEls}.
+
+should_archive_peer(C2SState,
+ #archive_prefs{default = Default,
+ always = Always,
+ never = Never},
+ Peer) ->
+ LPeer = jid:tolower(Peer),
+ case lists:member(LPeer, Always) of
+ true ->
+ true;
+ false ->
+ case lists:member(LPeer, Never) of
+ true ->
+ false;
+ false ->
+ case Default of
+ always -> true;
+ never -> false;
+ roster ->
+ case ejabberd_c2s:get_subscription(
+ LPeer, C2SState) of
+ both -> true;
+ from -> true;
+ to -> true;
+ _ -> false
+ end
+ end
+ end
+ end.
+
+should_archive_muc(Pkt) ->
+ case fxml:get_attr_s(<<"type">>, Pkt#xmlel.attrs) of
+ <<"groupchat">> ->
+ case check_store_hint(Pkt) of
+ store ->
+ true;
+ no_store ->
+ false;
+ none ->
+ case fxml:get_subtag_cdata(Pkt, <<"body">>) of
+ <<>> ->
+ case fxml:get_subtag_cdata(Pkt, <<"subject">>) of
+ <<>> ->
+ false;
+ _ ->
+ true
+ end;
+ _ ->
+ true
+ end
+ end;
+ _ ->
+ false
+ end.
+
+check_store_hint(Pkt) ->
+ case has_store_hint(Pkt) of
+ true ->
+ store;
+ false ->
+ case has_no_store_hint(Pkt) of
+ true ->
+ no_store;
+ false ->
+ none
+ end
+ end.
+
+has_store_hint(Message) ->
+ fxml:get_subtag_with_xmlns(Message, <<"store">>, ?NS_HINTS)
+ /= false.
+
+has_no_store_hint(Message) ->
+ fxml:get_subtag_with_xmlns(Message, <<"no-store">>, ?NS_HINTS)
+ /= false orelse
+ fxml:get_subtag_with_xmlns(Message, <<"no-storage">>, ?NS_HINTS)
+ /= false orelse
+ fxml:get_subtag_with_xmlns(Message, <<"no-permanent-store">>, ?NS_HINTS)
+ /= false orelse
+ fxml:get_subtag_with_xmlns(Message, <<"no-permanent-storage">>, ?NS_HINTS)
+ /= false.
+
+is_resent(Pkt, LServer) ->
+ case fxml:get_subtag_with_xmlns(Pkt, <<"stanza-id">>, ?NS_SID_0) of
+ #xmlel{attrs = Attrs} ->
+ case fxml:get_attr(<<"by">>, Attrs) of
+ {value, LServer} ->
+ true;
+ _ ->
+ false
+ end;
+ false ->
+ false
+ end.
+
+may_enter_room(From,
+ #state{config = #config{members_only = false}} = MUCState) ->
+ mod_muc_room:get_affiliation(From, MUCState) /= outcast;
+may_enter_room(From, MUCState) ->
+ mod_muc_room:is_occupant_or_admin(From, MUCState).
+
+store_msg(C2SState, Pkt, LUser, LServer, Peer, Dir) ->
+ Prefs = get_prefs(LUser, LServer),
+ case should_archive_peer(C2SState, Prefs, Peer) of
+ true ->
+ US = {LUser, LServer},
+ store(Pkt, LServer, US, chat, Peer, <<"">>, Dir,
+ gen_mod:db_type(LServer, ?MODULE));
+ false ->
+ pass
+ end.
+
+store_muc(MUCState, Pkt, RoomJID, Peer, Nick) ->
+ case should_archive_muc(Pkt) of
+ true ->
+ LServer = MUCState#state.server_host,
+ {U, S, _} = jid:tolower(RoomJID),
+ store(Pkt, LServer, {U, S}, groupchat, Peer, Nick, recv,
+ gen_mod:db_type(LServer, ?MODULE));
+ false ->
+ pass
+ end.
+
+store(Pkt, _, {LUser, LServer}, Type, Peer, Nick, _Dir, mnesia) ->
+ LPeer = {PUser, PServer, _} = jid:tolower(Peer),
+ TS = p1_time_compat:timestamp(),
+ ID = jlib:integer_to_binary(now_to_usec(TS)),
+ case mnesia:dirty_write(
+ #archive_msg{us = {LUser, LServer},
+ id = ID,
+ timestamp = TS,
+ peer = LPeer,
+ bare_peer = {PUser, PServer, <<>>},
+ type = Type,
+ nick = Nick,
+ packet = Pkt}) of
+ ok ->
+ {ok, ID};
+ Err ->
+ Err
+ end;
+store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir, odbc) ->
+ TSinteger = p1_time_compat:system_time(micro_seconds),
+ ID = TS = jlib:integer_to_binary(TSinteger),
+ SUser = case Type of
+ chat -> LUser;
+ groupchat -> jid:to_string({LUser, LHost, <<>>})
+ end,
+ BarePeer = jid:to_string(
+ jid:tolower(
+ jid:remove_resource(Peer))),
+ LPeer = jid:to_string(
+ jid:tolower(Peer)),
+ XML = fxml:element_to_binary(Pkt),
+ Body = fxml:get_subtag_cdata(Pkt, <<"body">>),
+ case ejabberd_odbc:sql_query(
+ LServer,
+ [<<"insert into archive (username, timestamp, "
+ "peer, bare_peer, xml, txt, kind, nick) values (">>,
+ <<"'">>, ejabberd_odbc:escape(SUser), <<"', ">>,
+ <<"'">>, TS, <<"', ">>,
+ <<"'">>, ejabberd_odbc:escape(LPeer), <<"', ">>,
+ <<"'">>, ejabberd_odbc:escape(BarePeer), <<"', ">>,
+ <<"'">>, ejabberd_odbc:escape(XML), <<"', ">>,
+ <<"'">>, ejabberd_odbc:escape(Body), <<"', ">>,
+ <<"'">>, jlib:atom_to_binary(Type), <<"', ">>,
+ <<"'">>, ejabberd_odbc:escape(Nick), <<"');">>]) of
+ {updated, _} ->
+ {ok, ID};
+ Err ->
+ Err
+ end.
+
+write_prefs(LUser, LServer, Host, Default, Always, Never) ->
+ DBType = case gen_mod:db_type(Host, ?MODULE) of
+ odbc -> {odbc, Host};
+ DB -> DB
+ end,
+ Prefs = #archive_prefs{us = {LUser, LServer},
+ default = Default,
+ always = Always,
+ never = Never},
+ cache_tab:dirty_insert(
+ archive_prefs, {LUser, LServer}, Prefs,
+ fun() -> write_prefs(LUser, LServer, Prefs, DBType) end).
+
+write_prefs(_LUser, _LServer, Prefs, mnesia) ->
+ mnesia:dirty_write(Prefs);
+write_prefs(LUser, _LServer, #archive_prefs{default = Default,
+ never = Never,
+ always = Always},
+ {odbc, Host}) ->
+ SUser = ejabberd_odbc:escape(LUser),
+ SDefault = erlang:atom_to_binary(Default, utf8),
+ SAlways = ejabberd_odbc:encode_term(Always),
+ SNever = ejabberd_odbc:encode_term(Never),
+ case update(Host, <<"archive_prefs">>,
+ [<<"username">>, <<"def">>, <<"always">>, <<"never">>],
+ [SUser, SDefault, SAlways, SNever],
+ [<<"username='">>, SUser, <<"'">>]) of
+ {updated, _} ->
+ ok;
+ Err ->
+ Err
+ end.
+
+get_prefs(LUser, LServer) ->
+ DBType = gen_mod:db_type(LServer, ?MODULE),
+ Res = cache_tab:lookup(archive_prefs, {LUser, LServer},
+ fun() -> get_prefs(LUser, LServer,
+ DBType)
+ end),
+ case Res of
+ {ok, Prefs} ->
+ Prefs;
+ error ->
+ ActivateOpt = gen_mod:get_module_opt(
+ LServer, ?MODULE, request_activates_archiving,
+ fun(B) when is_boolean(B) -> B end, false),
+ case ActivateOpt of
+ true ->
+ #archive_prefs{us = {LUser, LServer}, default = never};
+ false ->
+ Default = gen_mod:get_module_opt(
+ LServer, ?MODULE, default,
+ fun(always) -> always;
+ (never) -> never;
+ (roster) -> roster
+ end, never),
+ #archive_prefs{us = {LUser, LServer}, default = Default}
+ end
+ end.
+
+get_prefs(LUser, LServer, mnesia) ->
+ case mnesia:dirty_read(archive_prefs, {LUser, LServer}) of
+ [Prefs] ->
+ {ok, Prefs};
+ _ ->
+ error
+ end;
+get_prefs(LUser, LServer, odbc) ->
+ case ejabberd_odbc:sql_query(
+ LServer,
+ [<<"select def, always, never from archive_prefs ">>,
+ <<"where username='">>,
+ ejabberd_odbc:escape(LUser), <<"';">>]) of
+ {selected, _, [[SDefault, SAlways, SNever]]} ->
+ Default = erlang:binary_to_existing_atom(SDefault, utf8),
+ Always = ejabberd_odbc:decode_term(SAlways),
+ Never = ejabberd_odbc:decode_term(SNever),
+ {ok, #archive_prefs{us = {LUser, LServer},
+ default = Default,
+ always = Always,
+ never = Never}};
+ _ ->
+ error
+ end.
+
+prefs_el(Default, Always, Never, NS) ->
+ Default1 = jlib:atom_to_binary(Default),
+ JFun = fun(L) ->
+ [#xmlel{name = <<"jid">>,
+ children = [{xmlcdata, jid:to_string(J)}]}
+ || J <- L]
+ end,
+ Always1 = #xmlel{name = <<"always">>,
+ children = JFun(Always)},
+ Never1 = #xmlel{name = <<"never">>,
+ children = JFun(Never)},
+ #xmlel{name = <<"prefs">>,
+ attrs = [{<<"xmlns">>, NS},
+ {<<"default">>, Default1}],
+ children = [Always1, Never1]}.
+
+maybe_activate_mam(LUser, LServer) ->
+ ActivateOpt = gen_mod:get_module_opt(LServer, ?MODULE,
+ request_activates_archiving,
+ fun(B) when is_boolean(B) -> B end,
+ false),
+ case ActivateOpt of
+ true ->
+ Res = cache_tab:lookup(archive_prefs, {LUser, LServer},
+ fun() ->
+ get_prefs(LUser, LServer,
+ gen_mod:db_type(LServer,
+ ?MODULE))
+ end),
+ case Res of
+ {ok, _Prefs} ->
+ ok;
+ error ->
+ Default = gen_mod:get_module_opt(LServer, ?MODULE, default,
+ fun(always) -> always;
+ (never) -> never;
+ (roster) -> roster
+ end, never),
+ write_prefs(LUser, LServer, LServer, Default, [], [])
+ end;
+ false ->
+ ok
+ end.
+
+select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType) ->
+ DBType = case gen_mod:db_type(LServer, ?MODULE) of
+ odbc -> {odbc, LServer};
+ DB -> DB
+ end,
+ select_and_send(LServer, From, To, Start, End, With, RSM, IQ,
+ MsgType, DBType).
+
+select_and_send(LServer, From, To, Start, End, With, RSM, IQ, MsgType, DBType) ->
+ {Msgs, IsComplete, Count} = select_and_start(LServer, From, To, Start, End,
+ With, RSM, MsgType, DBType),
+ SortedMsgs = lists:keysort(2, Msgs),
+ send(From, To, SortedMsgs, RSM, Count, IsComplete, IQ).
+
+select_and_start(LServer, From, To, Start, End, With, RSM, MsgType, DBType) ->
+ case MsgType of
+ chat ->
+ select(LServer, From, From, Start, End, With, RSM, MsgType, DBType);
+ {groupchat, _Role, _MUCState} ->
+ select(LServer, From, To, Start, End, With, RSM, MsgType, DBType)
+ end.
+
+select(_LServer, JidRequestor, JidArchive, Start, End, _With, RSM,
+ {groupchat, _Role, #state{config = #config{mam = false},
+ history = History}} = MsgType,
+ _DBType) ->
+ #lqueue{len = L, queue = Q} = History,
+ {Msgs0, _} =
+ lists:mapfoldl(
+ fun({Nick, Pkt, _HaveSubject, UTCDateTime, _Size}, I) ->
+ Now = datetime_to_now(UTCDateTime, I),
+ TS = now_to_usec(Now),
+ case match_interval(Now, Start, End) and
+ match_rsm(Now, RSM) of
+ true ->
+ {[{jlib:integer_to_binary(TS), TS,
+ msg_to_el(#archive_msg{
+ type = groupchat,
+ timestamp = Now,
+ peer = undefined,
+ nick = Nick,
+ packet = Pkt},
+ MsgType, JidRequestor, JidArchive)}],
+ I+1};
+ false ->
+ {[], I+1}
+ end
+ end, 0, queue:to_list(Q)),
+ Msgs = lists:flatten(Msgs0),
+ case RSM of
+ #rsm_in{max = Max, direction = before} ->
+ {NewMsgs, IsComplete} = filter_by_max(lists:reverse(Msgs), Max),
+ {NewMsgs, IsComplete, L};
+ #rsm_in{max = Max} ->
+ {NewMsgs, IsComplete} = filter_by_max(Msgs, Max),
+ {NewMsgs, IsComplete, L};
+ _ ->
+ {Msgs, true, L}
+ end;
+select(_LServer, JidRequestor,
+ #jid{luser = LUser, lserver = LServer} = JidArchive,
+ Start, End, With, RSM, MsgType, mnesia) ->
+ MS = make_matchspec(LUser, LServer, Start, End, With),
+ Msgs = mnesia:dirty_select(archive_msg, MS),
+ SortedMsgs = lists:keysort(#archive_msg.timestamp, Msgs),
+ {FilteredMsgs, IsComplete} = filter_by_rsm(SortedMsgs, RSM),
+ Count = length(Msgs),
+ {lists:map(
+ fun(Msg) ->
+ {Msg#archive_msg.id,
+ jlib:binary_to_integer(Msg#archive_msg.id),
+ msg_to_el(Msg, MsgType, JidRequestor, JidArchive)}
+ end, FilteredMsgs), IsComplete, Count};
+select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive,
+ Start, End, With, RSM, MsgType, {odbc, Host}) ->
+ User = case MsgType of
+ chat -> LUser;
+ {groupchat, _Role, _MUCState} -> jid:to_string(JidArchive)
+ end,
+ {Query, CountQuery} = make_sql_query(User, LServer,
+ Start, End, With, RSM),
+ % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a
+ % reasonable limit on how many stanzas may be pushed to a client in one
+ % request. If a query returns a number of stanzas greater than this limit
+ % and the client did not specify a limit using RSM then the server should
+ % return a policy-violation error to the client." We currently don't do this
+ % for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer.
+ case {ejabberd_odbc:sql_query(Host, Query),
+ ejabberd_odbc:sql_query(Host, CountQuery)} of
+ {{selected, _, Res}, {selected, _, [[Count]]}} ->
+ {Max, Direction} = case RSM of
+ #rsm_in{max = M, direction = D} -> {M, D};
+ _ -> {undefined, undefined}
+ end,
+ {Res1, IsComplete} =
+ if Max >= 0 andalso Max /= undefined andalso length(Res) > Max ->
+ if Direction == before ->
+ {lists:nthtail(1, Res), false};
+ true ->
+ {lists:sublist(Res, Max), false}
+ end;
+ true ->
+ {Res, true}
+ end,
+ {lists:flatmap(
+ fun([TS, XML, PeerBin, Kind, Nick]) ->
+ try
+ #xmlel{} = El = fxml_stream:parse_element(XML),
+ Now = usec_to_now(jlib:binary_to_integer(TS)),
+ PeerJid = jid:tolower(jid:from_string(PeerBin)),
+ T = case Kind of
+ <<"">> -> chat;
+ null -> chat;
+ _ -> jlib:binary_to_atom(Kind)
+ end,
+ [{TS, jlib:binary_to_integer(TS),
+ msg_to_el(#archive_msg{timestamp = Now,
+ packet = El,
+ type = T,
+ nick = Nick,
+ peer = PeerJid},
+ MsgType, JidRequestor, JidArchive)}]
+ catch _:Err ->
+ ?ERROR_MSG("failed to parse data from SQL: ~p. "
+ "The data was: "
+ "timestamp = ~s, xml = ~s, "
+ "peer = ~s, kind = ~s, nick = ~s",
+ [Err, TS, XML, PeerBin, Kind, Nick]),
+ []
+ end
+ end, Res1), IsComplete, jlib:binary_to_integer(Count)};
+ _ ->
+ {[], false, 0}
+ end.
+
+msg_to_el(#archive_msg{timestamp = TS, packet = Pkt1, nick = Nick, peer = Peer},
+ MsgType, JidRequestor, #jid{lserver = LServer} = JidArchive) ->
+ Pkt2 = maybe_update_from_to(Pkt1, JidRequestor, JidArchive, Peer, MsgType,
+ Nick),
+ Pkt3 = #xmlel{name = <<"forwarded">>,
+ attrs = [{<<"xmlns">>, ?NS_FORWARD}],
+ children = [fxml:replace_tag_attr(
+ <<"xmlns">>, <<"jabber:client">>, Pkt2)]},
+ jlib:add_delay_info(Pkt3, LServer, TS).
+
+maybe_update_from_to(#xmlel{children = Els} = Pkt, JidRequestor, JidArchive,
+ Peer, {groupchat, Role,
+ #state{config = #config{anonymous = Anon}}},
+ Nick) ->
+ ExposeJID = case {Peer, JidRequestor} of
+ {undefined, _JidRequestor} ->
+ false;
+ {{U, S, _R}, #jid{luser = U, lserver = S}} ->
+ true;
+ {_Peer, _JidRequestor} when not Anon; Role == moderator ->
+ true;
+ {_Peer, _JidRequestor} ->
+ false
+ end,
+ Items = case ExposeJID of
+ true ->
+ [#xmlel{name = <<"x">>,
+ attrs = [{<<"xmlns">>, ?NS_MUC_USER}],
+ children =
+ [#xmlel{name = <<"item">>,
+ attrs = [{<<"jid">>,
+ jid:to_string(Peer)}]}]}];
+ false ->
+ []
+ end,
+ Pkt1 = Pkt#xmlel{children = Items ++ Els},
+ Pkt2 = jlib:replace_from(jid:replace_resource(JidArchive, Nick), Pkt1),
+ jlib:remove_attr(<<"to">>, Pkt2);
+maybe_update_from_to(Pkt, _JidRequestor, _JidArchive, _Peer, chat, _Nick) ->
+ Pkt.
+
+is_bare_copy(#jid{luser = U, lserver = S, lresource = R}, To) ->
+ PrioRes = ejabberd_sm:get_user_present_resources(U, S),
+ MaxRes = case catch lists:max(PrioRes) of
+ {_Prio, Res} when is_binary(Res) ->
+ Res;
+ _ ->
+ undefined
+ end,
+ IsBareTo = case To of
+ #jid{lresource = <<"">>} ->
+ true;
+ #jid{lresource = LRes} ->
+ %% Unavailable resources are handled like bare JIDs.
+ lists:keyfind(LRes, 2, PrioRes) =:= false
+ end,
+ case {IsBareTo, R} of
+ {true, MaxRes} ->
+ ?DEBUG("Recipient of message to bare JID has top priority: ~s@~s/~s",
+ [U, S, R]),
+ false;
+ {true, _R} ->
+ %% The message was sent to our bare JID, and we currently have
+ %% multiple resources with the same highest priority, so the session
+ %% manager routes the message to each of them. We store the message
+ %% only from the resource where R equals MaxRes.
+ ?DEBUG("Additional recipient of message to bare JID: ~s@~s/~s",
+ [U, S, R]),
+ true;
+ {false, _R} ->
+ false
+ end.
+
+send(From, To, Msgs, RSM, Count, IsComplete, #iq{sub_el = SubEl} = IQ) ->
+ QID = fxml:get_tag_attr_s(<<"queryid">>, SubEl),
+ NS = fxml:get_tag_attr_s(<<"xmlns">>, SubEl),
+ QIDAttr = if QID /= <<>> ->
+ [{<<"queryid">>, QID}];
+ true ->
+ []
+ end,
+ CompleteAttr = if NS == ?NS_MAM_TMP ->
+ [];
+ NS == ?NS_MAM_0; NS == ?NS_MAM_1 ->
+ [{<<"complete">>, jlib:atom_to_binary(IsComplete)}]
+ end,
+ Els = lists:map(
+ fun({ID, _IDInt, El}) ->
+ #xmlel{name = <<"message">>,
+ children = [#xmlel{name = <<"result">>,
+ attrs = [{<<"xmlns">>, NS},
+ {<<"id">>, ID}|QIDAttr],
+ children = [El]}]}
+ end, Msgs),
+ RSMOut = make_rsm_out(Msgs, RSM, Count, QIDAttr ++ CompleteAttr, NS),
+ if NS == ?NS_MAM_TMP; NS == ?NS_MAM_1 ->
+ lists:foreach(
+ fun(El) ->
+ ejabberd_router:route(To, From, El)
+ end, Els),
+ IQ#iq{type = result, sub_el = RSMOut};
+ NS == ?NS_MAM_0 ->
+ ejabberd_router:route(
+ To, From, jlib:iq_to_xml(IQ#iq{type = result, sub_el = []})),
+ lists:foreach(
+ fun(El) ->
+ ejabberd_router:route(To, From, El)
+ end, Els),
+ ejabberd_router:route(
+ To, From, #xmlel{name = <<"message">>,
+ children = RSMOut}),
+ ignore
+ end.
+
+
+make_rsm_out([], _, Count, Attrs, NS) ->
+ Tag = if NS == ?NS_MAM_TMP -> <<"query">>;
+ true -> <<"fin">>
+ end,
+ [#xmlel{name = Tag, attrs = [{<<"xmlns">>, NS}|Attrs],
+ children = jlib:rsm_encode(#rsm_out{count = Count})}];
+make_rsm_out([{FirstID, _, _}|_] = Msgs, _, Count, Attrs, NS) ->
+ {LastID, _, _} = lists:last(Msgs),
+ Tag = if NS == ?NS_MAM_TMP -> <<"query">>;
+ true -> <<"fin">>
+ end,
+ [#xmlel{name = Tag, attrs = [{<<"xmlns">>, NS}|Attrs],
+ children = jlib:rsm_encode(
+ #rsm_out{first = FirstID, count = Count,
+ last = LastID})}].
+
+filter_by_rsm(Msgs, none) ->
+ {Msgs, true};
+filter_by_rsm(_Msgs, #rsm_in{max = Max}) when Max < 0 ->
+ {[], true};
+filter_by_rsm(Msgs, #rsm_in{max = Max, direction = Direction, id = ID}) ->
+ NewMsgs = case Direction of
+ aft when ID /= <<"">> ->
+ lists:filter(
+ fun(#archive_msg{id = I}) ->
+ ?BIN_GREATER_THAN(I, ID)
+ end, Msgs);
+ before when ID /= <<"">> ->
+ lists:foldl(
+ fun(#archive_msg{id = I} = Msg, Acc)
+ when ?BIN_LESS_THAN(I, ID) ->
+ [Msg|Acc];
+ (_, Acc) ->
+ Acc
+ end, [], Msgs);
+ before when ID == <<"">> ->
+ lists:reverse(Msgs);
+ _ ->
+ Msgs
+ end,
+ filter_by_max(NewMsgs, Max).
+
+filter_by_max(Msgs, undefined) ->
+ {Msgs, true};
+filter_by_max(Msgs, Len) when is_integer(Len), Len >= 0 ->
+ {lists:sublist(Msgs, Len), length(Msgs) =< Len};
+filter_by_max(_Msgs, _Junk) ->
+ {[], true}.
+
+limit_max(RSM, ?NS_MAM_TMP) ->
+ RSM; % XEP-0313 v0.2 doesn't require clients to support RSM.
+limit_max(#rsm_in{max = Max} = RSM, _NS) when not is_integer(Max) ->
+ RSM#rsm_in{max = ?DEF_PAGE_SIZE};
+limit_max(#rsm_in{max = Max} = RSM, _NS) when Max > ?MAX_PAGE_SIZE ->
+ RSM#rsm_in{max = ?MAX_PAGE_SIZE};
+limit_max(RSM, _NS) ->
+ RSM.
+
+match_interval(Now, Start, End) ->
+ (Now >= Start) and (Now =< End).
+
+match_rsm(Now, #rsm_in{id = ID, direction = aft}) when ID /= <<"">> ->
+ Now1 = (catch usec_to_now(jlib:binary_to_integer(ID))),
+ Now > Now1;
+match_rsm(Now, #rsm_in{id = ID, direction = before}) when ID /= <<"">> ->
+ Now1 = (catch usec_to_now(jlib:binary_to_integer(ID))),
+ Now < Now1;
+match_rsm(_Now, _) ->
+ true.
+
+make_matchspec(LUser, LServer, Start, End, {_, _, <<>>} = With) ->
+ ets:fun2ms(
+ fun(#archive_msg{timestamp = TS,
+ us = US,
+ bare_peer = BPeer} = Msg)
+ when Start =< TS, End >= TS,
+ US == {LUser, LServer},
+ BPeer == With ->
+ Msg
+ end);
+make_matchspec(LUser, LServer, Start, End, {_, _, _} = With) ->
+ ets:fun2ms(
+ fun(#archive_msg{timestamp = TS,
+ us = US,
+ peer = Peer} = Msg)
+ when Start =< TS, End >= TS,
+ US == {LUser, LServer},
+ Peer == With ->
+ Msg
+ end);
+make_matchspec(LUser, LServer, Start, End, none) ->
+ ets:fun2ms(
+ fun(#archive_msg{timestamp = TS,
+ us = US,
+ peer = Peer} = Msg)
+ when Start =< TS, End >= TS,
+ US == {LUser, LServer} ->
+ Msg
+ end).
+
+make_sql_query(User, _LServer, Start, End, With, RSM) ->
+ {Max, Direction, ID} = case RSM of
+ #rsm_in{} ->
+ {RSM#rsm_in.max,
+ RSM#rsm_in.direction,
+ RSM#rsm_in.id};
+ none ->
+ {none, none, <<>>}
+ end,
+ LimitClause = if is_integer(Max), Max >= 0 ->
+ [<<" limit ">>, jlib:integer_to_binary(Max+1)];
+ true ->
+ []
+ end,
+ WithClause = case With of
+ {text, <<>>} ->
+ [];
+ {text, Txt} ->
+ [<<" and match (txt) against ('">>,
+ ejabberd_odbc:escape(Txt), <<"')">>];
+ {_, _, <<>>} ->
+ [<<" and bare_peer='">>,
+ ejabberd_odbc:escape(jid:to_string(With)),
+ <<"'">>];
+ {_, _, _} ->
+ [<<" and peer='">>,
+ ejabberd_odbc:escape(jid:to_string(With)),
+ <<"'">>];
+ none ->
+ []
+ end,
+ PageClause = case catch jlib:binary_to_integer(ID) of
+ I when is_integer(I), I >= 0 ->
+ case Direction of
+ before ->
+ [<<" AND timestamp < ">>, ID];
+ aft ->
+ [<<" AND timestamp > ">>, ID];
+ _ ->
+ []
+ end;
+ _ ->
+ []
+ end,
+ StartClause = case Start of
+ {_, _, _} ->
+ [<<" and timestamp >= ">>,
+ jlib:integer_to_binary(now_to_usec(Start))];
+ _ ->
+ []
+ end,
+ EndClause = case End of
+ {_, _, _} ->
+ [<<" and timestamp <= ">>,
+ jlib:integer_to_binary(now_to_usec(End))];
+ _ ->
+ []
+ end,
+ SUser = ejabberd_odbc:escape(User),
+
+ Query = [<<"SELECT timestamp, xml, peer, kind, nick"
+ " FROM archive WHERE username='">>,
+ SUser, <<"'">>, WithClause, StartClause, EndClause,
+ PageClause],
+
+ QueryPage =
+ case Direction of
+ before ->
+ % ID can be empty because of
+ % XEP-0059: Result Set Management
+ % 2.5 Requesting the Last Page in a Result Set
+ [<<"SELECT timestamp, xml, peer, kind, nick FROM (">>, Query,
+ <<" ORDER BY timestamp DESC ">>,
+ LimitClause, <<") AS t ORDER BY timestamp ASC;">>];
+ _ ->
+ [Query, <<" ORDER BY timestamp ASC ">>,
+ LimitClause, <<";">>]
+ end,
+ {QueryPage,
+ [<<"SELECT COUNT(*) FROM archive WHERE username='">>,
+ SUser, <<"'">>, WithClause, StartClause, EndClause, <<";">>]}.
+
+now_to_usec({MSec, Sec, USec}) ->
+ (MSec*1000000 + Sec)*1000000 + USec.
+
+usec_to_now(Int) ->
+ Secs = Int div 1000000,
+ USec = Int rem 1000000,
+ MSec = Secs div 1000000,
+ Sec = Secs rem 1000000,
+ {MSec, Sec, USec}.
+
+datetime_to_now(DateTime, USecs) ->
+ Seconds = calendar:datetime_to_gregorian_seconds(DateTime) -
+ calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}),
+ {Seconds div 1000000, Seconds rem 1000000, USecs}.
+
+get_jids(Els) ->
+ lists:flatmap(
+ fun(#xmlel{name = <<"jid">>} = El) ->
+ J = jid:from_string(fxml:get_tag_cdata(El)),
+ [jid:tolower(jid:remove_resource(J)),
+ jid:tolower(J)];
+ (_) ->
+ []
+ end, Els).
+
+update(LServer, Table, Fields, Vals, Where) ->
+ UPairs = lists:zipwith(fun (A, B) ->
+ <<A/binary, "='", B/binary, "'">>
+ end,
+ Fields, Vals),
+ case ejabberd_odbc:sql_query(LServer,
+ [<<"update ">>, Table, <<" set ">>,
+ join(UPairs, <<", ">>), <<" where ">>, Where,
+ <<";">>])
+ of
+ {updated, 1} -> {updated, 1};
+ _ ->
+ ejabberd_odbc:sql_query(LServer,
+ [<<"insert into ">>, Table, <<"(">>,
+ join(Fields, <<", ">>), <<") values ('">>,
+ join(Vals, <<"', '">>), <<"');">>])
+ end.
+
+%% Almost a copy of string:join/2.
+join([], _Sep) -> [];
+join([H | T], Sep) -> [H, [[Sep, X] || X <- T]].
+
+get_commands_spec() ->
+ [#ejabberd_commands{name = delete_old_mam_messages, tags = [purge],
+ desc = "Delete MAM messages older than DAYS",
+ longdesc = "Valid message TYPEs: "
+ "\"chat\", \"groupchat\", \"all\".",
+ module = ?MODULE, function = delete_old_messages,
+ args = [{type, binary}, {days, integer}],
+ result = {res, rescode}}].
+
+mod_opt_type(assume_mam_usage) ->
+ fun(if_enabled) -> if_enabled;
+ (on_request) -> on_request;
+ (never) -> never
+ end;
+mod_opt_type(cache_life_time) ->
+ fun (I) when is_integer(I), I > 0 -> I end;
+mod_opt_type(cache_size) ->
+ fun (I) when is_integer(I), I > 0 -> I end;
+mod_opt_type(db_type) -> fun gen_mod:v_db/1;
+mod_opt_type(default) ->
+ fun (always) -> always;
+ (never) -> never;
+ (roster) -> roster
+ end;
+mod_opt_type(iqdisc) -> fun gen_iq_handler:check_type/1;
+mod_opt_type(request_activates_archiving) ->
+ fun (B) when is_boolean(B) -> B end;
+mod_opt_type(_) ->
+ [assume_mam_usage, cache_life_time, cache_size, db_type, default, iqdisc,
+ request_activates_archiving].