aboutsummaryrefslogtreecommitdiff
path: root/src/mod_offline_riak.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_offline_riak.erl')
-rw-r--r--src/mod_offline_riak.erl153
1 files changed, 153 insertions, 0 deletions
diff --git a/src/mod_offline_riak.erl b/src/mod_offline_riak.erl
new file mode 100644
index 000000000..217e8f828
--- /dev/null
+++ b/src/mod_offline_riak.erl
@@ -0,0 +1,153 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2016, Evgeny Khramtsov
+%%% @doc
+%%%
+%%% @end
+%%% Created : 15 Apr 2016 by Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%%-------------------------------------------------------------------
+-module(mod_offline_riak).
+
+-behaviour(mod_offline).
+
+-export([init/2, store_messages/5, pop_messages/2, remove_expired_messages/1,
+ remove_old_messages/2, remove_user/2, read_message_headers/2,
+ read_message/3, remove_message/3, read_all_messages/2,
+ remove_all_messages/2, count_messages/2, import/2]).
+
+-include("jlib.hrl").
+-include("mod_offline.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ ok.
+
+store_messages(Host, {User, _}, Msgs, Len, MaxOfflineMsgs) ->
+ Count = if MaxOfflineMsgs =/= infinity ->
+ Len + count_messages(User, Host);
+ true -> 0
+ end,
+ if
+ Count > MaxOfflineMsgs ->
+ {atomic, discard};
+ true ->
+ try
+ lists:foreach(
+ fun(#offline_msg{us = US,
+ timestamp = TS} = M) ->
+ ok = ejabberd_riak:put(
+ M, offline_msg_schema(),
+ [{i, TS}, {'2i', [{<<"us">>, US}]}])
+ end, Msgs),
+ {atomic, ok}
+ catch _:{badmatch, Err} ->
+ {atomic, Err}
+ end
+ end.
+
+pop_messages(LUser, LServer) ->
+ case ejabberd_riak:get_by_index(offline_msg, offline_msg_schema(),
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ try
+ lists:foreach(
+ fun(#offline_msg{timestamp = T}) ->
+ ok = ejabberd_riak:delete(offline_msg, T)
+ end, Rs),
+ {ok, lists:keysort(#offline_msg.timestamp, Rs)}
+ catch _:{badmatch, Err} ->
+ Err
+ end;
+ Err ->
+ Err
+ end.
+
+remove_expired_messages(_LServer) ->
+ %% TODO
+ {atomic, ok}.
+
+remove_old_messages(_Days, _LServer) ->
+ %% TODO
+ {atomic, ok}.
+
+remove_user(LUser, LServer) ->
+ {atomic, ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer})}.
+
+read_message_headers(LUser, LServer) ->
+ case ejabberd_riak:get_by_index(
+ offline_msg, offline_msg_schema(),
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ Hdrs = lists:map(
+ fun(#offline_msg{from = From, to = To, packet = Pkt,
+ timestamp = TS}) ->
+ Seq = now_to_integer(TS),
+ NewPkt = jlib:add_delay_info(
+ Pkt, LServer, TS, <<"Offline Storage">>),
+ {Seq, From, To, NewPkt}
+ end, Rs),
+ lists:keysort(1, Hdrs);
+ _Err ->
+ []
+ end.
+
+read_message(_LUser, _LServer, I) ->
+ TS = integer_to_now(I),
+ case ejabberd_riak:get(offline_msg, offline_msg_schema(), TS) of
+ {ok, Msg} ->
+ {ok, Msg};
+ _ ->
+ error
+ end.
+
+remove_message(_LUser, _LServer, I) ->
+ TS = integer_to_now(I),
+ ejabberd_riak:delete(offline_msg, TS),
+ ok.
+
+read_all_messages(LUser, LServer) ->
+ case ejabberd_riak:get_by_index(
+ offline_msg, offline_msg_schema(),
+ <<"us">>, {LUser, LServer}) of
+ {ok, Rs} ->
+ lists:keysort(#offline_msg.timestamp, Rs);
+ _Err ->
+ []
+ end.
+
+remove_all_messages(LUser, LServer) ->
+ Res = ejabberd_riak:delete_by_index(offline_msg,
+ <<"us">>, {LUser, LServer}),
+ {atomic, Res}.
+
+count_messages(LUser, LServer) ->
+ case ejabberd_riak:count_by_index(
+ offline_msg, <<"us">>, {LUser, LServer}) of
+ {ok, Res} ->
+ Res;
+ _ ->
+ 0
+ end.
+
+import(_LServer, #offline_msg{us = US, timestamp = TS} = M) ->
+ ejabberd_riak:put(M, offline_msg_schema(),
+ [{i, TS}, {'2i', [{<<"us">>, US}]}]).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+offline_msg_schema() ->
+ {record_info(fields, offline_msg), #offline_msg{}}.
+
+now_to_integer({MS, S, US}) ->
+ (MS * 1000000 + S) * 1000000 + US.
+
+integer_to_now(Int) ->
+ Secs = Int div 1000000,
+ USec = Int rem 1000000,
+ MSec = Secs div 1000000,
+ Sec = Secs rem 1000000,
+ {MSec, Sec, USec}.