diff options
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r-- | src/mod_offline.erl | 201 |
1 files changed, 109 insertions, 92 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl index 5e2d80aa2..c5dc305d6 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -187,43 +187,9 @@ store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs, discard_warn_sender(Msgs); true -> lists:foreach( - fun(M) -> - Username = User, - From = M#offline_msg.from, - To = M#offline_msg.to, - #xmlel{name = Name, attrs = Attrs, - children = Els} = - M#offline_msg.packet, - Attrs2 = jlib:replace_from_to_attrs( - jlib:jid_to_string(From), - jlib:jid_to_string(To), - Attrs), - Packet = #xmlel{name = Name, - attrs = Attrs2, - children = - Els ++ - [jlib:timestamp_to_xml( - calendar:now_to_universal_time( - M#offline_msg.timestamp), - utc, - jlib:make_jid(<<"">>, Host, <<"">>), - <<"Offline Storage">>), - jlib:timestamp_to_xml( - calendar:now_to_universal_time( - M#offline_msg.timestamp))]}, - XML = xml:element_to_binary(Packet), - {MegaSecs, Secs, MicroSecs} = - M#offline_msg.timestamp, - TS = - iolist_to_binary( - io_lib:format("~6..0w~6..0w.~6..0w", - [MegaSecs, Secs, MicroSecs])), - ejabberd_riak:put( - Host, <<"offline">>, - undefined, XML, - [{<<"user_bin">>, Username}, - {<<"timestamp_bin">>, TS} - ]) + fun(#offline_msg{us = US, + timestamp = TS} = M) -> + ejabberd_riak:put(M, [{i, TS}, {'2i', [{<<"us">>, US}]}]) end, Msgs) end. @@ -244,7 +210,7 @@ receive_all(US, Msgs, DBType) -> case DBType of mnesia -> Msgs; odbc -> lists:reverse(Msgs); - riak -> lists:reverse(Msgs) + riak -> Msgs end end. @@ -474,41 +440,30 @@ pop_offline_messages(Ls, LUser, LServer, odbc) -> _ -> Ls end; pop_offline_messages(Ls, LUser, LServer, riak) -> - Username = LUser, - case ejabberd_riak:get_objects_by_index( - LServer, <<"offline">>, <<"user_bin">>, Username) of + case ejabberd_riak:get_by_index(offline_msg, + <<"us">>, {LUser, LServer}) of {ok, Rs} -> - SortedRs = - lists:sort(fun(X, Y) -> - MX = riak_object:get_metadata(X), - {ok, IX} = dict:find(<<"index">>, MX), - {value, TSX} = lists:keysearch( - <<"timestamp_bin">>, 1, - IX), - MY = riak_object:get_metadata(Y), - {ok, IY} = dict:find(<<"index">>, MY), - {value, TSY} = lists:keysearch( - <<"timestamp_bin">>, 1, - IY), - TSX =< TSY - end, Rs), - Ls ++ lists:flatmap( - fun(R) -> - Key = riak_object:key(R), - ejabberd_riak:delete(LServer, <<"offline">>, Key), - XML = riak_object:get_value(R), - case xml_stream:parse_element(XML) of - {error, _Reason} -> - []; - El -> - case offline_msg_to_route(LServer, El) of - error -> - []; - RouteMsg -> - [RouteMsg] - end - end - end, SortedRs); + try + lists:foreach( + fun(#offline_msg{timestamp = T}) -> + ok = ejabberd_riak:delete(offline_msg, T) + end, Rs), + TS = now(), + Ls ++ lists:map( + fun (R) -> + offline_msg_to_route(LServer, R) + end, + lists:filter( + fun(R) -> + case R#offline_msg.expire of + never -> true; + TimeStamp -> TS < TimeStamp + end + end, + lists:keysort(#offline_msg.timestamp, Rs))) + catch _:{badmatch, _} -> + Ls + end; _ -> Ls end. @@ -579,17 +534,8 @@ remove_user(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), odbc_queries:del_spool_msg(LServer, Username); remove_user(LUser, LServer, riak) -> - Username = LUser, - case ejabberd_riak:get_keys_by_index( - LServer, <<"offline">>, <<"user_bin">>, Username) of - {ok, Keys} -> - lists:foreach( - fun(Key) -> - ejabberd_riak:delete(LServer, <<"offline">>, Key) - end, Keys); - _ -> - ok - end. + {atomic, ejabberd_riak:delete_by_index(offline_msg, + <<"us">>, {LUser, LServer})}. jid_to_binary(#jid{user = U, server = S, resource = R, luser = LU, lserver = LS, lresource = LR}) -> @@ -650,6 +596,8 @@ get_offline_els(LUser, LServer) -> get_offline_els(LUser, LServer, mnesia) -> Msgs = read_all_msgs(LUser, LServer, mnesia), +get_offline_els(LUser, LServer, DBType) when DBType == mnesia; DBType == riak -> + Msgs = read_all_msgs(LUser, LServer, DBType), lists:map( fun(Msg) -> {route, From, To, Packet} = offline_msg_to_route(LServer, Msg), @@ -706,6 +654,14 @@ read_all_msgs(LUser, LServer, mnesia) -> US = {LUser, LServer}, lists:keysort(#offline_msg.timestamp, mnesia:dirty_read({offline_msg, US})); +read_all_msgs(LUser, LServer, riak) -> + case ejabberd_riak:get_by_index( + offline_msg, <<"us">>, {LUser, LServer}) of + {ok, Rs} -> + lists:keysort(#offline_msg.timestamp, Rs); + _Err -> + [] + end; read_all_msgs(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), case catch ejabberd_odbc:sql_query(LServer, @@ -723,7 +679,7 @@ read_all_msgs(LUser, LServer, odbc) -> _ -> [] end. -format_user_queue(Msgs, mnesia) -> +format_user_queue(Msgs, DBType) when DBType == mnesia; DBType == riak -> lists:map(fun (#offline_msg{timestamp = TimeStamp, from = From, to = To, packet = @@ -831,6 +787,26 @@ user_queue_parse_query(LUser, LServer, Query, mnesia) -> ok; false -> nothing end; +user_queue_parse_query(LUser, LServer, Query, riak) -> + case lists:keysearch(<<"delete">>, 1, Query) of + {value, _} -> + Msgs = read_all_msgs(LUser, LServer, riak), + lists:foreach( + fun (Msg) -> + ID = jlib:encode_base64((term_to_binary(Msg))), + case lists:member({<<"selected">>, ID}, Query) of + true -> + ejabberd_riak:delete(offline_msg, + Msg#offline_msg.timestamp); + false -> + ok + end + end, + Msgs), + ok; + false -> + nothing + end; user_queue_parse_query(LUser, LServer, Query, odbc) -> Username = ejabberd_odbc:escape(LUser), case lists:keysearch(<<"delete">>, 1, Query) of @@ -889,6 +865,14 @@ get_queue_length(LUser, LServer) -> get_queue_length(LUser, LServer, mnesia) -> length(mnesia:dirty_read({offline_msg, {LUser, LServer}})); +get_queue_length(LUser, LServer, riak) -> + case ejabberd_riak:count_by_index(offline_msg, + <<"us">>, {LUser, LServer}) of + {ok, N} -> + N; + _ -> + 0 + end; get_queue_length(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), case catch ejabberd_odbc:sql_query(LServer, @@ -917,7 +901,8 @@ get_messages_subset(User, Host, MsgsAll, DBType) -> get_messages_subset2(Max, Length, MsgsAll, _DBType) when Length =< Max * 2 -> MsgsAll; -get_messages_subset2(Max, Length, MsgsAll, mnesia) -> +get_messages_subset2(Max, Length, MsgsAll, DBType) + when DBType == mnesia; DBType == riak -> FirstN = Max, {MsgsFirstN, Msgs2} = lists:split(FirstN, MsgsAll), MsgsLastN = lists:nthtail(Length - FirstN - FirstN, @@ -965,6 +950,10 @@ delete_all_msgs(LUser, LServer, mnesia) -> mnesia:dirty_read({offline_msg, US})) end, mnesia:transaction(F); +delete_all_msgs(LUser, LServer, riak) -> + Res = ejabberd_riak:delete_by_index(offline_msg, + <<"us">>, {LUser, LServer}), + {atomic, Res}; delete_all_msgs(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), odbc_queries:del_spool_msg(LServer, Username), @@ -987,16 +976,44 @@ webadmin_user_parse_query(Acc, _Action, _User, _Server, Acc. %% Returns as integer the number of offline messages for a given user -count_offline_messages(LUser, LServer) -> +count_offline_messages(User, Server) -> + LUser = jlib:nodeprep(User), + LServer = jlib:nameprep(Server), + DBType = gen_mod:db_type(LServer, ?MODULE), + count_offline_messages(LUser, LServer, DBType). + +count_offline_messages(LUser, LServer, mnesia) -> + US = {LUser, LServer}, + F = fun () -> + p1_mnesia:count_records(offline_msg, + #offline_msg{us = US, _ = '_'}) + end, + case catch mnesia:async_dirty(F) of + I when is_integer(I) -> I; + _ -> 0 + end; +count_offline_messages(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), - case catch odbc_queries:count_records_where( - LServer, "spool", - <<"where username='", Username/binary, "'">>) of - {selected, [_], [[Res]]} -> - jlib:binary_to_integer(Res); + case catch odbc_queries:count_records_where(LServer, + <<"spool">>, + <<"where username='", + Username/binary, "'">>) + of + {selected, [_], [[Res]]} -> + jlib:binary_to_integer(Res); + _ -> 0 + end; +count_offline_messages(LUser, LServer, riak) -> + case ejabberd_riak:count_by_index( + offline_msg, <<"us">>, {LUser, LServer}) of + {ok, Res} -> + Res; _ -> 0 - end. + end; +count_offline_messages(_Acc, User, Server) -> + N = count_offline_messages(User, Server), + {stop, N}. export(_Server) -> [{offline_msg, |