aboutsummaryrefslogtreecommitdiff
path: root/src/mod_offline.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r--src/mod_offline.erl678
1 files changed, 504 insertions, 174 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 7f9a81a0d..fa6a961fe 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -5,7 +5,7 @@
%%% Created : 5 Jan 2003 by Alexey Shchepin <alexey@process-one.net>
%%%
%%%
-%%% ejabberd, Copyright (C) 2002-2015 ProcessOne
+%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
@@ -25,36 +25,51 @@
-module(mod_offline).
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
-author('alexey@process-one.net').
+
+-protocol({xep, 13, '1.2'}).
+-protocol({xep, 22, '1.4'}).
+-protocol({xep, 23, '1.3'}).
+-protocol({xep, 160, '1.0'}).
+-protocol({xep, 334, '0.2'}).
+
-define(GEN_SERVER, p1_server).
-behaviour(?GEN_SERVER).
-behaviour(gen_mod).
--export([count_offline_messages/2]).
-
-export([start/2,
- start_link/2,
+ start_link/2,
stop/1,
store_packet/3,
+ store_offline_msg/5,
resend_offline_messages/2,
pop_offline_messages/3,
get_sm_features/5,
+ get_sm_identity/5,
+ get_sm_items/5,
+ get_info/5,
+ handle_offline_query/3,
remove_expired_messages/1,
remove_old_messages/2,
remove_user/2,
- import/1,
- import/3,
- export/1,
+ import/1,
+ import/3,
+ export/1,
get_queue_length/2,
- get_offline_els/2,
+ count_offline_messages/2,
+ get_offline_els/2,
webadmin_page/3,
webadmin_user/4,
webadmin_user_parse_query/5]).
-%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3]).
+ handle_info/2, terminate/2, code_change/3,
+ mod_opt_type/1]).
+
+-deprecated({get_queue_length,2}).
-include("ejabberd.hrl").
-include("logger.hrl").
@@ -65,17 +80,9 @@
-include("ejabberd_web_admin.hrl").
--record(offline_msg,
- {us = {<<"">>, <<"">>} :: {binary(), binary()},
- timestamp = now() :: erlang:timestamp() | '_',
- expire = now() :: erlang:timestamp() | never | '_',
- from = #jid{} :: jid() | '_',
- to = #jid{} :: jid() | '_',
- packet = #xmlel{} :: xmlel() | '_'}).
+-include("mod_offline.hrl").
--record(state,
- {host = <<"">> :: binary(),
- access_max_offline_messages}).
+-include("ejabberd_sql_pt.hrl").
-define(PROCNAME, ejabberd_offline).
@@ -116,6 +123,8 @@ init([Host, Opts]) ->
update_table();
_ -> ok
end,
+ IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
+ no_queue),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
store_packet, 50),
ejabberd_hooks:add(resend_offline_messages_hook, Host,
@@ -128,13 +137,23 @@ init([Host, Opts]) ->
?MODULE, get_sm_features, 50),
ejabberd_hooks:add(disco_local_features, Host,
?MODULE, get_sm_features, 50),
+ ejabberd_hooks:add(disco_sm_identity, Host,
+ ?MODULE, get_sm_identity, 50),
+ ejabberd_hooks:add(disco_sm_items, Host,
+ ?MODULE, get_sm_items, 50),
+ ejabberd_hooks:add(disco_info, Host, ?MODULE, get_info, 50),
ejabberd_hooks:add(webadmin_page_host, Host,
?MODULE, webadmin_page, 50),
ejabberd_hooks:add(webadmin_user, Host,
?MODULE, webadmin_user, 50),
ejabberd_hooks:add(webadmin_user_parse_query, Host,
- ?MODULE, webadmin_user_parse_query, 50),
- AccessMaxOfflineMsgs = gen_mod:get_opt(access_max_user_messages, Opts, fun(A) -> A end, max_user_offline_messages),
+ ?MODULE, webadmin_user_parse_query, 50),
+ gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE,
+ ?MODULE, handle_offline_query, IQDisc),
+ AccessMaxOfflineMsgs =
+ gen_mod:get_opt(access_max_user_messages, Opts,
+ fun(A) when is_atom(A) -> A end,
+ max_user_offline_messages),
{ok,
#state{host = Host,
access_max_offline_messages = AccessMaxOfflineMsgs}}.
@@ -175,17 +194,24 @@ terminate(_Reason, State) ->
?MODULE, remove_user, 50),
ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, get_sm_features, 50),
ejabberd_hooks:delete(disco_local_features, Host, ?MODULE, get_sm_features, 50),
+ ejabberd_hooks:delete(disco_sm_identity, Host, ?MODULE, get_sm_identity, 50),
+ ejabberd_hooks:delete(disco_sm_items, Host, ?MODULE, get_sm_items, 50),
+ ejabberd_hooks:delete(disco_info, Host, ?MODULE, get_info, 50),
ejabberd_hooks:delete(webadmin_page_host, Host,
?MODULE, webadmin_page, 50),
ejabberd_hooks:delete(webadmin_user, Host,
?MODULE, webadmin_user, 50),
ejabberd_hooks:delete(webadmin_user_parse_query, Host,
?MODULE, webadmin_user_parse_query, 50),
+ gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_FLEX_OFFLINE),
ok.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
+store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) ->
+ DBType = gen_mod:db_type(Host, ?MODULE),
+ store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs, DBType).
store_offline_msg(_Host, US, Msgs, Len, MaxOfflineMsgs,
mnesia) ->
@@ -224,7 +250,7 @@ store_offline_msg(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs, odbc) ->
M#offline_msg.timestamp,
<<"Offline Storage">>),
XML =
- ejabberd_odbc:escape(xml:element_to_binary(NewPacket)),
+ ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)),
odbc_queries:add_spool_sql(Username, XML)
end,
Msgs),
@@ -248,10 +274,9 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs,
end, Msgs)
end.
-%% Function copied from ejabberd_sm.erl:
get_max_user_messages(AccessRule, {User, Server}, Host) ->
case acl:match_rule(
- Host, AccessRule, jlib:make_jid(User, Server, <<"">>)) of
+ Host, AccessRule, jid:make(User, Server, <<"">>)) of
Max when is_integer(Max) -> Max;
infinity -> infinity;
_ -> ?MAX_USER_MESSAGES
@@ -274,27 +299,229 @@ get_sm_features(Acc, _From, _To, <<"">>, _Lang) ->
{result, I} -> I;
_ -> []
end,
- {result, Feats ++ [?NS_FEATURE_MSGOFFLINE]};
+ {result, Feats ++ [?NS_FEATURE_MSGOFFLINE, ?NS_FLEX_OFFLINE]};
get_sm_features(_Acc, _From, _To, ?NS_FEATURE_MSGOFFLINE, _Lang) ->
%% override all lesser features...
{result, []};
+get_sm_features(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
+ ?NS_FLEX_OFFLINE, _Lang) ->
+ {result, [?NS_FLEX_OFFLINE]};
+
get_sm_features(Acc, _From, _To, _Node, _Lang) ->
Acc.
+get_sm_identity(_Acc, #jid{luser = U, lserver = S}, #jid{luser = U, lserver = S},
+ ?NS_FLEX_OFFLINE, _Lang) ->
+ Identity = #xmlel{name = <<"identity">>,
+ attrs = [{<<"category">>, <<"automation">>},
+ {<<"type">>, <<"message-list">>}]},
+ [Identity];
+get_sm_identity(Acc, _From, _To, _Node, _Lang) ->
+ Acc.
+
+get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID,
+ #jid{luser = U, lserver = S},
+ ?NS_FLEX_OFFLINE, _Lang) ->
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ Pid when is_pid(Pid) ->
+ Hdrs = read_message_headers(U, S),
+ BareJID = jid:to_string(jid:remove_resource(JID)),
+ Pid ! dont_ask_offline,
+ {result, lists:map(
+ fun({Node, From, _OfflineMsg}) ->
+ #xmlel{name = <<"item">>,
+ attrs = [{<<"jid">>, BareJID},
+ {<<"node">>, Node},
+ {<<"name">>, From}]}
+ end, Hdrs)};
+ none ->
+ {result, []}
+ end;
+get_sm_items(Acc, _From, _To, _Node, _Lang) ->
+ Acc.
+
+get_info(_Acc, #jid{luser = U, lserver = S, lresource = R},
+ #jid{luser = U, lserver = S}, ?NS_FLEX_OFFLINE, _Lang) ->
+ N = jlib:integer_to_binary(count_offline_messages(U, S)),
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ Pid when is_pid(Pid) ->
+ Pid ! dont_ask_offline;
+ none ->
+ ok
+ end,
+ [#xmlel{name = <<"x">>,
+ attrs = [{<<"xmlns">>, ?NS_XDATA},
+ {<<"type">>, <<"result">>}],
+ children = [#xmlel{name = <<"field">>,
+ attrs = [{<<"var">>, <<"FORM_TYPE">>},
+ {<<"type">>, <<"hidden">>}],
+ children = [#xmlel{name = <<"value">>,
+ children = [{xmlcdata,
+ ?NS_FLEX_OFFLINE}]}]},
+ #xmlel{name = <<"field">>,
+ attrs = [{<<"var">>, <<"number_of_messages">>}],
+ children = [#xmlel{name = <<"value">>,
+ children = [{xmlcdata, N}]}]}]}];
+get_info(Acc, _From, _To, _Node, _Lang) ->
+ Acc.
+
+handle_offline_query(#jid{luser = U, lserver = S} = From,
+ #jid{luser = U, lserver = S} = _To,
+ #iq{type = Type, sub_el = SubEl} = IQ) ->
+ case Type of
+ get ->
+ case fxml:get_subtag(SubEl, <<"fetch">>) of
+ #xmlel{} ->
+ handle_offline_fetch(From);
+ false ->
+ handle_offline_items_view(From, SubEl)
+ end;
+ set ->
+ case fxml:get_subtag(SubEl, <<"purge">>) of
+ #xmlel{} ->
+ delete_all_msgs(U, S);
+ false ->
+ handle_offline_items_remove(From, SubEl)
+ end
+ end,
+ IQ#iq{type = result, sub_el = []};
+handle_offline_query(_From, _To, #iq{sub_el = SubEl} = IQ) ->
+ IQ#iq{type = error, sub_el = [SubEl, ?ERR_FORBIDDEN]}.
+
+handle_offline_items_view(JID, #xmlel{children = Items}) ->
+ {U, S, R} = jid:tolower(JID),
+ lists:foreach(
+ fun(Node) ->
+ case fetch_msg_by_node(JID, Node) of
+ {ok, OfflineMsg} ->
+ case offline_msg_to_route(S, OfflineMsg) of
+ {route, From, To, El} ->
+ NewEl = set_offline_tag(El, Node),
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ Pid when is_pid(Pid) ->
+ Pid ! {route, From, To, NewEl};
+ none ->
+ ok
+ end;
+ error ->
+ ok
+ end;
+ error ->
+ ok
+ end
+ end, get_nodes_from_items(Items, <<"view">>)).
+
+handle_offline_items_remove(JID, #xmlel{children = Items}) ->
+ lists:foreach(
+ fun(Node) ->
+ remove_msg_by_node(JID, Node)
+ end, get_nodes_from_items(Items, <<"remove">>)).
+
+get_nodes_from_items(Items, Action) ->
+ lists:flatmap(
+ fun(#xmlel{name = <<"item">>, attrs = Attrs}) ->
+ case fxml:get_attr_s(<<"action">>, Attrs) of
+ Action ->
+ case fxml:get_attr_s(<<"node">>, Attrs) of
+ <<"">> ->
+ [];
+ TS ->
+ [TS]
+ end;
+ _ ->
+ []
+ end;
+ (_) ->
+ []
+ end, Items).
+
+set_offline_tag(#xmlel{children = Els} = El, Node) ->
+ OfflineEl = #xmlel{name = <<"offline">>,
+ attrs = [{<<"xmlns">>, ?NS_FLEX_OFFLINE}],
+ children = [#xmlel{name = <<"item">>,
+ attrs = [{<<"node">>, Node}]}]},
+ El#xmlel{children = [OfflineEl|Els]}.
+
+handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) ->
+ case ejabberd_sm:get_session_pid(U, S, R) of
+ none ->
+ ok;
+ Pid when is_pid(Pid) ->
+ Pid ! dont_ask_offline,
+ lists:foreach(
+ fun({Node, _, Msg}) ->
+ case offline_msg_to_route(S, Msg) of
+ {route, From, To, El} ->
+ NewEl = set_offline_tag(El, Node),
+ Pid ! {route, From, To, NewEl};
+ error ->
+ ok
+ end
+ end, read_message_headers(U, S))
+ end.
+
+fetch_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
+ case jid:from_string(From_s) of
+ From = #jid{} ->
+ case gen_mod:db_type(To#jid.lserver, ?MODULE) of
+ odbc ->
+ read_message(From, To, Seq, odbc);
+ DBType ->
+ case binary_to_timestamp(Seq) of
+ undefined -> ok;
+ TS -> read_message(From, To, TS, DBType)
+ end
+ end;
+ error ->
+ ok
+ end.
+
+remove_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
+ case jid:from_string(From_s) of
+ From = #jid{} ->
+ case gen_mod:db_type(To#jid.lserver, ?MODULE) of
+ odbc ->
+ remove_message(From, To, Seq, odbc);
+ DBType ->
+ case binary_to_timestamp(Seq) of
+ undefined -> ok;
+ TS -> remove_message(From, To, TS, DBType)
+ end
+ end;
+ error ->
+ ok
+ end.
+
need_to_store(LServer, Packet) ->
- Type = xml:get_tag_attr_s(<<"type">>, Packet),
+ Type = fxml:get_tag_attr_s(<<"type">>, Packet),
if (Type /= <<"error">>) and (Type /= <<"groupchat">>)
and (Type /= <<"headline">>) ->
- case gen_mod:get_module_opt(
- LServer, ?MODULE, store_empty_body,
- fun(V) when is_boolean(V) -> V end,
- true) of
+ case has_offline_tag(Packet) of
false ->
- xml:get_subtag(Packet, <<"body">>) /= false;
+ case check_store_hint(Packet) of
+ store ->
+ true;
+ no_store ->
+ false;
+ none ->
+ case gen_mod:get_module_opt(
+ LServer, ?MODULE, store_empty_body,
+ fun(V) when is_boolean(V) -> V;
+ (unless_chat_state) -> unless_chat_state
+ end,
+ unless_chat_state) of
+ false ->
+ fxml:get_subtag(Packet, <<"body">>) /= false;
+ unless_chat_state ->
+ not jlib:is_standalone_chat_state(Packet);
+ true ->
+ true
+ end
+ end;
true ->
- true
+ false
end;
true ->
false
@@ -303,52 +530,59 @@ need_to_store(LServer, Packet) ->
store_packet(From, To, Packet) ->
case need_to_store(To#jid.lserver, Packet) of
true ->
- case has_no_storage_hint(Packet) of
- false ->
- case check_event(From, To, Packet) of
- true ->
- #jid{luser = LUser, lserver = LServer} = To,
- TimeStamp = now(),
- #xmlel{children = Els} = Packet,
- Expire = find_x_expire(TimeStamp, Els),
- gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) !
- #offline_msg{us = {LUser, LServer},
- timestamp = TimeStamp, expire = Expire,
- from = From, to = To, packet = Packet},
- stop;
- _ -> ok
- end;
- _ -> ok
- end;
- false -> ok
+ case check_event(From, To, Packet) of
+ true ->
+ #jid{luser = LUser, lserver = LServer} = To,
+ TimeStamp = p1_time_compat:timestamp(),
+ #xmlel{children = Els} = Packet,
+ Expire = find_x_expire(TimeStamp, Els),
+ gen_mod:get_module_proc(To#jid.lserver, ?PROCNAME) !
+ #offline_msg{us = {LUser, LServer},
+ timestamp = TimeStamp, expire = Expire,
+ from = From, to = To, packet = Packet},
+ stop;
+ _ -> ok
+ end;
+ false -> ok
end.
-has_no_storage_hint(Packet) ->
- case xml:get_subtag(Packet, <<"no-store">>) of
- #xmlel{attrs = Attrs} ->
- case xml:get_attr_s(<<"xmlns">>, Attrs) of
- ?NS_HINTS ->
- true;
- _ ->
- false
- end;
- _ ->
- false
+check_store_hint(Packet) ->
+ case has_store_hint(Packet) of
+ true ->
+ store;
+ false ->
+ case has_no_store_hint(Packet) of
+ true ->
+ no_store;
+ false ->
+ none
+ end
end.
-%% Check if the packet has any content about XEP-0022 or XEP-0085
+has_store_hint(Packet) ->
+ fxml:get_subtag_with_xmlns(Packet, <<"store">>, ?NS_HINTS) =/= false.
+
+has_no_store_hint(Packet) ->
+ fxml:get_subtag_with_xmlns(Packet, <<"no-store">>, ?NS_HINTS) =/= false
+ orelse
+ fxml:get_subtag_with_xmlns(Packet, <<"no-storage">>, ?NS_HINTS) =/= false.
+
+has_offline_tag(Packet) ->
+ fxml:get_subtag_with_xmlns(Packet, <<"offline">>, ?NS_FLEX_OFFLINE) =/= false.
+
+%% Check if the packet has any content about XEP-0022
check_event(From, To, Packet) ->
#xmlel{name = Name, attrs = Attrs, children = Els} =
Packet,
case find_x_event(Els) of
false -> true;
El ->
- case xml:get_subtag(El, <<"id">>) of
+ case fxml:get_subtag(El, <<"id">>) of
false ->
- case xml:get_subtag(El, <<"offline">>) of
+ case fxml:get_subtag(El, <<"offline">>) of
false -> true;
_ ->
- ID = case xml:get_tag_attr_s(<<"id">>, Packet) of
+ ID = case fxml:get_tag_attr_s(<<"id">>, Packet) of
<<"">> ->
#xmlel{name = <<"id">>, attrs = [],
children = []};
@@ -380,12 +614,12 @@ check_event(From, To, Packet) ->
end
end.
-%% Check if the packet has subelements about XEP-0022, XEP-0085 or other
+%% Check if the packet has subelements about XEP-0022
find_x_event([]) -> false;
find_x_event([{xmlcdata, _} | Els]) ->
find_x_event(Els);
find_x_event([El | Els]) ->
- case xml:get_tag_attr_s(<<"xmlns">>, El) of
+ case fxml:get_tag_attr_s(<<"xmlns">>, El) of
?NS_EVENT -> El;
_ -> find_x_event(Els)
end.
@@ -394,9 +628,9 @@ find_x_expire(_, []) -> never;
find_x_expire(TimeStamp, [{xmlcdata, _} | Els]) ->
find_x_expire(TimeStamp, Els);
find_x_expire(TimeStamp, [El | Els]) ->
- case xml:get_tag_attr_s(<<"xmlns">>, El) of
+ case fxml:get_tag_attr_s(<<"xmlns">>, El) of
?NS_EXPIRE ->
- Val = xml:get_tag_attr_s(<<"seconds">>, El),
+ Val = fxml:get_tag_attr_s(<<"seconds">>, El),
case catch jlib:binary_to_integer(Val) of
{'EXIT', _} -> never;
Int when Int > 0 ->
@@ -411,8 +645,8 @@ find_x_expire(TimeStamp, [El | Els]) ->
end.
resend_offline_messages(User, Server) ->
- LUser = jlib:nodeprep(User),
- LServer = jlib:nameprep(Server),
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
US = {LUser, LServer},
F = fun () ->
Rs = mnesia:wread({offline_msg, US}),
@@ -434,8 +668,8 @@ resend_offline_messages(User, Server) ->
end.
pop_offline_messages(Ls, User, Server) ->
- LUser = jlib:nodeprep(User),
- LServer = jlib:nameprep(Server),
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
pop_offline_messages(Ls, LUser, LServer,
gen_mod:db_type(LServer, ?MODULE)).
@@ -448,7 +682,7 @@ pop_offline_messages(Ls, LUser, LServer, mnesia) ->
end,
case mnesia:transaction(F) of
{atomic, Rs} ->
- TS = now(),
+ TS = p1_time_compat:timestamp(),
Ls ++
lists:map(fun (R) ->
offline_msg_to_route(LServer, R)
@@ -463,14 +697,11 @@ pop_offline_messages(Ls, LUser, LServer, mnesia) ->
_ -> Ls
end;
pop_offline_messages(Ls, LUser, LServer, odbc) ->
- EUser = ejabberd_odbc:escape(LUser),
- case odbc_queries:get_and_del_spool_msg_t(LServer,
- EUser)
- of
- {atomic, {selected, [<<"username">>, <<"xml">>], Rs}} ->
+ case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of
+ {atomic, {selected, Rs}} ->
Ls ++
- lists:flatmap(fun ([_, XML]) ->
- case xml_stream:parse_element(XML) of
+ lists:flatmap(fun ({_, XML}) ->
+ case fxml_stream:parse_element(XML) of
{error, _Reason} ->
[];
El ->
@@ -494,7 +725,7 @@ pop_offline_messages(Ls, LUser, LServer, riak) ->
fun(#offline_msg{timestamp = T}) ->
ok = ejabberd_riak:delete(offline_msg, T)
end, Rs),
- TS = now(),
+ TS = p1_time_compat:timestamp(),
Ls ++ lists:map(
fun (R) ->
offline_msg_to_route(LServer, R)
@@ -515,12 +746,12 @@ pop_offline_messages(Ls, LUser, LServer, riak) ->
end.
remove_expired_messages(Server) ->
- LServer = jlib:nameprep(Server),
+ LServer = jid:nameprep(Server),
remove_expired_messages(LServer,
gen_mod:db_type(LServer, ?MODULE)).
remove_expired_messages(_LServer, mnesia) ->
- TimeStamp = now(),
+ TimeStamp = p1_time_compat:timestamp(),
F = fun () ->
mnesia:write_lock_table(offline_msg),
mnesia:foldl(fun (Rec, _Acc) ->
@@ -540,13 +771,12 @@ remove_expired_messages(_LServer, odbc) -> {atomic, ok};
remove_expired_messages(_LServer, riak) -> {atomic, ok}.
remove_old_messages(Days, Server) ->
- LServer = jlib:nameprep(Server),
+ LServer = jid:nameprep(Server),
remove_old_messages(Days, LServer,
gen_mod:db_type(LServer, ?MODULE)).
remove_old_messages(Days, _LServer, mnesia) ->
- {MegaSecs, Secs, _MicroSecs} = now(),
- S = MegaSecs * 1000000 + Secs - 60 * 60 * 24 * Days,
+ S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days,
MegaSecs1 = S div 1000000,
Secs1 = S rem 1000000,
TimeStamp = {MegaSecs1, Secs1, 0},
@@ -579,8 +809,8 @@ remove_old_messages(_Days, _LServer, riak) ->
{atomic, ok}.
remove_user(User, Server) ->
- LUser = jlib:nodeprep(User),
- LServer = jlib:nameprep(Server),
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
remove_user(LUser, LServer,
gen_mod:db_type(LServer, ?MODULE)).
@@ -589,8 +819,7 @@ remove_user(LUser, LServer, mnesia) ->
F = fun () -> mnesia:delete({offline_msg, US}) end,
mnesia:transaction(F);
remove_user(LUser, LServer, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- odbc_queries:del_spool_msg(LServer, Username);
+ odbc_queries:del_spool_msg(LServer, LUser);
remove_user(LUser, LServer, riak) ->
{atomic, ejabberd_riak:delete_by_index(offline_msg,
<<"us">>, {LUser, LServer})}.
@@ -619,7 +848,7 @@ update_table() ->
iolist_to_binary(S)},
from = jid_to_binary(From),
to = jid_to_binary(To),
- packet = xml:to_xmlel(El)}
+ packet = fxml:to_xmlel(El)}
end);
_ ->
?INFO_MSG("Recreating offline_msg table", []),
@@ -634,7 +863,7 @@ discard_warn_sender(Msgs) ->
packet = Packet}) ->
ErrText = <<"Your contact offline message queue is "
"full. The message has been discarded.">>,
- Lang = xml:get_tag_attr_s(<<"xml:lang">>, Packet),
+ Lang = fxml:get_tag_attr_s(<<"xml:lang">>, Packet),
Err = jlib:make_error_reply(Packet,
?ERRT_RESOURCE_CONSTRAINT(Lang,
ErrText)),
@@ -661,14 +890,14 @@ get_offline_els(LUser, LServer, DBType)
jlib:replace_from_to(From, To, Packet)
end, Msgs);
get_offline_els(LUser, LServer, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- case catch ejabberd_odbc:sql_query(LServer,
- [<<"select xml from spool where username='">>,
- Username, <<"' order by seq;">>]) of
- {selected, [<<"xml">>], Rs} ->
+ case catch ejabberd_odbc:sql_query(
+ LServer,
+ ?SQL("select @(xml)s from spool where "
+ "username=%(LUser)s order by seq")) of
+ {selected, Rs} ->
lists:flatmap(
- fun([XML]) ->
- case xml_stream:parse_element(XML) of
+ fun({XML}) ->
+ case fxml_stream:parse_element(XML) of
#xmlel{} = El ->
case offline_msg_to_route(LServer, El) of
{route, _, _, NewEl} ->
@@ -689,14 +918,131 @@ offline_msg_to_route(LServer, #offline_msg{} = R) ->
jlib:add_delay_info(R#offline_msg.packet, LServer, R#offline_msg.timestamp,
<<"Offline Storage">>)};
offline_msg_to_route(_LServer, #xmlel{} = El) ->
- To = jlib:string_to_jid(xml:get_tag_attr_s(<<"to">>, El)),
- From = jlib:string_to_jid(xml:get_tag_attr_s(<<"from">>, El)),
+ To = jid:from_string(fxml:get_tag_attr_s(<<"to">>, El)),
+ From = jid:from_string(fxml:get_tag_attr_s(<<"from">>, El)),
if (To /= error) and (From /= error) ->
{route, From, To, El};
true ->
error
end.
+binary_to_timestamp(TS) ->
+ case catch jlib:binary_to_integer(TS) of
+ Int when is_integer(Int) ->
+ Secs = Int div 1000000,
+ USec = Int rem 1000000,
+ MSec = Secs div 1000000,
+ Sec = Secs rem 1000000,
+ {MSec, Sec, USec};
+ _ ->
+ undefined
+ end.
+
+timestamp_to_binary({MS, S, US}) ->
+ format_timestamp(integer_to_list((MS * 1000000 + S) * 1000000 + US)).
+
+format_timestamp(TS) ->
+ iolist_to_binary(io_lib:format("~20..0s", [TS])).
+
+offline_msg_to_header(#offline_msg{from = From, timestamp = Int} = Msg) ->
+ TS = timestamp_to_binary(Int),
+ From_s = jid:to_string(From),
+ {<<TS/binary, "+", From_s/binary>>, From_s, Msg}.
+
+read_message_headers(LUser, LServer) ->
+ DBType = gen_mod:db_type(LServer, ?MODULE),
+ read_message_headers(LUser, LServer, DBType).
+
+read_message_headers(LUser, LServer, mnesia) ->
+ Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}),
+ Hdrs = lists:map(fun offline_msg_to_header/1, Msgs),
+ lists:keysort(1, Hdrs);
+read_message_headers(LUser, LServer, riak) ->
+ case ejabberd_riak:get_by_index(
+ offline_msg, offline_msg_schema(),
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ Hdrs = lists:map(fun offline_msg_to_header/1, Rs),
+ lists:keysort(1, Hdrs);
+ _Err ->
+ []
+ end;
+read_message_headers(LUser, LServer, odbc) ->
+ Username = ejabberd_odbc:escape(LUser),
+ case catch ejabberd_odbc:sql_query(
+ LServer, [<<"select xml, seq from spool where username ='">>,
+ Username, <<"' order by seq;">>]) of
+ {selected, [<<"xml">>, <<"seq">>], Rows} ->
+ Hdrs = lists:flatmap(
+ fun([XML, Seq]) ->
+ try
+ #xmlel{} = El = fxml_stream:parse_element(XML),
+ From = fxml:get_tag_attr_s(<<"from">>, El),
+ #jid{} = jid:from_string(From),
+ TS = format_timestamp(Seq),
+ [{<<TS/binary, "+", From/binary>>, From, El}]
+ catch _:_ -> []
+ end
+ end, Rows),
+ lists:keysort(1, Hdrs);
+ _Err ->
+ []
+ end.
+
+read_message(_From, To, TS, mnesia) ->
+ {U, S, _} = jid:tolower(To),
+ case mnesia:dirty_match_object(
+ offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}) of
+ [Msg|_] ->
+ {ok, Msg};
+ _ ->
+ error
+ end;
+read_message(_From, _To, TS, riak) ->
+ case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
+ {ok, Msg} ->
+ {ok, Msg};
+ _ ->
+ error
+ end;
+read_message(_From, To, Seq, odbc) ->
+ {LUser, LServer, _} = jid:tolower(To),
+ Username = ejabberd_odbc:escape(LUser),
+ SSeq = ejabberd_odbc:escape(Seq),
+ case ejabberd_odbc:sql_query(
+ LServer,
+ [<<"select xml from spool where username='">>, Username,
+ <<"' and seq='">>, SSeq, <<"';">>]) of
+ {selected, [<<"xml">>], [[RawXML]|_]} ->
+ case fxml_stream:parse_element(RawXML) of
+ #xmlel{} = El -> {ok, El};
+ {error, _} -> error
+ end;
+ _ ->
+ error
+ end.
+
+remove_message(_From, To, TS, mnesia) ->
+ {U, S, _} = jid:tolower(To),
+ Msgs = mnesia:dirty_match_object(
+ offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}),
+ lists:foreach(
+ fun(Msg) ->
+ mnesia:dirty_delete_object(Msg)
+ end, Msgs);
+remove_message(_From, _To, TS, riak) ->
+ ejabberd_riak:delete(offline_msg, TS),
+ ok;
+remove_message(_From, To, Seq, odbc) ->
+ {LUser, LServer, _} = jid:tolower(To),
+ Username = ejabberd_odbc:escape(LUser),
+ SSeq = ejabberd_odbc:escape(Seq),
+ ejabberd_odbc:sql_query(
+ LServer,
+ [<<"delete from spool where username='">>, Username,
+ <<"' and seq='">>, SSeq, <<"';">>]),
+ ok.
+
read_all_msgs(LUser, LServer, mnesia) ->
US = {LUser, LServer},
lists:keysort(#offline_msg.timestamp,
@@ -711,20 +1057,20 @@ read_all_msgs(LUser, LServer, riak) ->
[]
end;
read_all_msgs(LUser, LServer, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- case catch ejabberd_odbc:sql_query(LServer,
- [<<"select xml from spool where username='">>,
- Username, <<"' order by seq;">>])
- of
- {selected, [<<"xml">>], Rs} ->
- lists:flatmap(fun ([XML]) ->
- case xml_stream:parse_element(XML) of
- {error, _Reason} -> [];
- El -> [El]
- end
- end,
- Rs);
- _ -> []
+ case catch ejabberd_odbc:sql_query(
+ LServer,
+ ?SQL("select @(xml)s from spool where "
+ "username=%(LUser)s order by seq")) of
+ {selected, Rs} ->
+ lists:flatmap(
+ fun({XML}) ->
+ case fxml_stream:parse_element(XML) of
+ {error, _Reason} -> [];
+ El -> [El]
+ end
+ end,
+ Rs);
+ _ -> []
end.
format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
@@ -742,8 +1088,8 @@ format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
[Year, Month, Day,
Hour, Minute,
Second])),
- SFrom = jlib:jid_to_string(From),
- STo = jlib:jid_to_string(To),
+ SFrom = jid:to_string(From),
+ STo = jid:to_string(To),
Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs),
Packet = #xmlel{name = Name, attrs = Attrs2,
children = Els},
@@ -772,8 +1118,8 @@ format_user_queue(Msgs, odbc) ->
Msgs).
user_queue(User, Server, Query, Lang) ->
- LUser = jlib:nodeprep(User),
- LServer = jlib:nameprep(Server),
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
US = {LUser, LServer},
DBType = gen_mod:db_type(LServer, ?MODULE),
Res = user_queue_parse_query(LUser, LServer, Query,
@@ -866,7 +1212,7 @@ user_queue_parse_query(LUser, LServer, Query, odbc) ->
of
{selected, [<<"xml">>, <<"seq">>], Rs} ->
lists:flatmap(fun ([XML, Seq]) ->
- case xml_stream:parse_element(XML)
+ case fxml_stream:parse_element(XML)
of
{error, _Reason} -> [];
El -> [{El, Seq}]
@@ -904,33 +1250,10 @@ user_queue_parse_query(LUser, LServer, Query, odbc) ->
end.
us_to_list({User, Server}) ->
- jlib:jid_to_string({User, Server, <<"">>}).
+ jid:to_string({User, Server, <<"">>}).
get_queue_length(LUser, LServer) ->
- get_queue_length(LUser, LServer,
- gen_mod:db_type(LServer, ?MODULE)).
-
-get_queue_length(LUser, LServer, mnesia) ->
- length(mnesia:dirty_read({offline_msg,
- {LUser, LServer}}));
-get_queue_length(LUser, LServer, riak) ->
- case ejabberd_riak:count_by_index(offline_msg,
- <<"us">>, {LUser, LServer}) of
- {ok, N} ->
- N;
- _ ->
- 0
- end;
-get_queue_length(LUser, LServer, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- case catch ejabberd_odbc:sql_query(LServer,
- [<<"select count(*) from spool where username='">>,
- Username, <<"';">>])
- of
- {selected, [_], [[SCount]]} ->
- jlib:binary_to_integer(SCount);
- _ -> 0
- end.
+ count_offline_messages(LUser, LServer).
get_messages_subset(User, Host, MsgsAll, DBType) ->
Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages,
@@ -955,8 +1278,8 @@ get_messages_subset2(Max, Length, MsgsAll, DBType)
{MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
Msgs2),
- NoJID = jlib:make_jid(<<"...">>, <<"...">>, <<"">>),
- IntermediateMsg = #offline_msg{timestamp = now(),
+ NoJID = jid:make(<<"...">>, <<"...">>, <<"">>),
+ IntermediateMsg = #offline_msg{timestamp = p1_time_compat:timestamp(),
from = NoJID, to = NoJID,
packet =
#xmlel{name = <<"...">>, attrs = [],
@@ -972,8 +1295,8 @@ get_messages_subset2(Max, Length, MsgsAll, odbc) ->
MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN.
webadmin_user(Acc, User, Server, Lang) ->
- QueueLen = get_queue_length(jlib:nodeprep(User),
- jlib:nameprep(Server)),
+ QueueLen = count_offline_messages(jid:nodeprep(User),
+ jid:nameprep(Server)),
FQueueLen = [?AC(<<"queue/">>,
(iolist_to_binary(integer_to_list(QueueLen))))],
Acc ++
@@ -984,8 +1307,8 @@ webadmin_user(Acc, User, Server, Lang) ->
<<"Remove All Offline Messages">>)].
delete_all_msgs(User, Server) ->
- LUser = jlib:nodeprep(User),
- LServer = jlib:nameprep(Server),
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
delete_all_msgs(LUser, LServer,
gen_mod:db_type(LServer, ?MODULE)).
@@ -1003,8 +1326,7 @@ delete_all_msgs(LUser, LServer, riak) ->
<<"us">>, {LUser, LServer}),
{atomic, Res};
delete_all_msgs(LUser, LServer, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- odbc_queries:del_spool_msg(LServer, Username),
+ odbc_queries:del_spool_msg(LServer, LUser),
{atomic, ok}.
webadmin_user_parse_query(_, <<"removealloffline">>,
@@ -1025,8 +1347,8 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
%% Returns as integer the number of offline messages for a given user
count_offline_messages(User, Server) ->
- LUser = jlib:nodeprep(User),
- LServer = jlib:nameprep(Server),
+ LUser = jid:nodeprep(User),
+ LServer = jid:nameprep(Server),
DBType = gen_mod:db_type(LServer, ?MODULE),
count_offline_messages(LUser, LServer, DBType).
@@ -1040,15 +1362,13 @@ count_offline_messages(LUser, LServer, mnesia) ->
_ -> 0
end;
count_offline_messages(LUser, LServer, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- case catch odbc_queries:count_records_where(LServer,
- <<"spool">>,
- <<"where username='",
- Username/binary, "'">>)
- of
- {selected, [_], [[Res]]} ->
- jlib:binary_to_integer(Res);
- _ -> 0
+ case catch ejabberd_odbc:sql_query(
+ LServer,
+ ?SQL("select @(count(*))d from spool "
+ "where username=%(LUser)s")) of
+ {selected, [{Res}]} ->
+ Res;
+ _ -> 0
end;
count_offline_messages(LUser, LServer, riak) ->
case ejabberd_riak:count_by_index(
@@ -1098,7 +1418,7 @@ export(_Server) ->
Packet1 = jlib:replace_from_to(From, To, Packet),
Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp,
<<"Offline Storage">>),
- XML = ejabberd_odbc:escape(xml:element_to_binary(Packet2)),
+ XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)),
[[<<"delete from spool where username='">>, Username, <<"';">>],
[<<"insert into spool(username, xml) values ('">>,
Username, <<"', '">>, XML, <<"');">>]];
@@ -1109,18 +1429,18 @@ export(_Server) ->
import(LServer) ->
[{<<"select username, xml from spool;">>,
fun([LUser, XML]) ->
- El = #xmlel{} = xml_stream:parse_element(XML),
- From = #jid{} = jlib:string_to_jid(
- xml:get_attr_s(<<"from">>, El#xmlel.attrs)),
- To = #jid{} = jlib:string_to_jid(
- xml:get_attr_s(<<"to">>, El#xmlel.attrs)),
- Stamp = xml:get_path_s(El, [{elem, <<"delay">>},
+ El = #xmlel{} = fxml_stream:parse_element(XML),
+ From = #jid{} = jid:from_string(
+ fxml:get_attr_s(<<"from">>, El#xmlel.attrs)),
+ To = #jid{} = jid:from_string(
+ fxml:get_attr_s(<<"to">>, El#xmlel.attrs)),
+ Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
{attr, <<"stamp">>}]),
TS = case jlib:datetime_string_to_timestamp(Stamp) of
{_, _, _} = Now ->
Now;
undefined ->
- now()
+ p1_time_compat:timestamp()
end,
Expire = find_x_expire(TS, El#xmlel.children),
#offline_msg{us = {LUser, LServer},
@@ -1135,3 +1455,13 @@ import(_LServer, riak, #offline_msg{us = US, timestamp = TS} = M) ->
[{i, TS}, {'2i', [{<<"us">>, US}]}]);
import(_, _, _) ->
pass.
+
+mod_opt_type(access_max_user_messages) ->
+ fun (A) -> A end;
+mod_opt_type(db_type) -> fun gen_mod:v_db/1;
+mod_opt_type(store_empty_body) ->
+ fun (V) when is_boolean(V) -> V;
+ (unless_chat_state) -> unless_chat_state
+ end;
+mod_opt_type(_) ->
+ [access_max_user_messages, db_type, store_empty_body].