diff options
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r-- | src/ejabberd_c2s.erl | 150 |
1 files changed, 70 insertions, 80 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 080880bec..bda3bbd5f 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -104,7 +104,6 @@ ip, aux_fields = [], csi_state = active, - csi_queue = [], mgmt_state, mgmt_xmlns, mgmt_queue, @@ -167,27 +166,32 @@ (Xmlns == ?NS_STREAM_MGMT_2) or (Xmlns == ?NS_STREAM_MGMT_3)). --define(MGMT_FAILED(Condition, Xmlns), +-define(MGMT_FAILED(Condition, Attrs), #xmlel{name = <<"failed">>, - attrs = [{<<"xmlns">>, Xmlns}], + attrs = Attrs, children = [#xmlel{name = Condition, attrs = [{<<"xmlns">>, ?NS_STANZAS}], children = []}]}). -define(MGMT_BAD_REQUEST(Xmlns), - ?MGMT_FAILED(<<"bad-request">>, Xmlns)). - --define(MGMT_ITEM_NOT_FOUND(Xmlns), - ?MGMT_FAILED(<<"item-not-found">>, Xmlns)). + ?MGMT_FAILED(<<"bad-request">>, [{<<"xmlns">>, Xmlns}])). -define(MGMT_SERVICE_UNAVAILABLE(Xmlns), - ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)). + ?MGMT_FAILED(<<"service-unavailable">>, [{<<"xmlns">>, Xmlns}])). -define(MGMT_UNEXPECTED_REQUEST(Xmlns), - ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)). + ?MGMT_FAILED(<<"unexpected-request">>, [{<<"xmlns">>, Xmlns}])). -define(MGMT_UNSUPPORTED_VERSION(Xmlns), - ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)). + ?MGMT_FAILED(<<"unsupported-version">>, [{<<"xmlns">>, Xmlns}])). + +-define(MGMT_ITEM_NOT_FOUND(Xmlns), + ?MGMT_FAILED(<<"item-not-found">>, [{<<"xmlns">>, Xmlns}])). + +-define(MGMT_ITEM_NOT_FOUND_H(Xmlns, NumStanzasIn), + ?MGMT_FAILED(<<"item-not-found">>, + [{<<"xmlns">>, Xmlns}, + {<<"h">>, jlib:integer_to_binary(NumStanzasIn)}])). %%%---------------------------------------------------------------------- %%% API @@ -620,9 +624,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) -> {auth, _ID, set, {U, P, D, R}} -> JID = jid:make(U, StateData#state.server, R), case JID /= error andalso - acl:match_rule(StateData#state.server, - StateData#state.access, JID) - == allow + acl:access_matches(StateData#state.access, + #{usr => jid:split(JID), ip => StateData#state.ip}, + StateData#state.server) == allow of true -> DGen = fun (PW) -> @@ -1099,8 +1103,10 @@ open_session(StateData) -> R = StateData#state.resource, JID = StateData#state.jid, Lang = StateData#state.lang, - case acl:match_rule(StateData#state.server, - StateData#state.access, JID) of + IP = StateData#state.ip, + case acl:access_matches(StateData#state.access, + #{usr => jid:split(JID), ip => IP}, + StateData#state.server) of allow -> ?INFO_MSG("(~w) Opened session for ~s", [StateData#state.socket, jid:to_string(JID)]), @@ -1147,7 +1153,7 @@ session_established({xmlstreamelement, #xmlel{name = <<"active">>, attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}]}}, StateData) -> - NewStateData = csi_queue_flush(StateData), + NewStateData = csi_flush_queue(StateData), fsm_next_state(session_established, NewStateData#state{csi_state = active}); session_established({xmlstreamelement, #xmlel{name = <<"inactive">>, @@ -1281,7 +1287,7 @@ wait_for_resume({xmlstreamelement, _El} = Event, StateData) -> wait_for_resume(timeout, StateData) -> ?DEBUG("Timed out waiting for resumption of stream for ~s", [jid:to_string(StateData#state.jid)]), - {stop, normal, StateData}; + {stop, normal, StateData#state{mgmt_state = timeout}}; wait_for_resume(Event, StateData) -> ?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]), fsm_next_state(wait_for_resume, StateData). @@ -1792,6 +1798,18 @@ terminate(_Reason, StateName, StateData) -> presence_broadcast(StateData, From, StateData#state.pres_a, Packet) end, + case StateData#state.mgmt_state of + timeout -> + Info = [{num_stanzas_in, + StateData#state.mgmt_stanzas_in}], + ejabberd_sm:set_offline_info(StateData#state.sid, + StateData#state.user, + StateData#state.server, + StateData#state.resource, + Info); + _ -> + ok + end, handle_unacked_stanzas(StateData) end, bounce_messages(); @@ -1808,8 +1826,9 @@ terminate(_Reason, StateName, StateData) -> %%%---------------------------------------------------------------------- change_shaper(StateData, JID) -> - Shaper = acl:match_rule(StateData#state.server, - StateData#state.shaper, JID), + Shaper = acl:access_matches(StateData#state.shaper, + #{usr => jid:split(JID), ip => StateData#state.ip}, + StateData#state.server), (StateData#state.sockmod):change_shaper(StateData#state.socket, Shaper). @@ -2727,6 +2746,8 @@ handle_resume(StateData, Attrs) -> case inherit_session_state(StateData, PrevID) of {ok, InheritedState} -> {ok, InheritedState, H}; + {error, Err, InH} -> + {error, ?MGMT_ITEM_NOT_FOUND_H(Xmlns, InH), Err}; {error, Err} -> {error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err} end; @@ -2763,7 +2784,7 @@ handle_resume(StateData, Attrs) -> #xmlel{name = <<"r">>, attrs = [{<<"xmlns">>, AttrXmlns}], children = []}), - FlushedState = csi_queue_flush(NewState), + FlushedState = csi_flush_queue(NewState), NewStateData = FlushedState#state{csi_state = active}, ?INFO_MSG("Resumed session for ~s", [jid:to_string(NewStateData#state.jid)]), @@ -2966,7 +2987,17 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> {term, {R, Time}} -> case ejabberd_sm:get_session_pid(U, S, R) of none -> - {error, <<"Previous session PID not found">>}; + case ejabberd_sm:get_offline_info(Time, U, S, R) of + none -> + {error, <<"Previous session PID not found">>}; + Info -> + case proplists:get_value(num_stanzas_in, Info) of + undefined -> + {error, <<"Previous session timed out">>}; + H -> + {error, <<"Previous session timed out">>, H} + end + end; OldPID -> OldSID = {Time, OldPID}, case catch resume_session(OldSID) of @@ -2995,7 +3026,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> privacy_list = OldStateData#state.privacy_list, aux_fields = OldStateData#state.aux_fields, csi_state = OldStateData#state.csi_state, - csi_queue = OldStateData#state.csi_queue, mgmt_xmlns = OldStateData#state.mgmt_xmlns, mgmt_queue = OldStateData#state.mgmt_queue, mgmt_timeout = OldStateData#state.mgmt_timeout, @@ -3028,65 +3058,25 @@ add_resent_delay_info(#state{server = From}, El, Time) -> %%% XEP-0352 %%%---------------------------------------------------------------------- -csi_filter_stanza(#state{csi_state = CsiState, jid = JID} = StateData, +csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData, Stanza) -> - Action = ejabberd_hooks:run_fold(csi_filter_stanza, - StateData#state.server, - send, [Stanza]), - ?DEBUG("Going to ~p stanza for inactive client ~p", - [Action, jid:to_string(JID)]), - case Action of - queue -> csi_queue_add(StateData, Stanza); - drop -> StateData; - send -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - StateData1 = csi_queue_send(StateData, From), - StateData2 = send_stanza(StateData1#state{csi_state = active}, - Stanza), - StateData2#state{csi_state = CsiState} - end. - -csi_queue_add(#state{csi_queue = Queue} = StateData, Stanza) -> - case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of - true -> csi_queue_add(csi_queue_flush(StateData), Stanza); - false -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - NewQueue = lists:keystore(From, 1, Queue, {From, p1_time_compat:timestamp(), Stanza}), - StateData#state{csi_queue = NewQueue} - end. - -csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState, server = Host} = - StateData, From) -> - case lists:keytake(From, 1, Queue) of - {value, {From, Time, Stanza}, NewQueue} -> - NewStanza = jlib:add_delay_info(Stanza, Host, Time, - <<"Client Inactive">>), - NewStateData = send_stanza(StateData#state{csi_state = active}, - NewStanza), - NewStateData#state{csi_queue = NewQueue, csi_state = CsiState}; - false -> StateData - end. - -csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID, - server = Host} = StateData) -> - ?DEBUG("Flushing CSI queue for ~s", [jid:to_string(JID)]), - NewStateData = - lists:foldl(fun({_From, Time, Stanza}, AccState) -> - NewStanza = - jlib:add_delay_info(Stanza, Host, Time, - <<"Client Inactive">>), - send_stanza(AccState, NewStanza) - end, StateData#state{csi_state = active}, Queue), - NewStateData#state{csi_queue = [], csi_state = CsiState}. - -%% Make sure we won't push too many messages to the XEP-0198 queue when the -%% client becomes 'active' again. Otherwise, the client might not manage to -%% acknowledge the message flood in time. Also, don't let the queue grow to -%% more than 100 stanzas. -csi_max_queue(#state{mgmt_max_queue = infinity}) -> 100; -csi_max_queue(#state{mgmt_max_queue = Max}) when Max > 200 -> 100; -csi_max_queue(#state{mgmt_max_queue = Max}) when Max < 2 -> 1; -csi_max_queue(#state{mgmt_max_queue = Max}) -> Max div 2. + {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server, + {StateData, [Stanza]}, + [Server, Stanza]), + StateData2 = lists:foldl(fun(CurStanza, AccState) -> + send_stanza(AccState, CurStanza) + end, StateData1#state{csi_state = active}, + Stanzas), + StateData2#state{csi_state = CsiState}. + +csi_flush_queue(#state{csi_state = CsiState, server = Server} = StateData) -> + {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server, + {StateData, []}, [Server]), + StateData2 = lists:foldl(fun(CurStanza, AccState) -> + send_stanza(AccState, CurStanza) + end, StateData1#state{csi_state = active}, + Stanzas), + StateData2#state{csi_state = CsiState}. %%%---------------------------------------------------------------------- %%% JID Set memory footprint reduction code |