aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristophe Romain <christophe.romain@process-one.net>2017-09-27 10:51:37 +0200
committerChristophe Romain <christophe.romain@process-one.net>2017-09-27 10:51:37 +0200
commit07a193d4dc8e4419f47fe21a45ef53fced8f9656 (patch)
tree2f0a859d62f9481123714539b9c84b9848b6b6b4 /src
parentPubSub: fix node_options, default options only apply on first plugin (diff)
PubSub: fix RSM support (#1994)(#2Â014)
Diffstat (limited to 'src')
-rw-r--r--src/mod_pubsub.erl14
-rw-r--r--src/node_flat_sql.erl203
2 files changed, 113 insertions, 104 deletions
diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl
index 95ab5ec4b..669422d15 100644
--- a/src/mod_pubsub.erl
+++ b/src/mod_pubsub.erl
@@ -1986,8 +1986,14 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
Owners = node_owners_call(Host, Type, Nidx, O),
{PS, RG} = get_presence_and_roster_permissions(
Host, From, Owners, AccessModel, AllowedGroups),
- node_call(Host, Type, get_items,
- [Nidx, From, AccessModel, PS, RG, SubId, RSM])
+ case ItemIds of
+ [ItemId] ->
+ node_call(Host, Type, get_item,
+ [Nidx, ItemId, From, AccessModel, PS, RG, undefined]);
+ _ ->
+ node_call(Host, Type, get_items,
+ [Nidx, From, AccessModel, PS, RG, SubId, RSM])
+ end
end
end,
case transaction(Host, Node, Action, sync_dirty) of
@@ -2005,6 +2011,10 @@ get_items(Host, Node, From, SubId, SMaxItems, ItemIds, RSM) ->
#pubsub{items = #ps_items{node = Node,
items = itemsEls(lists:sublist(SendItems, MaxItems))},
rsm = RsmOut}};
+ {result, {_, Item}} ->
+ {result,
+ #pubsub{items = #ps_items{node = Node,
+ items = itemsEls([Item])}}};
Error ->
Error
end.
diff --git a/src/node_flat_sql.erl b/src/node_flat_sql.erl
index 72281a970..110e31092 100644
--- a/src/node_flat_sql.erl
+++ b/src/node_flat_sql.erl
@@ -647,95 +647,60 @@ del_state(Nidx, JID) ->
" where jid=%(J)s and nodeid=%(Nidx)d")),
ok.
-get_items(Nidx, From, undefined) ->
- MaxItems = case ejabberd_sql:sql_query_t(
- ?SQL("select @(val)s from pubsub_node_option "
- "where nodeid=%(Nidx)d and name='max_items'")) of
- {selected, [{Value}]} ->
- misc:expr_to_term(Value);
- _ ->
- ?MAXITEMS
- end,
- get_items(Nidx, From, #rsm_set{max = MaxItems});
+get_items(Nidx, _From, undefined) ->
+ SNidx = misc:i2l(Nidx),
+ case ejabberd_sql:sql_query_t(
+ [<<"select itemid, publisher, creation, modification, payload",
+ " from pubsub_item where nodeid='", SNidx/binary, "'">>]) of
+ {selected, _, AllItems} ->
+ Count = length(AllItems),
+ if Count =< ?MAXITEMS ->
+ {result, {[raw_to_item(Nidx, RItem) || RItem <- AllItems], undefined}};
+ true ->
+ RItems = lists:sublist(AllItems, ?MAXITEMS),
+ Rsm = rsm_page(Count, 0, 0, RItems),
+ {result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}}
+ end;
+ _ ->
+ {result, {[], undefined}}
+ end;
get_items(Nidx, _From, #rsm_set{max = Max, index = IncIndex,
'after' = After, before = Before}) ->
- {Way, Order} = if After == <<>> -> {<<"is not">>, <<"desc">>};
- After /= undefined -> {<<"<">>, <<"desc">>};
- Before == <<>> -> {<<"is not">>, <<"asc">>};
- Before /= undefined -> {<<">">>, <<"asc">>};
- true -> {<<"is not">>, <<"desc">>}
- end,
- SNidx = misc:i2l(Nidx),
- I = if After /= undefined -> After;
- Before /= undefined -> Before;
- true -> undefined
- end,
- [AttrName, Id] =
- case I of
- undefined when IncIndex =/= undefined ->
- case ejabberd_sql:sql_query_t(
- [<<"select creation from pubsub_item pi "
- "where exists ( select count(*) as count1 "
- "from pubsub_item where nodeid='">>, SNidx,
- <<"' and creation > pi.creation having count1 = ">>,
- integer_to_binary(IncIndex), <<" );">>]) of
- {selected, [_], [[O]]} ->
- [<<"creation">>, <<"'", O/binary, "'">>];
- _ ->
- [<<"creation">>, <<"null">>]
- end;
- undefined ->
- [<<"creation">>, <<"null">>];
- <<>> ->
- [<<"creation">>, <<"null">>];
- I ->
- [A, B] = str:tokens(ejabberd_sql:escape(I), <<"@">>),
- [A, <<"'", B/binary, "'">>]
- end,
- Count = case ejabberd_sql:sql_query_t(
- [<<"select count(*) from pubsub_item where nodeid='">>,
- SNidx, <<"';">>]) of
- {selected, [_], [[C]]} -> binary_to_integer(C);
+ Count = case catch ejabberd_sql:sql_query_t(
+ ?SQL("select @(count(itemid))d from pubsub_item"
+ " where nodeid=%(Nidx)d")) of
+ {selected, [{C}]} -> C;
+ _ -> 0
+ end,
+ Offset = case {IncIndex, Before, After} of
+ {I, undefined, undefined} when is_integer(I) -> I;
_ -> 0
+ end,
+ Limit = case Max of
+ undefined -> ?MAXITEMS;
+ _ -> Max
end,
+ Filters = rsm_filters(misc:i2l(Nidx), Before, After),
Query = fun(mssql, _) ->
ejabberd_sql:sql_query_t(
- [<<"select top ">>, integer_to_binary(Max),
- <<" itemid, publisher, creation, modification, payload "
- "from pubsub_item where nodeid='">>, SNidx,
- <<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
- AttrName, <<" ">>, Order, <<";">>]);
+ [<<"select top ", (integer_to_binary(Limit))/binary,
+ " itemid, publisher, creation, modification, payload",
+ " from pubsub_item", Filters/binary>>]);
+ %OFFSET 10 ROWS FETCH NEXT 10 ROWS ONLY;
(_, _) ->
ejabberd_sql:sql_query_t(
- [<<"select itemid, publisher, creation, modification, payload "
- "from pubsub_item where nodeid='">>, SNidx,
- <<"' and ">>, AttrName, <<" ">>, Way, <<" ">>, Id, <<" order by ">>,
- AttrName, <<" ">>, Order, <<" limit ">>,
- integer_to_binary(Max), <<" ;">>])
+ [<<"select itemid, publisher, creation, modification, payload",
+ " from pubsub_item", Filters/binary,
+ " limit ", (integer_to_binary(Limit))/binary,
+ " offset ", (integer_to_binary(Offset))/binary>>])
end,
case ejabberd_sql:sql_query_t(Query) of
+ {selected, _, []} ->
+ {result, {[], #rsm_set{count = Count}}};
{selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
<<"modification">>, <<"payload">>], RItems} ->
- case RItems of
- [[_, _, _, F, _]|_] ->
- Index = case catch ejabberd_sql:sql_query_t(
- [<<"select count(*) from pubsub_item "
- "where nodeid='">>, SNidx, <<"' and ">>,
- AttrName, <<" > '">>, F, <<"';">>]) of
- {selected, [_], [[In]]} -> binary_to_integer(In);
- _ -> 0
- end,
- [_, _, _, L, _] = lists:last(RItems),
- RsmOut = #rsm_set{count = Count,
- index = Index,
- first = #rsm_first{
- index = Index,
- data = <<"creation@", F/binary>>},
- last = <<"creation@", L/binary>>},
- {result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], RsmOut}};
- [] ->
- {result, {[], #rsm_set{count = Count}}}
- end;
+ Rsm = rsm_page(Count, IncIndex, Offset, RItems),
+ {result, {[raw_to_item(Nidx, RItem) || RItem <- RItems], Rsm}};
_ ->
{result, {[], undefined}}
end.
@@ -773,24 +738,24 @@ get_items(Nidx, JID, AccessModel, PresenceSubscription, RosterGroup, _SubId, RSM
get_items(Nidx, JID, RSM)
end.
-get_last_items(Nidx, _From, Count) ->
- Limit = misc:i2l(Count),
+get_last_items(Nidx, _From, Limit) ->
SNidx = misc:i2l(Nidx),
Query = fun(mssql, _) ->
ejabberd_sql:sql_query_t(
- [<<"select top ">>, Limit,
- <<" itemid, publisher, creation, modification, payload "
- "from pubsub_item where nodeid='">>, SNidx,
- <<"' order by modification desc ;">>]);
+ [<<"select top ", (integer_to_binary(Limit))/binary,
+ " itemid, publisher, creation, modification, payload",
+ " from pubsub_item where nodeid='", SNidx/binary,
+ "' order by modification desc">>]);
(_, _) ->
ejabberd_sql:sql_query_t(
- [<<"select itemid, publisher, creation, modification, payload "
- "from pubsub_item where nodeid='">>, SNidx,
- <<"' order by modification desc limit ">>, Limit, <<";">>])
+ [<<"select itemid, publisher, creation, modification, payload",
+ " from pubsub_item where nodeid='", SNidx/binary,
+ "' order by modification desc ",
+ " limit ", (integer_to_binary(Limit))/binary>>])
end,
case catch ejabberd_sql:sql_query_t(Query) of
- {selected,
- [<<"itemid">>, <<"publisher">>, <<"creation">>, <<"modification">>, <<"payload">>], RItems} ->
+ {selected, [<<"itemid">>, <<"publisher">>, <<"creation">>,
+ <<"modification">>, <<"payload">>], RItems} ->
{result, [raw_to_item(Nidx, RItem) || RItem <- RItems]};
_ ->
{result, []}
@@ -798,9 +763,9 @@ get_last_items(Nidx, _From, Count) ->
get_item(Nidx, ItemId) ->
case catch ejabberd_sql:sql_query_t(
- ?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
- " @(modification)s, @(payload)s from pubsub_item"
- " where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
+ ?SQL("select @(itemid)s, @(publisher)s, @(creation)s,"
+ " @(modification)s, @(payload)s from pubsub_item"
+ " where nodeid=%(Nidx)d and itemid=%(ItemId)s"))
of
{selected, [RItem]} ->
{result, raw_to_item(Nidx, RItem)};
@@ -850,11 +815,8 @@ set_item(Item) ->
P = encode_jid(JID),
Payload = Item#pubsub_item.payload,
XML = str:join([fxml:element_to_binary(X) || X<-Payload], <<>>),
- S = fun ({T1, T2, T3}) ->
- str:join([misc:i2l(T1, 6), misc:i2l(T2, 6), misc:i2l(T3, 6)], <<":">>)
- end,
- SM = S(M),
- SC = S(C),
+ SM = encode_now(M),
+ SC = encode_now(C),
?SQL_UPSERT_T(
"pubsub_item",
["!nodeid=%(Nidx)d",
@@ -1029,15 +991,52 @@ raw_to_item(Nidx, [ItemId, SJID, Creation, Modification, XML]) ->
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML});
raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) ->
JID = decode_jid(SJID),
- ToTime = fun (Str) ->
- [T1, T2, T3] = str:tokens(Str, <<":">>),
- {misc:l2i(T1), misc:l2i(T2), misc:l2i(T3)}
- end,
Payload = case fxml_stream:parse_element(XML) of
{error, _Reason} -> [];
El -> [El]
end,
#pubsub_item{itemid = {ItemId, Nidx},
- creation = {ToTime(Creation), jid:remove_resource(JID)},
- modification = {ToTime(Modification), JID},
+ creation = {decode_now(Creation), jid:remove_resource(JID)},
+ modification = {decode_now(Modification), JID},
payload = Payload}.
+
+rsm_filters(SNidx, undefined, undefined) ->
+ <<" where nodeid='", SNidx/binary, "'",
+ " order by creation asc">>;
+rsm_filters(SNidx, undefined, After) ->
+ <<" where nodeid='", SNidx/binary, "'",
+ " and creation>'", (encode_stamp(After))/binary, "'",
+ " order by creation asc">>;
+rsm_filters(SNidx, <<>>, undefined) ->
+ %% 2.5 Requesting the Last Page in a Result Set
+ Now = p1_time_compat:timestamp(),
+ <<" where nodeid='", SNidx/binary, "'",
+ " and creation<'", (encode_now(Now))/binary, "'",
+ " order by creation desc">>;
+rsm_filters(SNidx, Before, undefined) ->
+ <<" where nodeid='", SNidx/binary, "'",
+ " and creation<'", (encode_stamp(Before))/binary, "'",
+ " order by creation desc">>.
+
+rsm_page(Count, Index, Offset, Items) ->
+ First = decode_stamp(lists:nth(3, hd(Items))),
+ Last = decode_stamp(lists:nth(3, lists:last(Items))),
+ #rsm_set{count = Count, index = Index,
+ first = #rsm_first{index = Offset, data = First},
+ last = Last}.
+
+encode_stamp(Stamp) ->
+ case jlib:datetime_string_to_timestamp(Stamp) of
+ {MS,S,US} -> encode_now({MS,S,US});
+ _ -> Stamp
+ end.
+decode_stamp(Stamp) ->
+ jlib:now_to_utc_string(decode_now(Stamp)).
+
+encode_now({T1, T2, T3}) ->
+ <<(misc:i2l(T1, 6))/binary, ":",
+ (misc:i2l(T2, 6))/binary, ":",
+ (misc:i2l(T3, 6))/binary>>.
+decode_now(NowStr) ->
+ [MS, S, US] = binary:split(NowStr, <<":">>, [global]),
+ {binary_to_integer(MS), binary_to_integer(S), binary_to_integer(US)}.