aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHolger Weiss <holger@zedat.fu-berlin.de>2014-03-12 23:34:14 +0100
committerHolger Weiß <holger@zedat.fu-berlin.de>2014-03-12 23:34:14 +0100
commit7d594086c3601c3281473b4ca54714b714027019 (patch)
tree2b2cd4d2d05a6a95430f1d55ee20e0b2b3397101 /src
parentProvide header with latin-1 encoding in translations to work with Erlang/OTP R17 (diff)
Add initial XEP-0198 support (EJAB-532)
Implement partial support for XEP-0198: Stream Management. After successful negotiation of this feature, the server requests an ACK for each stanza transmitted to the client and responds to ACK requests issued by the client. On session termination, the server re-routes any unacknowledged stanzas. The length of the pending queue can be limited by setting the "max_ack_queue" option to some integer value (default: 500). XEP-0198 support can be disabled entirely by setting the "stream_management" option to false (default: true). So far, stream management is implemented only for c2s connections, and the optional stream resumption feature also described in XEP-0198 is not (yet) supported. This addition was originally based on a patch provided by Magnus Henoch and updated by Grzegorz Grasza. Their code implements an early draft of XEP-0198 for some previous version of ejabberd. It has since been rewritten almost entirely.
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_c2s.erl403
-rw-r--r--src/jlib.erl17
2 files changed, 358 insertions, 62 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index ae5fc97b8..044afd0f4 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -108,6 +108,12 @@
auth_module = unknown,
ip,
aux_fields = [],
+ sm_xmlns,
+ ack_queue,
+ max_ack_queue,
+ manage_stream = fun negotiate_stream_mgmt/2,
+ n_stanzas_in = 0,
+ n_stanzas_out = 0,
lang}).
%-define(DBGFSM, true).
@@ -156,6 +162,35 @@
-define(INVALID_FROM, ?SERR_INVALID_FROM).
+%% XEP-0198:
+
+-define(IS_STREAM_MGMT_TAG(Name),
+ Name == <<"enable">>;
+ Name == <<"a">>;
+ Name == <<"r">>).
+
+-define(IS_SUPPORTED_SM_XMLNS(Xmlns),
+ Xmlns == ?NS_STREAM_MGMT_2;
+ Xmlns == ?NS_STREAM_MGMT_3).
+
+-define(SM_FAILED(Condition, Xmlns),
+ #xmlel{name = <<"failed">>,
+ attrs = [{<<"xmlns">>, Xmlns}],
+ children = [#xmlel{name = Condition,
+ attrs = [{<<"xmlns">>, ?NS_STANZAS}],
+ children = []}]}).
+
+-define(SM_BAD_REQUEST(Xmlns),
+ ?SM_FAILED(<<"bad-request">>, Xmlns)).
+
+-define(SM_SERVICE_UNAVAILABLE(Xmlns),
+ ?SM_FAILED(<<"service-unavailable">>, Xmlns)).
+
+-define(SM_UNEXPECTED_REQUEST(Xmlns),
+ ?SM_FAILED(<<"unexpected-request">>, Xmlns)).
+
+-define(SM_UNSUPPORTED_VERSION(Xmlns),
+ ?SM_FAILED(<<"unsupported-version">>, Xmlns)).
%%%----------------------------------------------------------------------
%%% API
@@ -250,6 +285,16 @@ init([{SockMod, Socket}, Opts]) ->
true -> TLSOpts1
end,
TLSOpts = [verify_none | TLSOpts2],
+ MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
+ Limit when is_integer(Limit), Limit > 0 -> Limit;
+ _ -> 500
+ end,
+ StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
+ AckQueue = if not StreamMgmtEnabled ->
+ none;
+ true ->
+ undefined
+ end,
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
@@ -271,7 +316,8 @@ init([{SockMod, Socket}, Opts]) ->
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled, tls_options = TLSOpts,
streamid = new_id(), access = Access,
- shaper = Shaper, ip = IP},
+ shaper = Shaper, ip = IP,
+ ack_queue = AckQueue, max_ack_queue = MaxAckQueue},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
@@ -402,6 +448,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
ejabberd_hooks:run_fold(roster_get_versioning_feature,
Server, [],
[Server]),
+ StreamManagementFeature =
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ [#xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}],
+ children = []},
+ #xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}],
+ children = []}];
+ false ->
+ []
+ end,
StreamFeatures = [#xmlel{name = <<"bind">>,
attrs = [{<<"xmlns">>, ?NS_BIND}],
children = []},
@@ -410,6 +468,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
children = []}]
++
RosterVersioningFeature ++
+ StreamManagementFeature ++
ejabberd_hooks:run_fold(c2s_stream_features,
Server, [], [Server]),
send_element(StateData,
@@ -472,6 +531,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) ->
wait_for_stream(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData));
wait_for_auth({xmlstreamelement, El}, StateData) ->
case is_auth_packet(El) of
{auth, _ID, get, {U, _, _, _}} ->
@@ -618,6 +680,9 @@ wait_for_auth({xmlstreamerror, _}, StateData) ->
wait_for_auth(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData));
wait_for_feature_request({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -783,6 +848,9 @@ wait_for_feature_request({xmlstreamerror, _},
wait_for_feature_request(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData));
wait_for_sasl_response({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -913,6 +981,9 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
+wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData));
wait_for_bind({xmlstreamelement, El}, StateData) ->
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} =
@@ -974,61 +1045,65 @@ wait_for_bind({xmlstreamerror, _}, StateData) ->
wait_for_bind(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData));
wait_for_session({xmlstreamelement, El}, StateData) ->
+ NewStateData = update_num_stanzas_in(StateData, El),
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_SESSION} ->
- U = StateData#state.user,
- R = StateData#state.resource,
- JID = StateData#state.jid,
- case acl:match_rule(StateData#state.server,
- StateData#state.access, JID) of
+ U = NewStateData#state.user,
+ R = NewStateData#state.resource,
+ JID = NewStateData#state.jid,
+ case acl:match_rule(NewStateData#state.server,
+ NewStateData#state.access, JID) of
allow ->
?INFO_MSG("(~w) Opened session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Res = jlib:make_result_iq_reply(El#xmlel{children = []}),
- send_element(StateData, Res),
- change_shaper(StateData, JID),
+ NewState = send_stanza(NewStateData, Res),
+ change_shaper(NewState, JID),
{Fs, Ts} = ejabberd_hooks:run_fold(
roster_get_subscription_lists,
- StateData#state.server,
+ NewState#state.server,
{[], []},
- [U, StateData#state.server]),
+ [U, NewState#state.server]),
LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)),
Fs1 = [LJID | Fs],
Ts1 = [LJID | Ts],
PrivList =
ejabberd_hooks:run_fold(
- privacy_get_user_list, StateData#state.server,
+ privacy_get_user_list, NewState#state.server,
#userlist{},
- [U, StateData#state.server]),
+ [U, NewState#state.server]),
SID = {now(), self()},
- Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
+ Conn = get_conn_type(NewState),
+ Info = [{ip, NewState#state.ip}, {conn, Conn},
+ {auth_module, NewState#state.auth_module}],
ejabberd_sm:open_session(
- SID, U, StateData#state.server, R, Info),
- NewStateData =
- StateData#state{
+ SID, U, NewState#state.server, R, Info),
+ UpdatedStateData =
+ NewState#state{
sid = SID,
conn = Conn,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1),
privacy_list = PrivList},
fsm_next_state_pack(session_established,
- NewStateData);
+ UpdatedStateData);
_ ->
ejabberd_hooks:run(forbidden_session_hook,
- StateData#state.server, [JID]),
+ NewStateData#state.server, [JID]),
?INFO_MSG("(~w) Forbidden session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED),
- send_element(StateData, Err),
- fsm_next_state(wait_for_session, StateData)
+ send_element(NewStateData, Err),
+ fsm_next_state(wait_for_session, NewStateData)
end;
_ ->
- fsm_next_state(wait_for_session, StateData)
+ fsm_next_state(wait_for_session, NewStateData)
end;
wait_for_session(timeout, StateData) ->
@@ -1042,6 +1117,9 @@ wait_for_session({xmlstreamerror, _}, StateData) ->
wait_for_session(closed, StateData) ->
{stop, normal, StateData}.
+session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData));
session_established({xmlstreamelement, El},
StateData) ->
FromJID = StateData#state.jid,
@@ -1080,9 +1158,10 @@ session_established(closed, StateData) ->
%% connection)
session_established2(El, StateData) ->
#xmlel{name = Name, attrs = Attrs} = El,
- User = StateData#state.user,
- Server = StateData#state.server,
- FromJID = StateData#state.jid,
+ NewStateData = update_num_stanzas_in(StateData, El),
+ User = NewStateData#state.user,
+ Server = NewStateData#state.server,
+ FromJID = NewStateData#state.jid,
To = xml:get_attr_s(<<"to">>, Attrs),
ToJID = case To of
<<"">> -> jlib:make_jid(User, Server, <<"">>);
@@ -1091,7 +1170,7 @@ session_established2(El, StateData) ->
NewEl1 = jlib:remove_attr(<<"xmlns">>, El),
NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of
<<"">> ->
- case StateData#state.lang of
+ case NewStateData#state.lang of
<<"">> -> NewEl1;
Lang ->
xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1)
@@ -1101,13 +1180,18 @@ session_established2(El, StateData) ->
NewState = case ToJID of
error ->
case xml:get_attr_s(<<"type">>, Attrs) of
- <<"error">> -> StateData;
- <<"result">> -> StateData;
+ <<"error">> -> NewStateData;
+ <<"result">> -> NewStateData;
_ ->
Err = jlib:make_error_reply(NewEl,
?ERR_JID_MALFORMED),
- send_element(StateData, Err),
- StateData
+ case is_stanza(Err) of
+ true ->
+ send_stanza(NewStateData, Err);
+ false ->
+ send_element(NewStateData, Err),
+ NewStateData
+ end
end;
_ ->
case Name of
@@ -1122,12 +1206,12 @@ session_established2(El, StateData) ->
#jid{user = User, server = Server,
resource = <<"">>} ->
?DEBUG("presence_update(~p,~n\t~p,~n\t~p)",
- [FromJID, PresenceEl, StateData]),
+ [FromJID, PresenceEl, NewStateData]),
presence_update(FromJID, PresenceEl,
- StateData);
+ NewStateData);
_ ->
presence_track(FromJID, ToJID, PresenceEl,
- StateData)
+ NewStateData)
end;
<<"iq">> ->
case jlib:iq_query_info(NewEl) of
@@ -1135,21 +1219,21 @@ session_established2(El, StateData) ->
when Xmlns == (?NS_PRIVACY);
Xmlns == (?NS_BLOCKING) ->
process_privacy_iq(FromJID, ToJID, IQ,
- StateData);
+ NewStateData);
_ ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData,
+ check_privacy_route(FromJID, NewStateData,
FromJID, ToJID, NewEl),
- StateData
+ NewStateData
end;
<<"message">> ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData, FromJID,
+ check_privacy_route(FromJID, NewStateData, FromJID,
ToJID, NewEl),
- StateData;
- _ -> StateData
+ NewStateData;
+ _ -> NewStateData
end
end,
ejabberd_hooks:run(c2s_loop_debug,
@@ -1263,13 +1347,13 @@ handle_info({route, _From, _To, {broadcast, Data}},
jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid,
jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
+ NewState = send_stanza(StateData, PrivPushEl),
fsm_next_state(StateName,
- StateData#state{privacy_list = NewPL})
+ NewState#state{privacy_list = NewPL})
end;
{blocking, What} ->
- route_blocking(What, StateData),
- fsm_next_state(StateName, StateData);
+ NewState = route_blocking(What, StateData),
+ fsm_next_state(StateName, NewState);
_ ->
fsm_next_state(StateName, StateData)
end;
@@ -1515,12 +1599,12 @@ handle_info({route, From, To,
jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To), NewAttrs),
FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els},
- send_element(StateData, FixedPacket),
+ SentStateData = send_stanza(StateData, FixedPacket),
ejabberd_hooks:run(user_receive_packet,
- StateData#state.server,
- [StateData#state.jid, From, To, FixedPacket]),
+ SentStateData#state.server,
+ [SentStateData#state.jid, From, To, FixedPacket]),
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- fsm_next_state(StateName, NewState);
+ fsm_next_state(StateName, SentStateData);
true ->
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, NewState)
@@ -1643,6 +1727,7 @@ terminate(_Reason, StateName, StateData) ->
StateData#state.pres_i, Packet)
end
end,
+ resend_unacked_stanzas(StateData),
bounce_messages();
_ ->
ok
@@ -1674,6 +1759,13 @@ send_element(StateData, El) when StateData#state.xml_socket ->
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
+send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined ->
+ send_stanza_and_ack_req(StateData, Stanza),
+ ack_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) ->
+ send_element(StateData, Stanza),
+ StateData.
+
send_header(StateData, Server, Version, Lang)
when StateData#state.xml_socket ->
VersionAttr = case Version of
@@ -1727,6 +1819,19 @@ is_auth_packet(El) ->
_ -> false
end.
+is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>;
+ Name == <<"presence">>;
+ Name == <<"iq">> ->
+ case xml:get_attr(<<"xmlns">>, Attrs) of
+ {value, NS} when NS /= <<"jabber:client">>,
+ NS /= <<"jabber:server">> ->
+ false;
+ _ ->
+ true
+ end;
+is_stanza(_El) ->
+ false.
+
get_auth_tags([#xmlel{name = Name, children = Els} | L],
U, P, D, R) ->
CData = xml:get_cdata(Els),
@@ -1854,12 +1959,12 @@ presence_update(From, Packet, StateData) ->
ejabberd_hooks:run(user_available_hook,
NewStateData#state.server,
[NewStateData#state.jid]),
- if NewPriority >= 0 ->
- resend_offline_messages(NewStateData),
- resend_subscription_requests(NewStateData);
- true -> ok
- end,
- presence_broadcast_first(From, NewStateData,
+ ResentStateData = if NewPriority >= 0 ->
+ resend_offline_messages(NewStateData),
+ resend_subscription_requests(NewStateData);
+ true -> NewStateData
+ end,
+ presence_broadcast_first(From, ResentStateData,
Packet);
true ->
presence_broadcast_to_trusted(NewStateData, From,
@@ -2173,10 +2278,11 @@ resend_subscription_requests(#state{user = User,
PendingSubscriptions =
ejabberd_hooks:run_fold(resend_subscription_requests_hook,
Server, [], [User, Server]),
- lists:foreach(fun (XMLPacket) ->
- send_element(StateData, XMLPacket)
- end,
- PendingSubscriptions).
+ lists:foldl(fun (XMLPacket, AccStateData) ->
+ send_stanza(AccStateData, XMLPacket)
+ end,
+ StateData,
+ PendingSubscriptions).
get_showtag(undefined) -> <<"unavailable">>;
get_showtag(Presence) ->
@@ -2352,10 +2458,185 @@ route_blocking(What, StateData) ->
PrivPushEl =
jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
%% No need to replace active privacy list here,
%% blocking pushes are always accompanied by
%% Privacy List pushes
+ send_stanza(StateData, PrivPushEl).
+
+%%%----------------------------------------------------------------------
+%%% XEP-0198
+%%%----------------------------------------------------------------------
+
+stream_mgmt_enabled(#state{ack_queue = none}) ->
+ false;
+stream_mgmt_enabled(_StateData) ->
+ true.
+
+negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
+ %% XEP-0198 says: "For client-to-server connections, the client MUST NOT
+ %% attempt to enable stream management until after it has completed Resource
+ %% Binding". However, it also says: "Stream management errors SHOULD be
+ %% considered recoverable", so we won't bail out.
+ send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
+ StateData;
+negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ case Name of
+ <<"enable">> ->
+ handle_enable(StateData#state{sm_xmlns = Xmlns});
+ _ ->
+ Res = if Name == <<"a">>;
+ Name == <<"r">> ->
+ ?SM_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?SM_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ false ->
+ send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)),
+ StateData
+ end;
+ _ ->
+ send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
+ StateData
+ end.
+
+perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when Xmlns == StateData#state.sm_xmlns ->
+ case Name of
+ <<"r">> ->
+ handle_r(StateData);
+ <<"a">> ->
+ handle_a(StateData, Attrs);
+ _ ->
+ Res = if Name == <<"enable">> ->
+ ?SM_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?SM_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ _ ->
+ send_element(StateData,
+ ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)),
+ StateData
+ end.
+
+handle_enable(StateData) ->
+ ?INFO_MSG("Enabling XEP-0198 stream management for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ Res = #xmlel{name = <<"enabled">>,
+ attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
+ children = []},
+ send_element(StateData, Res),
+ StateData#state{ack_queue = queue:new(),
+ manage_stream = fun perform_stream_mgmt/2}.
+
+handle_r(StateData) ->
+ H = jlib:integer_to_binary(StateData#state.n_stanzas_in),
+ Res = #xmlel{name = <<"a">>,
+ attrs = [{<<"xmlns">>, StateData#state.sm_xmlns},
+ {<<"h">>, H}],
+ children = []},
+ send_element(StateData, Res),
+ StateData.
+
+handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
+ case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of
+ H when is_integer(H), H > NumStanzasOut ->
+ ?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent",
+ [jlib:jid_to_string(JID), H, NumStanzasOut]),
+ ack_queue_drop(StateData, NumStanzasOut);
+ H when is_integer(H), H >= 0 ->
+ ?DEBUG("~s acknowledged ~B of ~B stanzas",
+ [jlib:jid_to_string(JID), H, NumStanzasOut]),
+ ack_queue_drop(StateData, H);
+ _ ->
+ ?WARNING_MSG("Ignoring invalid ACK element from ~s",
+ [jlib:jid_to_string(JID)]),
+ StateData
+ end.
+
+update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined ->
+ NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of
+ {true, 4294967295} ->
+ 0;
+ {true, Num} ->
+ Num + 1;
+ {false, Num} ->
+ Num
+ end,
+ StateData#state{n_stanzas_in = NewNum};
+update_num_stanzas_in(StateData, _El) ->
+ StateData.
+
+send_stanza_and_ack_req(StateData, Stanza) ->
+ AckReq = #xmlel{name = <<"r">>,
+ attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
+ children = []},
+ StanzaS = xml:element_to_binary(Stanza),
+ AckReqS = xml:element_to_binary(AckReq),
+ send_text(StateData, [StanzaS, AckReqS]).
+
+ack_queue_add(StateData, El) ->
+ NewNum = case StateData#state.n_stanzas_out of
+ 4294967295 ->
+ 0;
+ Num ->
+ Num + 1
+ end,
+ NewState = limit_queue_length(StateData),
+ NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue),
+ NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}.
+
+ack_queue_drop(StateData, NumHandled) ->
+ NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end,
+ StateData#state.ack_queue),
+ StateData#state{ack_queue = NewQueue}.
+
+limit_queue_length(#state{max_ack_queue = Limit} = StateData)
+ when Limit == infinity;
+ Limit == unlimited ->
+ StateData;
+limit_queue_length(#state{jid = JID,
+ ack_queue = Queue,
+ max_ack_queue = Limit} = StateData) ->
+ case queue:len(Queue) >= Limit of
+ true ->
+ ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s",
+ [jlib:jid_to_string(JID)]),
+ limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)});
+ false ->
+ StateData
+ end.
+
+resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined ->
+ Queue = StateData#state.ack_queue,
+ case queue:len(Queue) of
+ 0 ->
+ ok;
+ N ->
+ ?INFO_MSG("Resending ~B unacknowledged stanzas to ~s",
+ [N, jlib:jid_to_string(StateData#state.jid)]),
+ lists:foreach(
+ fun({Num, #xmlel{attrs = Attrs} = El}) ->
+ From_s = xml:get_attr_s(<<"from">>, Attrs),
+ From = jlib:string_to_jid(From_s),
+ To_s = xml:get_attr_s(<<"to">>, Attrs),
+ To = jlib:string_to_jid(To_s),
+ ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s",
+ [Num, From_s, To_s]),
+ ejabberd_router:route(From, To, El)
+ end, queue:to_list(Queue))
+ end;
+resend_unacked_stanzas(_StateData) ->
ok.
%%%----------------------------------------------------------------------
diff --git a/src/jlib.erl b/src/jlib.erl
index 46e864b0c..a362697f4 100644
--- a/src/jlib.erl
+++ b/src/jlib.erl
@@ -51,7 +51,7 @@
binary_to_integer/1, binary_to_integer/2,
integer_to_binary/1, integer_to_binary/2,
atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1,
- l2i/1, i2l/1, i2l/2]).
+ l2i/1, i2l/1, i2l/2, queue_drop_while/2]).
%% TODO: Remove once XEP-0091 is Obsolete
%% TODO: Remove once XEP-0091 is Obsolete
@@ -894,3 +894,18 @@ i2l(L, N) when is_binary(L) ->
C when C > N -> L;
_ -> i2l(<<$0, L/binary>>, N)
end.
+
+-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue().
+
+queue_drop_while(F, Q) ->
+ case queue:peek(Q) of
+ {value, Item} ->
+ case F(Item) of
+ true ->
+ queue_drop_while(F, queue:drop(Q));
+ _ ->
+ Q
+ end;
+ empty ->
+ Q
+ end.