diff options
Diffstat (limited to 'src/mod_client_state.erl')
-rw-r--r-- | src/mod_client_state.erl | 506 |
1 files changed, 300 insertions, 206 deletions
diff --git a/src/mod_client_state.erl b/src/mod_client_state.erl index 9d37d4f5b..35eb6b733 100644 --- a/src/mod_client_state.erl +++ b/src/mod_client_state.erl @@ -5,7 +5,7 @@ %%% Created : 11 Sep 2014 by Holger Weiss <holger@zedat.fu-berlin.de> %%% %%% -%%% ejabberd, Copyright (C) 2014-2016 ProcessOne +%%% ejabberd, Copyright (C) 2014-2019 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -28,281 +28,375 @@ -protocol({xep, 85, '2.1'}). -protocol({xep, 352, '0.1'}). --behavior(gen_mod). +-behaviour(gen_mod). %% gen_mod callbacks. --export([start/2, stop/1, mod_opt_type/1, depends/2]). +-export([start/2, stop/1, reload/3, mod_opt_type/1, depends/2, mod_options/1]). %% ejabberd_hooks callbacks. --export([filter_presence/4, filter_chat_states/4, filter_pep/4, filter_other/4, - flush_queue/3, add_stream_feature/2]). +-export([filter_presence/1, filter_chat_states/1, + filter_pep/1, filter_other/1, + c2s_stream_started/2, add_stream_feature/2, + c2s_authenticated_packet/2, csi_activity/2, + c2s_copy_session/2, c2s_session_resumed/1]). --include("ejabberd.hrl"). -include("logger.hrl"). --include("jlib.hrl"). +-include("xmpp.hrl"). -define(CSI_QUEUE_MAX, 100). -type csi_type() :: presence | chatstate | {pep, binary()}. +-type csi_queue() :: {non_neg_integer(), #{csi_key() => csi_element()}}. +-type csi_timestamp() :: {non_neg_integer(), erlang:timestamp()}. -type csi_key() :: {ljid(), csi_type()}. --type csi_stanza() :: {csi_key(), erlang:timestamp(), xmlel()}. --type csi_queue() :: [csi_stanza()]. +-type csi_element() :: {csi_timestamp(), stanza()}. +-type c2s_state() :: ejabberd_c2s:state(). +-type filter_acc() :: {stanza() | drop, c2s_state()}. %%-------------------------------------------------------------------- %% gen_mod callbacks. %%-------------------------------------------------------------------- - -spec start(binary(), gen_mod:opts()) -> ok. - start(Host, Opts) -> - QueuePresence = - gen_mod:get_opt(queue_presence, Opts, - fun(B) when is_boolean(B) -> B end, - true), - QueueChatStates = - gen_mod:get_opt(queue_chat_states, Opts, - fun(B) when is_boolean(B) -> B end, - true), - QueuePEP = - gen_mod:get_opt(queue_pep, Opts, - fun(B) when is_boolean(B) -> B end, - true), + QueuePresence = mod_client_state_opt:queue_presence(Opts), + QueueChatStates = mod_client_state_opt:queue_chat_states(Opts), + QueuePEP = mod_client_state_opt:queue_pep(Opts), if QueuePresence; QueueChatStates; QueuePEP -> - ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, - add_stream_feature, 50), + register_hooks(Host), if QueuePresence -> - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, filter_presence, 50); true -> ok end, if QueueChatStates -> - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, filter_chat_states, 50); true -> ok end, if QueuePEP -> - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, filter_pep, 50); true -> ok - end, - ejabberd_hooks:add(csi_filter_stanza, Host, ?MODULE, - filter_other, 100), - ejabberd_hooks:add(csi_flush_queue, Host, ?MODULE, - flush_queue, 50); + end; true -> ok end. -spec stop(binary()) -> ok. - stop(Host) -> - QueuePresence = - gen_mod:get_module_opt(Host, ?MODULE, queue_presence, - fun(B) when is_boolean(B) -> B end, - true), - QueueChatStates = - gen_mod:get_module_opt(Host, ?MODULE, queue_chat_states, - fun(B) when is_boolean(B) -> B end, - true), - QueuePEP = - gen_mod:get_module_opt(Host, ?MODULE, queue_pep, - fun(B) when is_boolean(B) -> B end, - true), + QueuePresence = mod_client_state_opt:queue_presence(Host), + QueueChatStates = mod_client_state_opt:queue_chat_states(Host), + QueuePEP = mod_client_state_opt:queue_pep(Host), if QueuePresence; QueueChatStates; QueuePEP -> - ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, - add_stream_feature, 50), + unregister_hooks(Host), if QueuePresence -> - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, filter_presence, 50); true -> ok end, if QueueChatStates -> - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, filter_chat_states, 50); true -> ok end, if QueuePEP -> - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, filter_pep, 50); true -> ok - end, - ejabberd_hooks:delete(csi_filter_stanza, Host, ?MODULE, - filter_other, 100), - ejabberd_hooks:delete(csi_flush_queue, Host, ?MODULE, - flush_queue, 50); + end; true -> ok end. --spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()]. +-spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok. +reload(Host, NewOpts, _OldOpts) -> + QueuePresence = mod_client_state_opt:queue_presence(NewOpts), + QueueChatStates = mod_client_state_opt:queue_chat_states(NewOpts), + QueuePEP = mod_client_state_opt:queue_pep(NewOpts), + if QueuePresence; QueueChatStates; QueuePEP -> + register_hooks(Host); + true -> + unregister_hooks(Host) + end, + if QueuePresence -> + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, + filter_presence, 50); + true -> + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, + filter_presence, 50) + end, + if QueueChatStates -> + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, + filter_chat_states, 50); + true -> + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, + filter_chat_states, 50) + end, + if QueuePEP -> + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, + filter_pep, 50); + true -> + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, + filter_pep, 50) + end. +-spec mod_opt_type(atom()) -> econf:validator(). mod_opt_type(queue_presence) -> - fun(B) when is_boolean(B) -> B end; + econf:bool(); mod_opt_type(queue_chat_states) -> - fun(B) when is_boolean(B) -> B end; + econf:bool(); mod_opt_type(queue_pep) -> - fun(B) when is_boolean(B) -> B end; -mod_opt_type(_) -> [queue_presence, queue_chat_states, queue_pep]. + econf:bool(). --spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}]. +mod_options(_) -> + [{queue_presence, true}, + {queue_chat_states, true}, + {queue_pep, true}]. +-spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}]. depends(_Host, _Opts) -> []. +-spec register_hooks(binary()) -> ok. +register_hooks(Host) -> + ejabberd_hooks:add(c2s_stream_started, Host, ?MODULE, + c2s_stream_started, 50), + ejabberd_hooks:add(c2s_post_auth_features, Host, ?MODULE, + add_stream_feature, 50), + ejabberd_hooks:add(c2s_authenticated_packet, Host, ?MODULE, + c2s_authenticated_packet, 50), + ejabberd_hooks:add(csi_activity, Host, ?MODULE, + csi_activity, 50), + ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:add(c2s_session_resumed, Host, ?MODULE, + c2s_session_resumed, 50), + ejabberd_hooks:add(c2s_filter_send, Host, ?MODULE, + filter_other, 75). + +-spec unregister_hooks(binary()) -> ok. +unregister_hooks(Host) -> + ejabberd_hooks:delete(c2s_stream_started, Host, ?MODULE, + c2s_stream_started, 50), + ejabberd_hooks:delete(c2s_post_auth_features, Host, ?MODULE, + add_stream_feature, 50), + ejabberd_hooks:delete(c2s_authenticated_packet, Host, ?MODULE, + c2s_authenticated_packet, 50), + ejabberd_hooks:delete(csi_activity, Host, ?MODULE, + csi_activity, 50), + ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:delete(c2s_session_resumed, Host, ?MODULE, + c2s_session_resumed, 50), + ejabberd_hooks:delete(c2s_filter_send, Host, ?MODULE, + filter_other, 75). + %%-------------------------------------------------------------------- %% ejabberd_hooks callbacks. %%-------------------------------------------------------------------- - --spec filter_presence({term(), [xmlel()]}, binary(), jid(), xmlel()) - -> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}. - -filter_presence({C2SState, _OutStanzas} = Acc, Host, To, - #xmlel{name = <<"presence">>, attrs = Attrs} = Stanza) -> - case fxml:get_attr(<<"type">>, Attrs) of - {value, Type} when Type /= <<"unavailable">> -> - Acc; - _ -> - ?DEBUG("Got availability presence stanza for ~s", - [jid:to_string(To)]), - queue_add(presence, Stanza, Host, C2SState) +-spec c2s_stream_started(c2s_state(), stream_start()) -> c2s_state(). +c2s_stream_started(State, _) -> + init_csi_state(State). + +-spec c2s_authenticated_packet(c2s_state(), xmpp_element()) -> c2s_state(). +c2s_authenticated_packet(#{lserver := LServer} = C2SState, #csi{type = active}) -> + ejabberd_hooks:run_fold(csi_activity, LServer, C2SState, [active]); +c2s_authenticated_packet(#{lserver := LServer} = C2SState, #csi{type = inactive}) -> + ejabberd_hooks:run_fold(csi_activity, LServer, C2SState, [inactive]); +c2s_authenticated_packet(C2SState, _) -> + C2SState. + +-spec csi_activity(c2s_state(), active | inactive) -> c2s_state(). +csi_activity(C2SState, active) -> + C2SState1 = C2SState#{csi_state => active}, + flush_queue(C2SState1); +csi_activity(C2SState, inactive) -> + C2SState#{csi_state => inactive}. + +-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state(). +c2s_copy_session(C2SState, #{csi_queue := Q}) -> + C2SState#{csi_queue => Q}; +c2s_copy_session(C2SState, _) -> + C2SState. + +-spec c2s_session_resumed(c2s_state()) -> c2s_state(). +c2s_session_resumed(C2SState) -> + flush_queue(C2SState). + +-spec filter_presence(filter_acc()) -> filter_acc(). +filter_presence({#presence{meta = #{csi_resend := true}}, _} = Acc) -> + Acc; +filter_presence({#presence{to = To, type = Type} = Pres, + #{csi_state := inactive} = C2SState}) + when Type == available; Type == unavailable -> + ?DEBUG("Got availability presence stanza for ~ts", [jid:encode(To)]), + enqueue_stanza(presence, Pres, C2SState); +filter_presence(Acc) -> + Acc. + +-spec filter_chat_states(filter_acc()) -> filter_acc(). +filter_chat_states({#message{meta = #{csi_resend := true}}, _} = Acc) -> + Acc; +filter_chat_states({#message{from = From, to = To} = Msg, + #{csi_state := inactive} = C2SState} = Acc) -> + case misc:is_standalone_chat_state(Msg) of + true -> + case {From, To} of + {#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}} -> + %% Don't queue (carbon copies of) chat states from other + %% resources, as they might be used to sync the state of + %% conversations across clients. + Acc; + _ -> + ?DEBUG("Got standalone chat state notification for ~ts", + [jid:encode(To)]), + enqueue_stanza(chatstate, Msg, C2SState) + end; + false -> + Acc end; -filter_presence(Acc, _Host, _To, _Stanza) -> Acc. - --spec filter_chat_states({term(), [xmlel()]}, binary(), jid(), xmlel()) - -> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}. - -filter_chat_states({C2SState, _OutStanzas} = Acc, Host, To, - #xmlel{name = <<"message">>} = Stanza) -> - case jlib:is_standalone_chat_state(Stanza) of - true -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - case {jid:from_string(From), To} of - {#jid{luser = U, lserver = S}, #jid{luser = U, lserver = S}} -> - %% Don't queue (carbon copies of) chat states from other - %% resources, as they might be used to sync the state of - %% conversations across clients. - Acc; - _ -> - ?DEBUG("Got standalone chat state notification for ~s", - [jid:to_string(To)]), - queue_add(chatstate, Stanza, Host, C2SState) - end; - false -> - Acc +filter_chat_states(Acc) -> + Acc. + +-spec filter_pep(filter_acc()) -> filter_acc(). +filter_pep({#message{meta = #{csi_resend := true}}, _} = Acc) -> + Acc; +filter_pep({#message{to = To} = Msg, + #{csi_state := inactive} = C2SState} = Acc) -> + case get_pep_node(Msg) of + undefined -> + Acc; + Node -> + ?DEBUG("Got PEP notification for ~ts", [jid:encode(To)]), + enqueue_stanza({pep, Node}, Msg, C2SState) end; -filter_chat_states(Acc, _Host, _To, _Stanza) -> Acc. - --spec filter_pep({term(), [xmlel()]}, binary(), jid(), xmlel()) - -> {term(), [xmlel()]} | {stop, {term(), [xmlel()]}}. - -filter_pep({C2SState, _OutStanzas} = Acc, Host, To, - #xmlel{name = <<"message">>} = Stanza) -> - case get_pep_node(Stanza) of - {value, Node} -> - ?DEBUG("Got PEP notification for ~s", [jid:to_string(To)]), - queue_add({pep, Node}, Stanza, Host, C2SState); - false -> - Acc +filter_pep(Acc) -> + Acc. + +-spec filter_other(filter_acc()) -> filter_acc(). +filter_other({Stanza, #{jid := JID} = C2SState} = Acc) when ?is_stanza(Stanza) -> + case xmpp:get_meta(Stanza) of + #{csi_resend := true} -> + Acc; + _ -> + ?DEBUG("Won't add stanza for ~ts to CSI queue", [jid:encode(JID)]), + From = case xmpp:get_from(Stanza) of + undefined -> JID; + F -> F + end, + C2SState1 = dequeue_sender(From, C2SState), + {Stanza, C2SState1} end; -filter_pep(Acc, _Host, _To, _Stanza) -> Acc. - --spec filter_other({term(), [xmlel()]}, binary(), jid(), xmlel()) - -> {term(), [xmlel()]}. - -filter_other({C2SState, _OutStanzas}, Host, To, Stanza) -> - ?DEBUG("Won't add stanza for ~s to CSI queue", [jid:to_string(To)]), - queue_take(Stanza, Host, C2SState). - --spec flush_queue({term(), [xmlel()]}, binary(), jid()) -> {term(), [xmlel()]}. - -flush_queue({C2SState, _OutStanzas}, Host, JID) -> - ?DEBUG("Going to flush CSI queue of ~s", [jid:to_string(JID)]), - Queue = get_queue(C2SState), - NewState = set_queue([], C2SState), - {NewState, get_stanzas(Queue, Host)}. - --spec add_stream_feature([xmlel()], binary) -> [xmlel()]. - -add_stream_feature(Features, _Host) -> - Feature = #xmlel{name = <<"csi">>, - attrs = [{<<"xmlns">>, ?NS_CLIENT_STATE}], - children = []}, - [Feature | Features]. +filter_other(Acc) -> + Acc. + +-spec add_stream_feature([xmpp_element()], binary()) -> [xmpp_element()]. +add_stream_feature(Features, Host) -> + case gen_mod:is_loaded(Host, ?MODULE) of + true -> + [#feature_csi{} | Features]; + false -> + Features + end. %%-------------------------------------------------------------------- %% Internal functions. %%-------------------------------------------------------------------- - --spec queue_add(csi_type(), xmlel(), binary(), term()) - -> {stop, {term(), [xmlel()]}}. - -queue_add(Type, Stanza, Host, C2SState) -> - case get_queue(C2SState) of - Queue when length(Queue) >= ?CSI_QUEUE_MAX -> +-spec init_csi_state(c2s_state()) -> c2s_state(). +init_csi_state(C2SState) -> + C2SState#{csi_state => active, csi_queue => queue_new()}. + +-spec enqueue_stanza(csi_type(), stanza(), c2s_state()) -> filter_acc(). +enqueue_stanza(Type, Stanza, #{csi_state := inactive, + csi_queue := Q} = C2SState) -> + case queue_len(Q) >= ?CSI_QUEUE_MAX of + true -> ?DEBUG("CSI queue too large, going to flush it", []), - NewState = set_queue([], C2SState), - {stop, {NewState, get_stanzas(Queue, Host) ++ [Stanza]}}; - Queue -> - ?DEBUG("Adding stanza to CSI queue", []), - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - Key = {jid:tolower(jid:from_string(From)), Type}, - Entry = {Key, p1_time_compat:timestamp(), Stanza}, - NewQueue = lists:keystore(Key, 1, Queue, Entry), - NewState = set_queue(NewQueue, C2SState), - {stop, {NewState, []}} + C2SState1 = flush_queue(C2SState), + enqueue_stanza(Type, Stanza, C2SState1); + false -> + From = jid:tolower(xmpp:get_from(Stanza)), + Q1 = queue_in({From, Type}, Stanza, Q), + {stop, {drop, C2SState#{csi_queue => Q1}}} + end; +enqueue_stanza(_Type, Stanza, State) -> + {Stanza, State}. + +-spec dequeue_sender(jid(), c2s_state()) -> c2s_state(). +dequeue_sender(#jid{luser = U, lserver = S} = Sender, + #{jid := JID} = C2SState) -> + case maps:get(csi_queue, C2SState, undefined) of + undefined -> + %% This may happen when the module is (re)loaded in runtime + init_csi_state(C2SState); + Q -> + ?DEBUG("Flushing packets of ~ts@~ts from CSI queue of ~ts", + [U, S, jid:encode(JID)]), + {Elems, Q1} = queue_take(Sender, Q), + C2SState1 = flush_stanzas(C2SState, Elems), + C2SState1#{csi_queue => Q1} end. --spec queue_take(xmlel(), binary(), term()) -> {term(), [xmlel()]}. - -queue_take(Stanza, Host, C2SState) -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - {LUser, LServer, _LResource} = jid:tolower(jid:from_string(From)), - {Selected, Rest} = lists:partition( - fun({{{U, S, _R}, _Type}, _Time, _Stanza}) -> - U == LUser andalso S == LServer - end, get_queue(C2SState)), - NewState = set_queue(Rest, C2SState), - {NewState, get_stanzas(Selected, Host) ++ [Stanza]}. - --spec set_queue(csi_queue(), term()) -> term(). - -set_queue(Queue, C2SState) -> - ejabberd_c2s:set_aux_field(csi_queue, Queue, C2SState). - --spec get_queue(term()) -> csi_queue(). - -get_queue(C2SState) -> - case ejabberd_c2s:get_aux_field(csi_queue, C2SState) of - {ok, Queue} -> - Queue; - error -> - [] +-spec flush_queue(c2s_state()) -> c2s_state(). +flush_queue(#{csi_queue := Q, jid := JID} = C2SState) -> + ?DEBUG("Flushing CSI queue of ~ts", [jid:encode(JID)]), + C2SState1 = flush_stanzas(C2SState, queue_to_list(Q)), + C2SState1#{csi_queue => queue_new()}. + +-spec flush_stanzas(c2s_state(), + [{csi_type(), csi_timestamp(), stanza()}]) -> c2s_state(). +flush_stanzas(#{lserver := LServer} = C2SState, Elems) -> + lists:foldl( + fun({Time, Stanza}, AccState) -> + Stanza1 = add_delay_info(Stanza, LServer, Time), + ejabberd_c2s:send(AccState, Stanza1) + end, C2SState, Elems). + +-spec add_delay_info(stanza(), binary(), csi_timestamp()) -> stanza(). +add_delay_info(Stanza, LServer, {_Seq, TimeStamp}) -> + Stanza1 = misc:add_delay_info( + Stanza, jid:make(LServer), TimeStamp, + <<"Client Inactive">>), + xmpp:put_meta(Stanza1, csi_resend, true). + +-spec get_pep_node(message()) -> binary() | undefined. +get_pep_node(#message{from = #jid{luser = <<>>}}) -> + %% It's not PEP. + undefined; +get_pep_node(#message{} = Msg) -> + case xmpp:get_subtag(Msg, #ps_event{}) of + #ps_event{items = #ps_items{node = Node}} -> + Node; + _ -> + undefined end. --spec get_stanzas(csi_queue(), binary()) -> [xmlel()]. - -get_stanzas(Queue, Host) -> - lists:map(fun({_Key, Time, Stanza}) -> - jlib:add_delay_info(Stanza, Host, Time, - <<"Client Inactive">>) - end, Queue). - --spec get_pep_node(xmlel()) -> {value, binary()} | false. - -get_pep_node(#xmlel{name = <<"message">>} = Stanza) -> - From = fxml:get_tag_attr_s(<<"from">>, Stanza), - case jid:from_string(From) of - #jid{luser = <<>>} -> % It's not PEP. - false; - _ -> - case fxml:get_subtag_with_xmlns(Stanza, <<"event">>, - ?NS_PUBSUB_EVENT) of - #xmlel{children = Els} -> - case fxml:remove_cdata(Els) of - [#xmlel{name = <<"items">>, attrs = ItemsAttrs}] -> - fxml:get_attr(<<"node">>, ItemsAttrs); - _ -> - false - end; - false -> - false - end - end. +%%-------------------------------------------------------------------- +%% Queue interface +%%-------------------------------------------------------------------- +-spec queue_new() -> csi_queue(). +queue_new() -> + {0, #{}}. + +-spec queue_in(csi_key(), stanza(), csi_queue()) -> csi_queue(). +queue_in(Key, Stanza, {Seq, Q}) -> + Seq1 = Seq + 1, + Time = {Seq1, erlang:timestamp()}, + Q1 = maps:put(Key, {Time, Stanza}, Q), + {Seq1, Q1}. + +-spec queue_take(jid(), csi_queue()) -> {[csi_element()], csi_queue()}. +queue_take(#jid{luser = LUser, lserver = LServer}, {Seq, Q}) -> + {Vals, Q1} = maps:fold(fun({{U, S, _}, _} = Key, Val, {AccVals, AccQ}) + when U == LUser, S == LServer -> + {[Val | AccVals], maps:remove(Key, AccQ)}; + (_, _, Acc) -> + Acc + end, {[], Q}, Q), + {lists:keysort(1, Vals), {Seq, Q1}}. + +-spec queue_len(csi_queue()) -> non_neg_integer(). +queue_len({_, Q}) -> + maps:size(Q). + +-spec queue_to_list(csi_queue()) -> [csi_element()]. +queue_to_list({_, Q}) -> + lists:keysort(1, maps:values(Q)). |