aboutsummaryrefslogtreecommitdiff
path: root/src/mod_offline.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r--src/mod_offline.erl201
1 files changed, 109 insertions, 92 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl
index 5e2d80aa2..c5dc305d6 100644
--- a/src/mod_offline.erl
+++ b/src/mod_offline.erl
@@ -187,43 +187,9 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs,
discard_warn_sender(Msgs);
true ->
lists:foreach(
- fun(M) ->
- Username = User,
- From = M#offline_msg.from,
- To = M#offline_msg.to,
- #xmlel{name = Name, attrs = Attrs,
- children = Els} =
- M#offline_msg.packet,
- Attrs2 = jlib:replace_from_to_attrs(
- jlib:jid_to_string(From),
- jlib:jid_to_string(To),
- Attrs),
- Packet = #xmlel{name = Name,
- attrs = Attrs2,
- children =
- Els ++
- [jlib:timestamp_to_xml(
- calendar:now_to_universal_time(
- M#offline_msg.timestamp),
- utc,
- jlib:make_jid(<<"">>, Host, <<"">>),
- <<"Offline Storage">>),
- jlib:timestamp_to_xml(
- calendar:now_to_universal_time(
- M#offline_msg.timestamp))]},
- XML = xml:element_to_binary(Packet),
- {MegaSecs, Secs, MicroSecs} =
- M#offline_msg.timestamp,
- TS =
- iolist_to_binary(
- io_lib:format("~6..0w~6..0w.~6..0w",
- [MegaSecs, Secs, MicroSecs])),
- ejabberd_riak:put(
- Host, <<"offline">>,
- undefined, XML,
- [{<<"user_bin">>, Username},
- {<<"timestamp_bin">>, TS}
- ])
+ fun(#offline_msg{us = US,
+ timestamp = TS} = M) ->
+ ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}])
end, Msgs)
end.
@@ -244,7 +210,7 @@ receive_all(US, Msgs, DBType) ->
case DBType of
mnesia -> Msgs;
odbc -> lists:reverse(Msgs);
- riak -> lists:reverse(Msgs)
+ riak -> Msgs
end
end.
@@ -474,41 +440,30 @@ pop_offline_messages(Ls, LUser, LServer, odbc) ->
_ -> Ls
end;
pop_offline_messages(Ls, LUser, LServer, riak) ->
- Username = LUser,
- case ejabberd_riak:get_objects_by_index(
- LServer, <<"offline">>, <<"user_bin">>, Username) of
+ case ejabberd_riak:get_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}) of
{ok, Rs} ->
- SortedRs =
- lists:sort(fun(X, Y) ->
- MX = riak_object:get_metadata(X),
- {ok, IX} = dict:find(<<"index">>, MX),
- {value, TSX} = lists:keysearch(
- <<"timestamp_bin">>, 1,
- IX),
- MY = riak_object:get_metadata(Y),
- {ok, IY} = dict:find(<<"index">>, MY),
- {value, TSY} = lists:keysearch(
- <<"timestamp_bin">>, 1,
- IY),
- TSX =< TSY
- end, Rs),
- Ls ++ lists:flatmap(
- fun(R) ->
- Key = riak_object:key(R),
- ejabberd_riak:delete(LServer, <<"offline">>, Key),
- XML = riak_object:get_value(R),
- case xml_stream:parse_element(XML) of
- {error, _Reason} ->
- [];
- El ->
- case offline_msg_to_route(LServer, El) of
- error ->
- [];
- RouteMsg ->
- [RouteMsg]
- end
- end
- end, SortedRs);
+ try
+ lists:foreach(
+ fun(#offline_msg{timestamp = T}) ->
+ ok = ejabberd_riak:delete(offline_msg, T)
+ end, Rs),
+ TS = now(),
+ Ls ++ lists:map(
+ fun (R) ->
+ offline_msg_to_route(LServer, R)
+ end,
+ lists:filter(
+ fun(R) ->
+ case R#offline_msg.expire of
+ never -> true;
+ TimeStamp -> TS < TimeStamp
+ end
+ end,
+ lists:keysort(#offline_msg.timestamp, Rs)))
+ catch _:{badmatch, _} ->
+ Ls
+ end;
_ ->
Ls
end.
@@ -579,17 +534,8 @@ remove_user(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
odbc_queries:del_spool_msg(LServer, Username);
remove_user(LUser, LServer, riak) ->
- Username = LUser,
- case ejabberd_riak:get_keys_by_index(
- LServer, <<"offline">>, <<"user_bin">>, Username) of
- {ok, Keys} ->
- lists:foreach(
- fun(Key) ->
- ejabberd_riak:delete(LServer, <<"offline">>, Key)
- end, Keys);
- _ ->
- ok
- end.
+ {atomic, ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer})}.
jid_to_binary(#jid{user = U, server = S, resource = R,
luser = LU, lserver = LS, lresource = LR}) ->
@@ -650,6 +596,8 @@ get_offline_els(LUser, LServer) ->
get_offline_els(LUser, LServer, mnesia) ->
Msgs = read_all_msgs(LUser, LServer, mnesia),
+get_offline_els(LUser, LServer, DBType) when DBType == mnesia; DBType == riak ->
+ Msgs = read_all_msgs(LUser, LServer, DBType),
lists:map(
fun(Msg) ->
{route, From, To, Packet} = offline_msg_to_route(LServer, Msg),
@@ -706,6 +654,14 @@ read_all_msgs(LUser, LServer, mnesia) ->
US = {LUser, LServer},
lists:keysort(#offline_msg.timestamp,
mnesia:dirty_read({offline_msg, US}));
+read_all_msgs(LUser, LServer, riak) ->
+ case ejabberd_riak:get_by_index(
+ offline_msg, <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ lists:keysort(#offline_msg.timestamp, Rs);
+ _Err ->
+ []
+ end;
read_all_msgs(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch ejabberd_odbc:sql_query(LServer,
@@ -723,7 +679,7 @@ read_all_msgs(LUser, LServer, odbc) ->
_ -> []
end.
-format_user_queue(Msgs, mnesia) ->
+format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak ->
lists:map(fun (#offline_msg{timestamp = TimeStamp,
from = From, to = To,
packet =
@@ -831,6 +787,26 @@ user_queue_parse_query(LUser, LServer, Query, mnesia) ->
ok;
false -> nothing
end;
+user_queue_parse_query(LUser, LServer, Query, riak) ->
+ case lists:keysearch(<<"delete">>, 1, Query) of
+ {value, _} ->
+ Msgs = read_all_msgs(LUser, LServer, riak),
+ lists:foreach(
+ fun (Msg) ->
+ ID = jlib:encode_base64((term_to_binary(Msg))),
+ case lists:member({<<"selected">>, ID}, Query) of
+ true ->
+ ejabberd_riak:delete(offline_msg,
+ Msg#offline_msg.timestamp);
+ false ->
+ ok
+ end
+ end,
+ Msgs),
+ ok;
+ false ->
+ nothing
+ end;
user_queue_parse_query(LUser, LServer, Query, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case lists:keysearch(<<"delete">>, 1, Query) of
@@ -889,6 +865,14 @@ get_queue_length(LUser, LServer) ->
get_queue_length(LUser, LServer, mnesia) ->
length(mnesia:dirty_read({offline_msg,
{LUser, LServer}}));
+get_queue_length(LUser, LServer, riak) ->
+ case ejabberd_riak:count_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}) of
+ {ok, N} ->
+ N;
+ _ ->
+ 0
+ end;
get_queue_length(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
case catch ejabberd_odbc:sql_query(LServer,
@@ -917,7 +901,8 @@ get_messages_subset(User, Host, MsgsAll, DBType) ->
get_messages_subset2(Max, Length, MsgsAll, _DBType)
when Length =< Max * 2 ->
MsgsAll;
-get_messages_subset2(Max, Length, MsgsAll, mnesia) ->
+get_messages_subset2(Max, Length, MsgsAll, DBType)
+ when DBType == mnesia; DBType == riak ->
FirstN = Max,
{MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll),
MsgsLastN = lists:nthtail(Length - FirstN - FirstN,
@@ -965,6 +950,10 @@ delete_all_msgs(LUser, LServer, mnesia) ->
mnesia:dirty_read({offline_msg, US}))
end,
mnesia:transaction(F);
+delete_all_msgs(LUser, LServer, riak) ->
+ Res = ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}),
+ {atomic, Res};
delete_all_msgs(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
odbc_queries:del_spool_msg(LServer, Username),
@@ -987,16 +976,44 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server,
Acc.
%% Returns as integer the number of offline messages for a given user
-count_offline_messages(LUser, LServer) ->
+count_offline_messages(User, Server) ->
+ LUser = jlib:nodeprep(User),
+ LServer = jlib:nameprep(Server),
+ DBType = gen_mod:db_type(LServer, ?MODULE),
+ count_offline_messages(LUser, LServer, DBType).
+
+count_offline_messages(LUser, LServer, mnesia) ->
+ US = {LUser, LServer},
+ F = fun () ->
+ p1_mnesia:count_records(offline_msg,
+ #offline_msg{us = US, _ = '_'})
+ end,
+ case catch mnesia:async_dirty(F) of
+ I when is_integer(I) -> I;
+ _ -> 0
+ end;
+count_offline_messages(LUser, LServer, odbc) ->
Username = ejabberd_odbc:escape(LUser),
- case catch odbc_queries:count_records_where(
- LServer, "spool",
- <<"where username='", Username/binary, "'">>) of
- {selected, [_], [[Res]]} ->
- jlib:binary_to_integer(Res);
+ case catch odbc_queries:count_records_where(LServer,
+ <<"spool">>,
+ <<"where username='",
+ Username/binary, "'">>)
+ of
+ {selected, [_], [[Res]]} ->
+ jlib:binary_to_integer(Res);
+ _ -> 0
+ end;
+count_offline_messages(LUser, LServer, riak) ->
+ case ejabberd_riak:count_by_index(
+ offline_msg, <<"us">>, {LUser, LServer}) of
+ {ok, Res} ->
+ Res;
_ ->
0
- end.
+ end;
+count_offline_messages(_Acc, User, Server) ->
+ N = count_offline_messages(User, Server),
+ {stop, N}.
export(_Server) ->
[{offline_msg,