diff options
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r-- | src/mod_offline.erl | 678 |
1 files changed, 504 insertions, 174 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 7f9a81a0d..fa6a961fe 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -5,7 +5,7 @@ %%% Created : 5 Jan 2003 by Alexey Shchepin <alexey@process-one.net> %%% %%% -%%% ejabberd, Copyright (C) 2002-2015 ProcessOne +%%% ejabberd, Copyright (C) 2002-2016 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -25,36 +25,51 @@ -module(mod_offline). +-compile([{parse_transform, ejabberd_sql_pt}]). + -author('alexey@process-one.net'). + +-protocol({xep, 13, '1.2'}). +-protocol({xep, 22, '1.4'}). +-protocol({xep, 23, '1.3'}). +-protocol({xep, 160, '1.0'}). +-protocol({xep, 334, '0.2'}). + -define(GEN_SERVER, p1_server). -behaviour(?GEN_SERVER). -behaviour(gen_mod). --export([count_offline_messages/2]). - -export([start/2, - start_link/2, + start_link/2, stop/1, store_packet/3, + store_offline_msg/5, resend_offline_messages/2, pop_offline_messages/3, get_sm_features/5, + get_sm_identity/5, + get_sm_items/5, + get_info/5, + handle_offline_query/3, remove_expired_messages/1, remove_old_messages/2, remove_user/2, - import/1, - import/3, - export/1, + import/1, + import/3, + export/1, get_queue_length/2, - get_offline_els/2, + count_offline_messages/2, + get_offline_els/2, webadmin_page/3, webadmin_user/4, webadmin_user_parse_query/5]). -%% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3]). + handle_info/2, terminate/2, code_change/3, + mod_opt_type/1]). + +-deprecated({get_queue_length,2}). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -65,17 +80,9 @@ -include("ejabberd_web_admin.hrl"). --record(offline_msg, - {us = {<<"">>, <<"">>} :: {binary(), binary()}, - timestamp = now() :: erlang:timestamp() | '_', - expire = now() :: erlang:timestamp() | never | '_', - from = #jid{} :: jid() | '_', - to = #jid{} :: jid() | '_', - packet = #xmlel{} :: xmlel() | '_'}). +-include("mod_offline.hrl"). --record(state, - {host = <<"">> :: binary(), - access_max_offline_messages}). +-include("ejabberd_sql_pt.hrl"). -define(PROCNAME, ejabberd_offline). @@ -116,6 +123,8 @@ init([Host, Opts]) -> update_table(); _ -> ok end, + IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1, + no_queue), ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, store_packet, 50), ejabberd_hooks:add(resend_offline_messages_hook, Host, @@ -128,13 +137,23 @@ init([Host, Opts]) -> ?MODULE, get_sm_features, 50), ejabberd_hooks:add(disco_local_features, Host, ?MODULE, get_sm_features, 50), + ejabberd_hooks:add(disco_sm_identity, Host, + ?MODULE, get_sm_identity, 50), + ejabberd_hooks:add(disco_sm_items, Host, + ?MODULE, get_sm_items, 50), + ejabberd_hooks:add(disco_info, Host, ?MODULE, get_info, 50), ejabberd_hooks:add(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:add(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:add(webadmin_user_parse_query, Host, - ?MODULE, webadmin_user_parse_query, 50), - AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, fun(A) -> A end, max_user_offline_messages), + ?MODULE, webadmin_user_parse_query, 50), + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE, + ?MODULE, handle_offline_query, IQDisc), + AccessMaxOfflineMsgs = + gen_mod:get_opt(access_max_user_messages, Opts, + fun(A) when is_atom(A) -> A end, + max_user_offline_messages), {ok, #state{host = Host, access_max_offline_messages = AccessMaxOfflineMsgs}}. @@ -175,17 +194,24 @@ terminate(_Reason, State) -> ?MODULE, remove_user, 50), ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50), ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50), + ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, get_sm_identity, 50), + ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, get_sm_items, 50), + ejabberd_hooks:delete(disco_info, Host, ?MODULE, get_info, 50), ejabberd_hooks:delete(webadmin_page_host, Host, ?MODULE, webadmin_page, 50), ejabberd_hooks:delete(webadmin_user, Host, ?MODULE, webadmin_user, 50), ejabberd_hooks:delete(webadmin_user_parse_query, Host, ?MODULE, webadmin_user_parse_query, 50), + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. +store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) -> + DBType = gen_mod:db_type(Host, ?MODULE), + store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs, DBType). store_offline_msg(_Host, US, Msgs, Len, MaxOfflineMsgs, mnesia) -> @@ -224,7 +250,7 @@ store_offline_msg(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs, odbc) -> M#offline_msg.timestamp, <<"Offline Storage">>), XML = - ejabberd_odbc:escape(xml:element_to_binary(NewPacket)), + ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)), odbc_queries:add_spool_sql(Username, XML) end, Msgs), @@ -248,10 +274,9 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs, end, Msgs) end. -%% Function copied from ejabberd_sm.erl: get_max_user_messages(AccessRule, {User, Server}, Host) -> case acl:match_rule( - Host, AccessRule, jlib:make_jid(User, Server, <<"">>)) of + Host, AccessRule, jid:make(User, Server, <<"">>)) of Max when is_integer(Max) -> Max; infinity -> infinity; _ -> ?MAX_USER_MESSAGES @@ -274,27 +299,229 @@ get_sm_features(Acc, _From, _To, <<"">>, _Lang) -> {result, I} -> I; _ -> [] end, - {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]}; + {result, Feats ++ [?NS_FEATURE_MSGOFFLINE, ?NS_FLEX_OFFLINE]}; get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) -> %% override all lesser features... {result, []}; +get_sm_features(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, + ?NS_FLEX_OFFLINE, _Lang) -> + {result, [?NS_FLEX_OFFLINE]}; + get_sm_features(Acc, _From, _To, _Node, _Lang) -> Acc. +get_sm_identity(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}, + ?NS_FLEX_OFFLINE, _Lang) -> + Identity = #xmlel{name = <<"identity">>, + attrs = [{<<"category">>, <<"automation">>}, + {<<"type">>, <<"message-list">>}]}, + [Identity]; +get_sm_identity(Acc, _From, _To, _Node, _Lang) -> + Acc. + +get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID, + #jid{luser = U, lserver = S}, + ?NS_FLEX_OFFLINE, _Lang) -> + case ejabberd_sm:get_session_pid(U, S, R) of + Pid when is_pid(Pid) -> + Hdrs = read_message_headers(U, S), + BareJID = jid:to_string(jid:remove_resource(JID)), + Pid ! dont_ask_offline, + {result, lists:map( + fun({Node, From, _OfflineMsg}) -> + #xmlel{name = <<"item">>, + attrs = [{<<"jid">>, BareJID}, + {<<"node">>, Node}, + {<<"name">>, From}]} + end, Hdrs)}; + none -> + {result, []} + end; +get_sm_items(Acc, _From, _To, _Node, _Lang) -> + Acc. + +get_info(_Acc, #jid{luser = U, lserver = S, lresource = R}, + #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, _Lang) -> + N = jlib:integer_to_binary(count_offline_messages(U, S)), + case ejabberd_sm:get_session_pid(U, S, R) of + Pid when is_pid(Pid) -> + Pid ! dont_ask_offline; + none -> + ok + end, + [#xmlel{name = <<"x">>, + attrs = [{<<"xmlns">>, ?NS_XDATA}, + {<<"type">>, <<"result">>}], + children = [#xmlel{name = <<"field">>, + attrs = [{<<"var">>, <<"FORM_TYPE">>}, + {<<"type">>, <<"hidden">>}], + children = [#xmlel{name = <<"value">>, + children = [{xmlcdata, + ?NS_FLEX_OFFLINE}]}]}, + #xmlel{name = <<"field">>, + attrs = [{<<"var">>, <<"number_of_messages">>}], + children = [#xmlel{name = <<"value">>, + children = [{xmlcdata, N}]}]}]}]; +get_info(Acc, _From, _To, _Node, _Lang) -> + Acc. + +handle_offline_query(#jid{luser = U, lserver = S} = From, + #jid{luser = U, lserver = S} = _To, + #iq{type = Type, sub_el = SubEl} = IQ) -> + case Type of + get -> + case fxml:get_subtag(SubEl, <<"fetch">>) of + #xmlel{} -> + handle_offline_fetch(From); + false -> + handle_offline_items_view(From, SubEl) + end; + set -> + case fxml:get_subtag(SubEl, <<"purge">>) of + #xmlel{} -> + delete_all_msgs(U, S); + false -> + handle_offline_items_remove(From, SubEl) + end + end, + IQ#iq{type = result, sub_el = []}; +handle_offline_query(_From, _To, #iq{sub_el = SubEl} = IQ) -> + IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}. + +handle_offline_items_view(JID, #xmlel{children = Items}) -> + {U, S, R} = jid:tolower(JID), + lists:foreach( + fun(Node) -> + case fetch_msg_by_node(JID, Node) of + {ok, OfflineMsg} -> + case offline_msg_to_route(S, OfflineMsg) of + {route, From, To, El} -> + NewEl = set_offline_tag(El, Node), + case ejabberd_sm:get_session_pid(U, S, R) of + Pid when is_pid(Pid) -> + Pid ! {route, From, To, NewEl}; + none -> + ok + end; + error -> + ok + end; + error -> + ok + end + end, get_nodes_from_items(Items, <<"view">>)). + +handle_offline_items_remove(JID, #xmlel{children = Items}) -> + lists:foreach( + fun(Node) -> + remove_msg_by_node(JID, Node) + end, get_nodes_from_items(Items, <<"remove">>)). + +get_nodes_from_items(Items, Action) -> + lists:flatmap( + fun(#xmlel{name = <<"item">>, attrs = Attrs}) -> + case fxml:get_attr_s(<<"action">>, Attrs) of + Action -> + case fxml:get_attr_s(<<"node">>, Attrs) of + <<"">> -> + []; + TS -> + [TS] + end; + _ -> + [] + end; + (_) -> + [] + end, Items). + +set_offline_tag(#xmlel{children = Els} = El, Node) -> + OfflineEl = #xmlel{name = <<"offline">>, + attrs = [{<<"xmlns">>, ?NS_FLEX_OFFLINE}], + children = [#xmlel{name = <<"item">>, + attrs = [{<<"node">>, Node}]}]}, + El#xmlel{children = [OfflineEl|Els]}. + +handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) -> + case ejabberd_sm:get_session_pid(U, S, R) of + none -> + ok; + Pid when is_pid(Pid) -> + Pid ! dont_ask_offline, + lists:foreach( + fun({Node, _, Msg}) -> + case offline_msg_to_route(S, Msg) of + {route, From, To, El} -> + NewEl = set_offline_tag(El, Node), + Pid ! {route, From, To, NewEl}; + error -> + ok + end + end, read_message_headers(U, S)) + end. + +fetch_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) -> + case jid:from_string(From_s) of + From = #jid{} -> + case gen_mod:db_type(To#jid.lserver, ?MODULE) of + odbc -> + read_message(From, To, Seq, odbc); + DBType -> + case binary_to_timestamp(Seq) of + undefined -> ok; + TS -> read_message(From, To, TS, DBType) + end + end; + error -> + ok + end. + +remove_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) -> + case jid:from_string(From_s) of + From = #jid{} -> + case gen_mod:db_type(To#jid.lserver, ?MODULE) of + odbc -> + remove_message(From, To, Seq, odbc); + DBType -> + case binary_to_timestamp(Seq) of + undefined -> ok; + TS -> remove_message(From, To, TS, DBType) + end + end; + error -> + ok + end. + need_to_store(LServer, Packet) -> - Type = xml:get_tag_attr_s(<<"type">>, Packet), + Type = fxml:get_tag_attr_s(<<"type">>, Packet), if (Type /= <<"error">>) and (Type /= <<"groupchat">>) and (Type /= <<"headline">>) -> - case gen_mod:get_module_opt( - LServer, ?MODULE, store_empty_body, - fun(V) when is_boolean(V) -> V end, - true) of + case has_offline_tag(Packet) of false -> - xml:get_subtag(Packet, <<"body">>) /= false; + case check_store_hint(Packet) of + store -> + true; + no_store -> + false; + none -> + case gen_mod:get_module_opt( + LServer, ?MODULE, store_empty_body, + fun(V) when is_boolean(V) -> V; + (unless_chat_state) -> unless_chat_state + end, + unless_chat_state) of + false -> + fxml:get_subtag(Packet, <<"body">>) /= false; + unless_chat_state -> + not jlib:is_standalone_chat_state(Packet); + true -> + true + end + end; true -> - true + false end; true -> false @@ -303,52 +530,59 @@ need_to_store(LServer, Packet) -> store_packet(From, To, Packet) -> case need_to_store(To#jid.lserver, Packet) of true -> - case has_no_storage_hint(Packet) of - false -> - case check_event(From, To, Packet) of - true -> - #jid{luser = LUser, lserver = LServer} = To, - TimeStamp = now(), - #xmlel{children = Els} = Packet, - Expire = find_x_expire(TimeStamp, Els), - gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) ! - #offline_msg{us = {LUser, LServer}, - timestamp = TimeStamp, expire = Expire, - from = From, to = To, packet = Packet}, - stop; - _ -> ok - end; - _ -> ok - end; - false -> ok + case check_event(From, To, Packet) of + true -> + #jid{luser = LUser, lserver = LServer} = To, + TimeStamp = p1_time_compat:timestamp(), + #xmlel{children = Els} = Packet, + Expire = find_x_expire(TimeStamp, Els), + gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) ! + #offline_msg{us = {LUser, LServer}, + timestamp = TimeStamp, expire = Expire, + from = From, to = To, packet = Packet}, + stop; + _ -> ok + end; + false -> ok end. -has_no_storage_hint(Packet) -> - case xml:get_subtag(Packet, <<"no-store">>) of - #xmlel{attrs = Attrs} -> - case xml:get_attr_s(<<"xmlns">>, Attrs) of - ?NS_HINTS -> - true; - _ -> - false - end; - _ -> - false +check_store_hint(Packet) -> + case has_store_hint(Packet) of + true -> + store; + false -> + case has_no_store_hint(Packet) of + true -> + no_store; + false -> + none + end end. -%% Check if the packet has any content about XEP-0022 or XEP-0085 +has_store_hint(Packet) -> + fxml:get_subtag_with_xmlns(Packet, <<"store">>, ?NS_HINTS) =/= false. + +has_no_store_hint(Packet) -> + fxml:get_subtag_with_xmlns(Packet, <<"no-store">>, ?NS_HINTS) =/= false + orelse + fxml:get_subtag_with_xmlns(Packet, <<"no-storage">>, ?NS_HINTS) =/= false. + +has_offline_tag(Packet) -> + fxml:get_subtag_with_xmlns(Packet, <<"offline">>, ?NS_FLEX_OFFLINE) =/= false. + +%% Check if the packet has any content about XEP-0022 check_event(From, To, Packet) -> #xmlel{name = Name, attrs = Attrs, children = Els} = Packet, case find_x_event(Els) of false -> true; El -> - case xml:get_subtag(El, <<"id">>) of + case fxml:get_subtag(El, <<"id">>) of false -> - case xml:get_subtag(El, <<"offline">>) of + case fxml:get_subtag(El, <<"offline">>) of false -> true; _ -> - ID = case xml:get_tag_attr_s(<<"id">>, Packet) of + ID = case fxml:get_tag_attr_s(<<"id">>, Packet) of <<"">> -> #xmlel{name = <<"id">>, attrs = [], children = []}; @@ -380,12 +614,12 @@ check_event(From, To, Packet) -> end end. -%% Check if the packet has subelements about XEP-0022, XEP-0085 or other +%% Check if the packet has subelements about XEP-0022 find_x_event([]) -> false; find_x_event([{xmlcdata, _} | Els]) -> find_x_event(Els); find_x_event([El | Els]) -> - case xml:get_tag_attr_s(<<"xmlns">>, El) of + case fxml:get_tag_attr_s(<<"xmlns">>, El) of ?NS_EVENT -> El; _ -> find_x_event(Els) end. @@ -394,9 +628,9 @@ find_x_expire(_, []) -> never; find_x_expire(TimeStamp, [{xmlcdata, _} | Els]) -> find_x_expire(TimeStamp, Els); find_x_expire(TimeStamp, [El | Els]) -> - case xml:get_tag_attr_s(<<"xmlns">>, El) of + case fxml:get_tag_attr_s(<<"xmlns">>, El) of ?NS_EXPIRE -> - Val = xml:get_tag_attr_s(<<"seconds">>, El), + Val = fxml:get_tag_attr_s(<<"seconds">>, El), case catch jlib:binary_to_integer(Val) of {'EXIT', _} -> never; Int when Int > 0 -> @@ -411,8 +645,8 @@ find_x_expire(TimeStamp, [El | Els]) -> end. resend_offline_messages(User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), US = {LUser, LServer}, F = fun () -> Rs = mnesia:wread({offline_msg, US}), @@ -434,8 +668,8 @@ resend_offline_messages(User, Server) -> end. pop_offline_messages(Ls, User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), pop_offline_messages(Ls, LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). @@ -448,7 +682,7 @@ pop_offline_messages(Ls, LUser, LServer, mnesia) -> end, case mnesia:transaction(F) of {atomic, Rs} -> - TS = now(), + TS = p1_time_compat:timestamp(), Ls ++ lists:map(fun (R) -> offline_msg_to_route(LServer, R) @@ -463,14 +697,11 @@ pop_offline_messages(Ls, LUser, LServer, mnesia) -> _ -> Ls end; pop_offline_messages(Ls, LUser, LServer, odbc) -> - EUser = ejabberd_odbc:escape(LUser), - case odbc_queries:get_and_del_spool_msg_t(LServer, - EUser) - of - {atomic, {selected, [<<"username">>, <<"xml">>], Rs}} -> + case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of + {atomic, {selected, Rs}} -> Ls ++ - lists:flatmap(fun ([_, XML]) -> - case xml_stream:parse_element(XML) of + lists:flatmap(fun ({_, XML}) -> + case fxml_stream:parse_element(XML) of {error, _Reason} -> []; El -> @@ -494,7 +725,7 @@ pop_offline_messages(Ls, LUser, LServer, riak) -> fun(#offline_msg{timestamp = T}) -> ok = ejabberd_riak:delete(offline_msg, T) end, Rs), - TS = now(), + TS = p1_time_compat:timestamp(), Ls ++ lists:map( fun (R) -> offline_msg_to_route(LServer, R) @@ -515,12 +746,12 @@ pop_offline_messages(Ls, LUser, LServer, riak) -> end. remove_expired_messages(Server) -> - LServer = jlib:nameprep(Server), + LServer = jid:nameprep(Server), remove_expired_messages(LServer, gen_mod:db_type(LServer, ?MODULE)). remove_expired_messages(_LServer, mnesia) -> - TimeStamp = now(), + TimeStamp = p1_time_compat:timestamp(), F = fun () -> mnesia:write_lock_table(offline_msg), mnesia:foldl(fun (Rec, _Acc) -> @@ -540,13 +771,12 @@ remove_expired_messages(_LServer, odbc) -> {atomic, ok}; remove_expired_messages(_LServer, riak) -> {atomic, ok}. remove_old_messages(Days, Server) -> - LServer = jlib:nameprep(Server), + LServer = jid:nameprep(Server), remove_old_messages(Days, LServer, gen_mod:db_type(LServer, ?MODULE)). remove_old_messages(Days, _LServer, mnesia) -> - {MegaSecs, Secs, _MicroSecs} = now(), - S = MegaSecs * 1000000 + Secs - 60 * 60 * 24 * Days, + S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days, MegaSecs1 = S div 1000000, Secs1 = S rem 1000000, TimeStamp = {MegaSecs1, Secs1, 0}, @@ -579,8 +809,8 @@ remove_old_messages(_Days, _LServer, riak) -> {atomic, ok}. remove_user(User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), remove_user(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). @@ -589,8 +819,7 @@ remove_user(LUser, LServer, mnesia) -> F = fun () -> mnesia:delete({offline_msg, US}) end, mnesia:transaction(F); remove_user(LUser, LServer, odbc) -> - Username = ejabberd_odbc:escape(LUser), - odbc_queries:del_spool_msg(LServer, Username); + odbc_queries:del_spool_msg(LServer, LUser); remove_user(LUser, LServer, riak) -> {atomic, ejabberd_riak:delete_by_index(offline_msg, <<"us">>, {LUser, LServer})}. @@ -619,7 +848,7 @@ update_table() -> iolist_to_binary(S)}, from = jid_to_binary(From), to = jid_to_binary(To), - packet = xml:to_xmlel(El)} + packet = fxml:to_xmlel(El)} end); _ -> ?INFO_MSG("Recreating offline_msg table", []), @@ -634,7 +863,7 @@ discard_warn_sender(Msgs) -> packet = Packet}) -> ErrText = <<"Your contact offline message queue is " "full. The message has been discarded.">>, - Lang = xml:get_tag_attr_s(<<"xml:lang">>, Packet), + Lang = fxml:get_tag_attr_s(<<"xml:lang">>, Packet), Err = jlib:make_error_reply(Packet, ?ERRT_RESOURCE_CONSTRAINT(Lang, ErrText)), @@ -661,14 +890,14 @@ get_offline_els(LUser, LServer, DBType) jlib:replace_from_to(From, To, Packet) end, Msgs); get_offline_els(LUser, LServer, odbc) -> - Username = ejabberd_odbc:escape(LUser), - case catch ejabberd_odbc:sql_query(LServer, - [<<"select xml from spool where username='">>, - Username, <<"' order by seq;">>]) of - {selected, [<<"xml">>], Rs} -> + case catch ejabberd_odbc:sql_query( + LServer, + ?SQL("select @(xml)s from spool where " + "username=%(LUser)s order by seq")) of + {selected, Rs} -> lists:flatmap( - fun([XML]) -> - case xml_stream:parse_element(XML) of + fun({XML}) -> + case fxml_stream:parse_element(XML) of #xmlel{} = El -> case offline_msg_to_route(LServer, El) of {route, _, _, NewEl} -> @@ -689,14 +918,131 @@ offline_msg_to_route(LServer, #offline_msg{} = R) -> jlib:add_delay_info(R#offline_msg.packet, LServer, R#offline_msg.timestamp, <<"Offline Storage">>)}; offline_msg_to_route(_LServer, #xmlel{} = El) -> - To = jlib:string_to_jid(xml:get_tag_attr_s(<<"to">>, El)), - From = jlib:string_to_jid(xml:get_tag_attr_s(<<"from">>, El)), + To = jid:from_string(fxml:get_tag_attr_s(<<"to">>, El)), + From = jid:from_string(fxml:get_tag_attr_s(<<"from">>, El)), if (To /= error) and (From /= error) -> {route, From, To, El}; true -> error end. +binary_to_timestamp(TS) -> + case catch jlib:binary_to_integer(TS) of + Int when is_integer(Int) -> + Secs = Int div 1000000, + USec = Int rem 1000000, + MSec = Secs div 1000000, + Sec = Secs rem 1000000, + {MSec, Sec, USec}; + _ -> + undefined + end. + +timestamp_to_binary({MS, S, US}) -> + format_timestamp(integer_to_list((MS * 1000000 + S) * 1000000 + US)). + +format_timestamp(TS) -> + iolist_to_binary(io_lib:format("~20..0s", [TS])). + +offline_msg_to_header(#offline_msg{from = From, timestamp = Int} = Msg) -> + TS = timestamp_to_binary(Int), + From_s = jid:to_string(From), + {<<TS/binary, "+", From_s/binary>>, From_s, Msg}. + +read_message_headers(LUser, LServer) -> + DBType = gen_mod:db_type(LServer, ?MODULE), + read_message_headers(LUser, LServer, DBType). + +read_message_headers(LUser, LServer, mnesia) -> + Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}), + Hdrs = lists:map(fun offline_msg_to_header/1, Msgs), + lists:keysort(1, Hdrs); +read_message_headers(LUser, LServer, riak) -> + case ejabberd_riak:get_by_index( + offline_msg, offline_msg_schema(), + <<"us">>, {LUser, LServer}) of + {ok, Rs} -> + Hdrs = lists:map(fun offline_msg_to_header/1, Rs), + lists:keysort(1, Hdrs); + _Err -> + [] + end; +read_message_headers(LUser, LServer, odbc) -> + Username = ejabberd_odbc:escape(LUser), + case catch ejabberd_odbc:sql_query( + LServer, [<<"select xml, seq from spool where username ='">>, + Username, <<"' order by seq;">>]) of + {selected, [<<"xml">>, <<"seq">>], Rows} -> + Hdrs = lists:flatmap( + fun([XML, Seq]) -> + try + #xmlel{} = El = fxml_stream:parse_element(XML), + From = fxml:get_tag_attr_s(<<"from">>, El), + #jid{} = jid:from_string(From), + TS = format_timestamp(Seq), + [{<<TS/binary, "+", From/binary>>, From, El}] + catch _:_ -> [] + end + end, Rows), + lists:keysort(1, Hdrs); + _Err -> + [] + end. + +read_message(_From, To, TS, mnesia) -> + {U, S, _} = jid:tolower(To), + case mnesia:dirty_match_object( + offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}) of + [Msg|_] -> + {ok, Msg}; + _ -> + error + end; +read_message(_From, _To, TS, riak) -> + case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of + {ok, Msg} -> + {ok, Msg}; + _ -> + error + end; +read_message(_From, To, Seq, odbc) -> + {LUser, LServer, _} = jid:tolower(To), + Username = ejabberd_odbc:escape(LUser), + SSeq = ejabberd_odbc:escape(Seq), + case ejabberd_odbc:sql_query( + LServer, + [<<"select xml from spool where username='">>, Username, + <<"' and seq='">>, SSeq, <<"';">>]) of + {selected, [<<"xml">>], [[RawXML]|_]} -> + case fxml_stream:parse_element(RawXML) of + #xmlel{} = El -> {ok, El}; + {error, _} -> error + end; + _ -> + error + end. + +remove_message(_From, To, TS, mnesia) -> + {U, S, _} = jid:tolower(To), + Msgs = mnesia:dirty_match_object( + offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}), + lists:foreach( + fun(Msg) -> + mnesia:dirty_delete_object(Msg) + end, Msgs); +remove_message(_From, _To, TS, riak) -> + ejabberd_riak:delete(offline_msg, TS), + ok; +remove_message(_From, To, Seq, odbc) -> + {LUser, LServer, _} = jid:tolower(To), + Username = ejabberd_odbc:escape(LUser), + SSeq = ejabberd_odbc:escape(Seq), + ejabberd_odbc:sql_query( + LServer, + [<<"delete from spool where username='">>, Username, + <<"' and seq='">>, SSeq, <<"';">>]), + ok. + read_all_msgs(LUser, LServer, mnesia) -> US = {LUser, LServer}, lists:keysort(#offline_msg.timestamp, @@ -711,20 +1057,20 @@ read_all_msgs(LUser, LServer, riak) -> [] end; read_all_msgs(LUser, LServer, odbc) -> - Username = ejabberd_odbc:escape(LUser), - case catch ejabberd_odbc:sql_query(LServer, - [<<"select xml from spool where username='">>, - Username, <<"' order by seq;">>]) - of - {selected, [<<"xml">>], Rs} -> - lists:flatmap(fun ([XML]) -> - case xml_stream:parse_element(XML) of - {error, _Reason} -> []; - El -> [El] - end - end, - Rs); - _ -> [] + case catch ejabberd_odbc:sql_query( + LServer, + ?SQL("select @(xml)s from spool where " + "username=%(LUser)s order by seq")) of + {selected, Rs} -> + lists:flatmap( + fun({XML}) -> + case fxml_stream:parse_element(XML) of + {error, _Reason} -> []; + El -> [El] + end + end, + Rs); + _ -> [] end. format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak -> @@ -742,8 +1088,8 @@ format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak -> [Year, Month, Day, Hour, Minute, Second])), - SFrom = jlib:jid_to_string(From), - STo = jlib:jid_to_string(To), + SFrom = jid:to_string(From), + STo = jid:to_string(To), Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs), Packet = #xmlel{name = Name, attrs = Attrs2, children = Els}, @@ -772,8 +1118,8 @@ format_user_queue(Msgs, odbc) -> Msgs). user_queue(User, Server, Query, Lang) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), US = {LUser, LServer}, DBType = gen_mod:db_type(LServer, ?MODULE), Res = user_queue_parse_query(LUser, LServer, Query, @@ -866,7 +1212,7 @@ user_queue_parse_query(LUser, LServer, Query, odbc) -> of {selected, [<<"xml">>, <<"seq">>], Rs} -> lists:flatmap(fun ([XML, Seq]) -> - case xml_stream:parse_element(XML) + case fxml_stream:parse_element(XML) of {error, _Reason} -> []; El -> [{El, Seq}] @@ -904,33 +1250,10 @@ user_queue_parse_query(LUser, LServer, Query, odbc) -> end. us_to_list({User, Server}) -> - jlib:jid_to_string({User, Server, <<"">>}). + jid:to_string({User, Server, <<"">>}). get_queue_length(LUser, LServer) -> - get_queue_length(LUser, LServer, - gen_mod:db_type(LServer, ?MODULE)). - -get_queue_length(LUser, LServer, mnesia) -> - length(mnesia:dirty_read({offline_msg, - {LUser, LServer}})); -get_queue_length(LUser, LServer, riak) -> - case ejabberd_riak:count_by_index(offline_msg, - <<"us">>, {LUser, LServer}) of - {ok, N} -> - N; - _ -> - 0 - end; -get_queue_length(LUser, LServer, odbc) -> - Username = ejabberd_odbc:escape(LUser), - case catch ejabberd_odbc:sql_query(LServer, - [<<"select count(*) from spool where username='">>, - Username, <<"';">>]) - of - {selected, [_], [[SCount]]} -> - jlib:binary_to_integer(SCount); - _ -> 0 - end. + count_offline_messages(LUser, LServer). get_messages_subset(User, Host, MsgsAll, DBType) -> Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages, @@ -955,8 +1278,8 @@ get_messages_subset2(Max, Length, MsgsAll, DBType) {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), MsgsLastN = lists:nthtail(Length - FirstN - FirstN, Msgs2), - NoJID = jlib:make_jid(<<"...">>, <<"...">>, <<"">>), - IntermediateMsg = #offline_msg{timestamp = now(), + NoJID = jid:make(<<"...">>, <<"...">>, <<"">>), + IntermediateMsg = #offline_msg{timestamp = p1_time_compat:timestamp(), from = NoJID, to = NoJID, packet = #xmlel{name = <<"...">>, attrs = [], @@ -972,8 +1295,8 @@ get_messages_subset2(Max, Length, MsgsAll, odbc) -> MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN. webadmin_user(Acc, User, Server, Lang) -> - QueueLen = get_queue_length(jlib:nodeprep(User), - jlib:nameprep(Server)), + QueueLen = count_offline_messages(jid:nodeprep(User), + jid:nameprep(Server)), FQueueLen = [?AC(<<"queue/">>, (iolist_to_binary(integer_to_list(QueueLen))))], Acc ++ @@ -984,8 +1307,8 @@ webadmin_user(Acc, User, Server, Lang) -> <<"Remove All Offline Messages">>)]. delete_all_msgs(User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), delete_all_msgs(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)). @@ -1003,8 +1326,7 @@ delete_all_msgs(LUser, LServer, riak) -> <<"us">>, {LUser, LServer}), {atomic, Res}; delete_all_msgs(LUser, LServer, odbc) -> - Username = ejabberd_odbc:escape(LUser), - odbc_queries:del_spool_msg(LServer, Username), + odbc_queries:del_spool_msg(LServer, LUser), {atomic, ok}. webadmin_user_parse_query(_, <<"removealloffline">>, @@ -1025,8 +1347,8 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server, %% Returns as integer the number of offline messages for a given user count_offline_messages(User, Server) -> - LUser = jlib:nodeprep(User), - LServer = jlib:nameprep(Server), + LUser = jid:nodeprep(User), + LServer = jid:nameprep(Server), DBType = gen_mod:db_type(LServer, ?MODULE), count_offline_messages(LUser, LServer, DBType). @@ -1040,15 +1362,13 @@ count_offline_messages(LUser, LServer, mnesia) -> _ -> 0 end; count_offline_messages(LUser, LServer, odbc) -> - Username = ejabberd_odbc:escape(LUser), - case catch odbc_queries:count_records_where(LServer, - <<"spool">>, - <<"where username='", - Username/binary, "'">>) - of - {selected, [_], [[Res]]} -> - jlib:binary_to_integer(Res); - _ -> 0 + case catch ejabberd_odbc:sql_query( + LServer, + ?SQL("select @(count(*))d from spool " + "where username=%(LUser)s")) of + {selected, [{Res}]} -> + Res; + _ -> 0 end; count_offline_messages(LUser, LServer, riak) -> case ejabberd_riak:count_by_index( @@ -1098,7 +1418,7 @@ export(_Server) -> Packet1 = jlib:replace_from_to(From, To, Packet), Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp, <<"Offline Storage">>), - XML = ejabberd_odbc:escape(xml:element_to_binary(Packet2)), + XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)), [[<<"delete from spool where username='">>, Username, <<"';">>], [<<"insert into spool(username, xml) values ('">>, Username, <<"', '">>, XML, <<"');">>]]; @@ -1109,18 +1429,18 @@ export(_Server) -> import(LServer) -> [{<<"select username, xml from spool;">>, fun([LUser, XML]) -> - El = #xmlel{} = xml_stream:parse_element(XML), - From = #jid{} = jlib:string_to_jid( - xml:get_attr_s(<<"from">>, El#xmlel.attrs)), - To = #jid{} = jlib:string_to_jid( - xml:get_attr_s(<<"to">>, El#xmlel.attrs)), - Stamp = xml:get_path_s(El, [{elem, <<"delay">>}, + El = #xmlel{} = fxml_stream:parse_element(XML), + From = #jid{} = jid:from_string( + fxml:get_attr_s(<<"from">>, El#xmlel.attrs)), + To = #jid{} = jid:from_string( + fxml:get_attr_s(<<"to">>, El#xmlel.attrs)), + Stamp = fxml:get_path_s(El, [{elem, <<"delay">>}, {attr, <<"stamp">>}]), TS = case jlib:datetime_string_to_timestamp(Stamp) of {_, _, _} = Now -> Now; undefined -> - now() + p1_time_compat:timestamp() end, Expire = find_x_expire(TS, El#xmlel.children), #offline_msg{us = {LUser, LServer}, @@ -1135,3 +1455,13 @@ import(_LServer, riak, #offline_msg{us = US, timestamp = TS} = M) -> [{i, TS}, {'2i', [{<<"us">>, US}]}]); import(_, _, _) -> pass. + +mod_opt_type(access_max_user_messages) -> + fun (A) -> A end; +mod_opt_type(db_type) -> fun gen_mod:v_db/1; +mod_opt_type(store_empty_body) -> + fun (V) when is_boolean(V) -> V; + (unless_chat_state) -> unless_chat_state + end; +mod_opt_type(_) -> + [access_max_user_messages, db_type, store_empty_body]. |