diff options
Diffstat (limited to 'src/mod_offline.erl')
-rw-r--r-- | src/mod_offline.erl | 111 |
1 files changed, 108 insertions, 3 deletions
diff --git a/src/mod_offline.erl b/src/mod_offline.erl index f27d35830..5e2d80aa2 100644 --- a/src/mod_offline.erl +++ b/src/mod_offline.erl @@ -175,6 +175,56 @@ store_offline_msg(Host, {User, _Server}, Msgs, Len, MaxOfflineMsgs, odbc) -> end, Msgs), odbc_queries:add_spool(Host, Query) + end; +store_offline_msg(Host, {User, _}, Msgs, Len, MaxOfflineMsgs, + riak) -> + Count = if MaxOfflineMsgs =/= infinity -> + Len + count_offline_messages(User, Host); + true -> 0 + end, + if + Count > MaxOfflineMsgs -> + discard_warn_sender(Msgs); + true -> + lists:foreach( + fun(M) -> + Username = User, + From = M#offline_msg.from, + To = M#offline_msg.to, + #xmlel{name = Name, attrs = Attrs, + children = Els} = + M#offline_msg.packet, + Attrs2 = jlib:replace_from_to_attrs( + jlib:jid_to_string(From), + jlib:jid_to_string(To), + Attrs), + Packet = #xmlel{name = Name, + attrs = Attrs2, + children = + Els ++ + [jlib:timestamp_to_xml( + calendar:now_to_universal_time( + M#offline_msg.timestamp), + utc, + jlib:make_jid(<<"">>, Host, <<"">>), + <<"Offline Storage">>), + jlib:timestamp_to_xml( + calendar:now_to_universal_time( + M#offline_msg.timestamp))]}, + XML = xml:element_to_binary(Packet), + {MegaSecs, Secs, MicroSecs} = + M#offline_msg.timestamp, + TS = + iolist_to_binary( + io_lib:format("~6..0w~6..0w.~6..0w", + [MegaSecs, Secs, MicroSecs])), + ejabberd_riak:put( + Host, <<"offline">>, + undefined, XML, + [{<<"user_bin">>, Username}, + {<<"timestamp_bin">>, TS} + ]) + end, Msgs) end. %% Function copied from ejabberd_sm.erl: @@ -193,7 +243,8 @@ receive_all(US, Msgs, DBType) -> after 0 -> case DBType of mnesia -> Msgs; - odbc -> lists:reverse(Msgs) + odbc -> lists:reverse(Msgs); + riak -> lists:reverse(Msgs) end end. @@ -421,6 +472,45 @@ pop_offline_messages(Ls, LUser, LServer, odbc) -> end, Rs); _ -> Ls + end; +pop_offline_messages(Ls, LUser, LServer, riak) -> + Username = LUser, + case ejabberd_riak:get_objects_by_index( + LServer, <<"offline">>, <<"user_bin">>, Username) of + {ok, Rs} -> + SortedRs = + lists:sort(fun(X, Y) -> + MX = riak_object:get_metadata(X), + {ok, IX} = dict:find(<<"index">>, MX), + {value, TSX} = lists:keysearch( + <<"timestamp_bin">>, 1, + IX), + MY = riak_object:get_metadata(Y), + {ok, IY} = dict:find(<<"index">>, MY), + {value, TSY} = lists:keysearch( + <<"timestamp_bin">>, 1, + IY), + TSX =< TSY + end, Rs), + Ls ++ lists:flatmap( + fun(R) -> + Key = riak_object:key(R), + ejabberd_riak:delete(LServer, <<"offline">>, Key), + XML = riak_object:get_value(R), + case xml_stream:parse_element(XML) of + {error, _Reason} -> + []; + El -> + case offline_msg_to_route(LServer, El) of + error -> + []; + RouteMsg -> + [RouteMsg] + end + end + end, SortedRs); + _ -> + Ls end. remove_expired_messages(Server) -> @@ -445,7 +535,8 @@ remove_expired_messages(_LServer, mnesia) -> ok, offline_msg) end, mnesia:transaction(F); -remove_expired_messages(_LServer, odbc) -> {atomic, ok}. +remove_expired_messages(_LServer, odbc) -> {atomic, ok}; +remove_expired_messages(_LServer, riak) -> {atomic, ok}. remove_old_messages(Days, Server) -> LServer = jlib:nameprep(Server), @@ -470,6 +561,8 @@ remove_old_messages(Days, _LServer, mnesia) -> end, mnesia:transaction(F); remove_old_messages(_Days, _LServer, odbc) -> + {atomic, ok}; +remove_old_messages(_Days, _LServer, riak) -> {atomic, ok}. remove_user(User, Server) -> @@ -484,7 +577,19 @@ remove_user(LUser, LServer, mnesia) -> mnesia:transaction(F); remove_user(LUser, LServer, odbc) -> Username = ejabberd_odbc:escape(LUser), - odbc_queries:del_spool_msg(LServer, Username). + odbc_queries:del_spool_msg(LServer, Username); +remove_user(LUser, LServer, riak) -> + Username = LUser, + case ejabberd_riak:get_keys_by_index( + LServer, <<"offline">>, <<"user_bin">>, Username) of + {ok, Keys} -> + lists:foreach( + fun(Key) -> + ejabberd_riak:delete(LServer, <<"offline">>, Key) + end, Keys); + _ -> + ok + end. jid_to_binary(#jid{user = U, server = S, resource = R, luser = LU, lserver = LS, lresource = LR}) -> |