diff options
Diffstat (limited to 'src/pubsub_odbc.patch')
-rw-r--r-- | src/pubsub_odbc.patch | 1201 |
1 files changed, 1201 insertions, 0 deletions
diff --git a/src/pubsub_odbc.patch b/src/pubsub_odbc.patch new file mode 100644 index 000000000..435025d4a --- /dev/null +++ b/src/pubsub_odbc.patch @@ -0,0 +1,1201 @@ +--- mod_pubsub.erl 2012-10-01 10:05:28.172445600 +0200 ++++ mod_pubsub_odbc.erl 2012-10-01 11:47:29.225239163 +0200 +@@ -41,7 +41,7 @@ + %%% 6.2.3.1, 6.2.3.5, and 6.3. For information on subscription leases see + %%% XEP-0060 section 12.18. + +--module(mod_pubsub). ++-module(mod_pubsub_odbc). + + -author('christophe.romain@process-one.net'). + +@@ -59,11 +59,11 @@ + + -include("pubsub.hrl"). + +--define(STDTREE, <<"tree">>). ++-define(STDTREE, <<"tree_odbc">>). + +--define(STDNODE, <<"flat">>). ++-define(STDNODE, <<"flat_odbc">>). + +--define(PEPNODE, <<"pep">>). ++-define(PEPNODE, <<"pep_odbc">>). + + %% exports for hooks + -export([presence_probe/3, caps_update/3, +@@ -86,8 +86,7 @@ + %% general helpers for plugins + -export([subscription_to_string/1, affiliation_to_string/1, + string_to_subscription/1, string_to_affiliation/1, +- extended_error/2, extended_error/3, +- rename_default_nodeplugin/0]). ++ extended_error/2, extended_error/3, escape/1]). + + %% API and gen_server callbacks + -export([start_link/2, start/2, stop/1, init/1, +@@ -97,7 +96,7 @@ + %% calls for parallel sending of last items + -export([send_loop/1]). + +--define(PROCNAME, ejabberd_mod_pubsub). ++-define(PROCNAME, ejabberd_mod_pubsub_odbc). + + -define(LOOPNAME, ejabberd_mod_pubsub_loop). + +@@ -324,8 +323,6 @@ + false -> ok + end, + ejabberd_router:register_route(Host), +- update_node_database(Host, ServerHost), +- update_state_database(Host, ServerHost), + put(server_host, ServerHost), + init_nodes(Host, ServerHost, NodeTree, Plugins), + State = #state{host = Host, server_host = ServerHost, +@@ -385,421 +382,14 @@ + ok. + + init_nodes(Host, ServerHost, _NodeTree, Plugins) -> +- case lists:member(<<"hometree">>, Plugins) of ++ case lists:member(<<"hometree_odbc">>, Plugins) of + true -> +- create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree">>), +- create_node(Host, ServerHost, <<"/home/", ServerHost/binary>>, service_jid(Host), +- <<"hometree">>); ++ create_node(Host, ServerHost, <<"/home">>, service_jid(Host), <<"hometree_odbc">>), ++ create_node(Host, ServerHost, <<"/home/", ServerHost/binary>>, service_jid(Host), ++ <<"hometree_odbc">>); + false -> ok + end. + +-update_node_database(Host, ServerHost) -> +- mnesia:del_table_index(pubsub_node, type), +- mnesia:del_table_index(pubsub_node, parentid), +- case catch mnesia:table_info(pubsub_node, attributes) of +- [host_node, host_parent, info] -> +- ?INFO_MSG("upgrade node pubsub tables", []), +- F = fun () -> +- {Result, LastIdx} = lists:foldl(fun ({pubsub_node, +- NodeId, ParentId, +- {nodeinfo, Items, +- Options, +- Entities}}, +- {RecList, +- NodeIdx}) -> +- ItemsList = +- lists:foldl(fun +- ({item, +- IID, +- Publisher, +- Payload}, +- Acc) -> +- C = +- {unknown, +- Publisher}, +- M = +- {now(), +- Publisher}, +- mnesia:write(#pubsub_item{itemid +- = +- {IID, +- NodeIdx}, +- creation +- = +- C, +- modification +- = +- M, +- payload +- = +- Payload}), +- [{Publisher, +- IID} +- | Acc] +- end, +- [], +- Items), +- Owners = +- dict:fold(fun +- (JID, +- {entity, +- Aff, +- Sub}, +- Acc) -> +- UsrItems = +- lists:foldl(fun +- ({P, +- I}, +- IAcc) -> +- case +- P +- of +- JID -> +- [I +- | IAcc]; +- _ -> +- IAcc +- end +- end, +- [], +- ItemsList), +- mnesia:write({pubsub_state, +- {JID, +- NodeIdx}, +- UsrItems, +- Aff, +- Sub}), +- case +- Aff +- of +- owner -> +- [JID +- | Acc]; +- _ -> +- Acc +- end +- end, +- [], +- Entities), +- mnesia:delete({pubsub_node, +- NodeId}), +- {[#pubsub_node{nodeid +- = +- NodeId, +- id +- = +- NodeIdx, +- parents +- = +- [element(2, +- ParentId)], +- owners +- = +- Owners, +- options +- = +- Options} +- | RecList], +- NodeIdx + 1} +- end, +- {[], 1}, +- mnesia:match_object({pubsub_node, +- {Host, +- '_'}, +- '_', +- '_'})), +- mnesia:write(#pubsub_index{index = node, last = LastIdx, +- free = []}), +- Result +- end, +- {atomic, NewRecords} = mnesia:transaction(F), +- {atomic, ok} = mnesia:delete_table(pubsub_node), +- {atomic, ok} = mnesia:create_table(pubsub_node, +- [{disc_copies, [node()]}, +- {attributes, +- record_info(fields, +- pubsub_node)}]), +- FNew = fun () -> +- lists:foreach(fun (Record) -> mnesia:write(Record) end, +- NewRecords) +- end, +- case mnesia:transaction(FNew) of +- {atomic, Result} -> +- ?INFO_MSG("Pubsub node tables updated correctly: ~p", +- [Result]); +- {aborted, Reason} -> +- ?ERROR_MSG("Problem updating Pubsub node tables:~n~p", +- [Reason]) +- end; +- [nodeid, parentid, type, owners, options] -> +- F = fun ({pubsub_node, NodeId, {_, Parent}, Type, +- Owners, Options}) -> +- #pubsub_node{nodeid = NodeId, id = 0, +- parents = [Parent], type = Type, +- owners = Owners, options = Options} +- end, +- mnesia:transform_table(pubsub_node, F, +- [nodeid, id, parents, type, owners, options]), +- FNew = fun () -> +- LastIdx = lists:foldl(fun (#pubsub_node{nodeid = +- NodeId} = +- PubsubNode, +- NodeIdx) -> +- mnesia:write(PubsubNode#pubsub_node{id +- = +- NodeIdx}), +- lists:foreach(fun +- (#pubsub_state{stateid +- = +- StateId} = +- State) -> +- {JID, +- _} = +- StateId, +- mnesia:delete({pubsub_state, +- StateId}), +- mnesia:write(State#pubsub_state{stateid +- = +- {JID, +- NodeIdx}}) +- end, +- mnesia:match_object(#pubsub_state{stateid +- = +- {'_', +- NodeId}, +- _ +- = +- '_'})), +- lists:foreach(fun +- (#pubsub_item{itemid +- = +- ItemId} = +- Item) -> +- {IID, +- _} = +- ItemId, +- {M1, +- M2} = +- Item#pubsub_item.modification, +- {C1, +- C2} = +- Item#pubsub_item.creation, +- mnesia:delete({pubsub_item, +- ItemId}), +- mnesia:write(Item#pubsub_item{itemid +- = +- {IID, +- NodeIdx}, +- modification +- = +- {M2, +- M1}, +- creation +- = +- {C2, +- C1}}) +- end, +- mnesia:match_object(#pubsub_item{itemid +- = +- {'_', +- NodeId}, +- _ +- = +- '_'})), +- NodeIdx + 1 +- end, +- 1, +- mnesia:match_object({pubsub_node, +- {Host, '_'}, +- '_', '_', +- '_', '_', +- '_'}) +- ++ +- mnesia:match_object({pubsub_node, +- {{'_', +- ServerHost, +- '_'}, +- '_'}, +- '_', '_', +- '_', '_', +- '_'})), +- mnesia:write(#pubsub_index{index = node, +- last = LastIdx, free = []}) +- end, +- case mnesia:transaction(FNew) of +- {atomic, Result} -> +- rename_default_nodeplugin(), +- ?INFO_MSG("Pubsub node tables updated correctly: ~p", +- [Result]); +- {aborted, Reason} -> +- ?ERROR_MSG("Problem updating Pubsub node tables:~n~p", +- [Reason]) +- end; +- [nodeid, id, parent, type, owners, options] -> +- F = fun ({pubsub_node, NodeId, Id, Parent, Type, Owners, +- Options}) -> +- #pubsub_node{nodeid = NodeId, id = Id, +- parents = [Parent], type = Type, +- owners = Owners, options = Options} +- end, +- mnesia:transform_table(pubsub_node, F, +- [nodeid, id, parents, type, owners, options]), +- rename_default_nodeplugin(); +- _ -> ok +- end, +- mnesia:transaction(fun () -> +- case catch mnesia:first(pubsub_node) of +- {_, L} when is_binary(L) -> +- lists:foreach(fun ({H, N}) +- when is_binary(N) -> +- [Node] = +- mnesia:read({pubsub_node, +- {H, +- N}}), +- Type = +- Node#pubsub_node.type, +- BN = element(2, +- node_call(Type, +- path_to_node, +- [N])), +- BP = case [element(2, +- node_call(Type, +- path_to_node, +- [P])) +- || P +- <- Node#pubsub_node.parents] +- of +- [<<>>] -> []; +- Parents -> +- Parents +- end, +- mnesia:write(Node#pubsub_node{nodeid +- = +- {H, +- BN}, +- parents +- = +- BP}), +- mnesia:delete({pubsub_node, +- {H, +- N}}); +- (_) -> ok +- end, +- mnesia:all_keys(pubsub_node)); +- _ -> ok +- end +- end). +- +-rename_default_nodeplugin() -> +- lists:foreach(fun (Node) -> +- mnesia:dirty_write(Node#pubsub_node{type = +- <<"hometree">>}) +- end, +- mnesia:dirty_match_object(#pubsub_node{type = +- <<"default">>, +- _ = '_'})). +- +-update_state_database(_Host, _ServerHost) -> +- case catch mnesia:table_info(pubsub_state, attributes) +- of +- [stateid, items, affiliation, subscription] -> +- ?INFO_MSG("upgrade state pubsub tables", []), +- F = fun ({pubsub_state, {JID, NodeID}, Items, Aff, Sub}, +- Acc) -> +- Subs = case Sub of +- none -> []; +- _ -> +- {result, SubID} = +- pubsub_subscription:subscribe_node(JID, +- NodeID, +- []), +- [{Sub, SubID}] +- end, +- NewState = #pubsub_state{stateid = {JID, NodeID}, +- items = Items, affiliation = Aff, +- subscriptions = Subs}, +- [NewState | Acc] +- end, +- {atomic, NewRecs} = +- mnesia:transaction(fun mnesia:foldl/3, +- [F, [], pubsub_state]), +- {atomic, ok} = mnesia:delete_table(pubsub_state), +- {atomic, ok} = mnesia:create_table(pubsub_state, +- [{disc_copies, [node()]}, +- {attributes, +- record_info(fields, +- pubsub_state)}]), +- FNew = fun () -> +- lists:foreach(fun mnesia:write/1, NewRecs) +- end, +- case mnesia:transaction(FNew) of +- {atomic, Result} -> +- ?INFO_MSG("Pubsub state tables updated correctly: ~p", +- [Result]); +- {aborted, Reason} -> +- ?ERROR_MSG("Problem updating Pubsub state tables:~n~p", +- [Reason]) +- end; +- [stateid, items, affiliation, subscriptions] -> +- ?INFO_MSG("upgrade state pubsub table", []), +- F = fun ({pubsub_state, {JID, Nidx}, Items, Aff, Subs}, +- Acc) -> +- NewState = #pubsub_state{stateid = {JID, Nidx}, +- nodeidx = Nidx, items = Items, +- affiliation = Aff, +- subscriptions = Subs}, +- [NewState | Acc] +- end, +- {atomic, NewRecs} = +- mnesia:transaction(fun mnesia:foldl/3, +- [F, [], pubsub_state]), +- {atomic, ok} = mnesia:delete_table(pubsub_state), +- {atomic, ok} = mnesia:create_table(pubsub_state, +- [{disc_copies, [node()]}, +- {attributes, +- record_info(fields, +- pubsub_state)}]), +- FNew = fun () -> +- lists:foreach(fun mnesia:write/1, NewRecs) +- end, +- case mnesia:transaction(FNew) of +- {atomic, Res1} -> +- ?INFO_MSG("Pubsub state tables updated correctly: ~p", +- [Res1]); +- {aborted, Rea1} -> +- ?ERROR_MSG("Problem updating Pubsub state table:~n~p", +- [Rea1]) +- end, +- ?INFO_MSG("upgrade item pubsub table", []), +- F = fun ({pubsub_item, {ItemId, Nidx}, C, M, P}, Acc) -> +- NewItem = #pubsub_item{itemid = {ItemId, Nidx}, +- nodeidx = Nidx, creation = C, +- modification = M, payload = P}, +- [NewItem | Acc] +- end, +- {atomic, NewRecs} = +- mnesia:transaction(fun mnesia:foldl/3, +- [F, [], pubsub_item]), +- {atomic, ok} = mnesia:delete_table(pubsub_item), +- {atomic, ok} = mnesia:create_table(pubsub_item, +- [{disc_copies, [node()]}, +- {attributes, +- record_info(fields, +- pubsub_item)}]), +- FNew = fun () -> +- lists:foreach(fun mnesia:write/1, NewRecs) +- end, +- case mnesia:transaction(FNew) of +- {atomic, Res2} -> +- ?INFO_MSG("Pubsub item tables updated correctly: ~p", +- [Res2]); +- {aborted, Rea2} -> +- ?ERROR_MSG("Problem updating Pubsub item table:~n~p", +- [Rea2]) +- end; +- _ -> ok +- end. +- + send_loop(State) -> + receive + {presence, JID, Pid} -> +@@ -808,11 +398,15 @@ + LJID = jlib:jid_tolower(JID), + BJID = jlib:jid_remove_resource(LJID), + lists:foreach(fun (PType) -> +- {result, Subscriptions} = node_action(Host, +- PType, +- get_entity_subscriptions, +- [Host, +- JID]), ++ Subscriptions = case catch node_action(Host, ++ PType, ++ get_entity_subscriptions_for_send_last, ++ [Host, ++ JID]) ++ of ++ {result, S} -> S; ++ _ -> [] ++ end, + lists:foreach(fun ({Node, subscribed, _, + SubJID}) -> + if (SubJID == LJID) or +@@ -824,24 +418,13 @@ + type = + Type, + id = +- NodeId, +- options +- = +- Options} = ++ NodeId} = + Node, +- case +- get_option(Options, +- send_last_published_item) +- of +- on_sub_and_presence -> +- send_items(H, +- N, +- NodeId, +- Type, +- LJID, +- last); +- _ -> ok +- end; ++ send_items(H, N, ++ NodeId, ++ Type, ++ LJID, ++ last); + true -> + % resource not concerned about that subscription + ok +@@ -1032,8 +615,11 @@ + children = []}]; + disco_identity(Host, Node, From) -> + Action = fun (#pubsub_node{id = Idx, type = Type, +- options = Options, owners = Owners}) -> +- case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of ++ options = Options}) -> ++ Owners = node_owners_call(Type, Idx), ++ case get_allowed_items_call(Host, Idx, From, Type, ++ Options, Owners) ++ of + {result, _} -> + {result, + [#xmlel{name = <<"identity">>, +@@ -1084,7 +670,8 @@ + || Feature <- features(<<"pep">>)]]; + disco_features(Host, Node, From) -> + Action = fun (#pubsub_node{id = Idx, type = Type, +- options = Options, owners = Owners}) -> ++ options = Options}) -> ++ Owners = node_owners_call(Type, Idx), + case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of + {result, _} -> + {result, +@@ -1129,9 +716,9 @@ + ). + disco_items(Host, <<>>, From) -> + Action = fun (#pubsub_node{nodeid = {_, NodeID}, +- options = Options, type = Type, id = Idx, +- owners = Owners}, ++ options = Options, type = Type, id = Idx}, + Acc) -> ++ Owners = node_owners_call(Type, Idx), + case get_allowed_items_call(Host, Idx, From, Type, Options, Owners) of + {result, _} -> + [#xmlel{name = <<"item">>, +@@ -1152,13 +739,14 @@ + _ -> Acc + end + end, +- case transaction(Host, Action, sync_dirty) of ++ case transaction_on_nodes(Host, Action, sync_dirty) of + {result, Items} -> Items; + _ -> [] + end; + disco_items(Host, Node, From) -> + Action = fun (#pubsub_node{id = Idx, type = Type, +- options = Options, owners = Owners}) -> ++ options = Options}) -> ++ Owners = node_owners_call(Type, Idx), + case get_allowed_items_call(Host, Idx, From, Type, + Options, Owners) + of +@@ -1262,9 +850,6 @@ + lists:foreach(fun ({#pubsub_node{options + = + Options, +- owners +- = +- Owners, + id = + NodeId}, + subscribed, _, +@@ -1276,7 +861,9 @@ + presence -> + case + lists:member(BJID, +- Owners) ++ node_owners(Host, ++ PType, ++ NodeId)) + of + true -> + node_action(Host, +@@ -1347,7 +934,7 @@ + {H, + N}, + type = +- <<"hometree">>}, ++ <<"hometree_odbc">>}, + owner}) + when N == + HomeTreeBase -> +@@ -1495,7 +1082,8 @@ + IQ -> + #xmlel{attrs = QAttrs} = SubEl, + Node = xml:get_attr_s(<<"node">>, QAttrs), +- Res = case iq_disco_items(Host, Node, From) of ++ Rsm = jlib:rsm_decode(IQ), ++ Res = case iq_disco_items(Host, Node, From, Rsm) of + {result, IQRes} -> + jlib:iq_to_xml(IQ#iq{type = result, + sub_el = +@@ -1669,7 +1257,8 @@ + [] -> [<<"leaf">>]; + _ -> + case node_call(Type, get_items, +- [NodeId, From]) ++ [NodeId, From, ++ none]) + of + {result, []} -> + [<<"collection">>]; +@@ -1691,7 +1280,13 @@ + F = [#xmlel{name = <<"feature">>, + attrs = [{<<"var">>, ?NS_PUBSUB}], + children = []} +- | lists:map(fun (T) -> ++ | lists:map(fun (<<"rsm">>) -> ++ #xmlel{name = <<"feature">>, ++ attrs = ++ [{<<"var">>, ++ ?NS_RSM}], ++ children = []}; ++ (T) -> + #xmlel{name = <<"feature">>, + attrs = + [{<<"var">>, +@@ -1735,10 +1330,16 @@ + #xmlel{name = <<"feature">>, + attrs = [{<<"var">>, ?NS_VCARD}], children = []}] + ++ +- lists:map(fun (Feature) -> ++ lists:map(fun (<<"rsm">>) -> ++ #xmlel{name = <<"feature">>, ++ attrs = [{<<"var">>, ?NS_RSM}], ++ children = []}; ++ (T) -> + #xmlel{name = <<"feature">>, + attrs = +- [{<<"var">>, <<(?NS_PUBSUB)/binary, "#", Feature/binary>>}], ++ [{<<"var">>, ++ <<(?NS_PUBSUB)/binary, "#", ++ T/binary>>}], + children = []} + end, + features(Host, Node))}; +@@ -1748,14 +1349,15 @@ + _ -> node_disco_info(Host, Node, From) + end. + +--spec(iq_disco_items/3 :: ++-spec(iq_disco_items/4 :: + ( + Host :: mod_pubsub:host(), + NodeId :: <<>> | mod_pubsub:nodeId(), +- From :: jid()) ++ From :: jid(), ++ Rsm :: none | rsm_in()) + -> {result, [xmlel()]} + ). +-iq_disco_items(Host, <<>>, From) -> ++iq_disco_items(Host, <<>>, From, _RSM) -> + {result, + lists:map(fun (#pubsub_node{nodeid = {_, SubNode}, + options = Options}) -> +@@ -1792,7 +1394,7 @@ + % Nodes)}; + % Other -> Other + % end; +-iq_disco_items(Host, ?NS_COMMANDS, _From) -> ++iq_disco_items(Host, ?NS_COMMANDS, _From, _RSM) -> + CommandItems = [#xmlel{name = <<"item">>, + attrs = + [{<<"jid">>, Host}, +@@ -1800,23 +1402,28 @@ + {<<"name">>, <<"Get Pending">>}], + children = []}], + {result, CommandItems}; +-iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From) -> ++iq_disco_items(_Host, ?NS_PUBSUB_GET_PENDING, _From, _RSM) -> + CommandItems = [], {result, CommandItems}; +-iq_disco_items(Host, Item, From) -> ++iq_disco_items(Host, Item, From, RSM) -> + case str:tokens(Item, <<"!">>) of + [_Node, _ItemID] -> {result, []}; + [Node] -> + % Node = string_to_node(SNode), + Action = fun (#pubsub_node{id = Idx, type = Type, +- options = Options, owners = Owners}) -> +- NodeItems = case get_allowed_items_call(Host, Idx, +- From, Type, +- Options, +- Owners) +- of +- {result, R} -> R; +- _ -> [] +- end, ++ options = Options}) -> ++ Owners = node_owners_call(Type, Idx), ++ {NodeItems, RsmOut} = case ++ get_allowed_items_call(Host, ++ Idx, ++ From, ++ Type, ++ Options, ++ Owners, ++ RSM) ++ of ++ {result, R} -> R; ++ _ -> {[], none} ++ end, + Nodes = lists:map(fun (#pubsub_node{nodeid = + {_, SubNode}, + options = +@@ -1858,7 +1465,7 @@ + children = []} + end, + NodeItems), +- {result, Nodes ++ Items} ++ {result, Nodes ++ Items ++ jlib:rsm_encode(RsmOut)} + end, + case transaction(Host, Node, Action, sync_dirty) of + {result, {_, Result}} -> {result, Result}; +@@ -2009,7 +1616,8 @@ + (_, Acc) -> Acc + end, + [], xml:remove_cdata(Els)), +- get_items(Host, Node, From, SubId, MaxItems, ItemIDs); ++ RSM = jlib:rsm_decode(SubEl), ++ get_items(Host, Node, From, SubId, MaxItems, ItemIDs, RSM); + {get, <<"subscriptions">>} -> + get_subscriptions(Host, Node, From, Plugins); + {get, <<"affiliations">>} -> +@@ -2174,7 +1782,7 @@ + _ -> [] + end + end, +- case transaction(fun () -> ++ case transaction(Host, fun () -> + {result, lists:flatmap(Tr, Plugins)} + end, + sync_dirty) +@@ -2216,8 +1824,10 @@ + + %%% authorization handling + +-send_authorization_request(#pubsub_node{owners = Owners, nodeid = {Host, Node}}, +- Subscriber) -> ++send_authorization_request(#pubsub_node{nodeid = ++ {Host, Node}, ++ type = Type, id = NodeId}, ++ Subscriber) -> + Lang = <<"en">>, + Stanza = #xmlel{name = <<"message">>, attrs = [], + children = +@@ -2294,7 +1904,7 @@ + ejabberd_router:route(service_jid(Host), + jlib:make_jid(Owner), Stanza) + end, +- Owners). ++ node_owners(Host, Type, NodeId)). + + find_authorization_response(Packet) -> + #xmlel{children = Els} = Packet, +@@ -2353,11 +1963,10 @@ + <<"true">> -> true; + _ -> false + end, +- Action = fun (#pubsub_node{type = Type, owners = Owners, +- id = NodeId}) -> ++ Action = fun (#pubsub_node{type = Type, id = NodeId}) -> + IsApprover = + lists:member(jlib:jid_tolower(jlib:jid_remove_resource(From)), +- Owners), ++ node_owners_call(Type, NodeId)), + {result, Subscriptions} = node_call(Type, + get_subscriptions, + [NodeId, +@@ -2597,7 +2206,7 @@ + children = + [#xmlel{name = <<"create">>, + attrs = nodeAttr(Node), children = []}]}], +- case transaction(CreateNode, transaction) of ++ case transaction(Host, CreateNode, transaction) of + {result, {NodeId, {Result, broadcast}}} -> + broadcast_created_node(Host, Node, NodeId, Type, + NodeOptions), +@@ -2709,7 +2318,7 @@ + ). + subscribe_node(Host, Node, From, JID, Configuration) -> + SubOpts = case +- pubsub_subscription:parse_options_xform(Configuration) ++ pubsub_subscription_odbc:parse_options_xform(Configuration) + of + {result, GoodSubOpts} -> GoodSubOpts; + _ -> invalid +@@ -2723,7 +2332,7 @@ + end + end, + Action = fun (#pubsub_node{options = Options, +- owners = Owners, type = Type, id = NodeId}) -> ++ type = Type, id = NodeId}) -> + Features = features(Type), + SubscribeFeature = lists:member(<<"subscribe">>, Features), + OptionsFeature = lists:member(<<"subscription-options">>, Features), +@@ -2732,6 +2341,7 @@ + AccessModel = get_option(Options, access_model), + SendLast = get_option(Options, send_last_published_item), + AllowedGroups = get_option(Options, roster_groups_allowed, []), ++ Owners = node_owners_call(Type, NodeId), + {PresenceSubscription, RosterGroup} = + get_presence_and_roster_permissions(Host, Subscriber, + Owners, AccessModel, AllowedGroups), +@@ -3076,19 +2686,20 @@ + Error -> Error + end. + +--spec(get_items/6 :: ++-spec(get_items/7 :: + ( + Host :: mod_pubsub:host(), + Node :: mod_pubsub:nodeId(), + From :: jid(), + SubId :: mod_pubsub:subId(), + SMaxItems :: binary(), +- ItemIDs :: [mod_pubsub:itemId()]) ++ ItemIDs :: [mod_pubsub:itemId()], ++ Rsm :: none | rsm_in()) + -> {result, [xmlel(),...]} + %%% + | {error, xmlel()} + ). +-get_items(Host, Node, From, SubId, SMaxItems, ItemIDs) -> ++get_items(Host, Node, From, SubId, SMaxItems, ItemIDs, RSM) -> + MaxItems = if SMaxItems == <<"">> -> + get_max_items_node(Host); + true -> +@@ -3100,13 +2711,13 @@ + case MaxItems of + {error, Error} -> {error, Error}; + _ -> +- Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId, +- owners = Owners}) -> ++ Action = fun (#pubsub_node{options = Options, type = Type, id = NodeId}) -> + Features = features(Type), + RetreiveFeature = lists:member(<<"retrieve-items">>, Features), + PersistentFeature = lists:member(<<"persistent-items">>, Features), + AccessModel = get_option(Options, access_model), + AllowedGroups = get_option(Options, roster_groups_allowed, []), ++ Owners = node_owners_call(Type, NodeId), + {PresenceSubscription, RosterGroup} = + get_presence_and_roster_permissions(Host, From, Owners, + AccessModel, AllowedGroups), +@@ -3124,11 +2735,11 @@ + node_call(Type, get_items, + [NodeId, From, AccessModel, + PresenceSubscription, RosterGroup, +- SubId]) ++ SubId, RSM]) + end + end, + case transaction(Host, Node, Action, sync_dirty) of +- {result, {_, Items}} -> ++ {result, {_, {Items, RSMOut}}} -> + SendItems = case ItemIDs of + [] -> Items; + _ -> +@@ -3147,7 +2758,8 @@ + [#xmlel{name = <<"items">>, attrs = nodeAttr(Node), + children = + itemsEls(lists:sublist(SendItems, +- MaxItems))}]}]}; ++ MaxItems))} ++ | jlib:rsm_encode(RSMOut)]}]}; + Error -> Error + end + end. +@@ -3170,42 +2782,68 @@ + Error -> Error + end. + +-get_allowed_items_call(Host, NodeIdx, From, Type, Options, Owners) -> ++get_allowed_items_call(Host, NodeIdx, From, Type, ++ Options, Owners) -> ++ case get_allowed_items_call(Host, NodeIdx, From, Type, ++ Options, Owners, none) ++ of ++ {result, {I, _}} -> {result, I}; ++ Error -> Error ++ end. ++ ++get_allowed_items_call(Host, NodeIdx, From, Type, ++ Options, Owners, RSM) -> + AccessModel = get_option(Options, access_model), +- AllowedGroups = get_option(Options, roster_groups_allowed, []), ++ AllowedGroups = get_option(Options, ++ roster_groups_allowed, []), + {PresenceSubscription, RosterGroup} = +- get_presence_and_roster_permissions(Host, From, Owners, AccessModel, +- AllowedGroups), ++ get_presence_and_roster_permissions(Host, From, Owners, ++ AccessModel, AllowedGroups), + node_call(Type, get_items, +- [NodeIdx, From, AccessModel, PresenceSubscription, RosterGroup, undefined]). ++ [NodeIdx, From, AccessModel, PresenceSubscription, ++ RosterGroup, undefined, RSM]). + +-send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, last) -> +- case get_cached_item(Host, NodeId) of +- undefined -> +- send_items(Host, Node, NodeId, Type, LJID, 1); +- LastItem -> +- {ModifNow, ModifUSR} = +- LastItem#pubsub_item.modification, +- Stanza = event_stanza_with_delay([#xmlel{name = +- <<"items">>, ++send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, ++ last) -> ++ Stanza = case get_cached_item(Host, NodeId) of ++ undefined -> ++ case node_action(Host, Type, get_last_items, ++ [NodeId, LJID, 1]) ++ of ++ {result, [LastItem]} -> ++ {ModifNow, ModifUSR} = ++ LastItem#pubsub_item.modification, ++ event_stanza_with_delay([#xmlel{name = <<"items">>, ++ attrs = nodeAttr(Node), ++ children = ++ itemsEls([LastItem])}], ++ ModifNow, ModifUSR); ++ _ -> ++ event_stanza([#xmlel{name = <<"items">>, ++ attrs = nodeAttr(Node), ++ children = itemsEls([])}]) ++ end; ++ LastItem -> ++ {ModifNow, ModifUSR} = ++ LastItem#pubsub_item.modification, ++ event_stanza_with_delay([#xmlel{name = <<"items">>, + attrs = nodeAttr(Node), + children = + itemsEls([LastItem])}], +- ModifNow, ModifUSR), +- case is_tuple(Host) of +- false -> +- ejabberd_router:route(service_jid(Host), +- jlib:make_jid(LJID), Stanza); +- true -> +- case ejabberd_sm:get_session_pid(U, S, R) of +- C2SPid when is_pid(C2SPid) -> +- ejabberd_c2s:broadcast(C2SPid, +- {pep_message, +- <<((Node))/binary, "+notify">>}, +- _Sender = service_jid(Host), +- Stanza); +- _ -> ok +- end ++ ModifNow, ModifUSR) ++ end, ++ case is_tuple(Host) of ++ false -> ++ ejabberd_router:route(service_jid(Host), ++ jlib:make_jid(LJID), Stanza); ++ true -> ++ case ejabberd_sm:get_session_pid(U, S, R) of ++ C2SPid when is_pid(C2SPid) -> ++ ejabberd_c2s:broadcast(C2SPid, ++ {pep_message, ++ <<((Node))/binary, "+notify">>}, ++ _Sender = service_jid(Host), Stanza); ++ _ -> ok + end + end; + send_items(Host, Node, NodeId, Type, {U, S, R} = LJID, +@@ -3436,9 +3074,8 @@ + case Entities of + error -> {error, ?ERR_BAD_REQUEST}; + _ -> +- Action = fun (#pubsub_node{owners = Owners, type = Type, +- id = NodeId} = +- N) -> ++ Action = fun (#pubsub_node{type = Type, id = NodeId} = N) -> ++ Owners = node_owners_call(Type, NodeId), + case lists:member(Owner, Owners) of + true -> + OwnerJID = jlib:make_jid(Owner), +@@ -3545,10 +3182,10 @@ + end. + + read_sub(Subscriber, Node, NodeID, SubID, Lang) -> +- case pubsub_subscription:get_subscription(Subscriber, NodeID, SubID) of ++ case pubsub_subscription_odbc:get_subscription(Subscriber, NodeID, SubID) of + {result, #pubsub_subscription{options = Options}} -> + {result, XdataEl} = +- pubsub_subscription:get_options_xform(Lang, Options), ++ pubsub_subscription_odbc:get_options_xform(Lang, Options), + OptionsEl = #xmlel{name = <<"options">>, + attrs = + [{<<"jid">>, jlib:jid_to_string(Subscriber)}, +@@ -3593,7 +3230,7 @@ + end. + + set_options_helper(Configuration, JID, NodeID, SubID, Type) -> +- SubOpts = case pubsub_subscription:parse_options_xform(Configuration) of ++ SubOpts = case pubsub_subscription_odbc:parse_options_xform(Configuration) of + {result, GoodSubOpts} -> GoodSubOpts; + _ -> invalid + end, +@@ -3629,7 +3266,7 @@ + write_sub(_Subscriber, _NodeID, _SubID, []) -> + {result, []}; + write_sub(Subscriber, NodeID, SubID, Options) -> +- case pubsub_subscription:set_subscription(Subscriber, ++ case pubsub_subscription_odbc:set_subscription(Subscriber, + NodeID, SubID, Options) + of + {result, _} -> {result, []}; +@@ -3854,8 +3491,8 @@ + ejabberd_router:route(service_jid(Host), + jlib:make_jid(JID), Stanza) + end, +- Action = fun (#pubsub_node{owners = Owners, type = Type, +- id = NodeId}) -> ++ Action = fun (#pubsub_node{type = Type, id = NodeId}) -> ++ Owners = node_owners_call(Type, NodeId), + case lists:member(Owner, Owners) of + true -> + Result = lists:foldl(fun ({JID, Subscription, +@@ -4347,7 +3984,7 @@ + [{Depth, [{N, sub_with_options(N)} || N <- Nodes]} + || {Depth, Nodes} <- Collection]} + end, +- case transaction(Action, sync_dirty) of ++ case transaction(Host, Action, sync_dirty) of + {result, CollSubs} -> + subscribed_nodes_by_jid(NotifyType, CollSubs); + _ -> [] +@@ -4443,10 +4080,10 @@ + end. + + sub_with_options(JID, NodeId, SubId) -> +- case pubsub_subscription:read_subscription(JID, NodeId, ++ case pubsub_subscription_odbc:get_subscription(JID, NodeId, + SubId) + of +- #pubsub_subscription{options = Options} -> ++ {result, #pubsub_subscription{options = Options}} -> + {JID, SubId, Options}; + _ -> {JID, SubId, []} + end. +@@ -4537,6 +4174,28 @@ + Result -> Result + end. + ++node_owners(Host, Type, NodeId) -> ++ case node_action(Host, Type, get_node_affiliations, ++ [NodeId]) ++ of ++ {result, Affiliations} -> ++ lists:foldl(fun ({LJID, owner}, Acc) -> [LJID | Acc]; ++ (_, Acc) -> Acc ++ end, ++ [], Affiliations); ++ _ -> [] ++ end. ++ ++node_owners_call(Type, NodeId) -> ++ case node_call(Type, get_node_affiliations, [NodeId]) of ++ {result, Affiliations} -> ++ lists:foldl(fun ({LJID, owner}, Acc) -> [LJID | Acc]; ++ (_, Acc) -> Acc ++ end, ++ [], Affiliations); ++ _ -> [] ++ end. ++ + max_items(Host, Options) -> + case get_option(Options, persist_items) of + true -> +@@ -5024,7 +4683,14 @@ + tree_action(Host, Function, Args) -> + ?DEBUG("tree_action ~p ~p ~p", [Host, Function, Args]), + Fun = fun () -> tree_call(Host, Function, Args) end, +- catch mnesia:sync_dirty(Fun). ++ case catch ejabberd_odbc:sql_bloc(odbc_conn(Host), Fun) ++ of ++ {atomic, Result} -> Result; ++ {aborted, Reason} -> ++ ?ERROR_MSG("transaction return internal error: ~p~n", ++ [{aborted, Reason}]), ++ {error, ?ERR_INTERNAL_SERVER_ERROR} ++ end. + + node_call(Type, Function, Args) -> + ?DEBUG("node_call ~p ~p ~p", [Type, Function, Args]), +@@ -5048,12 +4714,12 @@ + node_action(Host, Type, Function, Args) -> + ?DEBUG("node_action ~p ~p ~p ~p", + [Host, Type, Function, Args]), +- transaction(fun () -> node_call(Type, Function, Args) ++ transaction(Host, fun () -> node_call(Type, Function, Args) + end, + sync_dirty). + + transaction(Host, Node, Action, Trans) -> +- transaction(fun () -> ++ transaction(Host, fun () -> + case tree_call(Host, get_node, [Host, Node]) of + N when is_record(N, pubsub_node) -> + case Action(N) of +@@ -5067,16 +4733,24 @@ + end, + Trans). + +-transaction(Host, Action, Trans) -> +- transaction(fun () -> ++transaction_on_nodes(Host, Action, Trans) -> ++ transaction(Host, ++ fun () -> + {result, + lists:foldl(Action, [], + tree_call(Host, get_nodes, [Host]))} + end, + Trans). + +-transaction(Fun, Trans) -> +- case catch mnesia:Trans(Fun) of ++transaction(Host, Fun, Trans) -> ++ transaction_retry(Host, Fun, Trans, 2). ++ ++transaction_retry(Host, Fun, Trans, Count) -> ++ SqlFun = case Trans of ++ transaction -> sql_transaction; ++ _ -> sql_bloc ++ end, ++ case catch ejabberd_odbc:SqlFun(odbc_conn(Host), Fun) of + {result, Result} -> {result, Result}; + {error, Error} -> {error, Error}; + {atomic, {result, Result}} -> {result, Result}; +@@ -5085,6 +4759,16 @@ + ?ERROR_MSG("transaction return internal error: ~p~n", + [{aborted, Reason}]), + {error, ?ERR_INTERNAL_SERVER_ERROR}; ++ {'EXIT', {timeout, _} = Reason} -> ++ case Count of ++ 0 -> ++ ?ERROR_MSG("transaction return internal error: ~p~n", ++ [{'EXIT', Reason}]), ++ {error, ?ERR_INTERNAL_SERVER_ERROR}; ++ N -> ++ erlang:yield(), ++ transaction_retry(Host, Fun, Trans, N - 1) ++ end; + {'EXIT', Reason} -> + ?ERROR_MSG("transaction return internal error: ~p~n", + [{'EXIT', Reason}]), +@@ -5095,6 +4779,20 @@ + {error, ?ERR_INTERNAL_SERVER_ERROR} + end. + ++odbc_conn({_U, Host, _R}) -> Host; ++odbc_conn(Host) -> ++ lists:dropwhile(fun (A) -> A /= $. end, Host) -- ++ <<".">>. ++ ++-spec(escape/1 :: ++( ++ JID :: ljid() | binary()) ++ -> binary() ++). ++escape({_U, _H, _R} = JID) -> ++ ejabberd_odbc:escape(jlib:jid_to_string(JID)); ++escape(Value) -> ejabberd_odbc:escape(Value). ++ + %%%% helpers + + extended_error(Error, Ext) -> |