diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-01-09 17:02:17 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-01-09 17:02:17 +0300 |
commit | 1e55e018e534aa82541c5f460063a237192b768c (patch) | |
tree | 9584ed46fe2b18770343399254b0ba15ff591e51 /src/mod_client_state.erl | |
parent | Get rid of "jlib.hrl" header in some files (diff) |
Adopt remaining code to support new hooks
Diffstat (limited to 'src/mod_client_state.erl')
-rw-r--r-- | src/mod_client_state.erl | 331 |
1 files changed, 209 insertions, 122 deletions
diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index a838088f..175929a5 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -34,8 +34,11 @@ -export([start/2, stop/1, mod_opt_type/1, depends/2]). %% ejabberd_hooks callbacks. --export([filter_presence/4, filter_chat_states/4, filter_pep/4, filter_other/4, - flush_queue/3, add_stream_feature/2]). +-export([filter_presence/1, filter_chat_states/1, + filter_pep/1, filter_other/1, + c2s_stream_started/2, add_stream_feature/2, + c2s_copy_session/2, c2s_authenticated_packet/2, + c2s_session_resumed/1]). -include("ejabberd.hrl"). -include("logger.hrl"). @@ -44,9 +47,10 @@ -define(CSI_QUEUE_MAX, 100). -type csi_type() :: presence | chatstate | {pep, binary()}. --type csi_key() :: {ljid(), csi_type()}. --type csi_stanza() :: {csi_key(), erlang:timestamp(), xmlel()}. --type csi_queue() :: [csi_stanza()]. +-type csi_queue() :: {non_neg_integer(), non_neg_integer(), map()}. +-type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}. +-type c2s_state() :: ejabberd_c2s:state(). +-type filter_acc() :: {stanza() | drop, c2s_state()}. %%-------------------------------------------------------------------- %% gen_mod callbacks. @@ -68,27 +72,33 @@ start(Host, Opts) -> fun(B) when is_boolean(B) -> B end, true), if QueuePresence; QueueChatStates; QueuePEP -> + ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE, + c2s_stream_started, 50), ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, add_stream_feature, 50), + ejabberd_hooks:add(c2s_authenticated_packet, Host, ?MODULE, + c2s_authenticated_packet, 50), + ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:add(c2s_session_resumed, Host, ?MODULE, + c2s_session_resumed, 50), if QueuePresence -> - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, filter_presence, 50); true -> ok end, if QueueChatStates -> - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, filter_chat_states, 50); true -> ok end, if QueuePEP -> - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, filter_pep, 50); true -> ok end, - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, - filter_other, 100), - ejabberd_hooks:add(csi_flush_queue, Host, ?MODULE, - flush_queue, 50); + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, + filter_other, 75); true -> ok end. @@ -108,27 +118,33 @@ stop(Host) -> fun(B) when is_boolean(B) -> B end, true), if QueuePresence; QueueChatStates; QueuePEP -> + ejabberd_hooks:delete(c2s_stream_started, Host, ?MODULE, + c2s_stream_started, 50), ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, add_stream_feature, 50), + ejabberd_hooks:delete(c2s_authenticated_packet, Host, ?MODULE, + c2s_authenticated_packet, 50), + ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:delete(c2s_session_resumed, Host, ?MODULE, + c2s_session_resumed, 50), if QueuePresence -> - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, filter_presence, 50); true -> ok end, if QueueChatStates -> - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, filter_chat_states, 50); true -> ok end, if QueuePEP -> - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, filter_pep, 50); true -> ok end, - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, - filter_other, 100), - ejabberd_hooks:delete(csi_flush_queue, Host, ?MODULE, - flush_queue, 50); + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, + filter_other, 75); true -> ok end. @@ -150,29 +166,46 @@ depends(_Host, _Opts) -> %%-------------------------------------------------------------------- %% ejabberd_hooks callbacks. %%-------------------------------------------------------------------- - --spec filter_presence({ejabberd_c2s:state(), [stanza()]}, binary(), jid(), stanza()) - -> {ejabberd_c2s:state(), [stanza()]} | - {stop, {ejabberd_c2s:state(), [stanza()]}}. - -filter_presence({C2SState, _OutStanzas} = Acc, Host, To, - #presence{type = Type} = Stanza) -> - if Type == available; Type == unavailable -> - ?DEBUG("Got availability presence stanza for ~s", - [jid:to_string(To)]), - queue_add(presence, Stanza, Host, C2SState); - true -> - Acc - end; -filter_presence(Acc, _Host, _To, _Stanza) -> Acc. - --spec filter_chat_states({ejabberd_c2s:state(), [stanza()]}, binary(), jid(), stanza()) - -> {ejabberd_c2s:state(), [stanza()]} | - {stop, {ejabberd_c2s:state(), [stanza()]}}. - -filter_chat_states({C2SState, _OutStanzas} = Acc, Host, To, - #message{from = From} = Stanza) -> - case xmpp_util:is_standalone_chat_state(Stanza) of +-spec c2s_stream_started(c2s_state(), stream_start()) -> c2s_state(). +c2s_stream_started(State, _) -> + State#{csi_state => active, csi_queue => queue_new()}. + +-spec c2s_authenticated_packet(c2s_state(), xmpp_element()) -> c2s_state(). +c2s_authenticated_packet(C2SState, #csi{type = active}) -> + C2SState1 = C2SState#{csi_state => active}, + flush_queue(C2SState1); +c2s_authenticated_packet(C2SState, #csi{type = inactive}) -> + C2SState#{csi_state => inactive}; +c2s_authenticated_packet(C2SState, _) -> + C2SState. + +-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state(). +c2s_copy_session(C2SState, #{csi_state := State, csi_queue := Q}) -> + C2SState#{csi_state => State, csi_queue => Q}; +c2s_copy_session(C2SState, _) -> + C2SState. + +-spec c2s_session_resumed(c2s_state()) -> c2s_state(). +c2s_session_resumed(C2SState) -> + flush_queue(C2SState). + +-spec filter_presence(filter_acc()) -> filter_acc(). +filter_presence({#presence{meta = #{csi_resend := true}}, _} = Acc) -> + Acc; +filter_presence({#presence{to = To, type = Type} = Pres, + #{csi_state := inactive} = C2SState}) + when Type == available; Type == unavailable -> + ?DEBUG("Got availability presence stanza for ~s", [jid:to_string(To)]), + enqueue_stanza(presence, Pres, C2SState); +filter_presence(Acc) -> + Acc. + +-spec filter_chat_states(filter_acc()) -> filter_acc(). +filter_chat_states({#message{meta = #{csi_resend := true}}, _} = Acc) -> + Acc; +filter_chat_states({#message{from = From, to = To} = Msg, + #{csi_state := inactive} = C2SState} = Acc) -> + case xmpp_util:is_standalone_chat_state(Msg) of true -> case {From, To} of {#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}} -> @@ -181,105 +214,109 @@ filter_chat_states({C2SState, _OutStanzas} = Acc, Host, To, %% conversations across clients. Acc; _ -> - ?DEBUG("Got standalone chat state notification for ~s", - [jid:to_string(To)]), - queue_add(chatstate, Stanza, Host, C2SState) + ?DEBUG("Got standalone chat state notification for ~s", + [jid:to_string(To)]), + enqueue_stanza(chatstate, Msg, C2SState) end; false -> Acc end; -filter_chat_states(Acc, _Host, _To, _Stanza) -> Acc. - --spec filter_pep({ejabberd_c2s:state(), [stanza()]}, binary(), jid(), stanza()) - -> {ejabberd_c2s:state(), [stanza()]} | - {stop, {ejabberd_c2s:state(), [stanza()]}}. - -filter_pep({C2SState, _OutStanzas} = Acc, Host, To, #message{} = Stanza) -> - case get_pep_node(Stanza) of +filter_chat_states(Acc) -> + Acc. + +-spec filter_pep(filter_acc()) -> filter_acc(). +filter_pep({#message{meta = #{csi_resend := true}}, _} = Acc) -> + Acc; +filter_pep({#message{to = To} = Msg, + #{csi_state := inactive} = C2SState} = Acc) -> + case get_pep_node(Msg) of undefined -> Acc; Node -> ?DEBUG("Got PEP notification for ~s", [jid:to_string(To)]), - queue_add({pep, Node}, Stanza, Host, C2SState) + enqueue_stanza({pep, Node}, Msg, C2SState) end; -filter_pep(Acc, _Host, _To, _Stanza) -> Acc. - --spec filter_other({ejabberd_c2s:state(), [stanza()]}, binary(), jid(), stanza()) - -> {ejabberd_c2s:state(), [stanza()]}. - -filter_other({C2SState, _OutStanzas}, Host, To, Stanza) -> - ?DEBUG("Won't add stanza for ~s to CSI queue", [jid:to_string(To)]), - queue_take(Stanza, Host, C2SState). +filter_pep(Acc) -> + Acc. --spec flush_queue({ejabberd_c2s:state(), [stanza()]}, binary(), jid()) - -> {ejabberd_c2s:state(), [stanza()]}. - -flush_queue({C2SState, _OutStanzas}, Host, JID) -> - ?DEBUG("Going to flush CSI queue of ~s", [jid:to_string(JID)]), - Queue = get_queue(C2SState), - NewState = set_queue([], C2SState), - {NewState, get_stanzas(Queue, Host)}. - --spec add_stream_feature([stanza()], binary) -> [stanza()]. +-spec filter_other(filter_acc()) -> filter_acc(). +filter_other({Stanza, #{jid := JID} = C2SState} = Acc) when ?is_stanza(Stanza) -> + case xmpp:get_meta(Stanza) of + #{csi_resend := true} -> + Acc; + _ -> + ?DEBUG("Won't add stanza for ~s to CSI queue", [jid:to_string(JID)]), + From = xmpp:get_from(Stanza), + C2SState1 = dequeue_sender(From, C2SState), + {Stanza, C2SState1} + end; +filter_other(Acc) -> + Acc. -add_stream_feature(Features, _Host) -> - [#feature_csi{xmlns = <<"urn:xmpp:csi:0">>} | Features]. +-spec add_stream_feature([xmpp_element()], binary) -> [xmpp_element()]. +add_stream_feature(Features, Host) -> + case gen_mod:is_loaded(Host, ?MODULE) of + true -> + [#feature_csi{xmlns = <<"urn:xmpp:csi:0">>} | Features]; + false -> + Features + end. %%-------------------------------------------------------------------- %% Internal functions. %%-------------------------------------------------------------------- - --spec queue_add(csi_type(), stanza(), binary(), term()) - -> {stop, {term(), [stanza()]}}. - -queue_add(Type, Stanza, Host, C2SState) -> - case get_queue(C2SState) of - Queue when length(Queue) >= ?CSI_QUEUE_MAX -> - ?DEBUG("CSI queue too large, going to flush it", []), - NewState = set_queue([], C2SState), - {stop, {NewState, get_stanzas(Queue, Host) ++ [Stanza]}}; - Queue -> - ?DEBUG("Adding stanza to CSI queue", []), - From = xmpp:get_from(Stanza), - Key = {jid:tolower(From), Type}, - Entry = {Key, p1_time_compat:timestamp(), Stanza}, - NewQueue = lists:keystore(Key, 1, Queue, Entry), - NewState = set_queue(NewQueue, C2SState), - {stop, {NewState, []}} +-spec enqueue_stanza(csi_type(), stanza(), c2s_state()) -> filter_acc(). +enqueue_stanza(Type, Stanza, #{csi_state := inactive, + csi_queue := Q} = C2SState) -> + case queue_len(Q) >= ?CSI_QUEUE_MAX of + true -> + ?DEBUG("CSI queue too large, going to flush it", []), + C2SState1 = flush_queue(C2SState), + enqueue_stanza(Type, Stanza, C2SState1); + false -> + #jid{luser = U, lserver = S} = xmpp:get_from(Stanza), + Q1 = queue_in({U, S}, Type, Stanza, Q), + {stop, {drop, C2SState#{csi_queue => Q1}}} + end; +enqueue_stanza(_Type, Stanza, State) -> + {Stanza, State}. + +-spec dequeue_sender(jid(), c2s_state()) -> c2s_state(). +dequeue_sender(#jid{luser = U, lserver = S}, + #{csi_queue := Q, jid := JID} = C2SState) -> + ?DEBUG("Flushing packets of ~s@~s from CSI queue of ~s", + [U, S, jid:to_string(JID)]), + case queue_take({U, S}, Q) of + {Stanzas, Q1} -> + C2SState1 = flush_stanzas(C2SState, Stanzas), + C2SState1#{csi_queue => Q1}; + error -> + C2SState end. --spec queue_take(stanza(), binary(), term()) -> {term(), [stanza()]}. - -queue_take(Stanza, Host, C2SState) -> - From = xmpp:get_from(Stanza), - {LUser, LServer, _LResource} = jid:tolower(From), - {Selected, Rest} = lists:partition( - fun({{{U, S, _R}, _Type}, _Time, _Stanza}) -> - U == LUser andalso S == LServer - end, get_queue(C2SState)), - NewState = set_queue(Rest, C2SState), - {NewState, get_stanzas(Selected, Host) ++ [Stanza]}. - --spec set_queue(csi_queue(), ejabberd_c2s:state()) -> ejabberd_c2s:state(). - -set_queue(Queue, C2SState) -> - C2SState#{csi_queue => Queue}. - --spec get_queue(ejabberd_c2s:state()) -> csi_queue(). - -get_queue(C2SState) -> - maps:get(csi_queue, C2SState, []). - --spec get_stanzas(csi_queue(), binary()) -> [stanza()]. - -get_stanzas(Queue, Host) -> - lists:map(fun({_Key, Time, Stanza}) -> - xmpp_util:add_delay_info(Stanza, jid:make(Host), Time, - <<"Client Inactive">>) - end, Queue). +-spec flush_queue(c2s_state()) -> c2s_state(). +flush_queue(#{csi_queue := Q, jid := JID} = C2SState) -> + ?DEBUG("Flushing CSI queue of ~s", [jid:to_string(JID)]), + C2SState1 = flush_stanzas(C2SState, queue_to_list(Q)), + C2SState1#{csi_queue => queue_new()}. + +-spec flush_stanzas(c2s_state(), + [{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state(). +flush_stanzas(#{lserver := LServer} = C2SState, Elems) -> + lists:foldl( + fun({_Type, Time, Stanza}, AccState) -> + Stanza1 = add_delay_info(Stanza, LServer, Time), + ejabberd_c2s:send(AccState, Stanza1) + end, C2SState, Elems). + +-spec add_delay_info(stanza(), binary(), csi_timestamp()) -> stanza(). +add_delay_info(Stanza, LServer, {_Seq, TimeStamp}) -> + Stanza1 = xmpp_util:add_delay_info( + Stanza, jid:make(LServer), TimeStamp, + <<"Client Inactive">>), + xmpp:put_meta(Stanza1, csi_resend, true). -spec get_pep_node(message()) -> binary() | undefined. - get_pep_node(#message{from = #jid{luser = <<>>}}) -> %% It's not PEP. undefined; @@ -290,3 +327,53 @@ get_pep_node(#message{} = Msg) -> _ -> undefined end. + +%%-------------------------------------------------------------------- +%% Queue interface +%%-------------------------------------------------------------------- +-spec queue_new() -> csi_queue(). +queue_new() -> + {0, 0, #{}}. + +-spec queue_in(term(), term(), term(), csi_queue()) -> csi_queue(). +queue_in(Key, Type, Val, {N, Seq, Q}) -> + Seq1 = Seq + 1, + Time = {Seq1, p1_time_compat:timestamp()}, + try maps:get(Key, Q) of + TypeVals -> + case lists:keymember(Type, 1, TypeVals) of + true -> + TypeVals1 = lists:keyreplace( + Type, 1, TypeVals, {Type, Time, Val}), + Q1 = maps:put(Key, TypeVals1, Q), + {N, Seq1, Q1}; + false -> + TypeVals1 = [{Type, Time, Val}|TypeVals], + Q1 = maps:put(Key, TypeVals1, Q), + {N + 1, Seq1, Q1} + end + catch _:{badkey, _} -> + Q1 = maps:put(Key, [{Type, Time, Val}], Q), + {N + 1, Seq1, Q1} + end. + +-spec queue_take(term(), csi_queue()) -> {list(), csi_queue()} | error. +queue_take(Key, {N, Seq, Q}) -> + case maps:take(Key, Q) of + {TypeVals, Q1} -> + {lists:keysort(2, TypeVals), {N-length(TypeVals), Seq, Q1}}; + error -> + error + end. + +-spec queue_len(csi_queue()) -> non_neg_integer(). +queue_len({N, _, _}) -> + N. + +-spec queue_to_list(csi_queue()) -> [term()]. +queue_to_list({_, _, Q}) -> + TypeVals = maps:fold( + fun(_, Vals, Acc) -> + Vals ++ Acc + end, [], Q), + lists:keysort(2, TypeVals). |