aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ejabberd_c2s.erl403
-rw-r--r--src/jlib.erl17
2 files changed, 358 insertions, 62 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index ae5fc97b8..044afd0f4 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -108,6 +108,12 @@
auth_module = unknown,
ip,
aux_fields = [],
+ sm_xmlns,
+ ack_queue,
+ max_ack_queue,
+ manage_stream = fun negotiate_stream_mgmt/2,
+ n_stanzas_in = 0,
+ n_stanzas_out = 0,
lang}).
%-define(DBGFSM, true).
@@ -156,6 +162,35 @@
-define(INVALID_FROM, ?SERR_INVALID_FROM).
+%% XEP-0198:
+
+-define(IS_STREAM_MGMT_TAG(Name),
+ Name == <<"enable">>;
+ Name == <<"a">>;
+ Name == <<"r">>).
+
+-define(IS_SUPPORTED_SM_XMLNS(Xmlns),
+ Xmlns == ?NS_STREAM_MGMT_2;
+ Xmlns == ?NS_STREAM_MGMT_3).
+
+-define(SM_FAILED(Condition, Xmlns),
+ #xmlel{name = <<"failed">>,
+ attrs = [{<<"xmlns">>, Xmlns}],
+ children = [#xmlel{name = Condition,
+ attrs = [{<<"xmlns">>, ?NS_STANZAS}],
+ children = []}]}).
+
+-define(SM_BAD_REQUEST(Xmlns),
+ ?SM_FAILED(<<"bad-request">>, Xmlns)).
+
+-define(SM_SERVICE_UNAVAILABLE(Xmlns),
+ ?SM_FAILED(<<"service-unavailable">>, Xmlns)).
+
+-define(SM_UNEXPECTED_REQUEST(Xmlns),
+ ?SM_FAILED(<<"unexpected-request">>, Xmlns)).
+
+-define(SM_UNSUPPORTED_VERSION(Xmlns),
+ ?SM_FAILED(<<"unsupported-version">>, Xmlns)).
%%%----------------------------------------------------------------------
%%% API
@@ -250,6 +285,16 @@ init([{SockMod, Socket}, Opts]) ->
true -> TLSOpts1
end,
TLSOpts = [verify_none | TLSOpts2],
+ MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of
+ Limit when is_integer(Limit), Limit > 0 -> Limit;
+ _ -> 500
+ end,
+ StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true),
+ AckQueue = if not StreamMgmtEnabled ->
+ none;
+ true ->
+ undefined
+ end,
IP = peerip(SockMod, Socket),
%% Check if IP is blacklisted:
case is_ip_blacklisted(IP) of
@@ -271,7 +316,8 @@ init([{SockMod, Socket}, Opts]) ->
tls_required = StartTLSRequired,
tls_enabled = TLSEnabled, tls_options = TLSOpts,
streamid = new_id(), access = Access,
- shaper = Shaper, ip = IP},
+ shaper = Shaper, ip = IP,
+ ack_queue = AckQueue, max_ack_queue = MaxAckQueue},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}
end.
@@ -402,6 +448,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
ejabberd_hooks:run_fold(roster_get_versioning_feature,
Server, [],
[Server]),
+ StreamManagementFeature =
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ [#xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}],
+ children = []},
+ #xmlel{name = <<"sm">>,
+ attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}],
+ children = []}];
+ false ->
+ []
+ end,
StreamFeatures = [#xmlel{name = <<"bind">>,
attrs = [{<<"xmlns">>, ?NS_BIND}],
children = []},
@@ -410,6 +468,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) ->
children = []}]
++
RosterVersioningFeature ++
+ StreamManagementFeature ++
ejabberd_hooks:run_fold(c2s_stream_features,
Server, [], [Server]),
send_element(StateData,
@@ -472,6 +531,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) ->
wait_for_stream(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData));
wait_for_auth({xmlstreamelement, El}, StateData) ->
case is_auth_packet(El) of
{auth, _ID, get, {U, _, _, _}} ->
@@ -618,6 +680,9 @@ wait_for_auth({xmlstreamerror, _}, StateData) ->
wait_for_auth(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData));
wait_for_feature_request({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -783,6 +848,9 @@ wait_for_feature_request({xmlstreamerror, _},
wait_for_feature_request(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData));
wait_for_sasl_response({xmlstreamelement, El},
StateData) ->
#xmlel{name = Name, attrs = Attrs, children = Els} = El,
@@ -913,6 +981,9 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
+wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData));
wait_for_bind({xmlstreamelement, El}, StateData) ->
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} =
@@ -974,61 +1045,65 @@ wait_for_bind({xmlstreamerror, _}, StateData) ->
wait_for_bind(closed, StateData) ->
{stop, normal, StateData}.
+wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData));
wait_for_session({xmlstreamelement, El}, StateData) ->
+ NewStateData = update_num_stanzas_in(StateData, El),
case jlib:iq_query_info(El) of
#iq{type = set, xmlns = ?NS_SESSION} ->
- U = StateData#state.user,
- R = StateData#state.resource,
- JID = StateData#state.jid,
- case acl:match_rule(StateData#state.server,
- StateData#state.access, JID) of
+ U = NewStateData#state.user,
+ R = NewStateData#state.resource,
+ JID = NewStateData#state.jid,
+ case acl:match_rule(NewStateData#state.server,
+ NewStateData#state.access, JID) of
allow ->
?INFO_MSG("(~w) Opened session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Res = jlib:make_result_iq_reply(El#xmlel{children = []}),
- send_element(StateData, Res),
- change_shaper(StateData, JID),
+ NewState = send_stanza(NewStateData, Res),
+ change_shaper(NewState, JID),
{Fs, Ts} = ejabberd_hooks:run_fold(
roster_get_subscription_lists,
- StateData#state.server,
+ NewState#state.server,
{[], []},
- [U, StateData#state.server]),
+ [U, NewState#state.server]),
LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)),
Fs1 = [LJID | Fs],
Ts1 = [LJID | Ts],
PrivList =
ejabberd_hooks:run_fold(
- privacy_get_user_list, StateData#state.server,
+ privacy_get_user_list, NewState#state.server,
#userlist{},
- [U, StateData#state.server]),
+ [U, NewState#state.server]),
SID = {now(), self()},
- Conn = get_conn_type(StateData),
- Info = [{ip, StateData#state.ip}, {conn, Conn},
- {auth_module, StateData#state.auth_module}],
+ Conn = get_conn_type(NewState),
+ Info = [{ip, NewState#state.ip}, {conn, Conn},
+ {auth_module, NewState#state.auth_module}],
ejabberd_sm:open_session(
- SID, U, StateData#state.server, R, Info),
- NewStateData =
- StateData#state{
+ SID, U, NewState#state.server, R, Info),
+ UpdatedStateData =
+ NewState#state{
sid = SID,
conn = Conn,
pres_f = ?SETS:from_list(Fs1),
pres_t = ?SETS:from_list(Ts1),
privacy_list = PrivList},
fsm_next_state_pack(session_established,
- NewStateData);
+ UpdatedStateData);
_ ->
ejabberd_hooks:run(forbidden_session_hook,
- StateData#state.server, [JID]),
+ NewStateData#state.server, [JID]),
?INFO_MSG("(~w) Forbidden session for ~s",
- [StateData#state.socket,
+ [NewStateData#state.socket,
jlib:jid_to_string(JID)]),
Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED),
- send_element(StateData, Err),
- fsm_next_state(wait_for_session, StateData)
+ send_element(NewStateData, Err),
+ fsm_next_state(wait_for_session, NewStateData)
end;
_ ->
- fsm_next_state(wait_for_session, StateData)
+ fsm_next_state(wait_for_session, NewStateData)
end;
wait_for_session(timeout, StateData) ->
@@ -1042,6 +1117,9 @@ wait_for_session({xmlstreamerror, _}, StateData) ->
wait_for_session(closed, StateData) ->
{stop, normal, StateData}.
+session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData)
+ when ?IS_STREAM_MGMT_TAG(Name) ->
+ fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData));
session_established({xmlstreamelement, El},
StateData) ->
FromJID = StateData#state.jid,
@@ -1080,9 +1158,10 @@ session_established(closed, StateData) ->
%% connection)
session_established2(El, StateData) ->
#xmlel{name = Name, attrs = Attrs} = El,
- User = StateData#state.user,
- Server = StateData#state.server,
- FromJID = StateData#state.jid,
+ NewStateData = update_num_stanzas_in(StateData, El),
+ User = NewStateData#state.user,
+ Server = NewStateData#state.server,
+ FromJID = NewStateData#state.jid,
To = xml:get_attr_s(<<"to">>, Attrs),
ToJID = case To of
<<"">> -> jlib:make_jid(User, Server, <<"">>);
@@ -1091,7 +1170,7 @@ session_established2(El, StateData) ->
NewEl1 = jlib:remove_attr(<<"xmlns">>, El),
NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of
<<"">> ->
- case StateData#state.lang of
+ case NewStateData#state.lang of
<<"">> -> NewEl1;
Lang ->
xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1)
@@ -1101,13 +1180,18 @@ session_established2(El, StateData) ->
NewState = case ToJID of
error ->
case xml:get_attr_s(<<"type">>, Attrs) of
- <<"error">> -> StateData;
- <<"result">> -> StateData;
+ <<"error">> -> NewStateData;
+ <<"result">> -> NewStateData;
_ ->
Err = jlib:make_error_reply(NewEl,
?ERR_JID_MALFORMED),
- send_element(StateData, Err),
- StateData
+ case is_stanza(Err) of
+ true ->
+ send_stanza(NewStateData, Err);
+ false ->
+ send_element(NewStateData, Err),
+ NewStateData
+ end
end;
_ ->
case Name of
@@ -1122,12 +1206,12 @@ session_established2(El, StateData) ->
#jid{user = User, server = Server,
resource = <<"">>} ->
?DEBUG("presence_update(~p,~n\t~p,~n\t~p)",
- [FromJID, PresenceEl, StateData]),
+ [FromJID, PresenceEl, NewStateData]),
presence_update(FromJID, PresenceEl,
- StateData);
+ NewStateData);
_ ->
presence_track(FromJID, ToJID, PresenceEl,
- StateData)
+ NewStateData)
end;
<<"iq">> ->
case jlib:iq_query_info(NewEl) of
@@ -1135,21 +1219,21 @@ session_established2(El, StateData) ->
when Xmlns == (?NS_PRIVACY);
Xmlns == (?NS_BLOCKING) ->
process_privacy_iq(FromJID, ToJID, IQ,
- StateData);
+ NewStateData);
_ ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData,
+ check_privacy_route(FromJID, NewStateData,
FromJID, ToJID, NewEl),
- StateData
+ NewStateData
end;
<<"message">> ->
ejabberd_hooks:run(user_send_packet, Server,
[FromJID, ToJID, NewEl]),
- check_privacy_route(FromJID, StateData, FromJID,
+ check_privacy_route(FromJID, NewStateData, FromJID,
ToJID, NewEl),
- StateData;
- _ -> StateData
+ NewStateData;
+ _ -> NewStateData
end
end,
ejabberd_hooks:run(c2s_loop_debug,
@@ -1263,13 +1347,13 @@ handle_info({route, _From, _To, {broadcast, Data}},
jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid,
jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
+ NewState = send_stanza(StateData, PrivPushEl),
fsm_next_state(StateName,
- StateData#state{privacy_list = NewPL})
+ NewState#state{privacy_list = NewPL})
end;
{blocking, What} ->
- route_blocking(What, StateData),
- fsm_next_state(StateName, StateData);
+ NewState = route_blocking(What, StateData),
+ fsm_next_state(StateName, NewState);
_ ->
fsm_next_state(StateName, StateData)
end;
@@ -1515,12 +1599,12 @@ handle_info({route, From, To,
jlib:replace_from_to_attrs(jlib:jid_to_string(From),
jlib:jid_to_string(To), NewAttrs),
FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els},
- send_element(StateData, FixedPacket),
+ SentStateData = send_stanza(StateData, FixedPacket),
ejabberd_hooks:run(user_receive_packet,
- StateData#state.server,
- [StateData#state.jid, From, To, FixedPacket]),
+ SentStateData#state.server,
+ [SentStateData#state.jid, From, To, FixedPacket]),
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- fsm_next_state(StateName, NewState);
+ fsm_next_state(StateName, SentStateData);
true ->
ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
fsm_next_state(StateName, NewState)
@@ -1643,6 +1727,7 @@ terminate(_Reason, StateName, StateData) ->
StateData#state.pres_i, Packet)
end
end,
+ resend_unacked_stanzas(StateData),
bounce_messages();
_ ->
ok
@@ -1674,6 +1759,13 @@ send_element(StateData, El) when StateData#state.xml_socket ->
send_element(StateData, El) ->
send_text(StateData, xml:element_to_binary(El)).
+send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined ->
+ send_stanza_and_ack_req(StateData, Stanza),
+ ack_queue_add(StateData, Stanza);
+send_stanza(StateData, Stanza) ->
+ send_element(StateData, Stanza),
+ StateData.
+
send_header(StateData, Server, Version, Lang)
when StateData#state.xml_socket ->
VersionAttr = case Version of
@@ -1727,6 +1819,19 @@ is_auth_packet(El) ->
_ -> false
end.
+is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>;
+ Name == <<"presence">>;
+ Name == <<"iq">> ->
+ case xml:get_attr(<<"xmlns">>, Attrs) of
+ {value, NS} when NS /= <<"jabber:client">>,
+ NS /= <<"jabber:server">> ->
+ false;
+ _ ->
+ true
+ end;
+is_stanza(_El) ->
+ false.
+
get_auth_tags([#xmlel{name = Name, children = Els} | L],
U, P, D, R) ->
CData = xml:get_cdata(Els),
@@ -1854,12 +1959,12 @@ presence_update(From, Packet, StateData) ->
ejabberd_hooks:run(user_available_hook,
NewStateData#state.server,
[NewStateData#state.jid]),
- if NewPriority >= 0 ->
- resend_offline_messages(NewStateData),
- resend_subscription_requests(NewStateData);
- true -> ok
- end,
- presence_broadcast_first(From, NewStateData,
+ ResentStateData = if NewPriority >= 0 ->
+ resend_offline_messages(NewStateData),
+ resend_subscription_requests(NewStateData);
+ true -> NewStateData
+ end,
+ presence_broadcast_first(From, ResentStateData,
Packet);
true ->
presence_broadcast_to_trusted(NewStateData, From,
@@ -2173,10 +2278,11 @@ resend_subscription_requests(#state{user = User,
PendingSubscriptions =
ejabberd_hooks:run_fold(resend_subscription_requests_hook,
Server, [], [User, Server]),
- lists:foreach(fun (XMLPacket) ->
- send_element(StateData, XMLPacket)
- end,
- PendingSubscriptions).
+ lists:foldl(fun (XMLPacket, AccStateData) ->
+ send_stanza(AccStateData, XMLPacket)
+ end,
+ StateData,
+ PendingSubscriptions).
get_showtag(undefined) -> <<"unavailable">>;
get_showtag(Presence) ->
@@ -2352,10 +2458,185 @@ route_blocking(What, StateData) ->
PrivPushEl =
jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid),
StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)),
- send_element(StateData, PrivPushEl),
%% No need to replace active privacy list here,
%% blocking pushes are always accompanied by
%% Privacy List pushes
+ send_stanza(StateData, PrivPushEl).
+
+%%%----------------------------------------------------------------------
+%%% XEP-0198
+%%%----------------------------------------------------------------------
+
+stream_mgmt_enabled(#state{ack_queue = none}) ->
+ false;
+stream_mgmt_enabled(_StateData) ->
+ true.
+
+negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) ->
+ %% XEP-0198 says: "For client-to-server connections, the client MUST NOT
+ %% attempt to enable stream management until after it has completed Resource
+ %% Binding". However, it also says: "Stream management errors SHOULD be
+ %% considered recoverable", so we won't bail out.
+ send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)),
+ StateData;
+negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) ->
+ case stream_mgmt_enabled(StateData) of
+ true ->
+ case Name of
+ <<"enable">> ->
+ handle_enable(StateData#state{sm_xmlns = Xmlns});
+ _ ->
+ Res = if Name == <<"a">>;
+ Name == <<"r">> ->
+ ?SM_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?SM_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ false ->
+ send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)),
+ StateData
+ end;
+ _ ->
+ send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)),
+ StateData
+ end.
+
+perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) ->
+ case xml:get_attr_s(<<"xmlns">>, Attrs) of
+ Xmlns when Xmlns == StateData#state.sm_xmlns ->
+ case Name of
+ <<"r">> ->
+ handle_r(StateData);
+ <<"a">> ->
+ handle_a(StateData, Attrs);
+ _ ->
+ Res = if Name == <<"enable">> ->
+ ?SM_UNEXPECTED_REQUEST(Xmlns);
+ true ->
+ ?SM_BAD_REQUEST(Xmlns)
+ end,
+ send_element(StateData, Res),
+ StateData
+ end;
+ _ ->
+ send_element(StateData,
+ ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)),
+ StateData
+ end.
+
+handle_enable(StateData) ->
+ ?INFO_MSG("Enabling XEP-0198 stream management for ~s",
+ [jlib:jid_to_string(StateData#state.jid)]),
+ Res = #xmlel{name = <<"enabled">>,
+ attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
+ children = []},
+ send_element(StateData, Res),
+ StateData#state{ack_queue = queue:new(),
+ manage_stream = fun perform_stream_mgmt/2}.
+
+handle_r(StateData) ->
+ H = jlib:integer_to_binary(StateData#state.n_stanzas_in),
+ Res = #xmlel{name = <<"a">>,
+ attrs = [{<<"xmlns">>, StateData#state.sm_xmlns},
+ {<<"h">>, H}],
+ children = []},
+ send_element(StateData, Res),
+ StateData.
+
+handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) ->
+ case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of
+ H when is_integer(H), H > NumStanzasOut ->
+ ?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent",
+ [jlib:jid_to_string(JID), H, NumStanzasOut]),
+ ack_queue_drop(StateData, NumStanzasOut);
+ H when is_integer(H), H >= 0 ->
+ ?DEBUG("~s acknowledged ~B of ~B stanzas",
+ [jlib:jid_to_string(JID), H, NumStanzasOut]),
+ ack_queue_drop(StateData, H);
+ _ ->
+ ?WARNING_MSG("Ignoring invalid ACK element from ~s",
+ [jlib:jid_to_string(JID)]),
+ StateData
+ end.
+
+update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined ->
+ NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of
+ {true, 4294967295} ->
+ 0;
+ {true, Num} ->
+ Num + 1;
+ {false, Num} ->
+ Num
+ end,
+ StateData#state{n_stanzas_in = NewNum};
+update_num_stanzas_in(StateData, _El) ->
+ StateData.
+
+send_stanza_and_ack_req(StateData, Stanza) ->
+ AckReq = #xmlel{name = <<"r">>,
+ attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}],
+ children = []},
+ StanzaS = xml:element_to_binary(Stanza),
+ AckReqS = xml:element_to_binary(AckReq),
+ send_text(StateData, [StanzaS, AckReqS]).
+
+ack_queue_add(StateData, El) ->
+ NewNum = case StateData#state.n_stanzas_out of
+ 4294967295 ->
+ 0;
+ Num ->
+ Num + 1
+ end,
+ NewState = limit_queue_length(StateData),
+ NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue),
+ NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}.
+
+ack_queue_drop(StateData, NumHandled) ->
+ NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end,
+ StateData#state.ack_queue),
+ StateData#state{ack_queue = NewQueue}.
+
+limit_queue_length(#state{max_ack_queue = Limit} = StateData)
+ when Limit == infinity;
+ Limit == unlimited ->
+ StateData;
+limit_queue_length(#state{jid = JID,
+ ack_queue = Queue,
+ max_ack_queue = Limit} = StateData) ->
+ case queue:len(Queue) >= Limit of
+ true ->
+ ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s",
+ [jlib:jid_to_string(JID)]),
+ limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)});
+ false ->
+ StateData
+ end.
+
+resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined ->
+ Queue = StateData#state.ack_queue,
+ case queue:len(Queue) of
+ 0 ->
+ ok;
+ N ->
+ ?INFO_MSG("Resending ~B unacknowledged stanzas to ~s",
+ [N, jlib:jid_to_string(StateData#state.jid)]),
+ lists:foreach(
+ fun({Num, #xmlel{attrs = Attrs} = El}) ->
+ From_s = xml:get_attr_s(<<"from">>, Attrs),
+ From = jlib:string_to_jid(From_s),
+ To_s = xml:get_attr_s(<<"to">>, Attrs),
+ To = jlib:string_to_jid(To_s),
+ ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s",
+ [Num, From_s, To_s]),
+ ejabberd_router:route(From, To, El)
+ end, queue:to_list(Queue))
+ end;
+resend_unacked_stanzas(_StateData) ->
ok.
%%%----------------------------------------------------------------------
diff --git a/src/jlib.erl b/src/jlib.erl
index 46e864b0c..a362697f4 100644
--- a/src/jlib.erl
+++ b/src/jlib.erl
@@ -51,7 +51,7 @@
binary_to_integer/1, binary_to_integer/2,
integer_to_binary/1, integer_to_binary/2,
atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1,
- l2i/1, i2l/1, i2l/2]).
+ l2i/1, i2l/1, i2l/2, queue_drop_while/2]).
%% TODO: Remove once XEP-0091 is Obsolete
%% TODO: Remove once XEP-0091 is Obsolete
@@ -894,3 +894,18 @@ i2l(L, N) when is_binary(L) ->
C when C > N -> L;
_ -> i2l(<<$0, L/binary>>, N)
end.
+
+-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue().
+
+queue_drop_while(F, Q) ->
+ case queue:peek(Q) of
+ {value, Item} ->
+ case F(Item) of
+ true ->
+ queue_drop_while(F, queue:drop(Q));
+ _ ->
+ Q
+ end;
+ empty ->
+ Q
+ end.