aboutsummaryrefslogtreecommitdiff
path: root/src/pubsub_odbc.patch
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2013-04-08 11:12:54 +0200
committerChristophe Romain <christophe.romain@process-one.net>2013-06-13 11:11:02 +0200
commit4d8f7706240a1603468968f47fc7b150b788d62f (patch)
tree92d55d789cc7ac979b3c9e161ffb7f908eba043a /src/pubsub_odbc.patch
parentFix Guide: ejabberd_service expects a shaper_rule, not a shaper (diff)
Switch to rebar build tool
Use dynamic Rebar configuration Make iconv dependency optional Disable transient_supervisors compile option Add hipe compilation support Only compile ibrowse and lhttpc when needed Make it possible to generate an OTP application release Add --enable-debug compile option Add --enable-all compiler option Add --enable-tools configure option Add --with-erlang configure option. Add --enable-erlang-version-check configure option. Add lager support Improve the test suite
Diffstat (limited to 'src/pubsub_odbc.patch')
-rw-r--r--src/pubsub_odbc.patch1198
1 files changed, 1198 insertions, 0 deletions
diff --git a/src/pubsub_odbc.patch b/src/pubsub_odbc.patch
new file mode 100644
index 000000000..fdde477c0
--- /dev/null
+++ b/src/pubsub_odbc.patch
@@ -0,0 +1,1198 @@
+--- mod_pubsub.erl 2013-06-06 11:08:12.333599362 +0200
++++ mod_pubsub_odbc.erl 2013-06-06 11:31:06.640173557 +0200
+@@ -43,7 +43,7 @@
+ %%% 6.2.3.1, 6.2.3.5, and 6.3. For information on subscription leases see
+ %%% XEP-0060 section 12.18.
+
+--module(mod_pubsub).
++-module(mod_pubsub_odbc).
+
+ -author('christophe.romain@process-one.net').
+
+@@ -61,11 +61,11 @@
+
+ -include("pubsub.hrl").
+
+--define(STDTREE, <<"tree">>).
++-define(STDTREE, <<"tree_odbc">>).
+
+--define(STDNODE, <<"flat">>).
++-define(STDNODE, <<"flat_odbc">>).
+
+--define(PEPNODE, <<"pep">>).
++-define(PEPNODE, <<"pep_odbc">>).
+
+ %% exports for hooks
+ -export([presence_probe/3, caps_update/3,
+@@ -100,7 +100,7 @@
+ -export([subscription_to_string/1, affiliation_to_string/1,
+ string_to_subscription/1, string_to_affiliation/1,
+ extended_error/2, extended_error/3,
+- rename_default_nodeplugin/0]).
++ escape/1]).
+
+ %% API and gen_server callbacks
+ -export([start_link/2, start/2, stop/1, init/1,
+@@ -110,7 +110,7 @@
+ %% calls for parallel sending of last items
+ -export([send_loop/1]).
+
+--define(PROCNAME, ejabberd_mod_pubsub).
++-define(PROCNAME, ejabberd_mod_pubsub_odbc).
+
+ -define(LOOPNAME, ejabberd_mod_pubsub_loop).
+
+@@ -349,8 +349,6 @@
+ false -> ok
+ end,
+ ejabberd_router:register_route(Host),
+- update_node_database(Host, ServerHost),
+- update_state_database(Host, ServerHost),
+ put(server_host, ServerHost),
+ init_nodes(Host, ServerHost, NodeTree, Plugins),
+ State = #state{host = Host, server_host = ServerHost,
+@@ -423,359 +421,14 @@
+ ok.
+
+ init_nodes(Host, ServerHost, _NodeTree, Plugins) ->
+- case lists:member(<<"hometree">>, Plugins) of
++ case lists:member(<<"hometree_odbc">>, Plugins) of
+ true ->
+- create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree">>),
++ create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree_odbc">>),
+ create_node(Host, ServerHost, <<"/home/", ServerHost/binary>>, service_jid(Host),
+- <<"hometree">>);
++ <<"hometree_odbc">>);
+ false -> ok
+ end.
+
+-update_node_database(Host, ServerHost) ->
+- mnesia:del_table_index(pubsub_node, type),
+- mnesia:del_table_index(pubsub_node, parentid),
+- case catch mnesia:table_info(pubsub_node, attributes) of
+- [host_node, host_parent, info] ->
+- ?INFO_MSG("upgrade node pubsub tables", []),
+- F = fun () ->
+- {Result, LastIdx} = lists:foldl(fun ({pubsub_node,
+- NodeId, ParentId,
+- {nodeinfo, Items,
+- Options,
+- Entities}},
+- {RecList,
+- NodeIdx}) ->
+- ItemsList =
+- lists:foldl(fun
+- ({item,
+- IID,
+- Publisher,
+- Payload},
+- Acc) ->
+- C =
+- {unknown,
+- Publisher},
+- M =
+- {now(),
+- Publisher},
+- mnesia:write(#pubsub_item{itemid
+- =
+- {IID,
+- NodeIdx},
+- creation
+- =
+- C,
+- modification
+- =
+- M,
+- payload
+- =
+- Payload}),
+- [{Publisher,
+- IID}
+- | Acc]
+- end,
+- [],
+- Items),
+- Owners =
+- dict:fold(fun
+- (JID,
+- {entity,
+- Aff,
+- Sub},
+- Acc) ->
+- UsrItems =
+- lists:foldl(fun
+- ({P,
+- I},
+- IAcc) ->
+- case
+- P
+- of
+- JID ->
+- [I
+- | IAcc];
+- _ ->
+- IAcc
+- end
+- end,
+- [],
+- ItemsList),
+- mnesia:write({pubsub_state,
+- {JID,
+- NodeIdx},
+- UsrItems,
+- Aff,
+- Sub}),
+- case
+- Aff
+- of
+- owner ->
+- [JID
+- | Acc];
+- _ ->
+- Acc
+- end
+- end,
+- [],
+- Entities),
+- mnesia:delete({pubsub_node,
+- NodeId}),
+- {[#pubsub_node{nodeid
+- =
+- NodeId,
+- id
+- =
+- NodeIdx,
+- parents
+- =
+- [element(2,
+- ParentId)],
+- owners
+- =
+- Owners,
+- options
+- =
+- Options}
+- | RecList],
+- NodeIdx + 1}
+- end,
+- {[], 1},
+- mnesia:match_object({pubsub_node,
+- {Host,
+- '_'},
+- '_',
+- '_'})),
+- mnesia:write(#pubsub_index{index = node, last = LastIdx,
+- free = []}),
+- Result
+- end,
+- {atomic, NewRecords} = mnesia:transaction(F),
+- {atomic, ok} = mnesia:delete_table(pubsub_node),
+- {atomic, ok} = mnesia:create_table(pubsub_node,
+- [{disc_copies, [node()]},
+- {attributes,
+- record_info(fields,
+- pubsub_node)}]),
+- FNew = fun () ->
+- lists:foreach(fun (Record) -> mnesia:write(Record) end,
+- NewRecords)
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Result} ->
+- ?INFO_MSG("Pubsub node tables updated correctly: ~p",
+- [Result]);
+- {aborted, Reason} ->
+- ?ERROR_MSG("Problem updating Pubsub node tables:~n~p",
+- [Reason])
+- end;
+- [nodeid, parentid, type, owners, options] ->
+- F = fun ({pubsub_node, NodeId, {_, Parent}, Type,
+- Owners, Options}) ->
+- #pubsub_node{nodeid = NodeId, id = 0,
+- parents = [Parent], type = Type,
+- owners = Owners, options = Options}
+- end,
+- mnesia:transform_table(pubsub_node, F,
+- [nodeid, id, parents, type, owners, options]),
+- FNew = fun () ->
+- LastIdx = lists:foldl(fun (#pubsub_node{nodeid =
+- NodeId} =
+- PubsubNode,
+- NodeIdx) ->
+- mnesia:write(PubsubNode#pubsub_node{id
+- =
+- NodeIdx}),
+- lists:foreach(fun
+- (#pubsub_state{stateid
+- =
+- StateId} =
+- State) ->
+- {JID,
+- _} =
+- StateId,
+- mnesia:delete({pubsub_state,
+- StateId}),
+- mnesia:write(State#pubsub_state{stateid
+- =
+- {JID,
+- NodeIdx}})
+- end,
+- mnesia:match_object(#pubsub_state{stateid
+- =
+- {'_',
+- NodeId},
+- _
+- =
+- '_'})),
+- lists:foreach(fun
+- (#pubsub_item{itemid
+- =
+- ItemId} =
+- Item) ->
+- {IID,
+- _} =
+- ItemId,
+- {M1,
+- M2} =
+- Item#pubsub_item.modification,
+- {C1,
+- C2} =
+- Item#pubsub_item.creation,
+- mnesia:delete({pubsub_item,
+- ItemId}),
+- mnesia:write(Item#pubsub_item{itemid
+- =
+- {IID,
+- NodeIdx},
+- modification
+- =
+- {M2,
+- M1},
+- creation
+- =
+- {C2,
+- C1}})
+- end,
+- mnesia:match_object(#pubsub_item{itemid
+- =
+- {'_',
+- NodeId},
+- _
+- =
+- '_'})),
+- NodeIdx + 1
+- end,
+- 1,
+- mnesia:match_object({pubsub_node,
+- {Host, '_'},
+- '_', '_',
+- '_', '_',
+- '_'})
+- ++
+- mnesia:match_object({pubsub_node,
+- {{'_',
+- ServerHost,
+- '_'},
+- '_'},
+- '_', '_',
+- '_', '_',
+- '_'})),
+- mnesia:write(#pubsub_index{index = node,
+- last = LastIdx, free = []})
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Result} ->
+- rename_default_nodeplugin(),
+- ?INFO_MSG("Pubsub node tables updated correctly: ~p",
+- [Result]);
+- {aborted, Reason} ->
+- ?ERROR_MSG("Problem updating Pubsub node tables:~n~p",
+- [Reason])
+- end;
+- [nodeid, id, parent, type, owners, options] ->
+- F = fun ({pubsub_node, NodeId, Id, Parent, Type, Owners,
+- Options}) ->
+- #pubsub_node{nodeid = NodeId, id = Id,
+- parents = [Parent], type = Type,
+- owners = Owners, options = Options}
+- end,
+- mnesia:transform_table(pubsub_node, F,
+- [nodeid, id, parents, type, owners, options]),
+- rename_default_nodeplugin();
+- _ -> ok
+- end,
+- mnesia:transaction(fun () ->
+- case catch mnesia:first(pubsub_node) of
+- {_, L} when is_binary(L) ->
+- lists:foreach(fun ({H, N})
+- when is_binary(N) ->
+- [Node] =
+- mnesia:read({pubsub_node,
+- {H,
+- N}}),
+- Type =
+- Node#pubsub_node.type,
+- BN = element(2,
+- node_call(Type,
+- path_to_node,
+- [N])),
+- BP = case [element(2,
+- node_call(Type,
+- path_to_node,
+- [P]))
+- || P
+- <- Node#pubsub_node.parents]
+- of
+- [<<>>] -> [];
+- Parents ->
+- Parents
+- end,
+- mnesia:write(Node#pubsub_node{nodeid
+- =
+- {H,
+- BN},
+- parents
+- =
+- BP}),
+- mnesia:delete({pubsub_node,
+- {H,
+- N}});
+- (_) -> ok
+- end,
+- mnesia:all_keys(pubsub_node));
+- _ -> ok
+- end
+- end).
+-
+-rename_default_nodeplugin() ->
+- lists:foreach(fun (Node) ->
+- mnesia:dirty_write(Node#pubsub_node{type =
+- <<"hometree">>})
+- end,
+- mnesia:dirty_match_object(#pubsub_node{type =
+- <<"default">>,
+- _ = '_'})).
+-
+-update_state_database(_Host, _ServerHost) ->
+- case catch mnesia:table_info(pubsub_state, attributes) of
+- [stateid, items, affiliation, subscription] ->
+- ?INFO_MSG("upgrade state pubsub tables", []),
+- F = fun ({pubsub_state, {JID, NodeID}, Items, Aff, Sub}, Acc) ->
+- Subs = case Sub of
+- none ->
+- [];
+- _ ->
+- {result, SubID} = pubsub_subscription:subscribe_node(JID, NodeID, []),
+- [{Sub, SubID}]
+- end,
+- NewState = #pubsub_state{stateid = {JID, NodeID},
+- items = Items,
+- affiliation = Aff,
+- subscriptions = Subs},
+- [NewState | Acc]
+- end,
+- {atomic, NewRecs} = mnesia:transaction(fun mnesia:foldl/3,
+- [F, [], pubsub_state]),
+- {atomic, ok} = mnesia:delete_table(pubsub_state),
+- {atomic, ok} = mnesia:create_table(pubsub_state,
+- [{disc_copies, [node()]},
+- {attributes, record_info(fields, pubsub_state)}]),
+- FNew = fun () ->
+- lists:foreach(fun mnesia:write/1, NewRecs)
+- end,
+- case mnesia:transaction(FNew) of
+- {atomic, Result} ->
+- ?INFO_MSG("Pubsub state tables updated correctly: ~p",
+- [Result]);
+- {aborted, Reason} ->
+- ?ERROR_MSG("Problem updating Pubsub state tables:~n~p",
+- [Reason])
+- end;
+- _ ->
+- ok
+- end.
+-
+ send_loop(State) ->
+ receive
+ {presence, JID, Pid} ->
+@@ -784,11 +437,13 @@
+ LJID = jlib:jid_tolower(JID),
+ BJID = jlib:jid_remove_resource(LJID),
+ lists:foreach(fun (PType) ->
+- {result, Subscriptions} = node_action(Host,
++ {result, Subscriptions} = case catch node_action(Host,
+ PType,
+- get_entity_subscriptions,
+- [Host,
+- JID]),
++ get_entity_subscriptions_for_send_last,
++ [Host, JID]) of
++ {result, S} -> S;
++ _ -> []
++ end,
+ lists:foreach(fun ({Node, subscribed, _,
+ SubJID}) ->
+ if (SubJID == LJID) or
+@@ -800,24 +455,14 @@
+ type =
+ Type,
+ id =
+- NodeId,
+- options
+- =
+- Options} =
++ NodeId} =
+ Node,
+- case
+- get_option(Options,
+- send_last_published_item)
+- of
+- on_sub_and_presence ->
+- send_items(H,
++ send_items(H,
+ N,
+ NodeId,
+ Type,
+ LJID,
+ last);
+- _ -> ok
+- end;
+ true ->
+ % resource not concerned about that subscription
+ ok
+@@ -1008,7 +653,8 @@
+ children = []}];
+ disco_identity(Host, Node, From) ->
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
+ case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of
+ {result, _} ->
+ {result,
+@@ -1060,7 +706,8 @@
+ || Feature <- features(<<"pep">>)]];
+ disco_features(Host, Node, From) ->
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
+ case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of
+ {result, _} ->
+ {result,
+@@ -1105,9 +752,9 @@
+ ).
+ disco_items(Host, <<>>, From) ->
+ Action = fun (#pubsub_node{nodeid = {_, NodeID},
+- options = Options, type = Type, id = Idx,
+- owners = Owners},
++ options = Options, type = Type, id = Idx},
+ Acc) ->
++ Owners = node_owners_call(Type, Idx),
+ case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of
+ {result, _} ->
+ [#xmlel{name = <<"item">>,
+@@ -1128,13 +775,14 @@
+ _ -> Acc
+ end
+ end,
+- case transaction(Host, Action, sync_dirty) of
++ case transaction_on_nodes(Host, Action, sync_dirty) of
+ {result, Items} -> Items;
+ _ -> []
+ end;
+ disco_items(Host, Node, From) ->
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
+ case get_allowed_items_call(Host, Idx, From, Type,
+ Options, Owners)
+ of
+@@ -1238,9 +886,6 @@
+ lists:foreach(fun ({#pubsub_node{options
+ =
+ Options,
+- owners
+- =
+- Owners,
+ id =
+ NodeId},
+ subscribed, _,
+@@ -1252,7 +897,7 @@
+ presence ->
+ case
+ lists:member(BJID,
+- Owners)
++ node_owners(Host, PType, NodeId))
+ of
+ true ->
+ node_action(Host,
+@@ -1512,7 +1157,8 @@
+ IQ ->
+ #xmlel{attrs = QAttrs} = SubEl,
+ Node = xml:get_attr_s(<<"node">>, QAttrs),
+- Res = case iq_disco_items(Host, Node, From) of
++ Rsm = jlib:rsm_decode(IQ),
++ Res = case iq_disco_items(Host, Node, From, Rsm) of
+ {result, IQRes} ->
+ jlib:iq_to_xml(IQ#iq{type = result,
+ sub_el =
+@@ -1639,7 +1285,7 @@
+ % [] ->
+ % [<<"leaf">>]; %% No sub-nodes: it's a leaf node
+ % _ ->
+-% case node_call(Type, get_items, [NodeId, From]) of
++% case node_call(Type, get_items, [NodeId, From, none]) of
+ % {result, []} -> [<<"collection">>];
+ % {result, _} -> [<<"leaf">>, <<"collection">>];
+ % _ -> []
+@@ -1661,7 +1307,11 @@
+ % [#xmlel{name = <<"feature">>,
+ % attrs = [{<<"var">>, ?NS_PUBSUB}],
+ % children = []}
+-% | lists:map(fun (T) ->
++% | lists:map(fun
++% (<<"rsm">>)->
++% #xmlel{name = <<"feature">>,
++% attrs = [{<<"var">>, ?NS_RSM}]};
++% (T) ->
+ % #xmlel{name = <<"feature">>,
+ % attrs =
+ % [{<<"var">>,
+@@ -1686,7 +1336,7 @@
+ [] -> [<<"leaf">>];
+ _ ->
+ case node_call(Type, get_items,
+- [NodeId, From])
++ [NodeId, From, none])
+ of
+ {result, []} ->
+ [<<"collection">>];
+@@ -1708,7 +1358,11 @@
+ F = [#xmlel{name = <<"feature">>,
+ attrs = [{<<"var">>, ?NS_PUBSUB}],
+ children = []}
+- | lists:map(fun (T) ->
++ | lists:map(fun
++ (<<"rsm">>)->
++ #xmlel{name = <<"feature">>,
++ attrs = [{<<"var">>, ?NS_RSM}]};
++ (T) ->
+ #xmlel{name = <<"feature">>,
+ attrs =
+ [{<<"var">>,
+@@ -1752,7 +1406,11 @@
+ #xmlel{name = <<"feature">>,
+ attrs = [{<<"var">>, ?NS_VCARD}], children = []}]
+ ++
+- lists:map(fun (Feature) ->
++ lists:map(fun
++ (<<"rsm">>)->
++ #xmlel{name = <<"feature">>,
++ attrs = [{<<"var">>, ?NS_RSM}]};
++ (Feature) ->
+ #xmlel{name = <<"feature">>,
+ attrs =
+ [{<<"var">>, <<(?NS_PUBSUB)/binary, "#", Feature/binary>>}],
+@@ -1765,14 +1423,15 @@
+ _ -> node_disco_info(Host, Node, From)
+ end.
+
+--spec(iq_disco_items/3 ::
++-spec(iq_disco_items/4 ::
+ (
+ Host :: mod_pubsub:host(),
+ NodeId :: <<>> | mod_pubsub:nodeId(),
+- From :: jid())
++ From :: jid(),
++ Rsm :: any())
+ -> {result, [xmlel()]}
+ ).
+-iq_disco_items(Host, <<>>, From) ->
++iq_disco_items(Host, <<>>, From, _RSM) ->
+ {result,
+ lists:map(fun (#pubsub_node{nodeid = {_, SubNode},
+ options = Options}) ->
+@@ -1809,7 +1468,7 @@
+ % Nodes)};
+ % Other -> Other
+ % end;
+-iq_disco_items(Host, ?NS_COMMANDS, _From) ->
++iq_disco_items(Host, ?NS_COMMANDS, _From, _RSM) ->
+ CommandItems = [#xmlel{name = <<"item">>,
+ attrs =
+ [{<<"jid">>, Host},
+@@ -1817,22 +1476,19 @@
+ {<<"name">>, <<"Get Pending">>}],
+ children = []}],
+ {result, CommandItems};
+-iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From) ->
++iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From, _RSM) ->
+ CommandItems = [], {result, CommandItems};
+-iq_disco_items(Host, Item, From) ->
++iq_disco_items(Host, Item, From, RSM) ->
+ case str:tokens(Item, <<"!">>) of
+ [_Node, _ItemID] -> {result, []};
+ [Node] ->
+ % Node = string_to_node(SNode),
+ Action = fun (#pubsub_node{id = Idx, type = Type,
+- options = Options, owners = Owners}) ->
+- NodeItems = case get_allowed_items_call(Host, Idx,
+- From, Type,
+- Options,
+- Owners)
+- of
++ options = Options}) ->
++ Owners = node_owners_call(Type, Idx),
++ {NodeItems, RsmOut} = case get_allowed_items_call(Host, Idx, From, Type, Options, Owners, RSM) of
+ {result, R} -> R;
+- _ -> []
++ _ -> {[], none}
+ end,
+ Nodes = lists:map(fun (#pubsub_node{nodeid =
+ {_, SubNode},
+@@ -1875,7 +1531,7 @@
+ children = []}
+ end,
+ NodeItems),
+- {result, Nodes ++ Items}
++ {result, Nodes ++ Items ++ jlib:rsm_encode(RsmOut)}
+ end,
+ case transaction(Host, Node, Action, sync_dirty) of
+ {result, {_, Result}} -> {result, Result};
+@@ -2026,7 +1682,8 @@
+ (_, Acc) -> Acc
+ end,
+ [], xml:remove_cdata(Els)),
+- get_items(Host, Node, From, SubId, MaxItems, ItemIDs);
++ RSM = jlib:rsm_decode(SubEl),
++ get_items(Host, Node, From, SubId, MaxItems, ItemIDs, RSM);
+ {get, <<"subscriptions">>} ->
+ get_subscriptions(Host, Node, From, Plugins);
+ {get, <<"affiliations">>} ->
+@@ -2061,7 +1718,9 @@
+ ).
+ iq_pubsub_owner(Host, ServerHost, From, IQType, SubEl, Lang) ->
+ #xmlel{children = SubEls} = SubEl,
+- Action = xml:remove_cdata(SubEls),
++ Action = lists:filter(fun(#xmlel{name = <<"set">>, _ = '_'}) -> false;
++ (_) -> true
++ end, xml:remove_cdata(SubEls)),
+ case Action of
+ [#xmlel{name = Name, attrs = Attrs, children = Els}] ->
+ Node = xml:get_attr_s(<<"node">>, Attrs),
+@@ -2195,7 +1854,8 @@
+ _ -> []
+ end
+ end,
+- case transaction(fun () ->
++ case transaction(Host,
++ fun () ->
+ {result, lists:flatmap(Tr, Plugins)}
+ end,
+ sync_dirty)
+@@ -2240,7 +1900,8 @@
+
+ %%% authorization handling
+
+-send_authorization_request(#pubsub_node{owners = Owners, nodeid = {Host, Node}},
++send_authorization_request(#pubsub_node{nodeid = {Host, Node},
++ type = Type, id = NodeId},
+ Subscriber) ->
+ Lang = <<"en">>,
+ Stanza = #xmlel{name = <<"message">>, attrs = [],
+@@ -2318,7 +1979,7 @@
+ ejabberd_router:route(service_jid(Host),
+ jlib:make_jid(Owner), Stanza)
+ end,
+- Owners).
++ node_owners(Host, Type, NodeId)).
+
+ find_authorization_response(Packet) ->
+ #xmlel{children = Els} = Packet,
+@@ -2383,11 +2044,11 @@
+ <<"true">> -> true;
+ _ -> false
+ end,
+- Action = fun (#pubsub_node{type = Type, owners = Owners,
++ Action = fun (#pubsub_node{type = Type,
+ id = NodeId}) ->
+ IsApprover =
+ lists:member(jlib:jid_tolower(jlib:jid_remove_resource(From)),
+- Owners),
++ node_owners_call(Type, NodeId)),
+ {result, Subscriptions} = node_call(Type,
+ get_subscriptions,
+ [NodeId,
+@@ -2642,7 +2303,7 @@
+ children = [#xmlel{name = <<"create">>,
+ attrs = nodeAttr(Node),
+ children = []}]}],
+- case transaction(CreateNode, transaction) of
++ case transaction(Host, CreateNode, transaction) of
+ {result, {NodeId, SubsByDepth, {Result, broadcast}}} ->
+ broadcast_created_node(Host, Node, NodeId, Type, NodeOptions, SubsByDepth),
+ ejabberd_hooks:run(pubsub_create_node, ServerHost, [ServerHost, Host, Node, NodeId, NodeOptions]),
+@@ -2779,7 +2440,7 @@
+ %%</ul>
+ subscribe_node(Host, Node, From, JID, Configuration) ->
+ SubOpts = case
+- pubsub_subscription:parse_options_xform(Configuration)
++ pubsub_subscription_odbc:parse_options_xform(Configuration)
+ of
+ {result, GoodSubOpts} -> GoodSubOpts;
+ _ -> invalid
+@@ -2793,7 +2454,7 @@
+ end
+ end,
+ Action = fun (#pubsub_node{options = Options,
+- owners = Owners, type = Type, id = NodeId}) ->
++ type = Type, id = NodeId}) ->
+ Features = features(Type),
+ SubscribeFeature = lists:member(<<"subscribe">>, Features),
+ OptionsFeature = lists:member(<<"subscription-options">>, Features),
+@@ -2802,6 +2463,7 @@
+ AccessModel = get_option(Options, access_model),
+ SendLast = get_option(Options, send_last_published_item),
+ AllowedGroups = get_option(Options, roster_groups_allowed, []),
++ Owners = node_owners_call(Type, NodeId),
+ {PresenceSubscription, RosterGroup} =
+ get_presence_and_roster_permissions(Host, Subscriber,
+ Owners, AccessModel, AllowedGroups),
+@@ -2956,12 +2618,9 @@
+ Features = features(Type),
+ PublishFeature = lists:member(<<"publish">>, Features),
+ PublishModel = get_option(Options, publish_model),
++ MaxItems = max_items(Host, Options),
+ DeliverPayloads = get_option(Options, deliver_payloads),
+ PersistItems = get_option(Options, persist_items),
+- MaxItems = case PersistItems of
+- false -> 0;
+- true -> max_items(Host, Options)
+- end,
+ PayloadCount = payload_xmlelements(Payload),
+ PayloadSize = byte_size(term_to_binary(Payload)) - 2,
+ PayloadMaxSize = get_option(Options, max_payload_size),
+@@ -3017,7 +2676,7 @@
+ false ->
+ ok
+ end,
+- set_cached_item(Host, NodeId, ItemId, Publisher, Payload),
++ set_cached_item(Host, NodeId, ItemId, Publisher, Payload),
+ case Result of
+ default -> {result, Reply};
+ _ -> {result, Result}
+@@ -3210,19 +2869,20 @@
+ %% <p>The permission are not checked in this function.</p>
+ %% @todo We probably need to check that the user doing the query has the right
+ %% to read the items.
+--spec(get_items/6 ::
++-spec(get_items/7 ::
+ (
+ Host :: mod_pubsub:host(),
+ Node :: mod_pubsub:nodeId(),
+ From :: jid(),
+ SubId :: mod_pubsub:subId(),
+ SMaxItems :: binary(),
+- ItemIDs :: [mod_pubsub:itemId()])
++ ItemIDs :: [mod_pubsub:itemId()],
++ Rsm :: any())
+ -> {result, [xmlel(),...]}
+ %%%
+ | {error, xmlel()}
+ ).
+-get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) ->
++get_items(Host, Node, From, SubId, SMaxItems, ItemIDs, RSM) ->
+ MaxItems = if SMaxItems == <<"">> ->
+ get_max_items_node(Host);
+ true ->
+@@ -3234,13 +2894,13 @@
+ case MaxItems of
+ {error, Error} -> {error, Error};
+ _ ->
+- Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId,
+- owners = Owners}) ->
++ Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId}) ->
+ Features = features(Type),
+ RetreiveFeature = lists:member(<<"retrieve-items">>, Features),
+ PersistentFeature = lists:member(<<"persistent-items">>, Features),
+ AccessModel = get_option(Options, access_model),
+ AllowedGroups = get_option(Options, roster_groups_allowed, []),
++ Owners = node_owners_call(Type, NodeId),
+ {PresenceSubscription, RosterGroup} =
+ get_presence_and_roster_permissions(Host, From, Owners,
+ AccessModel, AllowedGroups),
+@@ -3258,11 +2918,11 @@
+ node_call(Type, get_items,
+ [NodeId, From, AccessModel,
+ PresenceSubscription, RosterGroup,
+- SubId])
++ SubId, RSM])
+ end
+ end,
+ case transaction(Host, Node, Action, sync_dirty) of
+- {result, {_, Items}} ->
++ {result, {_, {Items, RSMOut}}} ->
+ SendItems = case ItemIDs of
+ [] -> Items;
+ _ ->
+@@ -3280,8 +2940,8 @@
+ children =
+ [#xmlel{name = <<"items">>, attrs = nodeAttr(Node),
+ children =
+- itemsEls(lists:sublist(SendItems,
+- MaxItems))}]}]};
++ itemsEls(lists:sublist(SendItems, MaxItems))}
++ | jlib:rsm_encode(RSMOut)]}]};
+ Error -> Error
+ end
+ end.
+@@ -3305,13 +2965,18 @@
+ end.
+
+ get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners) ->
++ case get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners, none) of
++ {result, {I, _}} -> {result, I};
++ Error -> Error
++ end.
++get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners, RSM) ->
+ AccessModel = get_option(Options, access_model),
+ AllowedGroups = get_option(Options, roster_groups_allowed, []),
+ {PresenceSubscription, RosterGroup} =
+ get_presence_and_roster_permissions(Host, From, Owners, AccessModel,
+ AllowedGroups),
+ node_call(Type, get_items,
+- [NodeIdx, From, AccessModel, PresenceSubscription, RosterGroup, undefined]).
++ [NodeIdx, From, AccessModel, PresenceSubscription, RosterGroup, undefined, RSM]).
+
+ %% @spec (Host, Node, NodeId, Type, LJID, Number) -> any()
+ %% Host = pubsubHost()
+@@ -3322,35 +2987,32 @@
+ %% Number = last | integer()
+ %% @doc <p>Resend the items of a node to the user.</p>
+ %% @todo use cache-last-item feature
+-send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, last) ->
+- case get_cached_item(Host, NodeId) of
+- undefined ->
+- send_items(Host, Node, NodeId, Type, LJID, 1);
++send_items(Host, Node, NodeId, Type, LJID, last) ->
++ Stanza = case get_cached_item(Host, NodeId) of
++ undefined ->
++ % special ODBC optimization, works only with node_hometree_odbc, node_flat_odbc and node_pep_odbc
++ case node_action(Host, Type, get_last_items, [NodeId, LJID, 1]) of
++ {result, [LastItem]} ->
++ {ModifNow, ModifUSR} = LastItem#pubsub_item.modification,
++ event_stanza_with_delay(
++ [#xmlel{name = <<"items">>, attrs = nodeAttr(Node),
++ children = itemsEls([LastItem])}], ModifNow, ModifUSR);
++ _ ->
++ event_stanza(
++ [#xmlel{name = <<"items">>, attrs = nodeAttr(Node),
++ children = itemsEls([])}])
++ end;
+ LastItem ->
+ {ModifNow, ModifUSR} =
+ LastItem#pubsub_item.modification,
+- Stanza = event_stanza_with_delay([#xmlel{name =
++ event_stanza_with_delay([#xmlel{name =
+ <<"items">>,
+ attrs = nodeAttr(Node),
+ children =
+ itemsEls([LastItem])}],
+- ModifNow, ModifUSR),
+- case is_tuple(Host) of
+- false ->
+- ejabberd_router:route(service_jid(Host),
+- jlib:make_jid(LJID), Stanza);
+- true ->
+- case ejabberd_sm:get_session_pid(U, S, R) of
+- C2SPid when is_pid(C2SPid) ->
+- ejabberd_c2s:broadcast(C2SPid,
+- {pep_message,
+- <<((Node))/binary, "+notify">>},
+- _Sender = service_jid(Host),
+- Stanza);
+- _ -> ok
+- end
+- end
+- end;
++ ModifNow, ModifUSR)
++ end,
++ ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza);
+ send_items(Host, Node, NodeId, Type, {U, S, R} = LJID,
+ Number) ->
+ ToSend = case node_action(Host, Type, get_items,
+@@ -3378,7 +3040,8 @@
+ attrs = nodeAttr(Node),
+ children = itemsEls(ToSend)}])
+ end,
+- case is_tuple(Host) of
++ ejabberd_router:route(service_jid(Host), jlib:make_jid(LJID), Stanza).
++
+ %% @spec (Host, JID, Plugins) -> {error, Reason} | {result, Response}
+ %% Host = host()
+ %% JID = jid()
+@@ -3386,20 +3049,6 @@
+ %% Reason = stanzaError()
+ %% Response = [pubsubIQResponse()]
+ %% @doc <p>Return the list of affiliations as an XMPP response.</p>
+- false ->
+- ejabberd_router:route(service_jid(Host),
+- jlib:make_jid(LJID), Stanza);
+- true ->
+- case ejabberd_sm:get_session_pid(U, S, R) of
+- C2SPid when is_pid(C2SPid) ->
+- ejabberd_c2s:broadcast(C2SPid,
+- {pep_message,
+- <<((Node))/binary, "+notify">>},
+- _Sender = service_jid(Host), Stanza);
+- _ -> ok
+- end
+- end.
+-
+ -spec(get_affiliations/4 ::
+ (
+ Host :: mod_pubsub:host(),
+@@ -3586,9 +3235,10 @@
+ case Entities of
+ error -> {error, ?ERR_BAD_REQUEST};
+ _ ->
+- Action = fun (#pubsub_node{owners = Owners, type = Type,
++ Action = fun (#pubsub_node{type = Type,
+ id = NodeId} =
+ N) ->
++ Owners = node_owners_call(Type, NodeId),
+ case lists:member(Owner, Owners) of
+ true ->
+ OwnerJID = jlib:make_jid(Owner),
+@@ -3601,42 +3251,7 @@
+ _ -> Entities
+ end,
+ lists:foreach(fun ({JID, Affiliation}) ->
+- node_call(Type,
+- set_affiliation,
+- [NodeId, JID,
+- Affiliation]),
+- case Affiliation of
+- owner ->
+- NewOwner =
+- jlib:jid_tolower(jlib:jid_remove_resource(JID)),
+- NewOwners =
+- [NewOwner
+- | Owners],
+- tree_call(Host,
+- set_node,
+- [N#pubsub_node{owners
+- =
+- NewOwners}]);
+- none ->
+- OldOwner =
+- jlib:jid_tolower(jlib:jid_remove_resource(JID)),
+- case
+- lists:member(OldOwner,
+- Owners)
+- of
+- true ->
+- NewOwners =
+- Owners --
+- [OldOwner],
+- tree_call(Host,
+- set_node,
+- [N#pubsub_node{owners
+- =
+- NewOwners}]);
+- _ -> ok
+- end;
+- _ -> ok
+- end
++ node_call(Type, set_affiliation, [NodeId, JID, Affiliation])
+ end,
+ FilteredEntities),
+ {result, []};
+@@ -3695,11 +3310,11 @@
+ end.
+
+ read_sub(Subscriber, Node, NodeID, SubID, Lang) ->
+- case pubsub_subscription:get_subscription(Subscriber, NodeID, SubID) of
++ case pubsub_subscription_odbc:get_subscription(Subscriber, NodeID, SubID) of
+ {error, notfound} ->
+ {error, extended_error(?ERR_NOT_ACCEPTABLE, <<"invalid-subid">>)};
+ {result, #pubsub_subscription{options = Options}} ->
+- {result, XdataEl} = pubsub_subscription:get_options_xform(Lang, Options),
++ {result, XdataEl} = pubsub_subscription_odbc:get_options_xform(Lang, Options),
+ OptionsEl = #xmlel{name = <<"options">>,
+ attrs =
+ [{<<"jid">>, jlib:jid_to_string(Subscriber)},
+@@ -3733,7 +3348,7 @@
+ end.
+
+ set_options_helper(Configuration, JID, NodeID, SubID, Type) ->
+- SubOpts = case pubsub_subscription:parse_options_xform(Configuration) of
++ SubOpts = case pubsub_subscription_odbc:parse_options_xform(Configuration) of
+ {result, GoodSubOpts} -> GoodSubOpts;
+ _ -> invalid
+ end,
+@@ -3765,7 +3380,7 @@
+ write_sub(_Subscriber, _NodeID, _SubID, invalid) ->
+ {error, extended_error(?ERR_BAD_REQUEST, <<"invalid-options">>)};
+ write_sub(Subscriber, NodeID, SubID, Options) ->
+- case pubsub_subscription:set_subscription(Subscriber, NodeID, SubID, Options) of
++ case pubsub_subscription_odbc:set_subscription(Subscriber, NodeID, SubID, Options) of
+ {error, notfound} ->
+ {error, extended_error(?ERR_NOT_ACCEPTABLE, <<"invalid-subid">>)};
+ {result, _} ->
+@@ -3986,9 +3601,9 @@
+ ejabberd_router:route(service_jid(Host),
+ jlib:make_jid(JID), Stanza)
+ end,
+- Action = fun (#pubsub_node{owners = Owners, type = Type,
++ Action = fun (#pubsub_node{type = Type,
+ id = NodeId}) ->
+- case lists:member(Owner, Owners) of
++ case lists:member(Owner, node_owners_call(Type, NodeId)) of
+ true ->
+ Result = lists:foldl(fun ({JID, Subscription,
+ SubId},
+@@ -4405,7 +4020,7 @@
+ {Depth, [{N, get_node_subs(N)} || N <- Nodes]}
+ end, tree_call(Host, get_parentnodes_tree, [Host, Node, service_jid(Host)]))}
+ end,
+- case transaction(Action, sync_dirty) of
++ case transaction(Host, Action, sync_dirty) of
+ {result, CollSubs} -> CollSubs;
+ _ -> []
+ end.
+@@ -4419,9 +4034,9 @@
+
+ get_options_for_subs(NodeID, Subs) ->
+ lists:foldl(fun({JID, subscribed, SubID}, Acc) ->
+- case pubsub_subscription:read_subscription(JID, NodeID, SubID) of
++ case pubsub_subscription_odbc:get_subscription(JID, NodeID, SubID) of
+ {error, notfound} -> [{JID, SubID, []} | Acc];
+- #pubsub_subscription{options = Options} -> [{JID, SubID, Options} | Acc];
++ {result, #pubsub_subscription{options = Options}} -> [{JID, SubID, Options} | Acc];
+ _ -> Acc
+ end;
+ (_, Acc) ->
+@@ -5121,6 +4736,30 @@
+ _ -> features()
+ end.
+
++%% @spec (Host, Type, NodeId) -> [ljid()]
++%% NodeId = pubsubNodeId()
++%% @doc <p>Return list of node owners.</p>
++node_owners(Host, Type, NodeId) ->
++ case node_action(Host, Type, get_node_affiliations, [NodeId]) of
++ {result, Affiliations} ->
++ lists:foldl(
++ fun({LJID, owner}, Acc) -> [LJID|Acc];
++ (_, Acc) -> Acc
++ end, [], Affiliations);
++ _ ->
++ []
++ end.
++node_owners_call(Type, NodeId) ->
++ case node_call(Type, get_node_affiliations, [NodeId]) of
++ {result, Affiliations} ->
++ lists:foldl(
++ fun({LJID, owner}, Acc) -> [LJID|Acc];
++ (_, Acc) -> Acc
++ end, [], Affiliations);
++ _ ->
++ []
++ end.
++
+ %% @doc <p>node tree plugin call.</p>
+ tree_call({_User, Server, _Resource}, Function, Args) ->
+ tree_call(Server, Function, Args);
+@@ -5140,7 +4779,13 @@
+ tree_action(Host, Function, Args) ->
+ ?DEBUG("tree_action ~p ~p ~p", [Host, Function, Args]),
+ Fun = fun () -> tree_call(Host, Function, Args) end,
+- catch mnesia:sync_dirty(Fun).
++ case catch ejabberd_odbc:sql_bloc(odbc_conn(Host), Fun) of
++ {atomic, Result} ->
++ Result;
++ {aborted, Reason} ->
++ ?ERROR_MSG("transaction return internal error: ~p~n",[{aborted, Reason}]),
++ {error, ?ERR_INTERNAL_SERVER_ERROR}
++ end.
+
+ %% @doc <p>node plugin call.</p>
+ node_call(Type, Function, Args) ->
+@@ -5165,13 +4810,12 @@
+ node_action(Host, Type, Function, Args) ->
+ ?DEBUG("node_action ~p ~p ~p ~p",
+ [Host, Type, Function, Args]),
+- transaction(fun () -> node_call(Type, Function, Args)
+- end,
++ transaction(Host, fun () -> node_call(Type, Function, Args) end,
+ sync_dirty).
+
+ %% @doc <p>plugin transaction handling.</p>
+ transaction(Host, Node, Action, Trans) ->
+- transaction(fun () ->
++ transaction(Host, fun () ->
+ case tree_call(Host, get_node, [Host, Node]) of
+ N when is_record(N, pubsub_node) ->
+ case Action(N) of
+@@ -5185,16 +4829,22 @@
+ end,
+ Trans).
+
+-transaction(Host, Action, Trans) ->
+- transaction(fun () ->
++transaction_on_nodes(Host, Action, Trans) ->
++ transaction(Host, fun () ->
+ {result,
+ lists:foldl(Action, [],
+ tree_call(Host, get_nodes, [Host]))}
+ end,
+ Trans).
+
+-transaction(Fun, Trans) ->
+- case catch mnesia:Trans(Fun) of
++transaction(Host, Fun, Trans) ->
++ transaction_retry(Host, Fun, Trans, 2).
++transaction_retry(Host, Fun, Trans, Count) ->
++ SqlFun = case Trans of
++ transaction -> sql_transaction;
++ _ -> sql_bloc
++ end,
++ case catch ejabberd_odbc:SqlFun(odbc_conn(Host), Fun) of
+ {result, Result} -> {result, Result};
+ {error, Error} -> {error, Error};
+ {atomic, {result, Result}} -> {result, Result};
+@@ -5203,6 +4853,15 @@
+ ?ERROR_MSG("transaction return internal error: ~p~n",
+ [{aborted, Reason}]),
+ {error, ?ERR_INTERNAL_SERVER_ERROR};
++ {'EXIT', {timeout, _} = Reason} ->
++ case Count of
++ 0 ->
++ ?ERROR_MSG("transaction return internal error: ~p~n", [{'EXIT', Reason}]),
++ {error, ?ERR_INTERNAL_SERVER_ERROR};
++ N ->
++ erlang:yield(),
++ transaction_retry(Host, Fun, Trans, N-1)
++ end;
+ {'EXIT', Reason} ->
+ ?ERROR_MSG("transaction return internal error: ~p~n",
+ [{'EXIT', Reason}]),
+@@ -5213,6 +4872,16 @@
+ {error, ?ERR_INTERNAL_SERVER_ERROR}
+ end.
+
++odbc_conn({_U, Host, _R})-> Host;
++odbc_conn(<<$., Host/binary>>) -> Host;
++odbc_conn(<<_, Host/binary>>) -> odbc_conn(Host).
++
++%% escape value for database storage
++escape({_U, _H, _R}=JID)->
++ ejabberd_odbc:escape(jlib:jid_to_string(JID));
++escape(Value)->
++ ejabberd_odbc:escape(Value).
++
+ %%%% helpers
+
+ %% Add pubsub-specific error element