diff options
Diffstat (limited to 'src/mod_mam_sql.erl')
-rw-r--r-- | src/mod_mam_sql.erl | 484 |
1 files changed, 361 insertions, 123 deletions
diff --git a/src/mod_mam_sql.erl b/src/mod_mam_sql.erl index 20ed8d4f1..9e5df26a9 100644 --- a/src/mod_mam_sql.erl +++ b/src/mod_mam_sql.erl @@ -1,26 +1,43 @@ %%%------------------------------------------------------------------- -%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net> -%%% @copyright (C) 2016, Evgeny Khramtsov -%%% @doc -%%% -%%% @end +%%% File : mod_mam_sql.erl +%%% Author : Evgeny Khramtsov <ekhramtsov@process-one.net> %%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net> -%%%------------------------------------------------------------------- +%%% +%%% +%%% ejabberd, Copyright (C) 2002-2019 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + -module(mod_mam_sql). --compile([{parse_transform, ejabberd_sql_pt}]). -behaviour(mod_mam). %% API -export([init/2, remove_user/2, remove_room/3, delete_old_messages/3, - extended_fields/0, store/7, write_prefs/4, get_prefs/2, select/8]). + extended_fields/0, store/8, write_prefs/4, get_prefs/2, select/7, export/1, remove_from_archive/3, + is_empty_for_user/2, is_empty_for_room/3, select_with_mucsub/6]). -include_lib("stdlib/include/ms_transform.hrl"). --include("jlib.hrl"). +-include("xmpp.hrl"). -include("mod_mam.hrl"). -include("logger.hrl"). -include("ejabberd_sql_pt.hrl"). +-include("mod_muc_room.hrl"). %%%=================================================================== %%% API @@ -31,59 +48,88 @@ init(_Host, _Opts) -> remove_user(LUser, LServer) -> ejabberd_sql:sql_query( LServer, - ?SQL("delete from archive where username=%(LUser)s")), + ?SQL("delete from archive where username=%(LUser)s and %(LServer)H")), ejabberd_sql:sql_query( LServer, - ?SQL("delete from archive_prefs where username=%(LUser)s")). + ?SQL("delete from archive_prefs where username=%(LUser)s and %(LServer)H")). remove_room(LServer, LName, LHost) -> - LUser = jid:to_string({LName, LHost, <<>>}), + LUser = jid:encode({LName, LHost, <<>>}), remove_user(LUser, LServer). +remove_from_archive(LUser, LServer, none) -> + case ejabberd_sql:sql_query(LServer, + ?SQL("delete from archive where username=%(LUser)s and %(LServer)H")) of + {error, Reason} -> {error, Reason}; + _ -> ok + end; +remove_from_archive(LUser, LServer, WithJid) -> + Peer = jid:encode(jid:remove_resource(WithJid)), + case ejabberd_sql:sql_query(LServer, + ?SQL("delete from archive where username=%(LUser)s and %(LServer)H and bare_peer=%(Peer)s")) of + {error, Reason} -> {error, Reason}; + _ -> ok + end. + delete_old_messages(ServerHost, TimeStamp, Type) -> - TypeClause = if Type == all -> <<"">>; - true -> [<<" and kind='">>, jlib:atom_to_binary(Type), <<"'">>] - end, - TS = integer_to_binary(now_to_usec(TimeStamp)), - ejabberd_sql:sql_query( - ServerHost, [<<"delete from archive where timestamp<">>, - TS, TypeClause, <<";">>]), + TS = now_to_usec(TimeStamp), + case Type of + all -> + ejabberd_sql:sql_query( + ServerHost, + ?SQL("delete from archive" + " where timestamp < %(TS)d and %(ServerHost)H")); + _ -> + SType = misc:atom_to_binary(Type), + ejabberd_sql:sql_query( + ServerHost, + ?SQL("delete from archive" + " where timestamp < %(TS)d" + " and kind=%(SType)s" + " and %(ServerHost)H")) + end, ok. extended_fields() -> - [#xmlel{name = <<"field">>, - attrs = [{<<"type">>, <<"text-single">>}, - {<<"var">>, <<"withtext">>}]}]. + [{withtext, <<"">>}]. -store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir) -> - TSinteger = p1_time_compat:system_time(micro_seconds), - ID = jlib:integer_to_binary(TSinteger), +store(Pkt, LServer, {LUser, LHost}, Type, Peer, Nick, _Dir, TS) -> SUser = case Type of chat -> LUser; - groupchat -> jid:to_string({LUser, LHost, <<>>}) + groupchat -> jid:encode({LUser, LHost, <<>>}) end, - BarePeer = jid:to_string( + BarePeer = jid:encode( jid:tolower( jid:remove_resource(Peer))), - LPeer = jid:to_string( + LPeer = jid:encode( jid:tolower(Peer)), - XML = fxml:element_to_binary(Pkt), Body = fxml:get_subtag_cdata(Pkt, <<"body">>), - SType = jlib:atom_to_binary(Type), + SType = misc:atom_to_binary(Type), + XML = case mod_mam_opt:compress_xml(LServer) of + true -> + J1 = case Type of + chat -> jid:encode({LUser, LHost, <<>>}); + groupchat -> SUser + end, + xml_compress:encode(Pkt, J1, LPeer); + _ -> + fxml:element_to_binary(Pkt) + end, case ejabberd_sql:sql_query( LServer, - ?SQL("insert into archive (username, timestamp," - " peer, bare_peer, xml, txt, kind, nick) values (" - "%(SUser)s, " - "%(TSinteger)d, " - "%(LPeer)s, " - "%(BarePeer)s, " - "%(XML)s, " - "%(Body)s, " - "%(SType)s, " - "%(Nick)s)")) of + ?SQL_INSERT( + "archive", + ["username=%(SUser)s", + "server_host=%(LServer)s", + "timestamp=%(TS)d", + "peer=%(LPeer)s", + "bare_peer=%(BarePeer)s", + "xml=%(XML)s", + "txt=%(Body)s", + "kind=%(SType)s", + "nick=%(Nick)s"])) of {updated, _} -> - {ok, ID}; + ok; Err -> Err end. @@ -93,16 +139,17 @@ write_prefs(LUser, _LServer, #archive_prefs{default = Default, always = Always}, ServerHost) -> SDefault = erlang:atom_to_binary(Default, utf8), - SAlways = jlib:term_to_expr(Always), - SNever = jlib:term_to_expr(Never), + SAlways = misc:term_to_expr(Always), + SNever = misc:term_to_expr(Never), case ?SQL_UPSERT( ServerHost, "archive_prefs", ["!username=%(LUser)s", + "!server_host=%(ServerHost)s", "def=%(SDefault)s", "always=%(SAlways)s", "never=%(SNever)s"]) of - {updated, _} -> + ok -> ok; Err -> Err @@ -112,7 +159,7 @@ get_prefs(LUser, LServer) -> case ejabberd_sql:sql_query( LServer, ?SQL("select @(def)s, @(always)s, @(never)s from archive_prefs" - " where username=%(LUser)s")) of + " where username=%(LUser)s and %(LServer)H")) of {selected, [{SDefault, SAlways, SNever}]} -> Default = erlang:binary_to_existing_atom(SDefault, utf8), Always = ejabberd_sql:decode_term(SAlways), @@ -126,67 +173,167 @@ get_prefs(LUser, LServer) -> end. select(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, - Start, End, With, RSM, MsgType) -> + MAMQuery, RSM, MsgType, Flags) -> User = case MsgType of chat -> LUser; - {groupchat, _Role, _MUCState} -> jid:to_string(JidArchive) + _ -> jid:encode(JidArchive) end, - {Query, CountQuery} = make_sql_query(User, LServer, - Start, End, With, RSM), + {Query, CountQuery} = make_sql_query(User, LServer, MAMQuery, RSM, none), + do_select_query(LServer, JidRequestor, JidArchive, RSM, MsgType, Query, CountQuery, Flags). + +-spec select_with_mucsub(binary(), jid(), jid(), mam_query:result(), + #rsm_set{} | undefined, all | only_count | only_messages) -> + {[{binary(), non_neg_integer(), xmlel()}], boolean(), non_neg_integer()} | + {error, db_failure}. +select_with_mucsub(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, + MAMQuery, RSM, Flags) -> + Extra = case gen_mod:db_mod(LServer, mod_muc) of + mod_muc_sql -> + subscribers_table; + _ -> + SubRooms = case mod_muc_admin:find_hosts(LServer) of + [First|_] -> + case mod_muc:get_subscribed_rooms(First, JidRequestor) of + {ok, L} -> L; + {error, _} -> [] + end; + _ -> + [] + end, + [jid:encode(Jid) || {Jid, _} <- SubRooms] + end, + {Query, CountQuery} = make_sql_query(LUser, LServer, MAMQuery, RSM, Extra), + do_select_query(LServer, JidRequestor, JidArchive, RSM, chat, Query, CountQuery, Flags). + +do_select_query(LServer, JidRequestor, #jid{luser = LUser} = JidArchive, RSM, + MsgType, Query, CountQuery, Flags) -> % TODO from XEP-0313 v0.2: "To conserve resources, a server MAY place a % reasonable limit on how many stanzas may be pushed to a client in one % request. If a query returns a number of stanzas greater than this limit % and the client did not specify a limit using RSM then the server should % return a policy-violation error to the client." We currently don't do this % for v0.2 requests, but we do limit #rsm_in.max for v0.3 and newer. - case {ejabberd_sql:sql_query(LServer, Query), - ejabberd_sql:sql_query(LServer, CountQuery)} of + QRes = case Flags of + all -> + {ejabberd_sql:sql_query(LServer, Query), ejabberd_sql:sql_query(LServer, CountQuery)}; + only_messages -> + {ejabberd_sql:sql_query(LServer, Query), {selected, ok, [[<<"0">>]]}}; + only_count -> + {{selected, ok, []}, ejabberd_sql:sql_query(LServer, CountQuery)} + end, + case QRes of {{selected, _, Res}, {selected, _, [[Count]]}} -> - {Max, Direction} = case RSM of - #rsm_in{max = M, direction = D} -> {M, D}; - _ -> {undefined, undefined} - end, + {Max, Direction, _} = get_max_direction_id(RSM), {Res1, IsComplete} = - if Max >= 0 andalso Max /= undefined andalso length(Res) > Max -> - if Direction == before -> - {lists:nthtail(1, Res), false}; - true -> - {lists:sublist(Res, Max), false} - end; - true -> - {Res, true} - end, + if Max >= 0 andalso Max /= undefined andalso length(Res) > Max -> + if Direction == before -> + {lists:nthtail(1, Res), false}; + true -> + {lists:sublist(Res, Max), false} + end; + true -> + {Res, true} + end, + MucState = #state{config = #config{anonymous = true}}, + JidArchiveS = jid:encode(JidArchive), {lists:flatmap( - fun([TS, XML, PeerBin, Kind, Nick]) -> - try - #xmlel{} = El = fxml_stream:parse_element(XML), - Now = usec_to_now(jlib:binary_to_integer(TS)), - PeerJid = jid:tolower(jid:from_string(PeerBin)), - T = case Kind of - <<"">> -> chat; - null -> chat; - _ -> jlib:binary_to_atom(Kind) - end, - [{TS, jlib:binary_to_integer(TS), - mod_mam:msg_to_el(#archive_msg{timestamp = Now, - packet = El, - type = T, - nick = Nick, - peer = PeerJid}, - MsgType, JidRequestor, JidArchive)}] - catch _:Err -> - ?ERROR_MSG("failed to parse data from SQL: ~p. " - "The data was: " - "timestamp = ~s, xml = ~s, " - "peer = ~s, kind = ~s, nick = ~s", - [Err, TS, XML, PeerBin, Kind, Nick]), + fun([TS, XML, PeerBin, Kind, Nick]) -> + case make_archive_el(JidArchiveS, TS, XML, PeerBin, Kind, Nick, + MsgType, JidRequestor, JidArchive) of + {ok, El} -> + [{TS, binary_to_integer(TS), El}]; + {error, _} -> + [] + end; + ([User, TS, XML, PeerBin, Kind, Nick]) when User == LUser -> + case make_archive_el(JidArchiveS, TS, XML, PeerBin, Kind, Nick, + MsgType, JidRequestor, JidArchive) of + {ok, El} -> + [{TS, binary_to_integer(TS), El}]; + {error, _} -> + [] + end; + ([User, TS, XML, PeerBin, Kind, Nick]) -> + case make_archive_el(User, TS, XML, PeerBin, Kind, Nick, + {groupchat, member, MucState}, JidRequestor, + jid:decode(User)) of + {ok, El} -> + mod_mam:wrap_as_mucsub([{TS, binary_to_integer(TS), El}], + JidRequestor); + {error, _} -> [] end - end, Res1), IsComplete, jlib:binary_to_integer(Count)}; + end, Res1), IsComplete, binary_to_integer(Count)}; _ -> {[], false, 0} end. +export(_Server) -> + [{archive_prefs, + fun(Host, #archive_prefs{us = + {LUser, LServer}, + default = Default, + always = Always, + never = Never}) + when LServer == Host -> + SDefault = erlang:atom_to_binary(Default, utf8), + SAlways = misc:term_to_expr(Always), + SNever = misc:term_to_expr(Never), + [?SQL_INSERT( + "archive_prefs", + ["username=%(LUser)s", + "server_host=%(LServer)s", + "def=%(SDefault)s", + "always=%(SAlways)s", + "never=%(SNever)s"])]; + (_Host, _R) -> + [] + end}, + {archive_msg, + fun(Host, #archive_msg{us ={LUser, LServer}, + id = _ID, timestamp = TS, peer = Peer, + type = Type, nick = Nick, packet = Pkt}) + when LServer == Host -> + TStmp = now_to_usec(TS), + SUser = case Type of + chat -> LUser; + groupchat -> jid:encode({LUser, LServer, <<>>}) + end, + BarePeer = jid:encode(jid:tolower(jid:remove_resource(Peer))), + LPeer = jid:encode(jid:tolower(Peer)), + XML = fxml:element_to_binary(Pkt), + Body = fxml:get_subtag_cdata(Pkt, <<"body">>), + SType = misc:atom_to_binary(Type), + [?SQL_INSERT( + "archive", + ["username=%(SUser)s", + "server_host=%(LServer)s", + "timestamp=%(TStmp)d", + "peer=%(LPeer)s", + "bare_peer=%(BarePeer)s", + "xml=%(XML)s", + "txt=%(Body)s", + "kind=%(SType)s", + "nick=%(Nick)s"])]; + (_Host, _R) -> + [] + end}]. + +is_empty_for_user(LUser, LServer) -> + case ejabberd_sql:sql_query( + LServer, + ?SQL("select @(1)d from archive" + " where username=%(LUser)s and %(LServer)H limit 1")) of + {selected, [{1}]} -> + false; + _ -> + true + end. + +is_empty_for_room(LServer, LName, LHost) -> + LUser = jid:encode({LName, LHost, <<>>}), + is_empty_for_user(LUser, LServer). + %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -200,18 +347,13 @@ usec_to_now(Int) -> Sec = Secs rem 1000000, {MSec, Sec, USec}. -make_sql_query(User, LServer, Start, End, With, RSM) -> - {Max, Direction, ID} = case RSM of - #rsm_in{} -> - {RSM#rsm_in.max, - RSM#rsm_in.direction, - RSM#rsm_in.id}; - none -> - {none, none, <<>>} - end, - ODBCType = ejabberd_config:get_option( - {sql_type, LServer}, - ejabberd_sql:opt_type(sql_type)), +make_sql_query(User, LServer, MAMQuery, RSM, ExtraUsernames) -> + Start = proplists:get_value(start, MAMQuery), + End = proplists:get_value('end', MAMQuery), + With = proplists:get_value(with, MAMQuery), + WithText = proplists:get_value(withtext, MAMQuery), + {Max, Direction, ID} = get_max_direction_id(RSM), + ODBCType = ejabberd_option:sql_type(LServer), Escape = case ODBCType of mssql -> fun ejabberd_sql:standard_escape/1; @@ -219,38 +361,39 @@ make_sql_query(User, LServer, Start, End, With, RSM) -> _ -> fun ejabberd_sql:escape/1 end, LimitClause = if is_integer(Max), Max >= 0, ODBCType /= mssql -> - [<<" limit ">>, jlib:integer_to_binary(Max+1)]; + [<<" limit ">>, integer_to_binary(Max+1)]; true -> [] end, TopClause = if is_integer(Max), Max >= 0, ODBCType == mssql -> - [<<" TOP ">>, jlib:integer_to_binary(Max+1)]; + [<<" TOP ">>, integer_to_binary(Max+1)]; true -> [] end, - WithClause = case With of - {text, <<>>} -> - []; - {text, Txt} -> - [<<" and match (txt) against ('">>, - Escape(Txt), <<"')">>]; + WithTextClause = if is_binary(WithText), WithText /= <<>> -> + [<<" and match (txt) against ('">>, + Escape(WithText), <<"')">>]; + true -> + [] + end, + WithClause = case catch jid:tolower(With) of {_, _, <<>>} -> [<<" and bare_peer='">>, - Escape(jid:to_string(With)), + Escape(jid:encode(With)), <<"'">>]; {_, _, _} -> [<<" and peer='">>, - Escape(jid:to_string(With)), + Escape(jid:encode(With)), <<"'">>]; - none -> + _ -> [] end, - PageClause = case catch jlib:binary_to_integer(ID) of + PageClause = case catch binary_to_integer(ID) of I when is_integer(I), I >= 0 -> case Direction of before -> [<<" AND timestamp < ">>, ID]; - aft -> + 'after' -> [<<" AND timestamp > ">>, ID]; _ -> [] @@ -261,23 +404,53 @@ make_sql_query(User, LServer, Start, End, With, RSM) -> StartClause = case Start of {_, _, _} -> [<<" and timestamp >= ">>, - jlib:integer_to_binary(now_to_usec(Start))]; + integer_to_binary(now_to_usec(Start))]; _ -> [] end, EndClause = case End of {_, _, _} -> [<<" and timestamp <= ">>, - jlib:integer_to_binary(now_to_usec(End))]; + integer_to_binary(now_to_usec(End))]; _ -> [] end, SUser = Escape(User), + SServer = Escape(LServer), + + HostMatch = case ejabberd_sql:use_new_schema() of + true -> + [<<" and server_host='", SServer/binary, "'">>]; + _ -> + <<"">> + end, + + {UserSel, UserWhere} = case ExtraUsernames of + Users when is_list(Users) -> + EscUsers = [<<"'", (Escape(U))/binary, "'">> || U <- [User | Users]], + {<<" username,">>, + [<<" username in (">>, str:join(EscUsers, <<",">>), <<")">>]}; + subscribers_table -> + SJid = Escape(jid:encode({User, LServer, <<>>})), + RoomName = case ODBCType of + sqlite -> + <<"room || '@' || host">>; + _ -> + <<"concat(room, '@', host)">> + end, + {<<" username,">>, + [<<" (username = '">>, SUser, <<"'">>, + <<" or username in (select ">>, RoomName, + <<" from muc_room_subscribers where jid='">>, SJid, <<"'">>, HostMatch, <<"))">>]}; + _ -> + {<<>>, [<<" username='">>, SUser, <<"'">>]} + end, - Query = [<<"SELECT ">>, TopClause, <<" timestamp, xml, peer, kind, nick" - " FROM archive WHERE username='">>, - SUser, <<"'">>, WithClause, StartClause, EndClause, - PageClause], + Query = [<<"SELECT ">>, TopClause, UserSel, + <<" timestamp, xml, peer, kind, nick" + " FROM archive WHERE">>, UserWhere, HostMatch, + WithClause, WithTextClause, + StartClause, EndClause, PageClause], QueryPage = case Direction of @@ -285,13 +458,78 @@ make_sql_query(User, LServer, Start, End, With, RSM) -> % ID can be empty because of % XEP-0059: Result Set Management % 2.5 Requesting the Last Page in a Result Set - [<<"SELECT timestamp, xml, peer, kind, nick FROM (">>, Query, - <<" ORDER BY timestamp DESC ">>, + [<<"SELECT">>, UserSel, <<" timestamp, xml, peer, kind, nick FROM (">>, + Query, <<" ORDER BY timestamp DESC ">>, LimitClause, <<") AS t ORDER BY timestamp ASC;">>]; _ -> [Query, <<" ORDER BY timestamp ASC ">>, LimitClause, <<";">>] end, {QueryPage, - [<<"SELECT COUNT(*) FROM archive WHERE username='">>, - SUser, <<"'">>, WithClause, StartClause, EndClause, <<";">>]}. + [<<"SELECT COUNT(*) FROM archive WHERE ">>, UserWhere, + HostMatch, WithClause, WithTextClause, + StartClause, EndClause, <<";">>]}. + +-spec get_max_direction_id(rsm_set() | undefined) -> + {integer() | undefined, + before | 'after' | undefined, + binary()}. +get_max_direction_id(RSM) -> + case RSM of + #rsm_set{max = Max, before = Before} when is_binary(Before) -> + {Max, before, Before}; + #rsm_set{max = Max, 'after' = After} when is_binary(After) -> + {Max, 'after', After}; + #rsm_set{max = Max} -> + {Max, undefined, <<>>}; + _ -> + {undefined, undefined, <<>>} + end. + +-spec make_archive_el(binary(), binary(), binary(), binary(), binary(), + binary(), _, jid(), jid()) -> + {ok, xmpp_element()} | {error, invalid_jid | + invalid_timestamp | + invalid_xml}. +make_archive_el(User, TS, XML, Peer, Kind, Nick, MsgType, JidRequestor, JidArchive) -> + case xml_compress:decode(XML, User, Peer) of + #xmlel{} = El -> + try binary_to_integer(TS) of + TSInt -> + try jid:decode(Peer) of + PeerJID -> + Now = usec_to_now(TSInt), + PeerLJID = jid:tolower(PeerJID), + T = case Kind of + <<"">> -> chat; + null -> chat; + _ -> misc:binary_to_atom(Kind) + end, + mod_mam:msg_to_el( + #archive_msg{timestamp = Now, + id = TS, + packet = El, + type = T, + nick = Nick, + peer = PeerLJID}, + MsgType, JidRequestor, JidArchive) + catch _:{bad_jid, _} -> + ?ERROR_MSG("Malformed 'peer' field with value " + "'~ts' detected for user ~ts in table " + "'archive': invalid JID", + [Peer, jid:encode(JidArchive)]), + {error, invalid_jid} + end + catch _:_ -> + ?ERROR_MSG("Malformed 'timestamp' field with value '~ts' " + "detected for user ~ts in table 'archive': " + "not an integer", + [TS, jid:encode(JidArchive)]), + {error, invalid_timestamp} + end; + {error, {_, Reason}} -> + ?ERROR_MSG("Malformed 'xml' field with value '~ts' detected " + "for user ~ts in table 'archive': ~ts", + [XML, jid:encode(JidArchive), Reason]), + {error, invalid_xml} + end. |