aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-04-15 13:44:33 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-04-15 13:44:33 +0300
commit901d2e0aed83d195a4d1cf2929114b07dcac0dd8 (patch)
treed8ee99068d01d1d98d9d87069e4b5092fb2e58f5
parentClean mod_blocking.erl from DB specific code (diff)
Clean mod_offline.erl from DB specific code
-rw-r--r--src/mod_offline.erl936
-rw-r--r--src/mod_offline_mnesia.erl232
-rw-r--r--src/mod_offline_riak.erl153
-rw-r--r--src/mod_offline_sql.erl252
4 files changed, 809 insertions, 764 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 54eda165c..2cdd82ae8 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -25,8 +25,6 @@
-module(mod_offline).
--compile([{parse_transform, ejabberd_sql_pt}]).
-
-author('alexey@process-one.net').
-protocol({xep, 13, '1.2'}).
@@ -61,6 +59,7 @@
get_queue_length/2,
count_offline_messages/2,
get_offline_els/2,
+ find_x_expire/2,
webadmin_page/3,
webadmin_user/4,
webadmin_user_parse_query/5]).
@@ -82,8 +81,6 @@
-include("mod_offline.hrl").
--include("ejabberd_sql_pt.hrl").
-
-define(PROCNAME, ejabberd_offline).
-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
@@ -91,6 +88,25 @@
%% default value for the maximum number of user messages
-define(MAX_USER_MESSAGES, infinity).
+-type us() :: {binary(), binary()}.
+-callback init(binary(), gen_mod:opts()) -> any().
+-callback import(binary(), #offline_msg{}) -> ok | pass.
+-callback store_messages(binary(), us(), [#offline_msg{}],
+ non_neg_integer(), non_neg_integer()) ->
+ {atomic, any()}.
+-callback pop_messages(binary(), binary()) ->
+ {atomic, [#offline_msg{}]} | {aborted, any()}.
+-callback remove_expired_messages(binary()) -> {atomic, any()}.
+-callback remove_old_messages(non_neg_integer(), binary()) -> {atomic, any()}.
+-callback remove_user(binary(), binary()) -> {atomic, any()}.
+-callback read_message_headers(binary(), binary()) -> any().
+-callback read_message(binary(), binary(), non_neg_integer()) ->
+ {ok, #offline_msg{}} | error.
+-callback remove_message(binary(), binary(), non_neg_integer()) -> ok.
+-callback read_all_messages(binary(), binary()) -> [#offline_msg{}].
+-callback remove_all_messages(binary(), binary()) -> {atomic, any()}.
+-callback count_messages(binary(), binary()) -> non_neg_integer().
+
start_link(Host, Opts) ->
Proc = gen_mod:get_module_proc(Host, ?PROCNAME),
?GEN_SERVER:start_link({local, Proc}, ?MODULE,
@@ -115,14 +131,8 @@ stop(Host) ->
%%====================================================================
init([Host, Opts]) ->
- case gen_mod:db_type(Host, Opts) of
- mnesia ->
- mnesia:create_table(offline_msg,
- [{disc_only_copies, [node()]}, {type, bag},
- {attributes, record_info(fields, offline_msg)}]),
- update_table();
- _ -> ok
- end,
+ Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
+ Mod:init(Host, Opts),
IQDisc = gen_mod:get_opt(iqdisc, Opts, fun gen_iq_handler:check_type/1,
no_queue),
ejabberd_hooks:add(offline_message_hook, Host, ?MODULE,
@@ -174,7 +184,7 @@ handle_info(#offline_msg{us = UserServer} = Msg, State) ->
Len = length(Msgs),
MaxOfflineMsgs = get_max_user_messages(AccessMaxOfflineMsgs,
UserServer, Host),
- store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs, DBType),
+ store_offline_msg(Host, UserServer, Msgs, Len, MaxOfflineMsgs),
{noreply, State};
handle_info(_Info, State) ->
@@ -210,68 +220,12 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) -> {ok, State}.
store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs) ->
- DBType = gen_mod:db_type(Host, ?MODULE),
- store_offline_msg(Host, US, Msgs, Len, MaxOfflineMsgs, DBType).
-
-store_offline_msg(_Host, US, Msgs, Len, MaxOfflineMsgs,
- mnesia) ->
- F = fun () ->
- Count = if MaxOfflineMsgs =/= infinity ->
- Len + count_mnesia_records(US);
- true -> 0
- end,
- if Count > MaxOfflineMsgs -> discard_warn_sender(Msgs);
- true ->
- if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) ->
- mnesia:write_lock_table(offline_msg);
- true -> ok
- end,
- lists:foreach(fun (M) -> mnesia:write(M) end, Msgs)
- end
- end,
- mnesia:transaction(F);
-store_offline_msg(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs, odbc) ->
- Count = if MaxOfflineMsgs =/= infinity ->
- Len + count_offline_messages(User, Host);
- true -> 0
- end,
- if Count > MaxOfflineMsgs -> discard_warn_sender(Msgs);
- true ->
- Query = lists:map(fun (M) ->
- Username =
- ejabberd_odbc:escape((M#offline_msg.to)#jid.luser),
- From = M#offline_msg.from,
- To = M#offline_msg.to,
- Packet =
- jlib:replace_from_to(From, To,
- M#offline_msg.packet),
- NewPacket =
- jlib:add_delay_info(Packet, Host,
- M#offline_msg.timestamp,
- <<"Offline Storage">>),
- XML =
- ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)),
- odbc_queries:add_spool_sql(Username, XML)
- end,
- Msgs),
- odbc_queries:add_spool(Host, Query)
- end;
-store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs,
- riak) ->
- Count = if MaxOfflineMsgs =/= infinity ->
- Len + count_offline_messages(User, Host);
- true -> 0
- end,
- if
- Count > MaxOfflineMsgs ->
- discard_warn_sender(Msgs);
- true ->
- lists:foreach(
- fun(#offline_msg{us = US,
- timestamp = TS} = M) ->
- ejabberd_riak:put(M, offline_msg_schema(),
- [{i, TS}, {'2i', [{<<"us">>, US}]}])
- end, Msgs)
+ Mod = gen_mod:db_mod(Host, ?MODULE),
+ case Mod:store_messages(Host, US, Msgs, Len, MaxOfflineMsgs) of
+ {atomic, discard} ->
+ discard_warn_sender(Msgs);
+ _ ->
+ ok
end.
get_max_user_messages(AccessRule, {User, Server}, Host) ->
@@ -330,11 +284,11 @@ get_sm_items(_Acc, #jid{luser = U, lserver = S, lresource = R} = JID,
BareJID = jid:to_string(jid:remove_resource(JID)),
Pid ! dont_ask_offline,
{result, lists:map(
- fun({Node, From, _OfflineMsg}) ->
+ fun({Node, From, _To, _El}) ->
#xmlel{name = <<"item">>,
attrs = [{<<"jid">>, BareJID},
{<<"node">>, Node},
- {<<"name">>, From}]}
+ {<<"name">>, jid:to_string(From)}]}
end, Hdrs)};
none ->
{result, []}
@@ -452,46 +406,31 @@ handle_offline_fetch(#jid{luser = U, lserver = S, lresource = R}) ->
Pid when is_pid(Pid) ->
Pid ! dont_ask_offline,
lists:foreach(
- fun({Node, _, Msg}) ->
- case offline_msg_to_route(S, Msg) of
- {route, From, To, El} ->
- NewEl = set_offline_tag(El, Node),
- Pid ! {route, From, To, NewEl};
- error ->
- ok
- end
+ fun({Node, From, To, El}) ->
+ NewEl = set_offline_tag(El, Node),
+ Pid ! {route, From, To, NewEl}
end, read_message_headers(U, S))
end.
-fetch_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
- case jid:from_string(From_s) of
- From = #jid{} ->
- case gen_mod:db_type(To#jid.lserver, ?MODULE) of
- odbc ->
- read_message(From, To, Seq, odbc);
- DBType ->
- case binary_to_timestamp(Seq) of
- undefined -> ok;
- TS -> read_message(From, To, TS, DBType)
- end
- end;
- error ->
- ok
+fetch_msg_by_node(To, Seq) ->
+ case catch binary_to_integer(Seq) of
+ I when is_integer(I), I >= 0 ->
+ LUser = To#jid.luser,
+ LServer = To#jid.lserver,
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:read_message(LUser, LServer, I);
+ _ ->
+ error
end.
-remove_msg_by_node(To, <<Seq:20/binary, "+", From_s/binary>>) ->
- case jid:from_string(From_s) of
- From = #jid{} ->
- case gen_mod:db_type(To#jid.lserver, ?MODULE) of
- odbc ->
- remove_message(From, To, Seq, odbc);
- DBType ->
- case binary_to_timestamp(Seq) of
- undefined -> ok;
- TS -> remove_message(From, To, TS, DBType)
- end
- end;
- error ->
+remove_msg_by_node(To, Seq) ->
+ case catch binary_to_integer(Seq) of
+ I when is_integer(I), I>= 0 ->
+ LUser = To#jid.luser,
+ LServer = To#jid.lserver,
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:remove_message(LUser, LServer, I);
+ _ ->
ok
end.
@@ -648,21 +587,11 @@ find_x_expire(TimeStamp, [El | Els]) ->
resend_offline_messages(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
- US = {LUser, LServer},
- F = fun () ->
- Rs = mnesia:wread({offline_msg, US}),
- mnesia:delete({offline_msg, US}),
- Rs
- end,
- case mnesia:transaction(F) of
- {atomic, Rs} ->
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ case Mod:pop_messages(LUser, LServer) of
+ {ok, Rs} ->
lists:foreach(fun (R) ->
- ejabberd_sm !
- {route, R#offline_msg.from, R#offline_msg.to,
- jlib:add_delay_info(R#offline_msg.packet,
- LServer,
- R#offline_msg.timestamp,
- <<"Offline Storage">>)}
+ ejabberd_sm ! offline_msg_to_route(LServer, R)
end,
lists:keysort(#offline_msg.timestamp, Rs));
_ -> ok
@@ -671,190 +600,47 @@ resend_offline_messages(User, Server) ->
pop_offline_messages(Ls, User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
- pop_offline_messages(Ls, LUser, LServer,
- gen_mod:db_type(LServer, ?MODULE)).
-
-pop_offline_messages(Ls, LUser, LServer, mnesia) ->
- US = {LUser, LServer},
- F = fun () ->
- Rs = mnesia:wread({offline_msg, US}),
- mnesia:delete({offline_msg, US}),
- Rs
- end,
- case mnesia:transaction(F) of
- {atomic, Rs} ->
- TS = p1_time_compat:timestamp(),
- Ls ++
- lists:map(fun (R) ->
- offline_msg_to_route(LServer, R)
- end,
- lists:filter(fun (R) ->
- case R#offline_msg.expire of
- never -> true;
- TimeStamp -> TS < TimeStamp
- end
- end,
- lists:keysort(#offline_msg.timestamp, Rs)));
- _ -> Ls
- end;
-pop_offline_messages(Ls, LUser, LServer, odbc) ->
- case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of
- {atomic, {selected, Rs}} ->
- Ls ++
- lists:flatmap(fun ({_, XML}) ->
- case fxml_stream:parse_element(XML) of
- {error, _Reason} ->
- [];
- El ->
- case offline_msg_to_route(LServer, El) of
- error ->
- [];
- RouteMsg ->
- [RouteMsg]
- end
- end
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ case Mod:pop_messages(LUser, LServer) of
+ {ok, Rs} ->
+ TS = p1_time_compat:timestamp(),
+ Ls ++
+ lists:map(fun (R) ->
+ offline_msg_to_route(LServer, R)
end,
- Rs);
- _ -> Ls
- end;
-pop_offline_messages(Ls, LUser, LServer, riak) ->
- case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
- <<"us">>, {LUser, LServer}) of
- {ok, Rs} ->
- try
- lists:foreach(
- fun(#offline_msg{timestamp = T}) ->
- ok = ejabberd_riak:delete(offline_msg, T)
- end, Rs),
- TS = p1_time_compat:timestamp(),
- Ls ++ lists:map(
- fun (R) ->
- offline_msg_to_route(LServer, R)
- end,
- lists:filter(
- fun(R) ->
- case R#offline_msg.expire of
- never -> true;
- TimeStamp -> TS < TimeStamp
- end
- end,
- lists:keysort(#offline_msg.timestamp, Rs)))
- catch _:{badmatch, _} ->
- Ls
- end;
+ lists:filter(
+ fun(#offline_msg{packet = Pkt} = R) ->
+ #xmlel{children = Els} = Pkt,
+ Expire = case R#offline_msg.expire of
+ undefined ->
+ find_x_expire(TS, Els);
+ Exp ->
+ Exp
+ end,
+ case Expire of
+ never -> true;
+ TimeStamp -> TS < TimeStamp
+ end
+ end, Rs));
_ ->
Ls
end.
remove_expired_messages(Server) ->
LServer = jid:nameprep(Server),
- remove_expired_messages(LServer,
- gen_mod:db_type(LServer, ?MODULE)).
-
-remove_expired_messages(_LServer, mnesia) ->
- TimeStamp = p1_time_compat:timestamp(),
- F = fun () ->
- mnesia:write_lock_table(offline_msg),
- mnesia:foldl(fun (Rec, _Acc) ->
- case Rec#offline_msg.expire of
- never -> ok;
- TS ->
- if TS < TimeStamp ->
- mnesia:delete_object(Rec);
- true -> ok
- end
- end
- end,
- ok, offline_msg)
- end,
- mnesia:transaction(F);
-remove_expired_messages(_LServer, odbc) -> {atomic, ok};
-remove_expired_messages(_LServer, riak) -> {atomic, ok}.
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:remove_expired_messages(LServer).
remove_old_messages(Days, Server) ->
LServer = jid:nameprep(Server),
- remove_old_messages(Days, LServer,
- gen_mod:db_type(LServer, ?MODULE)).
-
-remove_old_messages(Days, _LServer, mnesia) ->
- S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days,
- MegaSecs1 = S div 1000000,
- Secs1 = S rem 1000000,
- TimeStamp = {MegaSecs1, Secs1, 0},
- F = fun () ->
- mnesia:write_lock_table(offline_msg),
- mnesia:foldl(fun (#offline_msg{timestamp = TS} = Rec,
- _Acc)
- when TS < TimeStamp ->
- mnesia:delete_object(Rec);
- (_Rec, _Acc) -> ok
- end,
- ok, offline_msg)
- end,
- mnesia:transaction(F);
-
-remove_old_messages(Days, LServer, odbc) ->
- case catch ejabberd_odbc:sql_query(
- LServer,
- [<<"DELETE FROM spool"
- " WHERE created_at < "
- "DATE_SUB(CURDATE(), INTERVAL ">>,
- integer_to_list(Days), <<" DAY);">>]) of
- {updated, N} ->
- ?INFO_MSG("~p message(s) deleted from offline spool", [N]);
- _Error ->
- ?ERROR_MSG("Cannot delete message in offline spool: ~p", [_Error])
- end,
- {atomic, ok};
-remove_old_messages(_Days, _LServer, riak) ->
- {atomic, ok}.
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:remove_old_messages(Days, LServer).
remove_user(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
- remove_user(LUser, LServer,
- gen_mod:db_type(LServer, ?MODULE)).
-
-remove_user(LUser, LServer, mnesia) ->
- US = {LUser, LServer},
- F = fun () -> mnesia:delete({offline_msg, US}) end,
- mnesia:transaction(F);
-remove_user(LUser, LServer, odbc) ->
- odbc_queries:del_spool_msg(LServer, LUser);
-remove_user(LUser, LServer, riak) ->
- {atomic, ejabberd_riak:delete_by_index(offline_msg,
- <<"us">>, {LUser, LServer})}.
-
-jid_to_binary(#jid{user = U, server = S, resource = R,
- luser = LU, lserver = LS, lresource = LR}) ->
- #jid{user = iolist_to_binary(U),
- server = iolist_to_binary(S),
- resource = iolist_to_binary(R),
- luser = iolist_to_binary(LU),
- lserver = iolist_to_binary(LS),
- lresource = iolist_to_binary(LR)}.
-
-update_table() ->
- Fields = record_info(fields, offline_msg),
- case mnesia:table_info(offline_msg, attributes) of
- Fields ->
- ejabberd_config:convert_table_to_binary(
- offline_msg, Fields, bag,
- fun(#offline_msg{us = {U, _}}) -> U end,
- fun(#offline_msg{us = {U, S},
- from = From,
- to = To,
- packet = El} = R) ->
- R#offline_msg{us = {iolist_to_binary(U),
- iolist_to_binary(S)},
- from = jid_to_binary(From),
- to = jid_to_binary(To),
- packet = fxml:to_xmlel(El)}
- end);
- _ ->
- ?INFO_MSG("Recreating offline_msg table", []),
- mnesia:transform_table(offline_msg, ignore, Fields)
- end.
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:remove_user(LUser, LServer).
%% Helper functions:
@@ -880,255 +666,71 @@ webadmin_page(_, Host,
webadmin_page(Acc, _, _) -> Acc.
get_offline_els(LUser, LServer) ->
- get_offline_els(LUser, LServer, gen_mod:db_type(LServer, ?MODULE)).
-
-get_offline_els(LUser, LServer, DBType)
- when DBType == mnesia; DBType == riak ->
- Msgs = read_all_msgs(LUser, LServer, DBType),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Hdrs = Mod:read_message_headers(LUser, LServer),
lists:map(
- fun(Msg) ->
- {route, From, To, Packet} = offline_msg_to_route(LServer, Msg),
- jlib:replace_from_to(From, To, Packet)
- end, Msgs);
-get_offline_els(LUser, LServer, odbc) ->
- case catch ejabberd_odbc:sql_query(
- LServer,
- ?SQL("select @(xml)s from spool where "
- "username=%(LUser)s order by seq")) of
- {selected, Rs} ->
- lists:flatmap(
- fun({XML}) ->
- case fxml_stream:parse_element(XML) of
- #xmlel{} = El ->
- case offline_msg_to_route(LServer, El) of
- {route, _, _, NewEl} ->
- [NewEl];
- error ->
- []
- end;
- _ ->
- []
- end
- end, Rs);
- _ ->
- []
- end.
+ fun({_Seq, From, To, Packet}) ->
+ jlib:replace_from_to(From, To, Packet)
+ end, Hdrs).
offline_msg_to_route(LServer, #offline_msg{} = R) ->
- {route, R#offline_msg.from, R#offline_msg.to,
- jlib:add_delay_info(R#offline_msg.packet, LServer, R#offline_msg.timestamp,
- <<"Offline Storage">>)};
-offline_msg_to_route(_LServer, #xmlel{} = El) ->
- To = jid:from_string(fxml:get_tag_attr_s(<<"to">>, El)),
- From = jid:from_string(fxml:get_tag_attr_s(<<"from">>, El)),
- if (To /= error) and (From /= error) ->
- {route, From, To, El};
- true ->
- error
- end.
-
-binary_to_timestamp(TS) ->
- case catch jlib:binary_to_integer(TS) of
- Int when is_integer(Int) ->
- Secs = Int div 1000000,
- USec = Int rem 1000000,
- MSec = Secs div 1000000,
- Sec = Secs rem 1000000,
- {MSec, Sec, USec};
- _ ->
- undefined
- end.
-
-timestamp_to_binary({MS, S, US}) ->
- format_timestamp(integer_to_list((MS * 1000000 + S) * 1000000 + US)).
-
-format_timestamp(TS) ->
- iolist_to_binary(io_lib:format("~20..0s", [TS])).
-
-offline_msg_to_header(#offline_msg{from = From, timestamp = Int} = Msg) ->
- TS = timestamp_to_binary(Int),
- From_s = jid:to_string(From),
- {<<TS/binary, "+", From_s/binary>>, From_s, Msg}.
+ El = case R#offline_msg.timestamp of
+ undefined ->
+ R#offline_msg.packet;
+ TS ->
+ jlib:add_delay_info(R#offline_msg.packet, LServer, TS,
+ <<"Offline Storage">>)
+ end,
+ {route, R#offline_msg.from, R#offline_msg.to, El}.
read_message_headers(LUser, LServer) ->
- DBType = gen_mod:db_type(LServer, ?MODULE),
- read_message_headers(LUser, LServer, DBType).
-
-read_message_headers(LUser, LServer, mnesia) ->
- Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}),
- Hdrs = lists:map(fun offline_msg_to_header/1, Msgs),
- lists:keysort(1, Hdrs);
-read_message_headers(LUser, LServer, riak) ->
- case ejabberd_riak:get_by_index(
- offline_msg, offline_msg_schema(),
- <<"us">>, {LUser, LServer}) of
- {ok, Rs} ->
- Hdrs = lists:map(fun offline_msg_to_header/1, Rs),
- lists:keysort(1, Hdrs);
- _Err ->
- []
- end;
-read_message_headers(LUser, LServer, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- case catch ejabberd_odbc:sql_query(
- LServer, [<<"select xml, seq from spool where username ='">>,
- Username, <<"' order by seq;">>]) of
- {selected, [<<"xml">>, <<"seq">>], Rows} ->
- Hdrs = lists:flatmap(
- fun([XML, Seq]) ->
- try
- #xmlel{} = El = fxml_stream:parse_element(XML),
- From = fxml:get_tag_attr_s(<<"from">>, El),
- #jid{} = jid:from_string(From),
- TS = format_timestamp(Seq),
- [{<<TS/binary, "+", From/binary>>, From, El}]
- catch _:_ -> []
- end
- end, Rows),
- lists:keysort(1, Hdrs);
- _Err ->
- []
- end.
-
-read_message(_From, To, TS, mnesia) ->
- {U, S, _} = jid:tolower(To),
- case mnesia:dirty_match_object(
- offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}) of
- [Msg|_] ->
- {ok, Msg};
- _ ->
- error
- end;
-read_message(_From, _To, TS, riak) ->
- case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
- {ok, Msg} ->
- {ok, Msg};
- _ ->
- error
- end;
-read_message(_From, To, Seq, odbc) ->
- {LUser, LServer, _} = jid:tolower(To),
- Username = ejabberd_odbc:escape(LUser),
- SSeq = ejabberd_odbc:escape(Seq),
- case ejabberd_odbc:sql_query(
- LServer,
- [<<"select xml from spool where username='">>, Username,
- <<"' and seq='">>, SSeq, <<"';">>]) of
- {selected, [<<"xml">>], [[RawXML]|_]} ->
- case fxml_stream:parse_element(RawXML) of
- #xmlel{} = El -> {ok, El};
- {error, _} -> error
- end;
- _ ->
- error
- end.
-
-remove_message(_From, To, TS, mnesia) ->
- {U, S, _} = jid:tolower(To),
- Msgs = mnesia:dirty_match_object(
- offline_msg, #offline_msg{us = {U, S}, timestamp = TS, _ = '_'}),
- lists:foreach(
- fun(Msg) ->
- mnesia:dirty_delete_object(Msg)
- end, Msgs);
-remove_message(_From, _To, TS, riak) ->
- ejabberd_riak:delete(offline_msg, TS),
- ok;
-remove_message(_From, To, Seq, odbc) ->
- {LUser, LServer, _} = jid:tolower(To),
- Username = ejabberd_odbc:escape(LUser),
- SSeq = ejabberd_odbc:escape(Seq),
- ejabberd_odbc:sql_query(
- LServer,
- [<<"delete from spool where username='">>, Username,
- <<"' and seq='">>, SSeq, <<"';">>]),
- ok.
-
-read_all_msgs(LUser, LServer, mnesia) ->
- US = {LUser, LServer},
- lists:keysort(#offline_msg.timestamp,
- mnesia:dirty_read({offline_msg, US}));
-read_all_msgs(LUser, LServer, riak) ->
- case ejabberd_riak:get_by_index(
- offline_msg, offline_msg_schema(),
- <<"us">>, {LUser, LServer}) of
- {ok, Rs} ->
- lists:keysort(#offline_msg.timestamp, Rs);
- _Err ->
- []
- end;
-read_all_msgs(LUser, LServer, odbc) ->
- case catch ejabberd_odbc:sql_query(
- LServer,
- ?SQL("select @(xml)s from spool where "
- "username=%(LUser)s order by seq")) of
- {selected, Rs} ->
- lists:flatmap(
- fun({XML}) ->
- case fxml_stream:parse_element(XML) of
- {error, _Reason} -> [];
- El -> [El]
- end
- end,
- Rs);
- _ -> []
- end.
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ lists:map(
+ fun({Seq, From, To, El}) ->
+ Node = integer_to_binary(Seq),
+ {Node, From, To, El}
+ end, Mod:read_message_headers(LUser, LServer)).
-format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
- lists:map(fun (#offline_msg{timestamp = TimeStamp,
- from = From, to = To,
- packet =
- #xmlel{name = Name, attrs = Attrs,
- children = Els}} =
- Msg) ->
- ID = jlib:encode_base64((term_to_binary(Msg))),
- {{Year, Month, Day}, {Hour, Minute, Second}} =
- calendar:now_to_local_time(TimeStamp),
- Time =
- iolist_to_binary(io_lib:format("~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w",
- [Year, Month, Day,
- Hour, Minute,
- Second])),
- SFrom = jid:to_string(From),
- STo = jid:to_string(To),
- Attrs2 = jlib:replace_from_to_attrs(SFrom, STo, Attrs),
- Packet = #xmlel{name = Name, attrs = Attrs2,
- children = Els},
- FPacket = ejabberd_web_admin:pretty_print_xml(Packet),
- ?XE(<<"tr">>,
- [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
- [?INPUT(<<"checkbox">>, <<"selected">>, ID)]),
- ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], Time),
- ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], SFrom),
- ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], STo),
- ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
- [?XC(<<"pre">>, FPacket)])])
- end,
- Msgs);
-format_user_queue(Msgs, odbc) ->
- lists:map(fun (#xmlel{} = Msg) ->
- ID = jlib:encode_base64((term_to_binary(Msg))),
- Packet = Msg,
- FPacket = ejabberd_web_admin:pretty_print_xml(Packet),
- ?XE(<<"tr">>,
- [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
- [?INPUT(<<"checkbox">>, <<"selected">>, ID)]),
- ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
- [?XC(<<"pre">>, FPacket)])])
- end,
- Msgs).
+format_user_queue(Hdrs) ->
+ lists:map(
+ fun({Seq, From, To, El}) ->
+ ID = integer_to_binary(Seq),
+ FPacket = ejabberd_web_admin:pretty_print_xml(El),
+ SFrom = jid:to_string(From),
+ STo = jid:to_string(To),
+ Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
+ {attr, <<"stamp">>}]),
+ Time = case jlib:datetime_string_to_timestamp(Stamp) of
+ {_, _, _} = Now ->
+ {{Year, Month, Day}, {Hour, Minute, Second}} =
+ calendar:now_to_local_time(Now),
+ iolist_to_binary(
+ io_lib:format(
+ "~w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w",
+ [Year, Month, Day, Hour, Minute,
+ Second]));
+ _ ->
+ <<"">>
+ end,
+ ?XE(<<"tr">>,
+ [?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
+ [?INPUT(<<"checkbox">>, <<"selected">>, ID)]),
+ ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], Time),
+ ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], SFrom),
+ ?XAC(<<"td">>, [{<<"class">>, <<"valign">>}], STo),
+ ?XAE(<<"td">>, [{<<"class">>, <<"valign">>}],
+ [?XC(<<"pre">>, FPacket)])])
+ end, Hdrs).
user_queue(User, Server, Query, Lang) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
US = {LUser, LServer},
- DBType = gen_mod:db_type(LServer, ?MODULE),
- Res = user_queue_parse_query(LUser, LServer, Query,
- DBType),
- MsgsAll = read_all_msgs(LUser, LServer, DBType),
- Msgs = get_messages_subset(US, Server, MsgsAll,
- DBType),
- FMsgs = format_user_queue(Msgs, DBType),
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Res = user_queue_parse_query(LUser, LServer, Query),
+ HdrsAll = Mod:read_message_headers(LUser, LServer),
+ Hdrs = get_messages_subset(US, Server, HdrsAll),
+ FMsgs = format_user_queue(Hdrs),
[?XC(<<"h1">>,
list_to_binary(io_lib:format(?T(<<"~s's Offline Messages Queue">>),
[us_to_list(US)])))]
@@ -1158,96 +760,24 @@ user_queue(User, Server, Query, Lang) ->
?INPUTT(<<"submit">>, <<"delete">>,
<<"Delete Selected">>)])].
-user_queue_parse_query(LUser, LServer, Query, mnesia) ->
- US = {LUser, LServer},
- case lists:keysearch(<<"delete">>, 1, Query) of
- {value, _} ->
- Msgs = lists:keysort(#offline_msg.timestamp,
- mnesia:dirty_read({offline_msg, US})),
- F = fun () ->
- lists:foreach(fun (Msg) ->
- ID =
- jlib:encode_base64((term_to_binary(Msg))),
- case lists:member({<<"selected">>,
- ID},
- Query)
- of
- true -> mnesia:delete_object(Msg);
- false -> ok
- end
- end,
- Msgs)
- end,
- mnesia:transaction(F),
- ok;
- false -> nothing
- end;
-user_queue_parse_query(LUser, LServer, Query, riak) ->
+user_queue_parse_query(LUser, LServer, Query) ->
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
case lists:keysearch(<<"delete">>, 1, Query) of
- {value, _} ->
- Msgs = read_all_msgs(LUser, LServer, riak),
- lists:foreach(
- fun (Msg) ->
- ID = jlib:encode_base64((term_to_binary(Msg))),
- case lists:member({<<"selected">>, ID}, Query) of
- true ->
- ejabberd_riak:delete(offline_msg,
- Msg#offline_msg.timestamp);
- false ->
- ok
- end
- end,
- Msgs),
- ok;
- false ->
- nothing
- end;
-user_queue_parse_query(LUser, LServer, Query, odbc) ->
- Username = ejabberd_odbc:escape(LUser),
- case lists:keysearch(<<"delete">>, 1, Query) of
- {value, _} ->
- Msgs = case catch ejabberd_odbc:sql_query(LServer,
- [<<"select xml, seq from spool where username='">>,
- Username,
- <<"' order by seq;">>])
- of
- {selected, [<<"xml">>, <<"seq">>], Rs} ->
- lists:flatmap(fun ([XML, Seq]) ->
- case fxml_stream:parse_element(XML)
- of
- {error, _Reason} -> [];
- El -> [{El, Seq}]
- end
- end,
- Rs);
- _ -> []
- end,
- F = fun () ->
- lists:foreach(fun ({Msg, Seq}) ->
- ID =
- jlib:encode_base64((term_to_binary(Msg))),
- case lists:member({<<"selected">>,
- ID},
- Query)
- of
- true ->
- SSeq =
- ejabberd_odbc:escape(Seq),
- catch
- ejabberd_odbc:sql_query(LServer,
- [<<"delete from spool where username='">>,
- Username,
- <<"' and seq='">>,
- SSeq,
- <<"';">>]);
- false -> ok
- end
- end,
- Msgs)
- end,
- mnesia:transaction(F),
- ok;
- false -> nothing
+ {value, _} ->
+ case lists:keyfind(<<"selected">>, 1, Query) of
+ {_, Seq} ->
+ case catch binary_to_integer(Seq) of
+ I when is_integer(I), I>=0 ->
+ Mod:remove_message(LUser, LServer, I),
+ ok;
+ _ ->
+ nothing
+ end;
+ false ->
+ nothing
+ end;
+ _ ->
+ nothing
end.
us_to_list({User, Server}) ->
@@ -1256,7 +786,7 @@ us_to_list({User, Server}) ->
get_queue_length(LUser, LServer) ->
count_offline_messages(LUser, LServer).
-get_messages_subset(User, Host, MsgsAll, DBType) ->
+get_messages_subset(User, Host, MsgsAll) ->
Access = gen_mod:get_module_opt(Host, ?MODULE, access_max_user_messages,
fun(A) when is_atom(A) -> A end,
max_user_offline_messages),
@@ -1267,33 +797,20 @@ get_messages_subset(User, Host, MsgsAll, DBType) ->
_ -> 100
end,
Length = length(MsgsAll),
- get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll,
- DBType).
+ get_messages_subset2(MaxOfflineMsgs, Length, MsgsAll).
-get_messages_subset2(Max, Length, MsgsAll, _DBType)
- when Length =< Max * 2 ->
+get_messages_subset2(Max, Length, MsgsAll) when Length =< Max * 2 ->
MsgsAll;
-get_messages_subset2(Max, Length, MsgsAll, DBType)
- when DBType == mnesia; DBType == riak ->
+get_messages_subset2(Max, Length, MsgsAll) ->
FirstN = Max,
{MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
Msgs2),
NoJID = jid:make(<<"...">>, <<"...">>, <<"">>),
- IntermediateMsg = #offline_msg{timestamp = p1_time_compat:timestamp(),
- from = NoJID, to = NoJID,
- packet =
- #xmlel{name = <<"...">>, attrs = [],
- children = []}},
- MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN;
-get_messages_subset2(Max, Length, MsgsAll, odbc) ->
- FirstN = Max,
- {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
- MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
- Msgs2),
+ Seq = <<"0">>,
IntermediateMsg = #xmlel{name = <<"...">>, attrs = [],
children = []},
- MsgsFirstN ++ [IntermediateMsg] ++ MsgsLastN.
+ MsgsFirstN ++ [{Seq, NoJID, NoJID, IntermediateMsg}] ++ MsgsLastN.
webadmin_user(Acc, User, Server, Lang) ->
QueueLen = count_offline_messages(jid:nodeprep(User),
@@ -1310,25 +827,8 @@ webadmin_user(Acc, User, Server, Lang) ->
delete_all_msgs(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
- delete_all_msgs(LUser, LServer,
- gen_mod:db_type(LServer, ?MODULE)).
-
-delete_all_msgs(LUser, LServer, mnesia) ->
- US = {LUser, LServer},
- F = fun () ->
- mnesia:write_lock_table(offline_msg),
- lists:foreach(fun (Msg) -> mnesia:delete_object(Msg)
- end,
- mnesia:dirty_read({offline_msg, US}))
- end,
- mnesia:transaction(F);
-delete_all_msgs(LUser, LServer, riak) ->
- Res = ejabberd_riak:delete_by_index(offline_msg,
- <<"us">>, {LUser, LServer}),
- {atomic, Res};
-delete_all_msgs(LUser, LServer, odbc) ->
- odbc_queries:del_spool_msg(LServer, LUser),
- {atomic, ok}.
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:remove_all_messages(LUser, LServer).
webadmin_user_parse_query(_, <<"removealloffline">>,
User, Server, _Query) ->
@@ -1350,112 +850,20 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
count_offline_messages(User, Server) ->
LUser = jid:nodeprep(User),
LServer = jid:nameprep(Server),
- DBType = gen_mod:db_type(LServer, ?MODULE),
- count_offline_messages(LUser, LServer, DBType).
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:count_messages(LUser, LServer).
-count_offline_messages(LUser, LServer, mnesia) ->
- US = {LUser, LServer},
- F = fun () ->
- count_mnesia_records(US)
- end,
- case catch mnesia:async_dirty(F) of
- I when is_integer(I) -> I;
- _ -> 0
- end;
-count_offline_messages(LUser, LServer, odbc) ->
- case catch ejabberd_odbc:sql_query(
- LServer,
- ?SQL("select @(count(*))d from spool "
- "where username=%(LUser)s")) of
- {selected, [{Res}]} ->
- Res;
- _ -> 0
- end;
-count_offline_messages(LUser, LServer, riak) ->
- case ejabberd_riak:count_by_index(
- offline_msg, <<"us">>, {LUser, LServer}) of
- {ok, Res} ->
- Res;
- _ ->
- 0
- end.
-
-%% Return the number of records matching a given match expression.
-%% This function is intended to be used inside a Mnesia transaction.
-%% The count has been written to use the fewest possible memory by
-%% getting the record by small increment and by using continuation.
--define(BATCHSIZE, 100).
-
-count_mnesia_records(US) ->
- MatchExpression = #offline_msg{us = US, _ = '_'},
- case mnesia:select(offline_msg, [{MatchExpression, [], [[]]}],
- ?BATCHSIZE, read) of
- {Result, Cont} ->
- Count = length(Result),
- count_records_cont(Cont, Count);
- '$end_of_table' ->
- 0
- end.
-
-count_records_cont(Cont, Count) ->
- case mnesia:select(Cont) of
- {Result, Cont} ->
- NewCount = Count + length(Result),
- count_records_cont(Cont, NewCount);
- '$end_of_table' ->
- Count
- end.
-
-offline_msg_schema() ->
- {record_info(fields, offline_msg), #offline_msg{}}.
-
-export(_Server) ->
- [{offline_msg,
- fun(Host, #offline_msg{us = {LUser, LServer},
- timestamp = TimeStamp, from = From, to = To,
- packet = Packet})
- when LServer == Host ->
- Username = ejabberd_odbc:escape(LUser),
- Packet1 = jlib:replace_from_to(From, To, Packet),
- Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp,
- <<"Offline Storage">>),
- XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)),
- [[<<"delete from spool where username='">>, Username, <<"';">>],
- [<<"insert into spool(username, xml) values ('">>,
- Username, <<"', '">>, XML, <<"');">>]];
- (_Host, _R) ->
- []
- end}].
+export(LServer) ->
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:export(LServer).
import(LServer) ->
- [{<<"select username, xml from spool;">>,
- fun([LUser, XML]) ->
- El = #xmlel{} = fxml_stream:parse_element(XML),
- From = #jid{} = jid:from_string(
- fxml:get_attr_s(<<"from">>, El#xmlel.attrs)),
- To = #jid{} = jid:from_string(
- fxml:get_attr_s(<<"to">>, El#xmlel.attrs)),
- Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
- {attr, <<"stamp">>}]),
- TS = case jlib:datetime_string_to_timestamp(Stamp) of
- {_, _, _} = Now ->
- Now;
- undefined ->
- p1_time_compat:timestamp()
- end,
- Expire = find_x_expire(TS, El#xmlel.children),
- #offline_msg{us = {LUser, LServer},
- from = From, to = To,
- timestamp = TS, expire = Expire}
- end}].
-
-import(_LServer, mnesia, #offline_msg{} = Msg) ->
- mnesia:dirty_write(Msg);
-import(_LServer, riak, #offline_msg{us = US, timestamp = TS} = M) ->
- ejabberd_riak:put(M, offline_msg_schema(),
- [{i, TS}, {'2i', [{<<"us">>, US}]}]);
-import(_, _, _) ->
- pass.
+ Mod = gen_mod:db_mod(LServer, ?MODULE),
+ Mod:import(LServer).
+
+import(LServer, DBType, Data) ->
+ Mod = gen_mod:db_mod(DBType, ?MODULE),
+ Mod:import(LServer, Data).
mod_opt_type(access_max_user_messages) ->
fun (A) -> A end;
diff --git a/src/mod_offline_mnesia.erl b/src/mod_offline_mnesia.erl
new file mode 100644
index 000000000..6a1d9e309
--- /dev/null
+++ b/src/mod_offline_mnesia.erl
@@ -0,0 +1,232 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_offline_mnesia).
+
+-behaviour(mod_offline).
+
+-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+ remove_old_messages/2, remove_user/2, read_message_headers/2,
+ read_message/3, remove_message/3, read_all_messages/2,
+ remove_all_messages/2, count_messages/2, import/2]).
+
+-include("jlib.hrl").
+-include("mod_offline.hrl").
+-include("logger.hrl").
+
+-define(OFFLINE_TABLE_LOCK_THRESHOLD, 1000).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ mnesia:create_table(offline_msg,
+ [{disc_only_copies, [node()]}, {type, bag},
+ {attributes, record_info(fields, offline_msg)}]),
+ update_table().
+
+store_messages(_Host, US, Msgs, Len, MaxOfflineMsgs) ->
+ F = fun () ->
+ Count = if MaxOfflineMsgs =/= infinity ->
+ Len + count_mnesia_records(US);
+ true -> 0
+ end,
+ if Count > MaxOfflineMsgs -> discard;
+ true ->
+ if Len >= (?OFFLINE_TABLE_LOCK_THRESHOLD) ->
+ mnesia:write_lock_table(offline_msg);
+ true -> ok
+ end,
+ lists:foreach(fun (M) -> mnesia:write(M) end, Msgs)
+ end
+ end,
+ mnesia:transaction(F).
+
+pop_messages(LUser, LServer) ->
+ US = {LUser, LServer},
+ F = fun () ->
+ Rs = mnesia:wread({offline_msg, US}),
+ mnesia:delete({offline_msg, US}),
+ Rs
+ end,
+ case mnesia:transaction(F) of
+ {atomic, L} ->
+ {ok, lists:keysort(#offline_msg.timestamp, L)};
+ {aborted, Reason} ->
+ {error, Reason}
+ end.
+
+remove_expired_messages(_LServer) ->
+ TimeStamp = p1_time_compat:timestamp(),
+ F = fun () ->
+ mnesia:write_lock_table(offline_msg),
+ mnesia:foldl(fun (Rec, _Acc) ->
+ case Rec#offline_msg.expire of
+ never -> ok;
+ TS ->
+ if TS < TimeStamp ->
+ mnesia:delete_object(Rec);
+ true -> ok
+ end
+ end
+ end,
+ ok, offline_msg)
+ end,
+ mnesia:transaction(F).
+
+remove_old_messages(Days, _LServer) ->
+ S = p1_time_compat:system_time(seconds) - 60 * 60 * 24 * Days,
+ MegaSecs1 = S div 1000000,
+ Secs1 = S rem 1000000,
+ TimeStamp = {MegaSecs1, Secs1, 0},
+ F = fun () ->
+ mnesia:write_lock_table(offline_msg),
+ mnesia:foldl(fun (#offline_msg{timestamp = TS} = Rec,
+ _Acc)
+ when TS < TimeStamp ->
+ mnesia:delete_object(Rec);
+ (_Rec, _Acc) -> ok
+ end,
+ ok, offline_msg)
+ end,
+ mnesia:transaction(F).
+
+remove_user(LUser, LServer) ->
+ US = {LUser, LServer},
+ F = fun () -> mnesia:delete({offline_msg, US}) end,
+ mnesia:transaction(F).
+
+read_message_headers(LUser, LServer) ->
+ Msgs = mnesia:dirty_read({offline_msg, {LUser, LServer}}),
+ Hdrs = lists:map(
+ fun(#offline_msg{from = From, to = To, packet = Pkt,
+ timestamp = TS}) ->
+ Seq = now_to_integer(TS),
+ NewPkt = jlib:add_delay_info(Pkt, LServer, TS,
+ <<"Offline Storage">>),
+ {Seq, From, To, NewPkt}
+ end, Msgs),
+ lists:keysort(1, Hdrs).
+
+read_message(LUser, LServer, I) ->
+ US = {LUser, LServer},
+ TS = integer_to_now(I),
+ case mnesia:dirty_match_object(
+ offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}) of
+ [Msg|_] ->
+ {ok, Msg};
+ _ ->
+ error
+ end.
+
+remove_message(LUser, LServer, I) ->
+ US = {LUser, LServer},
+ TS = integer_to_now(I),
+ Msgs = mnesia:dirty_match_object(
+ offline_msg, #offline_msg{us = US, timestamp = TS, _ = '_'}),
+ lists:foreach(
+ fun(Msg) ->
+ mnesia:dirty_delete_object(Msg)
+ end, Msgs).
+
+read_all_messages(LUser, LServer) ->
+ US = {LUser, LServer},
+ lists:keysort(#offline_msg.timestamp,
+ mnesia:dirty_read({offline_msg, US})).
+
+remove_all_messages(LUser, LServer) ->
+ US = {LUser, LServer},
+ F = fun () ->
+ mnesia:write_lock_table(offline_msg),
+ lists:foreach(fun (Msg) -> mnesia:delete_object(Msg) end,
+ mnesia:dirty_read({offline_msg, US}))
+ end,
+ mnesia:transaction(F).
+
+count_messages(LUser, LServer) ->
+ US = {LUser, LServer},
+ F = fun () ->
+ count_mnesia_records(US)
+ end,
+ case catch mnesia:async_dirty(F) of
+ I when is_integer(I) -> I;
+ _ -> 0
+ end.
+
+import(_LServer, #offline_msg{} = Msg) ->
+ mnesia:dirty_write(Msg).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+%% Return the number of records matching a given match expression.
+%% This function is intended to be used inside a Mnesia transaction.
+%% The count has been written to use the fewest possible memory by
+%% getting the record by small increment and by using continuation.
+-define(BATCHSIZE, 100).
+
+count_mnesia_records(US) ->
+ MatchExpression = #offline_msg{us = US, _ = '_'},
+ case mnesia:select(offline_msg, [{MatchExpression, [], [[]]}],
+ ?BATCHSIZE, read) of
+ {Result, Cont} ->
+ Count = length(Result),
+ count_records_cont(Cont, Count);
+ '$end_of_table' ->
+ 0
+ end.
+
+count_records_cont(Cont, Count) ->
+ case mnesia:select(Cont) of
+ {Result, Cont} ->
+ NewCount = Count + length(Result),
+ count_records_cont(Cont, NewCount);
+ '$end_of_table' ->
+ Count
+ end.
+
+jid_to_binary(#jid{user = U, server = S, resource = R,
+ luser = LU, lserver = LS, lresource = LR}) ->
+ #jid{user = iolist_to_binary(U),
+ server = iolist_to_binary(S),
+ resource = iolist_to_binary(R),
+ luser = iolist_to_binary(LU),
+ lserver = iolist_to_binary(LS),
+ lresource = iolist_to_binary(LR)}.
+
+now_to_integer({MS, S, US}) ->
+ (MS * 1000000 + S) * 1000000 + US.
+
+integer_to_now(Int) ->
+ Secs = Int div 1000000,
+ USec = Int rem 1000000,
+ MSec = Secs div 1000000,
+ Sec = Secs rem 1000000,
+ {MSec, Sec, USec}.
+
+update_table() ->
+ Fields = record_info(fields, offline_msg),
+ case mnesia:table_info(offline_msg, attributes) of
+ Fields ->
+ ejabberd_config:convert_table_to_binary(
+ offline_msg, Fields, bag,
+ fun(#offline_msg{us = {U, _}}) -> U end,
+ fun(#offline_msg{us = {U, S},
+ from = From,
+ to = To,
+ packet = El} = R) ->
+ R#offline_msg{us = {iolist_to_binary(U),
+ iolist_to_binary(S)},
+ from = jid_to_binary(From),
+ to = jid_to_binary(To),
+ packet = fxml:to_xmlel(El)}
+ end);
+ _ ->
+ ?INFO_MSG("Recreating offline_msg table", []),
+ mnesia:transform_table(offline_msg, ignore, Fields)
+ end.
diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl
new file mode 100644
index 000000000..217e8f828
--- /dev/null
+++ b/src/mod_offline_riak.erl
@@ -0,0 +1,153 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_offline_riak).
+
+-behaviour(mod_offline).
+
+-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+ remove_old_messages/2, remove_user/2, read_message_headers/2,
+ read_message/3, remove_message/3, read_all_messages/2,
+ remove_all_messages/2, count_messages/2, import/2]).
+
+-include("jlib.hrl").
+-include("mod_offline.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ ok.
+
+store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) ->
+ Count = if MaxOfflineMsgs =/= infinity ->
+ Len + count_messages(User, Host);
+ true -> 0
+ end,
+ if
+ Count > MaxOfflineMsgs ->
+ {atomic, discard};
+ true ->
+ try
+ lists:foreach(
+ fun(#offline_msg{us = US,
+ timestamp = TS} = M) ->
+ ok = ejabberd_riak:put(
+ M, offline_msg_schema(),
+ [{i, TS}, {'2i', [{<<"us">>, US}]}])
+ end, Msgs),
+ {atomic, ok}
+ catch _:{badmatch, Err} ->
+ {atomic, Err}
+ end
+ end.
+
+pop_messages(LUser, LServer) ->
+ case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ try
+ lists:foreach(
+ fun(#offline_msg{timestamp = T}) ->
+ ok = ejabberd_riak:delete(offline_msg, T)
+ end, Rs),
+ {ok, lists:keysort(#offline_msg.timestamp, Rs)}
+ catch _:{badmatch, Err} ->
+ Err
+ end;
+ Err ->
+ Err
+ end.
+
+remove_expired_messages(_LServer) ->
+ %% TODO
+ {atomic, ok}.
+
+remove_old_messages(_Days, _LServer) ->
+ %% TODO
+ {atomic, ok}.
+
+remove_user(LUser, LServer) ->
+ {atomic, ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer})}.
+
+read_message_headers(LUser, LServer) ->
+ case ejabberd_riak:get_by_index(
+ offline_msg, offline_msg_schema(),
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ Hdrs = lists:map(
+ fun(#offline_msg{from = From, to = To, packet = Pkt,
+ timestamp = TS}) ->
+ Seq = now_to_integer(TS),
+ NewPkt = jlib:add_delay_info(
+ Pkt, LServer, TS, <<"Offline Storage">>),
+ {Seq, From, To, NewPkt}
+ end, Rs),
+ lists:keysort(1, Hdrs);
+ _Err ->
+ []
+ end.
+
+read_message(_LUser, _LServer, I) ->
+ TS = integer_to_now(I),
+ case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
+ {ok, Msg} ->
+ {ok, Msg};
+ _ ->
+ error
+ end.
+
+remove_message(_LUser, _LServer, I) ->
+ TS = integer_to_now(I),
+ ejabberd_riak:delete(offline_msg, TS),
+ ok.
+
+read_all_messages(LUser, LServer) ->
+ case ejabberd_riak:get_by_index(
+ offline_msg, offline_msg_schema(),
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ lists:keysort(#offline_msg.timestamp, Rs);
+ _Err ->
+ []
+ end.
+
+remove_all_messages(LUser, LServer) ->
+ Res = ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}),
+ {atomic, Res}.
+
+count_messages(LUser, LServer) ->
+ case ejabberd_riak:count_by_index(
+ offline_msg, <<"us">>, {LUser, LServer}) of
+ {ok, Res} ->
+ Res;
+ _ ->
+ 0
+ end.
+
+import(_LServer, #offline_msg{us = US, timestamp = TS} = M) ->
+ ejabberd_riak:put(M, offline_msg_schema(),
+ [{i, TS}, {'2i', [{<<"us">>, US}]}]).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+offline_msg_schema() ->
+ {record_info(fields, offline_msg), #offline_msg{}}.
+
+now_to_integer({MS, S, US}) ->
+ (MS * 1000000 + S) * 1000000 + US.
+
+integer_to_now(Int) ->
+ Secs = Int div 1000000,
+ USec = Int rem 1000000,
+ MSec = Secs div 1000000,
+ Sec = Secs rem 1000000,
+ {MSec, Sec, USec}.
diff --git a/src/mod_offline_sql.erl b/src/mod_offline_sql.erl
new file mode 100644
index 000000000..37b90163d
--- /dev/null
+++ b/src/mod_offline_sql.erl
@@ -0,0 +1,252 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_offline_sql).
+
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
+-behaviour(mod_offline).
+
+-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+ remove_old_messages/2, remove_user/2, read_message_headers/2,
+ read_message/3, remove_message/3, read_all_messages/2,
+ remove_all_messages/2, count_messages/2, import/1, import/2,
+ export/1]).
+
+-include("jlib.hrl").
+-include("mod_offline.hrl").
+-include("logger.hrl").
+-include("ejabberd_sql_pt.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ ok.
+
+store_messages(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs) ->
+ Count = if MaxOfflineMsgs =/= infinity ->
+ Len + count_messages(User, Host);
+ true -> 0
+ end,
+ if Count > MaxOfflineMsgs -> {atomic, discard};
+ true ->
+ Query = lists:map(
+ fun(M) ->
+ Username =
+ ejabberd_odbc:escape((M#offline_msg.to)#jid.luser),
+ From = M#offline_msg.from,
+ To = M#offline_msg.to,
+ Packet =
+ jlib:replace_from_to(From, To,
+ M#offline_msg.packet),
+ NewPacket =
+ jlib:add_delay_info(Packet, Host,
+ M#offline_msg.timestamp,
+ <<"Offline Storage">>),
+ XML =
+ ejabberd_odbc:escape(fxml:element_to_binary(NewPacket)),
+ odbc_queries:add_spool_sql(Username, XML)
+ end,
+ Msgs),
+ odbc_queries:add_spool(Host, Query)
+ end.
+
+pop_messages(LUser, LServer) ->
+ case odbc_queries:get_and_del_spool_msg_t(LServer, LUser) of
+ {atomic, {selected, Rs}} ->
+ {ok, lists:flatmap(
+ fun({_, XML}) ->
+ case xml_to_offline_msg(XML) of
+ {ok, Msg} ->
+ [Msg];
+ _Err ->
+ []
+ end
+ end, Rs)};
+ Err ->
+ {error, Err}
+ end.
+
+remove_expired_messages(_LServer) ->
+ %% TODO
+ {atomic, ok}.
+
+remove_old_messages(Days, LServer) ->
+ case catch ejabberd_odbc:sql_query(
+ LServer,
+ [<<"DELETE FROM spool"
+ " WHERE created_at < "
+ "DATE_SUB(CURDATE(), INTERVAL ">>,
+ integer_to_list(Days), <<" DAY);">>]) of
+ {updated, N} ->
+ ?INFO_MSG("~p message(s) deleted from offline spool", [N]);
+ _Error ->
+ ?ERROR_MSG("Cannot delete message in offline spool: ~p", [_Error])
+ end,
+ {atomic, ok}.
+
+remove_user(LUser, LServer) ->
+ odbc_queries:del_spool_msg(LServer, LUser).
+
+read_message_headers(LUser, LServer) ->
+ Username = ejabberd_odbc:escape(LUser),
+ case catch ejabberd_odbc:sql_query(
+ LServer, [<<"select xml, seq from spool where username ='">>,
+ Username, <<"' order by seq;">>]) of
+ {selected, [<<"xml">>, <<"seq">>], Rows} ->
+ lists:flatmap(
+ fun([XML, Seq]) ->
+ case xml_to_offline_msg(XML) of
+ {ok, #offline_msg{from = From,
+ to = To,
+ packet = El}} ->
+ Seq0 = binary_to_integer(Seq),
+ [{Seq0, From, To, El}];
+ _ ->
+ []
+ end
+ end, Rows);
+ _Err ->
+ []
+ end.
+
+read_message(LUser, LServer, Seq) ->
+ Username = ejabberd_odbc:escape(LUser),
+ SSeq = ejabberd_odbc:escape(integer_to_binary(Seq)),
+ case ejabberd_odbc:sql_query(
+ LServer,
+ [<<"select xml from spool where username='">>, Username,
+ <<"' and seq='">>, SSeq, <<"';">>]) of
+ {selected, [<<"xml">>], [[RawXML]|_]} ->
+ case xml_to_offline_msg(RawXML) of
+ {ok, Msg} ->
+ {ok, Msg};
+ _ ->
+ error
+ end;
+ _ ->
+ error
+ end.
+
+remove_message(LUser, LServer, Seq) ->
+ Username = ejabberd_odbc:escape(LUser),
+ SSeq = ejabberd_odbc:escape(integer_to_binary(Seq)),
+ ejabberd_odbc:sql_query(
+ LServer,
+ [<<"delete from spool where username='">>, Username,
+ <<"' and seq='">>, SSeq, <<"';">>]),
+ ok.
+
+read_all_messages(LUser, LServer) ->
+ case catch ejabberd_odbc:sql_query(
+ LServer,
+ ?SQL("select @(xml)s from spool where "
+ "username=%(LUser)s order by seq")) of
+ {selected, Rs} ->
+ lists:flatmap(
+ fun({XML}) ->
+ case xml_to_offline_msg(XML) of
+ {ok, Msg} -> [Msg];
+ _ -> []
+ end
+ end, Rs);
+ _ ->
+ []
+ end.
+
+remove_all_messages(LUser, LServer) ->
+ odbc_queries:del_spool_msg(LServer, LUser),
+ {atomic, ok}.
+
+count_messages(LUser, LServer) ->
+ case catch ejabberd_odbc:sql_query(
+ LServer,
+ ?SQL("select @(count(*))d from spool "
+ "where username=%(LUser)s")) of
+ {selected, [{Res}]} ->
+ Res;
+ _ -> 0
+ end.
+
+export(_Server) ->
+ [{offline_msg,
+ fun(Host, #offline_msg{us = {LUser, LServer},
+ timestamp = TimeStamp, from = From, to = To,
+ packet = Packet})
+ when LServer == Host ->
+ Username = ejabberd_odbc:escape(LUser),
+ Packet1 = jlib:replace_from_to(From, To, Packet),
+ Packet2 = jlib:add_delay_info(Packet1, LServer, TimeStamp,
+ <<"Offline Storage">>),
+ XML = ejabberd_odbc:escape(fxml:element_to_binary(Packet2)),
+ [[<<"delete from spool where username='">>, Username, <<"';">>],
+ [<<"insert into spool(username, xml) values ('">>,
+ Username, <<"', '">>, XML, <<"');">>]];
+ (_Host, _R) ->
+ []
+ end}].
+
+import(LServer) ->
+ [{<<"select username, xml from spool;">>,
+ fun([LUser, XML]) ->
+ El = #xmlel{} = fxml_stream:parse_element(XML),
+ From = #jid{} = jid:from_string(
+ fxml:get_attr_s(<<"from">>, El#xmlel.attrs)),
+ To = #jid{} = jid:from_string(
+ fxml:get_attr_s(<<"to">>, El#xmlel.attrs)),
+ Stamp = fxml:get_path_s(El, [{elem, <<"delay">>},
+ {attr, <<"stamp">>}]),
+ TS = case jlib:datetime_string_to_timestamp(Stamp) of
+ {_, _, _} = Now ->
+ Now;
+ undefined ->
+ p1_time_compat:timestamp()
+ end,
+ Expire = mod_offline:find_x_expire(TS, El#xmlel.children),
+ #offline_msg{us = {LUser, LServer},
+ from = From, to = To,
+ packet = El,
+ timestamp = TS, expire = Expire}
+ end}].
+
+import(_, _) ->
+ pass.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+xml_to_offline_msg(XML) ->
+ case fxml_stream:parse_element(XML) of
+ #xmlel{} = El ->
+ el_to_offline_msg(El);
+ Err ->
+ ?ERROR_MSG("got ~p when parsing XML packet ~s",
+ [Err, XML]),
+ Err
+ end.
+
+el_to_offline_msg(El) ->
+ To_s = fxml:get_tag_attr_s(<<"to">>, El),
+ From_s = fxml:get_tag_attr_s(<<"from">>, El),
+ To = jid:from_string(To_s),
+ From = jid:from_string(From_s),
+ if To == error ->
+ ?ERROR_MSG("failed to get 'to' JID from offline XML ~p", [El]),
+ {error, bad_jid_to};
+ From == error ->
+ ?ERROR_MSG("failed to get 'from' JID from offline XML ~p", [El]),
+ {error, bad_jid_from};
+ true ->
+ {ok, #offline_msg{us = {To#jid.luser, To#jid.lserver},
+ from = From,
+ to = To,
+ timestamp = undefined,
+ expire = undefined,
+ packet = El}}
+ end.