aboutsummaryrefslogtreecommitdiff
path: root/src/node_flat.erl
diff options
context:
space:
mode:
authorChristophe Romain <christophe.romain@process-one.net>2017-09-27 20:39:54 +0200
committerChristophe Romain <christophe.romain@process-one.net>2017-09-27 20:39:54 +0200
commit216a0c97b962d101311dc4d44875685f4c7cb008 (patch)
tree35c1f3af4a93b249511fb8b07ba16bf172165151 /src/node_flat.erl
parentPubSub: enforce controls on publish and delete (diff)
PubSub: add RSM support for mnesia backend
Diffstat (limited to 'src/node_flat.erl')
-rw-r--r--src/node_flat.erl70
1 files changed, 67 insertions, 3 deletions
diff --git a/src/node_flat.erl b/src/node_flat.erl
index 7347747e8..2c11edde7 100644
--- a/src/node_flat.erl
+++ b/src/node_flat.erl
@@ -728,9 +728,53 @@ del_state(#pubsub_state{stateid = {Key, Nidx}, items = Items}) ->
%% mod_pubsub module.</p>
%% <p>PubSub plugins can store the items where they wants (for example in a
%% relational database), or they can even decide not to persist any items.</p>
-get_items(Nidx, _From, _RSM) ->
- Items = mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx),
- {result, {lists:keysort(#pubsub_item.creation, Items), undefined}}.
+get_items(Nidx, _From, undefined) ->
+ RItems = lists:keysort(#pubsub_item.creation,
+ mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)),
+ Count = length(RItems),
+ if Count =< ?MAXITEMS ->
+ {result, {RItems, undefined}};
+ true ->
+ ItemsPage = lists:sublist(RItems, ?MAXITEMS),
+ Rsm = rsm_page(Count, 0, 0, ItemsPage),
+ {result, {ItemsPage, Rsm}}
+ end;
+
+get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
+ 'after' = After, before = Before}) ->
+ RItems = lists:keysort(#pubsub_item.creation,
+ mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)),
+ Count = length(RItems),
+ Limit = case Max of
+ undefined -> ?MAXITEMS;
+ _ -> Max
+ end,
+ {Offset, ItemsPage} =
+ case {IncIndex, Before, After} of
+ {I, undefined, undefined} ->
+ SubList = lists:nthtail(I, RItems),
+ {I, lists:sublist(SubList, Limit)};
+ {_, <<>>, undefined} ->
+ %% 2.5 Requesting the Last Page in a Result Set
+ SubList = lists:reverse(RItems),
+ {0, lists:sublist(SubList, Limit)};
+ {_, Stamp, undefined} ->
+ BeforeNow = encode_stamp(Stamp),
+ SubList = lists:dropwhile(
+ fun(#pubsub_item{creation = {Now, _}}) ->
+ Now >= BeforeNow
+ end, lists:reverse(RItems)),
+ {0, lists:sublist(SubList, Limit)};
+ {_, undefined, Stamp} ->
+ AfterNow = encode_stamp(Stamp),
+ SubList = lists:dropwhile(
+ fun(#pubsub_item{creation = {Now, _}}) ->
+ Now =< AfterNow
+ end, RItems),
+ {0, lists:sublist(SubList, Limit)}
+ end,
+ Rsm = rsm_page(Count, IncIndex, Offset, ItemsPage),
+ {result, {ItemsPage, Rsm}}.
get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM) ->
SubKey = jid:tolower(JID),
@@ -880,6 +924,26 @@ first_in_list(Pred, [H | T]) ->
_ -> first_in_list(Pred, T)
end.
+rsm_page(Count, Index, Offset, Items) ->
+ FirstItem = hd(Items),
+ LastItem = lists:last(Items),
+ First = decode_stamp(element(1, FirstItem#pubsub_item.creation)),
+ Last = decode_stamp(element(1, LastItem#pubsub_item.creation)),
+ #rsm_set{count = Count, index = Index,
+ first = #rsm_first{index = Offset, data = First},
+ last = Last}.
+
+encode_stamp(Stamp) ->
+ case catch xmpp_util:decode_timestamp(Stamp) of
+ {MS,S,US} -> {MS,S,US};
+ _ -> Stamp
+ end.
+decode_stamp(Stamp) ->
+ case catch xmpp_util:encode_timestamp(Stamp) of
+ TimeStamp when is_binary(TimeStamp) -> TimeStamp;
+ _ -> Stamp
+ end.
+
transform({pubsub_state, {Id, Nidx}, Is, A, Ss}) ->
{pubsub_state, {Id, Nidx}, Nidx, Is, A, Ss};
transform({pubsub_item, {Id, Nidx}, C, M, P}) ->