From 216a0c97b962d101311dc4d44875685f4c7cb008 Mon Sep 17 00:00:00 2001 From: Christophe Romain Date: Wed, 27 Sep 2017 20:39:54 +0200 Subject: PubSub: add RSM support for mnesia backend --- src/node_flat.erl | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 3 deletions(-) (limited to 'src/node_flat.erl') diff --git a/src/node_flat.erl b/src/node_flat.erl index 7347747e8..2c11edde7 100644 --- a/src/node_flat.erl +++ b/src/node_flat.erl @@ -728,9 +728,53 @@ del_state(#pubsub_state{stateid = {Key, Nidx}, items = Items}) -> %% mod_pubsub module.

%%

PubSub plugins can store the items where they wants (for example in a %% relational database), or they can even decide not to persist any items.

-get_items(Nidx, _From, _RSM) -> - Items = mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx), - {result, {lists:keysort(#pubsub_item.creation, Items), undefined}}. +get_items(Nidx, _From, undefined) -> + RItems = lists:keysort(#pubsub_item.creation, + mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)), + Count = length(RItems), + if Count =< ?MAXITEMS -> + {result, {RItems, undefined}}; + true -> + ItemsPage = lists:sublist(RItems, ?MAXITEMS), + Rsm = rsm_page(Count, 0, 0, ItemsPage), + {result, {ItemsPage, Rsm}} + end; + +get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex, + 'after' = After, before = Before}) -> + RItems = lists:keysort(#pubsub_item.creation, + mnesia:index_read(pubsub_item, Nidx, #pubsub_item.nodeidx)), + Count = length(RItems), + Limit = case Max of + undefined -> ?MAXITEMS; + _ -> Max + end, + {Offset, ItemsPage} = + case {IncIndex, Before, After} of + {I, undefined, undefined} -> + SubList = lists:nthtail(I, RItems), + {I, lists:sublist(SubList, Limit)}; + {_, <<>>, undefined} -> + %% 2.5 Requesting the Last Page in a Result Set + SubList = lists:reverse(RItems), + {0, lists:sublist(SubList, Limit)}; + {_, Stamp, undefined} -> + BeforeNow = encode_stamp(Stamp), + SubList = lists:dropwhile( + fun(#pubsub_item{creation = {Now, _}}) -> + Now >= BeforeNow + end, lists:reverse(RItems)), + {0, lists:sublist(SubList, Limit)}; + {_, undefined, Stamp} -> + AfterNow = encode_stamp(Stamp), + SubList = lists:dropwhile( + fun(#pubsub_item{creation = {Now, _}}) -> + Now =< AfterNow + end, RItems), + {0, lists:sublist(SubList, Limit)} + end, + Rsm = rsm_page(Count, IncIndex, Offset, ItemsPage), + {result, {ItemsPage, Rsm}}. get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM) -> SubKey = jid:tolower(JID), @@ -880,6 +924,26 @@ first_in_list(Pred, [H | T]) -> _ -> first_in_list(Pred, T) end. +rsm_page(Count, Index, Offset, Items) -> + FirstItem = hd(Items), + LastItem = lists:last(Items), + First = decode_stamp(element(1, FirstItem#pubsub_item.creation)), + Last = decode_stamp(element(1, LastItem#pubsub_item.creation)), + #rsm_set{count = Count, index = Index, + first = #rsm_first{index = Offset, data = First}, + last = Last}. + +encode_stamp(Stamp) -> + case catch xmpp_util:decode_timestamp(Stamp) of + {MS,S,US} -> {MS,S,US}; + _ -> Stamp + end. +decode_stamp(Stamp) -> + case catch xmpp_util:encode_timestamp(Stamp) of + TimeStamp when is_binary(TimeStamp) -> TimeStamp; + _ -> Stamp + end. + transform({pubsub_state, {Id, Nidx}, Is, A, Ss}) -> {pubsub_state, {Id, Nidx}, Nidx, Is, A, Ss}; transform({pubsub_item, {Id, Nidx}, C, M, P}) -> -- cgit v1.2.3