aboutsummaryrefslogtreecommitdiff
path: root/src/pubsub_odbc.patch
diff options
context:
space:
mode:
Diffstat (limited to 'src/pubsub_odbc.patch')
-rw-r--r--src/pubsub_odbc.patch1201
1 files changed, 1201 insertions, 0 deletions
diff --git a/src/pubsub_odbc.patch b/src/pubsub_odbc.patch
new file mode 100644
index 000000000..435025d4a
--- /dev/null
+++ b/src/pubsub_odbc.patch
@@ -0,0 +1,1201 @@
+--- mod_pubsub.erl 2012-10-01 10:05:28.172445600 +0200
++++ mod_pubsub_odbc.erl 2012-10-01 11:47:29.225239163 +0200
+@@ -41,7 +41,7 @@
+ %%% 6.2.3.1, 6.2.3.5, and 6.3. For information on subscription leases see
+ %%% XEP-0060 section 12.18.
+
+--module(mod_pubsub).
++-module(mod_pubsub_odbc).
+
+ -author('christophe.romain@process-one.net').
+
+@@ -59,11 +59,11 @@
+
+ -include("pubsub.hrl").
+
+--define(STDTREE, <<"tree">>).
++-define(STDTREE, <<"tree_odbc">>).
+
+--define(STDNODE, <<"flat">>).
++-define(STDNODE, <<"flat_odbc">>).
+
+--define(PEPNODE, <<"pep">>).
++-define(PEPNODE, <<"pep_odbc">>).
+
+ %% exports for hooks
+ -export([presence_probe/3, caps_update/3,
+@@ -86,8 +86,7 @@
+ %% general helpers for plugins
+ -export([subscription_to_string/1, affiliation_to_string/1,
+ string_to_subscription/1, string_to_affiliation/1,
+- extended_error/2, extended_error/3,
+- rename_default_nodeplugin/0]).
++ extended_error/2, extended_error/3, escape/1]).
+
+ %% API and gen_server callbacks
+ -export([start_link/2, start/2, stop/1, init/1,
+@@ -97,7 +96,7 @@
+ %% calls for parallel sending of last items
+ -export([send_loop/1]).
+
+--define(PROCNAME, ejabberd_mod_pubsub).
++-define(PROCNAME, ejabberd_mod_pubsub_odbc).
+
+ -define(LOOPNAME, ejabberd_mod_pubsub_loop).
+
+@@ -324,8 +323,6 @@
+ false -> ok
+ end,
+ ejabberd_router:register_route(Host),
+- update_node_database(Host, ServerHost),
+- update_state_database(Host, ServerHost),
+ put(server_host, ServerHost),
+ init_nodes(Host, ServerHost, NodeTree, Plugins),
+ State = #state{host = Host, server_host = ServerHost,
+@@ -385,421 +382,14 @@
+ ok.
+
+ init_nodes(Host, ServerHost, _NodeTree, Plugins) ->
+- case lists:member(<<"hometree">>, Plugins) of
++ case lists:member(<<"hometree_odbc">>, Plugins) of
+ true ->
+- create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree">>),
+- create_node(Host, ServerHost, <<"/home/", ServerHost/binary>>, service_jid(Host),
+- <<"hometree">>);
++ create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree_odbc">>),
++ create_node(Host, ServerHost, <<"/home/", ServerHost/binary>>, service_jid(Host),
++ <<"hometree_odbc">>);
+ false -> ok
+ end.
+
+-update_node_database(Host, ServerHost) ->
+- mnesia:del_table_index(pubsub_node, type),
+- mnesia:del_table_index(pubsub_node, parentid),
+- case catch mnesia:table_info(pubsub_node, attributes) of
+- [host_node, host_parent, info] ->
+- ?INFO_MSG("upgrade node pubsub tables", []),
+- F = fun () ->
+- {Result, LastIdx} = lists:foldl(fun ({pubsub_node,
+- NodeId, ParentId,
+- {nodeinfo, Items,
+- Options,
+- Entities}},
+- {RecList,
+- NodeIdx}) ->
+- ItemsList =
+- lists:foldl(fun
+- ({item,
+- IID,
+- Publisher,
+- Payload},
+- Acc) ->
+- C =
+- {unknown,
+- Publisher},
+- M =
+- {now(),
+- Publisher},
+- mnesia:write(#pubsub_item{itemid
+- =
+- {IID,
+- NodeIdx},
+- creation
+- =
+- C,
+- modification
+- =
+- M,
+- payload
+- =
+- Payload}),
+- [{Publisher,
+- IID}
+- | Acc]
+- end,
+- [],
+- Items),
+- Owners =
+- dict:fold(fun
+- (JID,
+- {entity,
+- Aff,
+- Sub},
+- Acc) ->
+- UsrItems =
+- lists:foldl(fun
+- ({P,
+- I},
+- IAcc) ->
+- case
+- P
+- of
+- JID ->
+- [I
+- | IAcc];
+- _ ->
+- IAcc
+- end
+- end,
+- [],
+- ItemsList),
+- mnesia:write({pubsub_state,
+- {JID,
+- NodeIdx},
+- UsrItems,
+- Aff,
+- Sub}),
+- case
+- Aff
+- of
+- owner ->
+- [JID
+- | Acc];
+- _ ->
+- Acc
+- end
+- end,
+- [],
+- Entities),
+- mnesia:delete({pubsub_node,
+- NodeId}),
+- {[#pubsub_node{nodeid
+- =
+- NodeId,
+- id
+- =
+- NodeIdx,
+- parents
+- =
+- [element(2,
+- ParentId)],
+- owners
+- =
+- Owners,
+- options
+- =
+- Options}
+- | RecList],
+- NodeIdx + 1}
+- end,
+- {[], 1},
+- mnesia:match_object({pubsub_node,
+- {Host,
+- '_'},
+- '_',
+- '_'})),
+- mnesia:write(#pubsub_index{index = node, last = LastIdx,
+- free = []}),
+- Result
+- end,
+- {atomic, NewRecords} = mnesia:transaction(F),
+- {atomic, ok} = mnesia:delete_table(pubsub_node),
+- {atomic, ok} = mnesia:create_table(pubsub_node,
+- [{disc_copies, [node()]},
+- {attributes,
+- record_info(fields,
+- pubsub_node)}]),
+- FNew = fun () ->
+- lists:foreach(fun (Record) -> mnesia:write(Record) end,
+- NewRecords)
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Result} ->
+- ?INFO_MSG("Pubsub node tables updated correctly: ~p",
+- [Result]);
+- {aborted, Reason} ->
+- ?ERROR_MSG("Problem updating Pubsub node tables:~n~p",
+- [Reason])
+- end;
+- [nodeid, parentid, type, owners, options] ->
+- F = fun ({pubsub_node, NodeId, {_, Parent}, Type,
+- Owners, Options}) ->
+- #pubsub_node{nodeid = NodeId, id = 0,
+- parents = [Parent], type = Type,
+- owners = Owners, options = Options}
+- end,
+- mnesia:transform_table(pubsub_node, F,
+- [nodeid, id, parents, type, owners, options]),
+- FNew = fun () ->
+- LastIdx = lists:foldl(fun (#pubsub_node{nodeid =
+- NodeId} =
+- PubsubNode,
+- NodeIdx) ->
+- mnesia:write(PubsubNode#pubsub_node{id
+- =
+- NodeIdx}),
+- lists:foreach(fun
+- (#pubsub_state{stateid
+- =
+- StateId} =
+- State) ->
+- {JID,
+- _} =
+- StateId,
+- mnesia:delete({pubsub_state,
+- StateId}),
+- mnesia:write(State#pubsub_state{stateid
+- =
+- {JID,
+- NodeIdx}})
+- end,
+- mnesia:match_object(#pubsub_state{stateid
+- =
+- {'_',
+- NodeId},
+- _
+- =
+- '_'})),
+- lists:foreach(fun
+- (#pubsub_item{itemid
+- =
+- ItemId} =
+- Item) ->
+- {IID,
+- _} =
+- ItemId,
+- {M1,
+- M2} =
+- Item#pubsub_item.modification,
+- {C1,
+- C2} =
+- Item#pubsub_item.creation,
+- mnesia:delete({pubsub_item,
+- ItemId}),
+- mnesia:write(Item#pubsub_item{itemid
+- =
+- {IID,
+- NodeIdx},
+- modification
+- =
+- {M2,
+- M1},
+- creation
+- =
+- {C2,
+- C1}})
+- end,
+- mnesia:match_object(#pubsub_item{itemid
+- =
+- {'_',
+- NodeId},
+- _
+- =
+- '_'})),
+- NodeIdx + 1
+- end,
+- 1,
+- mnesia:match_object({pubsub_node,
+- {Host, '_'},
+- '_', '_',
+- '_', '_',
+- '_'})
+- ++
+- mnesia:match_object({pubsub_node,
+- {{'_',
+- ServerHost,
+- '_'},
+- '_'},
+- '_', '_',
+- '_', '_',
+- '_'})),
+- mnesia:write(#pubsub_index{index = node,
+- last = LastIdx, free = []})
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Result} ->
+- rename_default_nodeplugin(),
+- ?INFO_MSG("Pubsub node tables updated correctly: ~p",
+- [Result]);
+- {aborted, Reason} ->
+- ?ERROR_MSG("Problem updating Pubsub node tables:~n~p",
+- [Reason])
+- end;
+- [nodeid, id, parent, type, owners, options] ->
+- F = fun ({pubsub_node, NodeId, Id, Parent, Type, Owners,
+- Options}) ->
+- #pubsub_node{nodeid = NodeId, id = Id,
+- parents = [Parent], type = Type,
+- owners = Owners, options = Options}
+- end,
+- mnesia:transform_table(pubsub_node, F,
+- [nodeid, id, parents, type, owners, options]),
+- rename_default_nodeplugin();
+- _ -> ok
+- end,
+- mnesia:transaction(fun () ->
+- case catch mnesia:first(pubsub_node) of
+- {_, L} when is_binary(L) ->
+- lists:foreach(fun ({H, N})
+- when is_binary(N) ->
+- [Node] =
+- mnesia:read({pubsub_node,
+- {H,
+- N}}),
+- Type =
+- Node#pubsub_node.type,
+- BN = element(2,
+- node_call(Type,
+- path_to_node,
+- [N])),
+- BP = case [element(2,
+- node_call(Type,
+- path_to_node,
+- [P]))
+- || P
+- <- Node#pubsub_node.parents]
+- of
+- [<<>>] -> [];
+- Parents ->
+- Parents
+- end,
+- mnesia:write(Node#pubsub_node{nodeid
+- =
+- {H,
+- BN},
+- parents
+- =
+- BP}),
+- mnesia:delete({pubsub_node,
+- {H,
+- N}});
+- (_) -> ok
+- end,
+- mnesia:all_keys(pubsub_node));
+- _ -> ok
+- end
+- end).
+-
+-rename_default_nodeplugin() ->
+- lists:foreach(fun (Node) ->
+- mnesia:dirty_write(Node#pubsub_node{type =
+- <<"hometree">>})
+- end,
+- mnesia:dirty_match_object(#pubsub_node{type =
+- <<"default">>,
+- _ = '_'})).
+-
+-update_state_database(_Host, _ServerHost) ->
+- case catch mnesia:table_info(pubsub_state, attributes)
+- of
+- [stateid, items, affiliation, subscription] ->
+- ?INFO_MSG("upgrade state pubsub tables", []),
+- F = fun ({pubsub_state, {JID, NodeID}, Items, Aff, Sub},
+- Acc) ->
+- Subs = case Sub of
+- none -> [];
+- _ ->
+- {result, SubID} =
+- pubsub_subscription:subscribe_node(JID,
+- NodeID,
+- []),
+- [{Sub, SubID}]
+- end,
+- NewState = #pubsub_state{stateid = {JID, NodeID},
+- items = Items, affiliation = Aff,
+- subscriptions = Subs},
+- [NewState | Acc]
+- end,
+- {atomic, NewRecs} =
+- mnesia:transaction(fun mnesia:foldl/3,
+- [F, [], pubsub_state]),
+- {atomic, ok} = mnesia:delete_table(pubsub_state),
+- {atomic, ok} = mnesia:create_table(pubsub_state,
+- [{disc_copies, [node()]},
+- {attributes,
+- record_info(fields,
+- pubsub_state)}]),
+- FNew = fun () ->
+- lists:foreach(fun mnesia:write/1, NewRecs)
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Result} ->
+- ?INFO_MSG("Pubsub state tables updated correctly: ~p",
+- [Result]);
+- {aborted, Reason} ->
+- ?ERROR_MSG("Problem updating Pubsub state tables:~n~p",
+- [Reason])
+- end;
+- [stateid, items, affiliation, subscriptions] ->
+- ?INFO_MSG("upgrade state pubsub table", []),
+- F = fun ({pubsub_state, {JID, Nidx}, Items, Aff, Subs},
+- Acc) ->
+- NewState = #pubsub_state{stateid = {JID, Nidx},
+- nodeidx = Nidx, items = Items,
+- affiliation = Aff,
+- subscriptions = Subs},
+- [NewState | Acc]
+- end,
+- {atomic, NewRecs} =
+- mnesia:transaction(fun mnesia:foldl/3,
+- [F, [], pubsub_state]),
+- {atomic, ok} = mnesia:delete_table(pubsub_state),
+- {atomic, ok} = mnesia:create_table(pubsub_state,
+- [{disc_copies, [node()]},
+- {attributes,
+- record_info(fields,
+- pubsub_state)}]),
+- FNew = fun () ->
+- lists:foreach(fun mnesia:write/1, NewRecs)
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Res1} ->
+- ?INFO_MSG("Pubsub state tables updated correctly: ~p",
+- [Res1]);
+- {aborted, Rea1} ->
+- ?ERROR_MSG("Problem updating Pubsub state table:~n~p",
+- [Rea1])
+- end,
+- ?INFO_MSG("upgrade item pubsub table", []),
+- F = fun ({pubsub_item, {ItemId, Nidx}, C, M, P}, Acc) ->
+- NewItem = #pubsub_item{itemid = {ItemId, Nidx},
+- nodeidx = Nidx, creation = C,
+- modification = M, payload = P},
+- [NewItem | Acc]
+- end,
+- {atomic, NewRecs} =
+- mnesia:transaction(fun mnesia:foldl/3,
+- [F, [], pubsub_item]),
+- {atomic, ok} = mnesia:delete_table(pubsub_item),
+- {atomic, ok} = mnesia:create_table(pubsub_item,
+- [{disc_copies, [node()]},
+- {attributes,
+- record_info(fields,
+- pubsub_item)}]),
+- FNew = fun () ->
+- lists:foreach(fun mnesia:write/1, NewRecs)
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Res2} ->
+- ?INFO_MSG("Pubsub item tables updated correctly: ~p",
+- [Res2]);
+- {aborted, Rea2} ->
+- ?ERROR_MSG("Problem updating Pubsub item table:~n~p",
+- [Rea2])
+- end;
+- _ -> ok
+- end.
+-
+ send_loop(State) ->
+ receive
+ {presence, JID, Pid} ->
+@@ -808,11 +398,15 @@
+ LJID = jlib:jid_tolower(JID),
+ BJID = jlib:jid_remove_resource(LJID),
+ lists:foreach(fun (PType) ->
+- {result, Subscriptions} = node_action(Host,
+- PType,
+- get_entity_subscriptions,
+- [Host,
+- JID]),
++ Subscriptions = case catch node_action(Host,
++ PType,
++ get_entity_subscriptions_for_send_last,
++ [Host,
++ JID])
++ of
++ {result, S} -> S;
++ _ -> []
++ end,
+ lists:foreach(fun ({Node, subscribed, _,
+ SubJID}) ->
+ if (SubJID == LJID) or
+@@ -824,24 +418,13 @@
+ type =
+ Type,
+ id =
+- NodeId,
+- options
+- =
+- Options} =
++ NodeId} =
+ Node,
+- case
+- get_option(Options,
+- send_last_published_item)
+- of
+- on_sub_and_presence ->
+- send_items(H,
+- N,
+- NodeId,
+- Type,
+- LJID,
+- last);
+- _ -> ok
+- end;
++ send_items(H, N,
++ NodeId,
++ Type,
++ LJID,
++ last);
+ true ->
+ % resource not concerned about that subscription
+ ok
+@@ -1032,8 +615,11 @@
+ children = []}];
+ disco_identity(Host, Node, From) ->
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
+- case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
++ case get_allowed_items_call(Host, Idx, From, Type,
++ Options, Owners)
++ of
+ {result, _} ->
+ {result,
+ [#xmlel{name = <<"identity">>,
+@@ -1084,7 +670,8 @@
+ || Feature <- features(<<"pep">>)]];
+ disco_features(Host, Node, From) ->
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
+ case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of
+ {result, _} ->
+ {result,
+@@ -1129,9 +716,9 @@
+ ).
+ disco_items(Host, <<>>, From) ->
+ Action = fun (#pubsub_node{nodeid = {_, NodeID},
+- options = Options, type = Type, id = Idx,
+- owners = Owners},
++ options = Options, type = Type, id = Idx},
+ Acc) ->
++ Owners = node_owners_call(Type, Idx),
+ case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of
+ {result, _} ->
+ [#xmlel{name = <<"item">>,
+@@ -1152,13 +739,14 @@
+ _ -> Acc
+ end
+ end,
+- case transaction(Host, Action, sync_dirty) of
++ case transaction_on_nodes(Host, Action, sync_dirty) of
+ {result, Items} -> Items;
+ _ -> []
+ end;
+ disco_items(Host, Node, From) ->
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
+ case get_allowed_items_call(Host, Idx, From, Type,
+ Options, Owners)
+ of
+@@ -1262,9 +850,6 @@
+ lists:foreach(fun ({#pubsub_node{options
+ =
+ Options,
+- owners
+- =
+- Owners,
+ id =
+ NodeId},
+ subscribed, _,
+@@ -1276,7 +861,9 @@
+ presence ->
+ case
+ lists:member(BJID,
+- Owners)
++ node_owners(Host,
++ PType,
++ NodeId))
+ of
+ true ->
+ node_action(Host,
+@@ -1347,7 +934,7 @@
+ {H,
+ N},
+ type =
+- <<"hometree">>},
++ <<"hometree_odbc">>},
+ owner})
+ when N ==
+ HomeTreeBase ->
+@@ -1495,7 +1082,8 @@
+ IQ ->
+ #xmlel{attrs = QAttrs} = SubEl,
+ Node = xml:get_attr_s(<<"node">>, QAttrs),
+- Res = case iq_disco_items(Host, Node, From) of
++ Rsm = jlib:rsm_decode(IQ),
++ Res = case iq_disco_items(Host, Node, From, Rsm) of
+ {result, IQRes} ->
+ jlib:iq_to_xml(IQ#iq{type = result,
+ sub_el =
+@@ -1669,7 +1257,8 @@
+ [] -> [<<"leaf">>];
+ _ ->
+ case node_call(Type, get_items,
+- [NodeId, From])
++ [NodeId, From,
++ none])
+ of
+ {result, []} ->
+ [<<"collection">>];
+@@ -1691,7 +1280,13 @@
+ F = [#xmlel{name = <<"feature">>,
+ attrs = [{<<"var">>, ?NS_PUBSUB}],
+ children = []}
+- | lists:map(fun (T) ->
++ | lists:map(fun (<<"rsm">>) ->
++ #xmlel{name = <<"feature">>,
++ attrs =
++ [{<<"var">>,
++ ?NS_RSM}],
++ children = []};
++ (T) ->
+ #xmlel{name = <<"feature">>,
+ attrs =
+ [{<<"var">>,
+@@ -1735,10 +1330,16 @@
+ #xmlel{name = <<"feature">>,
+ attrs = [{<<"var">>, ?NS_VCARD}], children = []}]
+ ++
+- lists:map(fun (Feature) ->
++ lists:map(fun (<<"rsm">>) ->
++ #xmlel{name = <<"feature">>,
++ attrs = [{<<"var">>, ?NS_RSM}],
++ children = []};
++ (T) ->
+ #xmlel{name = <<"feature">>,
+ attrs =
+- [{<<"var">>, <<(?NS_PUBSUB)/binary, "#", Feature/binary>>}],
++ [{<<"var">>,
++ <<(?NS_PUBSUB)/binary, "#",
++ T/binary>>}],
+ children = []}
+ end,
+ features(Host, Node))};
+@@ -1748,14 +1349,15 @@
+ _ -> node_disco_info(Host, Node, From)
+ end.
+
+--spec(iq_disco_items/3 ::
++-spec(iq_disco_items/4 ::
+ (
+ Host :: mod_pubsub:host(),
+ NodeId :: <<>> | mod_pubsub:nodeId(),
+- From :: jid())
++ From :: jid(),
++ Rsm :: none | rsm_in())
+ -> {result, [xmlel()]}
+ ).
+-iq_disco_items(Host, <<>>, From) ->
++iq_disco_items(Host, <<>>, From, _RSM) ->
+ {result,
+ lists:map(fun (#pubsub_node{nodeid = {_, SubNode},
+ options = Options}) ->
+@@ -1792,7 +1394,7 @@
+ % Nodes)};
+ % Other -> Other
+ % end;
+-iq_disco_items(Host, ?NS_COMMANDS, _From) ->
++iq_disco_items(Host, ?NS_COMMANDS, _From, _RSM) ->
+ CommandItems = [#xmlel{name = <<"item">>,
+ attrs =
+ [{<<"jid">>, Host},
+@@ -1800,23 +1402,28 @@
+ {<<"name">>, <<"Get Pending">>}],
+ children = []}],
+ {result, CommandItems};
+-iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From) ->
++iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From, _RSM) ->
+ CommandItems = [], {result, CommandItems};
+-iq_disco_items(Host, Item, From) ->
++iq_disco_items(Host, Item, From, RSM) ->
+ case str:tokens(Item, <<"!">>) of
+ [_Node, _ItemID] -> {result, []};
+ [Node] ->
+ % Node = string_to_node(SNode),
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
+- NodeItems = case get_allowed_items_call(Host, Idx,
+- From, Type,
+- Options,
+- Owners)
+- of
+- {result, R} -> R;
+- _ -> []
+- end,
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
++ {NodeItems, RsmOut} = case
++ get_allowed_items_call(Host,
++ Idx,
++ From,
++ Type,
++ Options,
++ Owners,
++ RSM)
++ of
++ {result, R} -> R;
++ _ -> {[], none}
++ end,
+ Nodes = lists:map(fun (#pubsub_node{nodeid =
+ {_, SubNode},
+ options =
+@@ -1858,7 +1465,7 @@
+ children = []}
+ end,
+ NodeItems),
+- {result, Nodes ++ Items}
++ {result, Nodes ++ Items ++ jlib:rsm_encode(RsmOut)}
+ end,
+ case transaction(Host, Node, Action, sync_dirty) of
+ {result, {_, Result}} -> {result, Result};
+@@ -2009,7 +1616,8 @@
+ (_, Acc) -> Acc
+ end,
+ [], xml:remove_cdata(Els)),
+- get_items(Host, Node, From, SubId, MaxItems, ItemIDs);
++ RSM = jlib:rsm_decode(SubEl),
++ get_items(Host, Node, From, SubId, MaxItems, ItemIDs, RSM);
+ {get, <<"subscriptions">>} ->
+ get_subscriptions(Host, Node, From, Plugins);
+ {get, <<"affiliations">>} ->
+@@ -2174,7 +1782,7 @@
+ _ -> []
+ end
+ end,
+- case transaction(fun () ->
++ case transaction(Host, fun () ->
+ {result, lists:flatmap(Tr, Plugins)}
+ end,
+ sync_dirty)
+@@ -2216,8 +1824,10 @@
+
+ %%% authorization handling
+
+-send_authorization_request(#pubsub_node{owners = Owners, nodeid = {Host, Node}},
+- Subscriber) ->
++send_authorization_request(#pubsub_node{nodeid =
++ {Host, Node},
++ type = Type, id = NodeId},
++ Subscriber) ->
+ Lang = <<"en">>,
+ Stanza = #xmlel{name = <<"message">>, attrs = [],
+ children =
+@@ -2294,7 +1904,7 @@
+ ejabberd_router:route(service_jid(Host),
+ jlib:make_jid(Owner), Stanza)
+ end,
+- Owners).
++ node_owners(Host, Type, NodeId)).
+
+ find_authorization_response(Packet) ->
+ #xmlel{children = Els} = Packet,
+@@ -2353,11 +1963,10 @@
+ <<"true">> -> true;
+ _ -> false
+ end,
+- Action = fun (#pubsub_node{type = Type, owners = Owners,
+- id = NodeId}) ->
++ Action = fun (#pubsub_node{type = Type, id = NodeId}) ->
+ IsApprover =
+ lists:member(jlib:jid_tolower(jlib:jid_remove_resource(From)),
+- Owners),
++ node_owners_call(Type, NodeId)),
+ {result, Subscriptions} = node_call(Type,
+ get_subscriptions,
+ [NodeId,
+@@ -2597,7 +2206,7 @@
+ children =
+ [#xmlel{name = <<"create">>,
+ attrs = nodeAttr(Node), children = []}]}],
+- case transaction(CreateNode, transaction) of
++ case transaction(Host, CreateNode, transaction) of
+ {result, {NodeId, {Result, broadcast}}} ->
+ broadcast_created_node(Host, Node, NodeId, Type,
+ NodeOptions),
+@@ -2709,7 +2318,7 @@
+ ).
+ subscribe_node(Host, Node, From, JID, Configuration) ->
+ SubOpts = case
+- pubsub_subscription:parse_options_xform(Configuration)
++ pubsub_subscription_odbc:parse_options_xform(Configuration)
+ of
+ {result, GoodSubOpts} -> GoodSubOpts;
+ _ -> invalid
+@@ -2723,7 +2332,7 @@
+ end
+ end,
+ Action = fun (#pubsub_node{options = Options,
+- owners = Owners, type = Type, id = NodeId}) ->
++ type = Type, id = NodeId}) ->
+ Features = features(Type),
+ SubscribeFeature = lists:member(<<"subscribe">>, Features),
+ OptionsFeature = lists:member(<<"subscription-options">>, Features),
+@@ -2732,6 +2341,7 @@
+ AccessModel = get_option(Options, access_model),
+ SendLast = get_option(Options, send_last_published_item),
+ AllowedGroups = get_option(Options, roster_groups_allowed, []),
++ Owners = node_owners_call(Type, NodeId),
+ {PresenceSubscription, RosterGroup} =
+ get_presence_and_roster_permissions(Host, Subscriber,
+ Owners, AccessModel, AllowedGroups),
+@@ -3076,19 +2686,20 @@
+ Error -> Error
+ end.
+
+--spec(get_items/6 ::
++-spec(get_items/7 ::
+ (
+ Host :: mod_pubsub:host(),
+ Node :: mod_pubsub:nodeId(),
+ From :: jid(),
+ SubId :: mod_pubsub:subId(),
+ SMaxItems :: binary(),
+- ItemIDs :: [mod_pubsub:itemId()])
++ ItemIDs :: [mod_pubsub:itemId()],
++ Rsm :: none | rsm_in())
+ -> {result, [xmlel(),...]}
+ %%%
+ | {error, xmlel()}
+ ).
+-get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) ->
++get_items(Host, Node, From, SubId, SMaxItems, ItemIDs, RSM) ->
+ MaxItems = if SMaxItems == <<"">> ->
+ get_max_items_node(Host);
+ true ->
+@@ -3100,13 +2711,13 @@
+ case MaxItems of
+ {error, Error} -> {error, Error};
+ _ ->
+- Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId,
+- owners = Owners}) ->
++ Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId}) ->
+ Features = features(Type),
+ RetreiveFeature = lists:member(<<"retrieve-items">>, Features),
+ PersistentFeature = lists:member(<<"persistent-items">>, Features),
+ AccessModel = get_option(Options, access_model),
+ AllowedGroups = get_option(Options, roster_groups_allowed, []),
++ Owners = node_owners_call(Type, NodeId),
+ {PresenceSubscription, RosterGroup} =
+ get_presence_and_roster_permissions(Host, From, Owners,
+ AccessModel, AllowedGroups),
+@@ -3124,11 +2735,11 @@
+ node_call(Type, get_items,
+ [NodeId, From, AccessModel,
+ PresenceSubscription, RosterGroup,
+- SubId])
++ SubId, RSM])
+ end
+ end,
+ case transaction(Host, Node, Action, sync_dirty) of
+- {result, {_, Items}} ->
++ {result, {_, {Items, RSMOut}}} ->
+ SendItems = case ItemIDs of
+ [] -> Items;
+ _ ->
+@@ -3147,7 +2758,8 @@
+ [#xmlel{name = <<"items">>, attrs = nodeAttr(Node),
+ children =
+ itemsEls(lists:sublist(SendItems,
+- MaxItems))}]}]};
++ MaxItems))}
++ | jlib:rsm_encode(RSMOut)]}]};
+ Error -> Error
+ end
+ end.
+@@ -3170,42 +2782,68 @@
+ Error -> Error
+ end.
+
+-get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners) ->
++get_allowed_items_call(Host, NodeIdx, From, Type,
++ Options, Owners) ->
++ case get_allowed_items_call(Host, NodeIdx, From, Type,
++ Options, Owners, none)
++ of
++ {result, {I, _}} -> {result, I};
++ Error -> Error
++ end.
++
++get_allowed_items_call(Host, NodeIdx, From, Type,
++ Options, Owners, RSM) ->
+ AccessModel = get_option(Options, access_model),
+- AllowedGroups = get_option(Options, roster_groups_allowed, []),
++ AllowedGroups = get_option(Options,
++ roster_groups_allowed, []),
+ {PresenceSubscription, RosterGroup} =
+- get_presence_and_roster_permissions(Host, From, Owners, AccessModel,
+- AllowedGroups),
++ get_presence_and_roster_permissions(Host, From, Owners,
++ AccessModel, AllowedGroups),
+ node_call(Type, get_items,
+- [NodeIdx, From, AccessModel, PresenceSubscription, RosterGroup, undefined]).
++ [NodeIdx, From, AccessModel, PresenceSubscription,
++ RosterGroup, undefined, RSM]).
+
+-send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, last) ->
+- case get_cached_item(Host, NodeId) of
+- undefined ->
+- send_items(Host, Node, NodeId, Type, LJID, 1);
+- LastItem ->
+- {ModifNow, ModifUSR} =
+- LastItem#pubsub_item.modification,
+- Stanza = event_stanza_with_delay([#xmlel{name =
+- <<"items">>,
++send_items(Host, Node, NodeId, Type, {U, S, R} = LJID,
++ last) ->
++ Stanza = case get_cached_item(Host, NodeId) of
++ undefined ->
++ case node_action(Host, Type, get_last_items,
++ [NodeId, LJID, 1])
++ of
++ {result, [LastItem]} ->
++ {ModifNow, ModifUSR} =
++ LastItem#pubsub_item.modification,
++ event_stanza_with_delay([#xmlel{name = <<"items">>,
++ attrs = nodeAttr(Node),
++ children =
++ itemsEls([LastItem])}],
++ ModifNow, ModifUSR);
++ _ ->
++ event_stanza([#xmlel{name = <<"items">>,
++ attrs = nodeAttr(Node),
++ children = itemsEls([])}])
++ end;
++ LastItem ->
++ {ModifNow, ModifUSR} =
++ LastItem#pubsub_item.modification,
++ event_stanza_with_delay([#xmlel{name = <<"items">>,
+ attrs = nodeAttr(Node),
+ children =
+ itemsEls([LastItem])}],
+- ModifNow, ModifUSR),
+- case is_tuple(Host) of
+- false ->
+- ejabberd_router:route(service_jid(Host),
+- jlib:make_jid(LJID), Stanza);
+- true ->
+- case ejabberd_sm:get_session_pid(U, S, R) of
+- C2SPid when is_pid(C2SPid) ->
+- ejabberd_c2s:broadcast(C2SPid,
+- {pep_message,
+- <<((Node))/binary, "+notify">>},
+- _Sender = service_jid(Host),
+- Stanza);
+- _ -> ok
+- end
++ ModifNow, ModifUSR)
++ end,
++ case is_tuple(Host) of
++ false ->
++ ejabberd_router:route(service_jid(Host),
++ jlib:make_jid(LJID), Stanza);
++ true ->
++ case ejabberd_sm:get_session_pid(U, S, R) of
++ C2SPid when is_pid(C2SPid) ->
++ ejabberd_c2s:broadcast(C2SPid,
++ {pep_message,
++ <<((Node))/binary, "+notify">>},
++ _Sender = service_jid(Host), Stanza);
++ _ -> ok
+ end
+ end;
+ send_items(Host, Node, NodeId, Type, {U, S, R} = LJID,
+@@ -3436,9 +3074,8 @@
+ case Entities of
+ error -> {error, ?ERR_BAD_REQUEST};
+ _ ->
+- Action = fun (#pubsub_node{owners = Owners, type = Type,
+- id = NodeId} =
+- N) ->
++ Action = fun (#pubsub_node{type = Type, id = NodeId} = N) ->
++ Owners = node_owners_call(Type, NodeId),
+ case lists:member(Owner, Owners) of
+ true ->
+ OwnerJID = jlib:make_jid(Owner),
+@@ -3545,10 +3182,10 @@
+ end.
+
+ read_sub(Subscriber, Node, NodeID, SubID, Lang) ->
+- case pubsub_subscription:get_subscription(Subscriber, NodeID, SubID) of
++ case pubsub_subscription_odbc:get_subscription(Subscriber, NodeID, SubID) of
+ {result, #pubsub_subscription{options = Options}} ->
+ {result, XdataEl} =
+- pubsub_subscription:get_options_xform(Lang, Options),
++ pubsub_subscription_odbc:get_options_xform(Lang, Options),
+ OptionsEl = #xmlel{name = <<"options">>,
+ attrs =
+ [{<<"jid">>, jlib:jid_to_string(Subscriber)},
+@@ -3593,7 +3230,7 @@
+ end.
+
+ set_options_helper(Configuration, JID, NodeID, SubID, Type) ->
+- SubOpts = case pubsub_subscription:parse_options_xform(Configuration) of
++ SubOpts = case pubsub_subscription_odbc:parse_options_xform(Configuration) of
+ {result, GoodSubOpts} -> GoodSubOpts;
+ _ -> invalid
+ end,
+@@ -3629,7 +3266,7 @@
+ write_sub(_Subscriber, _NodeID, _SubID, []) ->
+ {result, []};
+ write_sub(Subscriber, NodeID, SubID, Options) ->
+- case pubsub_subscription:set_subscription(Subscriber,
++ case pubsub_subscription_odbc:set_subscription(Subscriber,
+ NodeID, SubID, Options)
+ of
+ {result, _} -> {result, []};
+@@ -3854,8 +3491,8 @@
+ ejabberd_router:route(service_jid(Host),
+ jlib:make_jid(JID), Stanza)
+ end,
+- Action = fun (#pubsub_node{owners = Owners, type = Type,
+- id = NodeId}) ->
++ Action = fun (#pubsub_node{type = Type, id = NodeId}) ->
++ Owners = node_owners_call(Type, NodeId),
+ case lists:member(Owner, Owners) of
+ true ->
+ Result = lists:foldl(fun ({JID, Subscription,
+@@ -4347,7 +3984,7 @@
+ [{Depth, [{N, sub_with_options(N)} || N <- Nodes]}
+ || {Depth, Nodes} <- Collection]}
+ end,
+- case transaction(Action, sync_dirty) of
++ case transaction(Host, Action, sync_dirty) of
+ {result, CollSubs} ->
+ subscribed_nodes_by_jid(NotifyType, CollSubs);
+ _ -> []
+@@ -4443,10 +4080,10 @@
+ end.
+
+ sub_with_options(JID, NodeId, SubId) ->
+- case pubsub_subscription:read_subscription(JID, NodeId,
++ case pubsub_subscription_odbc:get_subscription(JID, NodeId,
+ SubId)
+ of
+- #pubsub_subscription{options = Options} ->
++ {result, #pubsub_subscription{options = Options}} ->
+ {JID, SubId, Options};
+ _ -> {JID, SubId, []}
+ end.
+@@ -4537,6 +4174,28 @@
+ Result -> Result
+ end.
+
++node_owners(Host, Type, NodeId) ->
++ case node_action(Host, Type, get_node_affiliations,
++ [NodeId])
++ of
++ {result, Affiliations} ->
++ lists:foldl(fun ({LJID, owner}, Acc) -> [LJID | Acc];
++ (_, Acc) -> Acc
++ end,
++ [], Affiliations);
++ _ -> []
++ end.
++
++node_owners_call(Type, NodeId) ->
++ case node_call(Type, get_node_affiliations, [NodeId]) of
++ {result, Affiliations} ->
++ lists:foldl(fun ({LJID, owner}, Acc) -> [LJID | Acc];
++ (_, Acc) -> Acc
++ end,
++ [], Affiliations);
++ _ -> []
++ end.
++
+ max_items(Host, Options) ->
+ case get_option(Options, persist_items) of
+ true ->
+@@ -5024,7 +4683,14 @@
+ tree_action(Host, Function, Args) ->
+ ?DEBUG("tree_action ~p ~p ~p", [Host, Function, Args]),
+ Fun = fun () -> tree_call(Host, Function, Args) end,
+- catch mnesia:sync_dirty(Fun).
++ case catch ejabberd_odbc:sql_bloc(odbc_conn(Host), Fun)
++ of
++ {atomic, Result} -> Result;
++ {aborted, Reason} ->
++ ?ERROR_MSG("transaction return internal error: ~p~n",
++ [{aborted, Reason}]),
++ {error, ?ERR_INTERNAL_SERVER_ERROR}
++ end.
+
+ node_call(Type, Function, Args) ->
+ ?DEBUG("node_call ~p ~p ~p", [Type, Function, Args]),
+@@ -5048,12 +4714,12 @@
+ node_action(Host, Type, Function, Args) ->
+ ?DEBUG("node_action ~p ~p ~p ~p",
+ [Host, Type, Function, Args]),
+- transaction(fun () -> node_call(Type, Function, Args)
++ transaction(Host, fun () -> node_call(Type, Function, Args)
+ end,
+ sync_dirty).
+
+ transaction(Host, Node, Action, Trans) ->
+- transaction(fun () ->
++ transaction(Host, fun () ->
+ case tree_call(Host, get_node, [Host, Node]) of
+ N when is_record(N, pubsub_node) ->
+ case Action(N) of
+@@ -5067,16 +4733,24 @@
+ end,
+ Trans).
+
+-transaction(Host, Action, Trans) ->
+- transaction(fun () ->
++transaction_on_nodes(Host, Action, Trans) ->
++ transaction(Host,
++ fun () ->
+ {result,
+ lists:foldl(Action, [],
+ tree_call(Host, get_nodes, [Host]))}
+ end,
+ Trans).
+
+-transaction(Fun, Trans) ->
+- case catch mnesia:Trans(Fun) of
++transaction(Host, Fun, Trans) ->
++ transaction_retry(Host, Fun, Trans, 2).
++
++transaction_retry(Host, Fun, Trans, Count) ->
++ SqlFun = case Trans of
++ transaction -> sql_transaction;
++ _ -> sql_bloc
++ end,
++ case catch ejabberd_odbc:SqlFun(odbc_conn(Host), Fun) of
+ {result, Result} -> {result, Result};
+ {error, Error} -> {error, Error};
+ {atomic, {result, Result}} -> {result, Result};
+@@ -5085,6 +4759,16 @@
+ ?ERROR_MSG("transaction return internal error: ~p~n",
+ [{aborted, Reason}]),
+ {error, ?ERR_INTERNAL_SERVER_ERROR};
++ {'EXIT', {timeout, _} = Reason} ->
++ case Count of
++ 0 ->
++ ?ERROR_MSG("transaction return internal error: ~p~n",
++ [{'EXIT', Reason}]),
++ {error, ?ERR_INTERNAL_SERVER_ERROR};
++ N ->
++ erlang:yield(),
++ transaction_retry(Host, Fun, Trans, N - 1)
++ end;
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("transaction return internal error: ~p~n",
+ [{'EXIT', Reason}]),
+@@ -5095,6 +4779,20 @@
+ {error, ?ERR_INTERNAL_SERVER_ERROR}
+ end.
+
++odbc_conn({_U, Host, _R}) -> Host;
++odbc_conn(Host) ->
++ lists:dropwhile(fun (A) -> A /= $. end, Host) --
++ <<".">>.
++
++-spec(escape/1 ::
++(
++ JID :: ljid() | binary())
++ -> binary()
++).
++escape({_U, _H, _R} = JID) ->
++ ejabberd_odbc:escape(jlib:jid_to_string(JID));
++escape(Value) -> ejabberd_odbc:escape(Value).
++
+ %%%% helpers
+
+ extended_error(Error, Ext) ->