diff options
author | Holger Weiss <holger@zedat.fu-berlin.de> | 2014-03-12 23:34:14 +0100 |
---|---|---|
committer | Holger Weiß <holger@zedat.fu-berlin.de> | 2014-03-12 23:34:14 +0100 |
commit | 7d594086c3601c3281473b4ca54714b714027019 (patch) | |
tree | 2b2cd4d2d05a6a95430f1d55ee20e0b2b3397101 /src | |
parent | Provide header with latin-1 encoding in translations to work with Erlang/OTP R17 (diff) |
Add initial XEP-0198 support (EJAB-532)
Implement partial support for XEP-0198: Stream Management. After
successful negotiation of this feature, the server requests an ACK for
each stanza transmitted to the client and responds to ACK requests
issued by the client. On session termination, the server re-routes any
unacknowledged stanzas. The length of the pending queue can be limited
by setting the "max_ack_queue" option to some integer value (default:
500). XEP-0198 support can be disabled entirely by setting the
"stream_management" option to false (default: true).
So far, stream management is implemented only for c2s connections, and
the optional stream resumption feature also described in XEP-0198 is not
(yet) supported.
This addition was originally based on a patch provided by Magnus Henoch
and updated by Grzegorz Grasza. Their code implements an early draft of
XEP-0198 for some previous version of ejabberd. It has since been
rewritten almost entirely.
Diffstat (limited to 'src')
-rw-r--r-- | src/ejabberd_c2s.erl | 403 | ||||
-rw-r--r-- | src/jlib.erl | 17 |
2 files changed, 358 insertions, 62 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index ae5fc97b8..044afd0f4 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -108,6 +108,12 @@ auth_module = unknown, ip, aux_fields = [], + sm_xmlns, + ack_queue, + max_ack_queue, + manage_stream = fun negotiate_stream_mgmt/2, + n_stanzas_in = 0, + n_stanzas_out = 0, lang}). %-define(DBGFSM, true). @@ -156,6 +162,35 @@ -define(INVALID_FROM, ?SERR_INVALID_FROM). +%% XEP-0198: + +-define(IS_STREAM_MGMT_TAG(Name), + Name == <<"enable">>; + Name == <<"a">>; + Name == <<"r">>). + +-define(IS_SUPPORTED_SM_XMLNS(Xmlns), + Xmlns == ?NS_STREAM_MGMT_2; + Xmlns == ?NS_STREAM_MGMT_3). + +-define(SM_FAILED(Condition, Xmlns), + #xmlel{name = <<"failed">>, + attrs = [{<<"xmlns">>, Xmlns}], + children = [#xmlel{name = Condition, + attrs = [{<<"xmlns">>, ?NS_STANZAS}], + children = []}]}). + +-define(SM_BAD_REQUEST(Xmlns), + ?SM_FAILED(<<"bad-request">>, Xmlns)). + +-define(SM_SERVICE_UNAVAILABLE(Xmlns), + ?SM_FAILED(<<"service-unavailable">>, Xmlns)). + +-define(SM_UNEXPECTED_REQUEST(Xmlns), + ?SM_FAILED(<<"unexpected-request">>, Xmlns)). + +-define(SM_UNSUPPORTED_VERSION(Xmlns), + ?SM_FAILED(<<"unsupported-version">>, Xmlns)). %%%---------------------------------------------------------------------- %%% API @@ -250,6 +285,16 @@ init([{SockMod, Socket}, Opts]) -> true -> TLSOpts1 end, TLSOpts = [verify_none | TLSOpts2], + MaxAckQueue = case proplists:get_value(max_ack_queue, Opts) of + Limit when is_integer(Limit), Limit > 0 -> Limit; + _ -> 500 + end, + StreamMgmtEnabled = proplists:get_value(stream_management, Opts, true), + AckQueue = if not StreamMgmtEnabled -> + none; + true -> + undefined + end, IP = peerip(SockMod, Socket), %% Check if IP is blacklisted: case is_ip_blacklisted(IP) of @@ -271,7 +316,8 @@ init([{SockMod, Socket}, Opts]) -> tls_required = StartTLSRequired, tls_enabled = TLSEnabled, tls_options = TLSOpts, streamid = new_id(), access = Access, - shaper = Shaper, ip = IP}, + shaper = Shaper, ip = IP, + ack_queue = AckQueue, max_ack_queue = MaxAckQueue}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT} end. @@ -402,6 +448,18 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> ejabberd_hooks:run_fold(roster_get_versioning_feature, Server, [], [Server]), + StreamManagementFeature = + case stream_mgmt_enabled(StateData) of + true -> + [#xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_2}], + children = []}, + #xmlel{name = <<"sm">>, + attrs = [{<<"xmlns">>, ?NS_STREAM_MGMT_3}], + children = []}]; + false -> + [] + end, StreamFeatures = [#xmlel{name = <<"bind">>, attrs = [{<<"xmlns">>, ?NS_BIND}], children = []}, @@ -410,6 +468,7 @@ wait_for_stream({xmlstreamstart, _Name, Attrs}, StateData) -> children = []}] ++ RosterVersioningFeature ++ + StreamManagementFeature ++ ejabberd_hooks:run_fold(c2s_stream_features, Server, [], [Server]), send_element(StateData, @@ -472,6 +531,9 @@ wait_for_stream({xmlstreamerror, _}, StateData) -> wait_for_stream(closed, StateData) -> {stop, normal, StateData}. +wait_for_auth({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_auth, (StateData#state.manage_stream)(El, StateData)); wait_for_auth({xmlstreamelement, El}, StateData) -> case is_auth_packet(El) of {auth, _ID, get, {U, _, _, _}} -> @@ -618,6 +680,9 @@ wait_for_auth({xmlstreamerror, _}, StateData) -> wait_for_auth(closed, StateData) -> {stop, normal, StateData}. +wait_for_feature_request({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_feature_request, (StateData#state.manage_stream)(El, StateData)); wait_for_feature_request({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -783,6 +848,9 @@ wait_for_feature_request({xmlstreamerror, _}, wait_for_feature_request(closed, StateData) -> {stop, normal, StateData}. +wait_for_sasl_response({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_sasl_response, (StateData#state.manage_stream)(El, StateData)); wait_for_sasl_response({xmlstreamelement, El}, StateData) -> #xmlel{name = Name, attrs = Attrs, children = Els} = El, @@ -913,6 +981,9 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. +wait_for_bind({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_bind, (StateData#state.manage_stream)(El, StateData)); wait_for_bind({xmlstreamelement, El}, StateData) -> case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_BIND, sub_el = SubEl} = @@ -974,61 +1045,65 @@ wait_for_bind({xmlstreamerror, _}, StateData) -> wait_for_bind(closed, StateData) -> {stop, normal, StateData}. +wait_for_session({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(wait_for_session, (StateData#state.manage_stream)(El, StateData)); wait_for_session({xmlstreamelement, El}, StateData) -> + NewStateData = update_num_stanzas_in(StateData, El), case jlib:iq_query_info(El) of #iq{type = set, xmlns = ?NS_SESSION} -> - U = StateData#state.user, - R = StateData#state.resource, - JID = StateData#state.jid, - case acl:match_rule(StateData#state.server, - StateData#state.access, JID) of + U = NewStateData#state.user, + R = NewStateData#state.resource, + JID = NewStateData#state.jid, + case acl:match_rule(NewStateData#state.server, + NewStateData#state.access, JID) of allow -> ?INFO_MSG("(~w) Opened session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Res = jlib:make_result_iq_reply(El#xmlel{children = []}), - send_element(StateData, Res), - change_shaper(StateData, JID), + NewState = send_stanza(NewStateData, Res), + change_shaper(NewState, JID), {Fs, Ts} = ejabberd_hooks:run_fold( roster_get_subscription_lists, - StateData#state.server, + NewState#state.server, {[], []}, - [U, StateData#state.server]), + [U, NewState#state.server]), LJID = jlib:jid_tolower(jlib:jid_remove_resource(JID)), Fs1 = [LJID | Fs], Ts1 = [LJID | Ts], PrivList = ejabberd_hooks:run_fold( - privacy_get_user_list, StateData#state.server, + privacy_get_user_list, NewState#state.server, #userlist{}, - [U, StateData#state.server]), + [U, NewState#state.server]), SID = {now(), self()}, - Conn = get_conn_type(StateData), - Info = [{ip, StateData#state.ip}, {conn, Conn}, - {auth_module, StateData#state.auth_module}], + Conn = get_conn_type(NewState), + Info = [{ip, NewState#state.ip}, {conn, Conn}, + {auth_module, NewState#state.auth_module}], ejabberd_sm:open_session( - SID, U, StateData#state.server, R, Info), - NewStateData = - StateData#state{ + SID, U, NewState#state.server, R, Info), + UpdatedStateData = + NewState#state{ sid = SID, conn = Conn, pres_f = ?SETS:from_list(Fs1), pres_t = ?SETS:from_list(Ts1), privacy_list = PrivList}, fsm_next_state_pack(session_established, - NewStateData); + UpdatedStateData); _ -> ejabberd_hooks:run(forbidden_session_hook, - StateData#state.server, [JID]), + NewStateData#state.server, [JID]), ?INFO_MSG("(~w) Forbidden session for ~s", - [StateData#state.socket, + [NewStateData#state.socket, jlib:jid_to_string(JID)]), Err = jlib:make_error_reply(El, ?ERR_NOT_ALLOWED), - send_element(StateData, Err), - fsm_next_state(wait_for_session, StateData) + send_element(NewStateData, Err), + fsm_next_state(wait_for_session, NewStateData) end; _ -> - fsm_next_state(wait_for_session, StateData) + fsm_next_state(wait_for_session, NewStateData) end; wait_for_session(timeout, StateData) -> @@ -1042,6 +1117,9 @@ wait_for_session({xmlstreamerror, _}, StateData) -> wait_for_session(closed, StateData) -> {stop, normal, StateData}. +session_established({xmlstreamelement, #xmlel{name = Name} = El}, StateData) + when ?IS_STREAM_MGMT_TAG(Name) -> + fsm_next_state(session_established, (StateData#state.manage_stream)(El, StateData)); session_established({xmlstreamelement, El}, StateData) -> FromJID = StateData#state.jid, @@ -1080,9 +1158,10 @@ session_established(closed, StateData) -> %% connection) session_established2(El, StateData) -> #xmlel{name = Name, attrs = Attrs} = El, - User = StateData#state.user, - Server = StateData#state.server, - FromJID = StateData#state.jid, + NewStateData = update_num_stanzas_in(StateData, El), + User = NewStateData#state.user, + Server = NewStateData#state.server, + FromJID = NewStateData#state.jid, To = xml:get_attr_s(<<"to">>, Attrs), ToJID = case To of <<"">> -> jlib:make_jid(User, Server, <<"">>); @@ -1091,7 +1170,7 @@ session_established2(El, StateData) -> NewEl1 = jlib:remove_attr(<<"xmlns">>, El), NewEl = case xml:get_attr_s(<<"xml:lang">>, Attrs) of <<"">> -> - case StateData#state.lang of + case NewStateData#state.lang of <<"">> -> NewEl1; Lang -> xml:replace_tag_attr(<<"xml:lang">>, Lang, NewEl1) @@ -1101,13 +1180,18 @@ session_established2(El, StateData) -> NewState = case ToJID of error -> case xml:get_attr_s(<<"type">>, Attrs) of - <<"error">> -> StateData; - <<"result">> -> StateData; + <<"error">> -> NewStateData; + <<"result">> -> NewStateData; _ -> Err = jlib:make_error_reply(NewEl, ?ERR_JID_MALFORMED), - send_element(StateData, Err), - StateData + case is_stanza(Err) of + true -> + send_stanza(NewStateData, Err); + false -> + send_element(NewStateData, Err), + NewStateData + end end; _ -> case Name of @@ -1122,12 +1206,12 @@ session_established2(El, StateData) -> #jid{user = User, server = Server, resource = <<"">>} -> ?DEBUG("presence_update(~p,~n\t~p,~n\t~p)", - [FromJID, PresenceEl, StateData]), + [FromJID, PresenceEl, NewStateData]), presence_update(FromJID, PresenceEl, - StateData); + NewStateData); _ -> presence_track(FromJID, ToJID, PresenceEl, - StateData) + NewStateData) end; <<"iq">> -> case jlib:iq_query_info(NewEl) of @@ -1135,21 +1219,21 @@ session_established2(El, StateData) -> when Xmlns == (?NS_PRIVACY); Xmlns == (?NS_BLOCKING) -> process_privacy_iq(FromJID, ToJID, IQ, - StateData); + NewStateData); _ -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData + NewStateData end; <<"message">> -> ejabberd_hooks:run(user_send_packet, Server, [FromJID, ToJID, NewEl]), - check_privacy_route(FromJID, StateData, FromJID, + check_privacy_route(FromJID, NewStateData, FromJID, ToJID, NewEl), - StateData; - _ -> StateData + NewStateData; + _ -> NewStateData end end, ejabberd_hooks:run(c2s_loop_debug, @@ -1263,13 +1347,13 @@ handle_info({route, _From, _To, {broadcast, Data}}, jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), + NewState = send_stanza(StateData, PrivPushEl), fsm_next_state(StateName, - StateData#state{privacy_list = NewPL}) + NewState#state{privacy_list = NewPL}) end; {blocking, What} -> - route_blocking(What, StateData), - fsm_next_state(StateName, StateData); + NewState = route_blocking(What, StateData), + fsm_next_state(StateName, NewState); _ -> fsm_next_state(StateName, StateData) end; @@ -1515,12 +1599,12 @@ handle_info({route, From, To, jlib:replace_from_to_attrs(jlib:jid_to_string(From), jlib:jid_to_string(To), NewAttrs), FixedPacket = #xmlel{name = Name, attrs = Attrs2, children = Els}, - send_element(StateData, FixedPacket), + SentStateData = send_stanza(StateData, FixedPacket), ejabberd_hooks:run(user_receive_packet, - StateData#state.server, - [StateData#state.jid, From, To, FixedPacket]), + SentStateData#state.server, + [SentStateData#state.jid, From, To, FixedPacket]), ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - fsm_next_state(StateName, NewState); + fsm_next_state(StateName, SentStateData); true -> ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), fsm_next_state(StateName, NewState) @@ -1643,6 +1727,7 @@ terminate(_Reason, StateName, StateData) -> StateData#state.pres_i, Packet) end end, + resend_unacked_stanzas(StateData), bounce_messages(); _ -> ok @@ -1674,6 +1759,13 @@ send_element(StateData, El) when StateData#state.xml_socket -> send_element(StateData, El) -> send_text(StateData, xml:element_to_binary(El)). +send_stanza(StateData, Stanza) when StateData#state.sm_xmlns /= undefined -> + send_stanza_and_ack_req(StateData, Stanza), + ack_queue_add(StateData, Stanza); +send_stanza(StateData, Stanza) -> + send_element(StateData, Stanza), + StateData. + send_header(StateData, Server, Version, Lang) when StateData#state.xml_socket -> VersionAttr = case Version of @@ -1727,6 +1819,19 @@ is_auth_packet(El) -> _ -> false end. +is_stanza(#xmlel{name = Name, attrs = Attrs}) when Name == <<"message">>; + Name == <<"presence">>; + Name == <<"iq">> -> + case xml:get_attr(<<"xmlns">>, Attrs) of + {value, NS} when NS /= <<"jabber:client">>, + NS /= <<"jabber:server">> -> + false; + _ -> + true + end; +is_stanza(_El) -> + false. + get_auth_tags([#xmlel{name = Name, children = Els} | L], U, P, D, R) -> CData = xml:get_cdata(Els), @@ -1854,12 +1959,12 @@ presence_update(From, Packet, StateData) -> ejabberd_hooks:run(user_available_hook, NewStateData#state.server, [NewStateData#state.jid]), - if NewPriority >= 0 -> - resend_offline_messages(NewStateData), - resend_subscription_requests(NewStateData); - true -> ok - end, - presence_broadcast_first(From, NewStateData, + ResentStateData = if NewPriority >= 0 -> + resend_offline_messages(NewStateData), + resend_subscription_requests(NewStateData); + true -> NewStateData + end, + presence_broadcast_first(From, ResentStateData, Packet); true -> presence_broadcast_to_trusted(NewStateData, From, @@ -2173,10 +2278,11 @@ resend_subscription_requests(#state{user = User, PendingSubscriptions = ejabberd_hooks:run_fold(resend_subscription_requests_hook, Server, [], [User, Server]), - lists:foreach(fun (XMLPacket) -> - send_element(StateData, XMLPacket) - end, - PendingSubscriptions). + lists:foldl(fun (XMLPacket, AccStateData) -> + send_stanza(AccStateData, XMLPacket) + end, + StateData, + PendingSubscriptions). get_showtag(undefined) -> <<"unavailable">>; get_showtag(Presence) -> @@ -2352,10 +2458,185 @@ route_blocking(What, StateData) -> PrivPushEl = jlib:replace_from_to(jlib:jid_remove_resource(StateData#state.jid), StateData#state.jid, jlib:iq_to_xml(PrivPushIQ)), - send_element(StateData, PrivPushEl), %% No need to replace active privacy list here, %% blocking pushes are always accompanied by %% Privacy List pushes + send_stanza(StateData, PrivPushEl). + +%%%---------------------------------------------------------------------- +%%% XEP-0198 +%%%---------------------------------------------------------------------- + +stream_mgmt_enabled(#state{ack_queue = none}) -> + false; +stream_mgmt_enabled(_StateData) -> + true. + +negotiate_stream_mgmt(_El, #state{resource = <<"">>} = StateData) -> + %% XEP-0198 says: "For client-to-server connections, the client MUST NOT + %% attempt to enable stream management until after it has completed Resource + %% Binding". However, it also says: "Stream management errors SHOULD be + %% considered recoverable", so we won't bail out. + send_element(StateData, ?SM_UNEXPECTED_REQUEST(?NS_STREAM_MGMT_3)), + StateData; +negotiate_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when ?IS_SUPPORTED_SM_XMLNS(Xmlns) -> + case stream_mgmt_enabled(StateData) of + true -> + case Name of + <<"enable">> -> + handle_enable(StateData#state{sm_xmlns = Xmlns}); + _ -> + Res = if Name == <<"a">>; + Name == <<"r">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + false -> + send_element(StateData, ?SM_SERVICE_UNAVAILABLE(Xmlns)), + StateData + end; + _ -> + send_element(StateData, ?SM_UNSUPPORTED_VERSION(?NS_STREAM_MGMT_3)), + StateData + end. + +perform_stream_mgmt(#xmlel{name = Name, attrs = Attrs}, StateData) -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + Xmlns when Xmlns == StateData#state.sm_xmlns -> + case Name of + <<"r">> -> + handle_r(StateData); + <<"a">> -> + handle_a(StateData, Attrs); + _ -> + Res = if Name == <<"enable">> -> + ?SM_UNEXPECTED_REQUEST(Xmlns); + true -> + ?SM_BAD_REQUEST(Xmlns) + end, + send_element(StateData, Res), + StateData + end; + _ -> + send_element(StateData, + ?SM_UNSUPPORTED_VERSION(StateData#state.sm_xmlns)), + StateData + end. + +handle_enable(StateData) -> + ?INFO_MSG("Enabling XEP-0198 stream management for ~s", + [jlib:jid_to_string(StateData#state.jid)]), + Res = #xmlel{name = <<"enabled">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + send_element(StateData, Res), + StateData#state{ack_queue = queue:new(), + manage_stream = fun perform_stream_mgmt/2}. + +handle_r(StateData) -> + H = jlib:integer_to_binary(StateData#state.n_stanzas_in), + Res = #xmlel{name = <<"a">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}, + {<<"h">>, H}], + children = []}, + send_element(StateData, Res), + StateData. + +handle_a(#state{jid = JID, n_stanzas_out = NumStanzasOut} = StateData, Attrs) -> + case catch jlib:binary_to_integer(xml:get_attr_s(<<"h">>, Attrs)) of + H when is_integer(H), H > NumStanzasOut -> + ?WARNING_MSG("~s acknowledged ~B stanzas, but only ~B were sent", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, NumStanzasOut); + H when is_integer(H), H >= 0 -> + ?DEBUG("~s acknowledged ~B of ~B stanzas", + [jlib:jid_to_string(JID), H, NumStanzasOut]), + ack_queue_drop(StateData, H); + _ -> + ?WARNING_MSG("Ignoring invalid ACK element from ~s", + [jlib:jid_to_string(JID)]), + StateData + end. + +update_num_stanzas_in(StateData, El) when StateData#state.sm_xmlns /= undefined -> + NewNum = case {is_stanza(El), StateData#state.n_stanzas_in} of + {true, 4294967295} -> + 0; + {true, Num} -> + Num + 1; + {false, Num} -> + Num + end, + StateData#state{n_stanzas_in = NewNum}; +update_num_stanzas_in(StateData, _El) -> + StateData. + +send_stanza_and_ack_req(StateData, Stanza) -> + AckReq = #xmlel{name = <<"r">>, + attrs = [{<<"xmlns">>, StateData#state.sm_xmlns}], + children = []}, + StanzaS = xml:element_to_binary(Stanza), + AckReqS = xml:element_to_binary(AckReq), + send_text(StateData, [StanzaS, AckReqS]). + +ack_queue_add(StateData, El) -> + NewNum = case StateData#state.n_stanzas_out of + 4294967295 -> + 0; + Num -> + Num + 1 + end, + NewState = limit_queue_length(StateData), + NewQueue = queue:in({NewNum, El}, NewState#state.ack_queue), + NewState#state{ack_queue = NewQueue, n_stanzas_out = NewNum}. + +ack_queue_drop(StateData, NumHandled) -> + NewQueue = jlib:queue_drop_while(fun({N, _Stanza}) -> N =< NumHandled end, + StateData#state.ack_queue), + StateData#state{ack_queue = NewQueue}. + +limit_queue_length(#state{max_ack_queue = Limit} = StateData) + when Limit == infinity; + Limit == unlimited -> + StateData; +limit_queue_length(#state{jid = JID, + ack_queue = Queue, + max_ack_queue = Limit} = StateData) -> + case queue:len(Queue) >= Limit of + true -> + ?WARNING_MSG("Dropping stanza from too long ACK queue for ~s", + [jlib:jid_to_string(JID)]), + limit_queue_length(StateData#state{ack_queue = queue:drop(Queue)}); + false -> + StateData + end. + +resend_unacked_stanzas(StateData) when StateData#state.sm_xmlns /= undefined -> + Queue = StateData#state.ack_queue, + case queue:len(Queue) of + 0 -> + ok; + N -> + ?INFO_MSG("Resending ~B unacknowledged stanzas to ~s", + [N, jlib:jid_to_string(StateData#state.jid)]), + lists:foreach( + fun({Num, #xmlel{attrs = Attrs} = El}) -> + From_s = xml:get_attr_s(<<"from">>, Attrs), + From = jlib:string_to_jid(From_s), + To_s = xml:get_attr_s(<<"to">>, Attrs), + To = jlib:string_to_jid(To_s), + ?DEBUG("Resending unacknowledged stanza #~B from ~s to ~s", + [Num, From_s, To_s]), + ejabberd_router:route(From, To, El) + end, queue:to_list(Queue)) + end; +resend_unacked_stanzas(_StateData) -> ok. %%%---------------------------------------------------------------------- diff --git a/src/jlib.erl b/src/jlib.erl index 46e864b0c..a362697f4 100644 --- a/src/jlib.erl +++ b/src/jlib.erl @@ -51,7 +51,7 @@ binary_to_integer/1, binary_to_integer/2, integer_to_binary/1, integer_to_binary/2, atom_to_binary/1, binary_to_atom/1, tuple_to_binary/1, - l2i/1, i2l/1, i2l/2]). + l2i/1, i2l/1, i2l/2, queue_drop_while/2]). %% TODO: Remove once XEP-0091 is Obsolete %% TODO: Remove once XEP-0091 is Obsolete @@ -894,3 +894,18 @@ i2l(L, N) when is_binary(L) -> C when C > N -> L; _ -> i2l(<<$0, L/binary>>, N) end. + +-spec queue_drop_while(fun((term()) -> boolean()), queue()) -> queue(). + +queue_drop_while(F, Q) -> + case queue:peek(Q) of + {value, Item} -> + case F(Item) of + true -> + queue_drop_while(F, queue:drop(Q)); + _ -> + Q + end; + empty -> + Q + end. |