summaryrefslogtreecommitdiff
path: root/src/mod_client_state.erl
diff options
context:
space:
mode:
authorHolger Weiss <holger@zedat.fu-berlin.de>2016-05-17 19:27:18 +0200
committerHolger Weiss <holger@zedat.fu-berlin.de>2016-05-17 19:27:18 +0200
commitba74c1c367cba56feb31e896bec3c07645bde8f6 (patch)
treea6c7d08d48f5da2e71c3f55e7310290d198b2a22 /src/mod_client_state.erl
parentDelete duplicated command export_sql, use export2sql instead (#1118) (diff)
Move CSI queue handling into mod_client_state
Let mod_client_state handle the queueing of stanzas, not just their classification. This simplifies the ejabberd_c2s code and gives (custom) CSI modules more flexibility.
Diffstat (limited to 'src/mod_client_state.erl')
-rw-r--r--src/mod_client_state.erl107
1 files changed, 84 insertions, 23 deletions
diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl
index 790e808f..7d23beb0 100644
--- a/src/mod_client_state.erl
+++ b/src/mod_client_state.erl
@@ -31,20 +31,22 @@
-behavior(gen_mod).
-export([start/2, stop/1, add_stream_feature/2,
- filter_presence/2, filter_chat_states/2,
+ filter_presence/3, filter_chat_states/3, filter_other/3, flush_queue/2,
mod_opt_type/1]).
-include("ejabberd.hrl").
-include("logger.hrl").
-include("jlib.hrl").
+-define(CSI_QUEUE_MAX, 100).
+
start(Host, Opts) ->
QueuePresence = gen_mod:get_opt(queue_presence, Opts,
fun(B) when is_boolean(B) -> B end,
true),
DropChatStates = gen_mod:get_opt(drop_chat_states, Opts,
- fun(B) when is_boolean(B) -> B end,
- true),
+ fun(B) when is_boolean(B) -> B end,
+ true),
if QueuePresence; DropChatStates ->
ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE,
add_stream_feature, 50),
@@ -57,10 +59,13 @@ start(Host, Opts) ->
ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
filter_chat_states, 50);
true -> ok
- end;
+ end,
+ ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE,
+ filter_other, 100),
+ ejabberd_hooks:add(csi_flush_queue, Host, ?MODULE,
+ flush_queue, 50);
true -> ok
- end,
- ok.
+ end.
stop(Host) ->
QueuePresence = gen_mod:get_module_opt(Host, ?MODULE, queue_presence,
@@ -81,10 +86,13 @@ stop(Host) ->
ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
filter_chat_states, 50);
true -> ok
- end;
+ end,
+ ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE,
+ filter_other, 100),
+ ejabberd_hooks:delete(csi_flush_queue, Host, ?MODULE,
+ flush_queue, 50);
true -> ok
- end,
- ok.
+ end.
add_stream_feature(Features, _Host) ->
Feature = #xmlel{name = <<"csi">>,
@@ -92,30 +100,83 @@ add_stream_feature(Features, _Host) ->
children = []},
[Feature | Features].
-filter_presence(_Action, #xmlel{name = <<"presence">>, attrs = Attrs}) ->
+filter_presence({C2SState, _OutStanzas} = Acc, Host,
+ #xmlel{name = <<"presence">>, attrs = Attrs} = Stanza) ->
case fxml:get_attr(<<"type">>, Attrs) of
{value, Type} when Type /= <<"unavailable">> ->
- ?DEBUG("Got important presence stanza", []),
- {stop, send};
+ Acc;
_ ->
?DEBUG("Got availability presence stanza", []),
- {stop, queue}
+ queue_add(presence, Stanza, Host, C2SState)
end;
-filter_presence(Action, _Stanza) -> Action.
+filter_presence(Acc, _Host, _Stanza) -> Acc.
-filter_chat_states(_Action, #xmlel{name = <<"message">>} = Stanza) ->
+filter_chat_states({C2SState, _OutStanzas} = Acc, _Host,
+ #xmlel{name = <<"message">>} = Stanza) ->
case jlib:is_standalone_chat_state(Stanza) of
- true ->
+ true -> % Drop the stanza.
?DEBUG("Got standalone chat state notification", []),
- {stop, drop};
+ {stop, {C2SState, []}};
false ->
- ?DEBUG("Got message stanza", []),
- {stop, send}
+ Acc
end;
-filter_chat_states(Action, _Stanza) -> Action.
+filter_chat_states(Acc, _Host, _Stanza) -> Acc.
+
+filter_other({C2SState, _OutStanzas}, Host, Stanza) ->
+ ?DEBUG("Won't add stanza to CSI queue", []),
+ queue_take(Stanza, Host, C2SState).
+
+flush_queue({C2SState, _OutStanzas}, Host) ->
+ ?DEBUG("Going to flush CSI queue", []),
+ Queue = get_queue(C2SState),
+ NewState = set_queue([], C2SState),
+ {stop, {NewState, get_stanzas(Queue, Host)}}.
+
+queue_add(Type, Stanza, Host, C2SState) ->
+ case get_queue(C2SState) of
+ Queue when length(Queue) >= ?CSI_QUEUE_MAX ->
+ ?DEBUG("CSI queue too large, going to flush it", []),
+ NewState = set_queue([], C2SState),
+ {stop, {NewState, get_stanzas(Queue, Host) ++ [Stanza]}};
+ Queue ->
+ ?DEBUG("Adding stanza to CSI queue", []),
+ From = fxml:get_tag_attr_s(<<"from">>, Stanza),
+ Key = {jid:tolower(jid:from_string(From)), Type},
+ Entry = {Key, p1_time_compat:timestamp(), Stanza},
+ NewQueue = lists:keystore(Key, 1, Queue, Entry),
+ NewState = set_queue(NewQueue, C2SState),
+ {stop, {NewState, []}}
+ end.
+
+queue_take(Stanza, Host, C2SState) ->
+ From = fxml:get_tag_attr_s(<<"from">>, Stanza),
+ {LUser, LServer, _LResource} = jid:tolower(jid:from_string(From)),
+ {Selected, Rest} = lists:partition(
+ fun({{{U, S, _R}, _Type}, _Time, _Stanza}) ->
+ U == LUser andalso S == LServer
+ end, get_queue(C2SState)),
+ NewState = set_queue(Rest, C2SState),
+ {stop, {NewState, get_stanzas(Selected, Host) ++ [Stanza]}}.
+
+set_queue(Queue, C2SState) ->
+ ejabberd_c2s:set_aux_field(csi_queue, Queue, C2SState).
+
+get_queue(C2SState) ->
+ case ejabberd_c2s:get_aux_field(csi_queue, C2SState) of
+ {ok, Queue} ->
+ Queue;
+ error ->
+ []
+ end.
+
+get_stanzas(Queue, Host) ->
+ lists:map(fun({_Key, Time, Stanza}) ->
+ jlib:add_delay_info(Stanza, Host, Time,
+ <<"Client Inactive">>)
+ end, Queue).
-mod_opt_type(drop_chat_states) ->
- fun(B) when is_boolean(B) -> B end;
mod_opt_type(queue_presence) ->
fun(B) when is_boolean(B) -> B end;
-mod_opt_type(_) -> [drop_chat_states, queue_presence].
+mod_opt_type(drop_chat_states) ->
+ fun(B) when is_boolean(B) -> B end;
+mod_opt_type(_) -> [queue_presence, drop_chat_states].