aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_c2s.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r--src/ejabberd_c2s.erl150
1 files changed, 70 insertions, 80 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 080880bec..bda3bbd5f 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -104,7 +104,6 @@
ip,
aux_fields = [],
csi_state = active,
- csi_queue = [],
mgmt_state,
mgmt_xmlns,
mgmt_queue,
@@ -167,27 +166,32 @@
(Xmlns == ?NS_STREAM_MGMT_2) or
(Xmlns == ?NS_STREAM_MGMT_3)).
--define(MGMT_FAILED(Condition, Xmlns),
+-define(MGMT_FAILED(Condition, Attrs),
#xmlel{name = <<"failed">>,
- attrs = [{<<"xmlns">>, Xmlns}],
+ attrs = Attrs,
children = [#xmlel{name = Condition,
attrs = [{<<"xmlns">>, ?NS_STANZAS}],
children = []}]}).
-define(MGMT_BAD_REQUEST(Xmlns),
- ?MGMT_FAILED(<<"bad-request">>, Xmlns)).
-
--define(MGMT_ITEM_NOT_FOUND(Xmlns),
- ?MGMT_FAILED(<<"item-not-found">>, Xmlns)).
+ ?MGMT_FAILED(<<"bad-request">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_SERVICE_UNAVAILABLE(Xmlns),
- ?MGMT_FAILED(<<"service-unavailable">>, Xmlns)).
+ ?MGMT_FAILED(<<"service-unavailable">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_UNEXPECTED_REQUEST(Xmlns),
- ?MGMT_FAILED(<<"unexpected-request">>, Xmlns)).
+ ?MGMT_FAILED(<<"unexpected-request">>, [{<<"xmlns">>, Xmlns}])).
-define(MGMT_UNSUPPORTED_VERSION(Xmlns),
- ?MGMT_FAILED(<<"unsupported-version">>, Xmlns)).
+ ?MGMT_FAILED(<<"unsupported-version">>, [{<<"xmlns">>, Xmlns}])).
+
+-define(MGMT_ITEM_NOT_FOUND(Xmlns),
+ ?MGMT_FAILED(<<"item-not-found">>, [{<<"xmlns">>, Xmlns}])).
+
+-define(MGMT_ITEM_NOT_FOUND_H(Xmlns, NumStanzasIn),
+ ?MGMT_FAILED(<<"item-not-found">>,
+ [{<<"xmlns">>, Xmlns},
+ {<<"h">>, jlib:integer_to_binary(NumStanzasIn)}])).
%%%----------------------------------------------------------------------
%%% API
@@ -620,9 +624,9 @@ wait_for_auth({xmlstreamelement, El}, StateData) ->
{auth, _ID, set, {U, P, D, R}} ->
JID = jid:make(U, StateData#state.server, R),
case JID /= error andalso
- acl:match_rule(StateData#state.server,
- StateData#state.access, JID)
- == allow
+ acl:access_matches(StateData#state.access,
+ #{usr => jid:split(JID), ip => StateData#state.ip},
+ StateData#state.server) == allow
of
true ->
DGen = fun (PW) ->
@@ -1099,8 +1103,10 @@ open_session(StateData) ->
R = StateData#state.resource,
JID = StateData#state.jid,
Lang = StateData#state.lang,
- case acl:match_rule(StateData#state.server,
- StateData#state.access, JID) of
+ IP = StateData#state.ip,
+ case acl:access_matches(StateData#state.access,
+ #{usr => jid:split(JID), ip => IP},
+ StateData#state.server) of
allow ->
?INFO_MSG("(~w) Opened session for ~s",
[StateData#state.socket, jid:to_string(JID)]),
@@ -1147,7 +1153,7 @@ session_established({xmlstreamelement,
#xmlel{name = <<"active">>,
attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}]}},
StateData) ->
- NewStateData = csi_queue_flush(StateData),
+ NewStateData = csi_flush_queue(StateData),
fsm_next_state(session_established, NewStateData#state{csi_state = active});
session_established({xmlstreamelement,
#xmlel{name = <<"inactive">>,
@@ -1281,7 +1287,7 @@ wait_for_resume({xmlstreamelement, _El} = Event, StateData) ->
wait_for_resume(timeout, StateData) ->
?DEBUG("Timed out waiting for resumption of stream for ~s",
[jid:to_string(StateData#state.jid)]),
- {stop, normal, StateData};
+ {stop, normal, StateData#state{mgmt_state = timeout}};
wait_for_resume(Event, StateData) ->
?DEBUG("Ignoring event while waiting for resumption: ~p", [Event]),
fsm_next_state(wait_for_resume, StateData).
@@ -1792,6 +1798,18 @@ terminate(_Reason, StateName, StateData) ->
presence_broadcast(StateData, From,
StateData#state.pres_a, Packet)
end,
+ case StateData#state.mgmt_state of
+ timeout ->
+ Info = [{num_stanzas_in,
+ StateData#state.mgmt_stanzas_in}],
+ ejabberd_sm:set_offline_info(StateData#state.sid,
+ StateData#state.user,
+ StateData#state.server,
+ StateData#state.resource,
+ Info);
+ _ ->
+ ok
+ end,
handle_unacked_stanzas(StateData)
end,
bounce_messages();
@@ -1808,8 +1826,9 @@ terminate(_Reason, StateName, StateData) ->
%%%----------------------------------------------------------------------
change_shaper(StateData, JID) ->
- Shaper = acl:match_rule(StateData#state.server,
- StateData#state.shaper, JID),
+ Shaper = acl:access_matches(StateData#state.shaper,
+ #{usr => jid:split(JID), ip => StateData#state.ip},
+ StateData#state.server),
(StateData#state.sockmod):change_shaper(StateData#state.socket,
Shaper).
@@ -2727,6 +2746,8 @@ handle_resume(StateData, Attrs) ->
case inherit_session_state(StateData, PrevID) of
{ok, InheritedState} ->
{ok, InheritedState, H};
+ {error, Err, InH} ->
+ {error, ?MGMT_ITEM_NOT_FOUND_H(Xmlns, InH), Err};
{error, Err} ->
{error, ?MGMT_ITEM_NOT_FOUND(Xmlns), Err}
end;
@@ -2763,7 +2784,7 @@ handle_resume(StateData, Attrs) ->
#xmlel{name = <<"r">>,
attrs = [{<<"xmlns">>, AttrXmlns}],
children = []}),
- FlushedState = csi_queue_flush(NewState),
+ FlushedState = csi_flush_queue(NewState),
NewStateData = FlushedState#state{csi_state = active},
?INFO_MSG("Resumed session for ~s",
[jid:to_string(NewStateData#state.jid)]),
@@ -2966,7 +2987,17 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
{term, {R, Time}} ->
case ejabberd_sm:get_session_pid(U, S, R) of
none ->
- {error, <<"Previous session PID not found">>};
+ case ejabberd_sm:get_offline_info(Time, U, S, R) of
+ none ->
+ {error, <<"Previous session PID not found">>};
+ Info ->
+ case proplists:get_value(num_stanzas_in, Info) of
+ undefined ->
+ {error, <<"Previous session timed out">>};
+ H ->
+ {error, <<"Previous session timed out">>, H}
+ end
+ end;
OldPID ->
OldSID = {Time, OldPID},
case catch resume_session(OldSID) of
@@ -2995,7 +3026,6 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
privacy_list = OldStateData#state.privacy_list,
aux_fields = OldStateData#state.aux_fields,
csi_state = OldStateData#state.csi_state,
- csi_queue = OldStateData#state.csi_queue,
mgmt_xmlns = OldStateData#state.mgmt_xmlns,
mgmt_queue = OldStateData#state.mgmt_queue,
mgmt_timeout = OldStateData#state.mgmt_timeout,
@@ -3028,65 +3058,25 @@ add_resent_delay_info(#state{server = From}, El, Time) ->
%%% XEP-0352
%%%----------------------------------------------------------------------
-csi_filter_stanza(#state{csi_state = CsiState, jid = JID} = StateData,
+csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData,
Stanza) ->
- Action = ejabberd_hooks:run_fold(csi_filter_stanza,
- StateData#state.server,
- send, [Stanza]),
- ?DEBUG("Going to ~p stanza for inactive client ~p",
- [Action, jid:to_string(JID)]),
- case Action of
- queue -> csi_queue_add(StateData, Stanza);
- drop -> StateData;
- send ->
- From = fxml:get_tag_attr_s(<<"from">>, Stanza),
- StateData1 = csi_queue_send(StateData, From),
- StateData2 = send_stanza(StateData1#state{csi_state = active},
- Stanza),
- StateData2#state{csi_state = CsiState}
- end.
-
-csi_queue_add(#state{csi_queue = Queue} = StateData, Stanza) ->
- case length(StateData#state.csi_queue) >= csi_max_queue(StateData) of
- true -> csi_queue_add(csi_queue_flush(StateData), Stanza);
- false ->
- From = fxml:get_tag_attr_s(<<"from">>, Stanza),
- NewQueue = lists:keystore(From, 1, Queue, {From, p1_time_compat:timestamp(), Stanza}),
- StateData#state{csi_queue = NewQueue}
- end.
-
-csi_queue_send(#state{csi_queue = Queue, csi_state = CsiState, server = Host} =
- StateData, From) ->
- case lists:keytake(From, 1, Queue) of
- {value, {From, Time, Stanza}, NewQueue} ->
- NewStanza = jlib:add_delay_info(Stanza, Host, Time,
- <<"Client Inactive">>),
- NewStateData = send_stanza(StateData#state{csi_state = active},
- NewStanza),
- NewStateData#state{csi_queue = NewQueue, csi_state = CsiState};
- false -> StateData
- end.
-
-csi_queue_flush(#state{csi_queue = Queue, csi_state = CsiState, jid = JID,
- server = Host} = StateData) ->
- ?DEBUG("Flushing CSI queue for ~s", [jid:to_string(JID)]),
- NewStateData =
- lists:foldl(fun({_From, Time, Stanza}, AccState) ->
- NewStanza =
- jlib:add_delay_info(Stanza, Host, Time,
- <<"Client Inactive">>),
- send_stanza(AccState, NewStanza)
- end, StateData#state{csi_state = active}, Queue),
- NewStateData#state{csi_queue = [], csi_state = CsiState}.
-
-%% Make sure we won't push too many messages to the XEP-0198 queue when the
-%% client becomes 'active' again. Otherwise, the client might not manage to
-%% acknowledge the message flood in time. Also, don't let the queue grow to
-%% more than 100 stanzas.
-csi_max_queue(#state{mgmt_max_queue = infinity}) -> 100;
-csi_max_queue(#state{mgmt_max_queue = Max}) when Max > 200 -> 100;
-csi_max_queue(#state{mgmt_max_queue = Max}) when Max < 2 -> 1;
-csi_max_queue(#state{mgmt_max_queue = Max}) -> Max div 2.
+ {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server,
+ {StateData, [Stanza]},
+ [Server, Stanza]),
+ StateData2 = lists:foldl(fun(CurStanza, AccState) ->
+ send_stanza(AccState, CurStanza)
+ end, StateData1#state{csi_state = active},
+ Stanzas),
+ StateData2#state{csi_state = CsiState}.
+
+csi_flush_queue(#state{csi_state = CsiState, server = Server} = StateData) ->
+ {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server,
+ {StateData, []}, [Server]),
+ StateData2 = lists:foldl(fun(CurStanza, AccState) ->
+ send_stanza(AccState, CurStanza)
+ end, StateData1#state{csi_state = active},
+ Stanzas),
+ StateData2#state{csi_state = CsiState}.
%%%----------------------------------------------------------------------
%%% JID Set memory footprint reduction code