summaryrefslogtreecommitdiff
path: root/src/mod_client_state.erl
diff options
context:
space:
mode:
authorHolger Weiss <holger@zedat.fu-berlin.de>2017-04-06 22:19:00 +0200
committerHolger Weiss <holger@zedat.fu-berlin.de>2017-04-06 22:19:00 +0200
commit0ddd2e0ebfb32865d274b016d34ca410aee7a0e5 (patch)
treed24bccc281c020c1935e4fadca4280864b65dcc4 /src/mod_client_state.erl
parentmod_client_state: Don't keep track of queue size (diff)
mod_client_state: Queue stanzas of each full JID
Keep the latest stanzas of each given full JID, rather than dropping them when stanzas from a different resource are received. This change makes sure the recipient receives the latest status of all clients of each contact. It also ensures the recipient will see the current list of occupants of joined MUC rooms.
Diffstat (limited to '')
-rw-r--r--src/mod_client_state.erl73
1 files changed, 26 insertions, 47 deletions
diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl
index a5ac611f..0d92cb2d 100644
--- a/src/mod_client_state.erl
+++ b/src/mod_client_state.erl
@@ -49,6 +49,8 @@
-type csi_type() :: presence | chatstate | {pep, binary()}.
-type csi_queue() :: {non_neg_integer(), map()}.
-type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}.
+-type csi_key() :: {ljid(), csi_type()}.
+-type csi_element() :: {csi_timestamp(), stanza()}.
-type c2s_state() :: ejabberd_c2s:state().
-type filter_acc() :: {stanza() | drop, c2s_state()}.
@@ -320,25 +322,21 @@ enqueue_stanza(Type, Stanza, #{csi_state := inactive,
C2SState1 = flush_queue(C2SState),
enqueue_stanza(Type, Stanza, C2SState1);
false ->
- #jid{luser = U, lserver = S} = xmpp:get_from(Stanza),
- Q1 = queue_in({U, S}, Type, Stanza, Q),
+ From = jid:tolower(xmpp:get_from(Stanza)),
+ Q1 = queue_in({From, Type}, Stanza, Q),
{stop, {drop, C2SState#{csi_queue => Q1}}}
end;
enqueue_stanza(_Type, Stanza, State) ->
{Stanza, State}.
-spec dequeue_sender(jid(), c2s_state()) -> c2s_state().
-dequeue_sender(#jid{luser = U, lserver = S},
+dequeue_sender(#jid{luser = U, lserver = S} = Sender,
#{csi_queue := Q, jid := JID} = C2SState) ->
?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s",
[U, S, jid:encode(JID)]),
- case queue_take({U, S}, Q) of
- {Stanzas, Q1} ->
- C2SState1 = flush_stanzas(C2SState, Stanzas),
- C2SState1#{csi_queue => Q1};
- error ->
- C2SState
- end.
+ {Elems, Q1} = queue_take(Sender, Q),
+ C2SState1 = flush_stanzas(C2SState, Elems),
+ C2SState1#{csi_queue => Q1}.
-spec flush_queue(c2s_state()) -> c2s_state().
flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
@@ -350,7 +348,7 @@ flush_queue(#{csi_queue := Q, jid := JID} = C2SState) ->
[{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state().
flush_stanzas(#{lserver := LServer} = C2SState, Elems) ->
lists:foldl(
- fun({_Type, Time, Stanza}, AccState) ->
+ fun({Time, Stanza}, AccState) ->
Stanza1 = add_delay_info(Stanza, LServer, Time),
ejabberd_c2s:send(AccState, Stanza1)
end, C2SState, Elems).
@@ -381,46 +379,27 @@ get_pep_node(#message{} = Msg) ->
queue_new() ->
{0, #{}}.
--spec queue_in(term(), term(), term(), csi_queue()) -> csi_queue().
-queue_in(Key, Type, Val, {Seq, Q}) ->
+-spec queue_in(csi_key(), csi_element(), csi_queue()) -> csi_queue().
+queue_in(Key, Val, {Seq, Q}) ->
Seq1 = Seq + 1,
Time = {Seq1, p1_time_compat:timestamp()},
- case maps:get(Key, Q, error) of
- error ->
- Q1 = maps:put(Key, [{Type, Time, Val}], Q),
- {Seq1, Q1};
- TypeVals ->
- case lists:keymember(Type, 1, TypeVals) of
- true ->
- TypeVals1 = lists:keyreplace(
- Type, 1, TypeVals, {Type, Time, Val}),
- Q1 = maps:put(Key, TypeVals1, Q),
- {Seq1, Q1};
- false ->
- TypeVals1 = [{Type, Time, Val}|TypeVals],
- Q1 = maps:put(Key, TypeVals1, Q),
- {Seq1, Q1}
- end
- end.
-
--spec queue_take(term(), csi_queue()) -> {list(), csi_queue()} | error.
-queue_take(Key, {Seq, Q}) ->
- case maps:get(Key, Q, error) of
- error ->
- error;
- TypeVals ->
- Q1 = maps:remove(Key, Q),
- {lists:keysort(2, TypeVals), {Seq, Q1}}
- end.
+ Q1 = maps:put(Key, {Time, Val}, Q),
+ {Seq1, Q1}.
+
+-spec queue_take(jid(), csi_queue()) -> {[csi_element()], csi_queue()}.
+queue_take(#jid{luser = LUser, lserver = LServer}, {Seq, Q}) ->
+ {Vals, Q1} = maps:fold(fun({{U, S, _}, _} = Key, Val, {AccVals, AccQ})
+ when U == LUser, S == LServer ->
+ {[Val | AccVals], maps:remove(Key, AccQ)};
+ (_, _, Acc) ->
+ Acc
+ end, {[], Q}, Q),
+ {lists:keysort(1, Vals), {Seq, Q1}}.
-spec queue_len(csi_queue()) -> non_neg_integer().
queue_len({_, Q}) ->
maps:size(Q).
--spec queue_to_list(csi_queue()) -> [term()].
-queue_to_list({_, _, Q}) ->
- TypeVals = maps:fold(
- fun(_, Vals, Acc) ->
- Vals ++ Acc
- end, [], Q),
- lists:keysort(2, TypeVals).
+-spec queue_to_list(csi_queue()) -> [csi_element()].
+queue_to_list({_, Q}) ->
+ lists:keysort(1, maps:values(Q)).