diff options
Diffstat (limited to 'src/mod_push.erl')
-rw-r--r-- | src/mod_push.erl | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/src/mod_push.erl b/src/mod_push.erl new file mode 100644 index 000000000..2ca0bf525 --- /dev/null +++ b/src/mod_push.erl @@ -0,0 +1,596 @@ +%%%---------------------------------------------------------------------- +%%% File : mod_push.erl +%%% Author : Holger Weiss <holger@zedat.fu-berlin.de> +%%% Purpose : Push Notifications (XEP-0357) +%%% Created : 15 Jul 2017 by Holger Weiss <holger@zedat.fu-berlin.de> +%%% +%%% +%%% ejabberd, Copyright (C) 2017 ProcessOne +%%% +%%% This program is free software; you can redistribute it and/or +%%% modify it under the terms of the GNU General Public License as +%%% published by the Free Software Foundation; either version 2 of the +%%% License, or (at your option) any later version. +%%% +%%% This program is distributed in the hope that it will be useful, +%%% but WITHOUT ANY WARRANTY; without even the implied warranty of +%%% MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +%%% General Public License for more details. +%%% +%%% You should have received a copy of the GNU General Public License along +%%% with this program; if not, write to the Free Software Foundation, Inc., +%%% 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +%%% +%%%---------------------------------------------------------------------- + +-module(mod_push). +-author('holger@zedat.fu-berlin.de'). +-protocol({xep, 357, '0.2'}). + +-behavior(gen_mod). + +%% gen_mod callbacks. +-export([start/2, stop/1, reload/3, mod_opt_type/1, depends/2]). + +%% ejabberd_hooks callbacks. +-export([disco_sm_features/5, c2s_session_pending/1, c2s_copy_session/2, + c2s_handle_cast/2, c2s_stanza/3, mam_message/6, offline_message/1, + remove_user/2]). + +%% gen_iq_handler callback. +-export([process_iq/1]). + +%% ejabberd command. +-export([get_commands_spec/0, delete_old_sessions/1]). + +%% API (used by mod_push_keepalive). +-export([notify/1, notify/3, notify/5]). + +-include("ejabberd.hrl"). +-include("ejabberd_commands.hrl"). +-include("logger.hrl"). +-include("xmpp.hrl"). + +-define(PUSH_CACHE, push_cache). + +-type c2s_state() :: ejabberd_c2s:state(). +-type timestamp() :: erlang:timestamp(). +-type push_session() :: {timestamp(), ljid(), binary(), xdata()}. + +-callback init(binary(), gen_mod:opts()) + -> any(). +-callback store_session(binary(), binary(), timestamp(), jid(), binary(), + xdata()) + -> {ok, push_session()} | error. +-callback lookup_session(binary(), binary(), jid(), binary()) + -> {ok, push_session()} | error. +-callback lookup_session(binary(), binary(), timestamp()) + -> {ok, push_session()} | error. +-callback lookup_sessions(binary(), binary(), jid()) + -> {ok, [push_session()]} | error. +-callback lookup_sessions(binary(), binary()) + -> {ok, [push_session()]} | error. +-callback lookup_sessions(binary()) + -> {ok, [push_session()]} | error. +-callback delete_session(binary(), binary(), timestamp()) + -> ok | error. +-callback delete_old_sessions(binary() | global, erlang:timestamp()) + -> any(). +-callback use_cache(binary()) + -> boolean(). +-callback cache_nodes(binary()) + -> [node()]. + +-optional_callbacks([use_cache/1, cache_nodes/1]). + +%%-------------------------------------------------------------------- +%% gen_mod callbacks. +%%-------------------------------------------------------------------- +-spec start(binary(), gen_mod:opts()) -> ok. +start(Host, Opts) -> + IQDisc = gen_mod:get_opt(iqdisc, Opts, gen_iq_handler:iqdisc(Host)), + Mod = gen_mod:db_mod(Host, Opts, ?MODULE), + Mod:init(Host, Opts), + init_cache(Mod, Host, Opts), + register_iq_handlers(Host, IQDisc), + register_hooks(Host), + ejabberd_commands:register_commands(get_commands_spec()). + +-spec stop(binary()) -> ok. +stop(Host) -> + unregister_hooks(Host), + unregister_iq_handlers(Host), + ejabberd_commands:unregister_commands(get_commands_spec()). + +-spec reload(binary(), gen_mod:opts(), gen_mod:opts()) -> ok. +reload(Host, NewOpts, OldOpts) -> + NewMod = gen_mod:db_mod(Host, NewOpts, ?MODULE), + OldMod = gen_mod:db_mod(Host, OldOpts, ?MODULE), + if NewMod /= OldMod -> + NewMod:init(Host, NewOpts); + true -> + ok + end, + case gen_mod:is_equal_opt(iqdisc, NewOpts, OldOpts, + gen_iq_handler:iqdisc(Host)) of + {false, IQDisc, _} -> + register_iq_handlers(Host, IQDisc); + true -> + ok + end. + +-spec depends(binary(), gen_mod:opts()) -> [{module(), hard | soft}]. +depends(_Host, _Opts) -> + []. + +-spec mod_opt_type(atom()) -> fun((term()) -> term()) | [atom()]. +mod_opt_type(db_type) -> + fun(T) -> ejabberd_config:v_db(?MODULE, T) end; +mod_opt_type(O) when O == cache_life_time; O == cache_size -> + fun(I) when is_integer(I), I > 0 -> I; + (infinity) -> infinity + end; +mod_opt_type(O) when O == use_cache; O == cache_missed -> + fun (B) when is_boolean(B) -> B end; +mod_opt_type(iqdisc) -> + fun gen_iq_handler:check_type/1; +mod_opt_type(_) -> + [db_type, cache_life_time, cache_size, use_cache, cache_missed, iqdisc]. + +%%-------------------------------------------------------------------- +%% ejabberd command callback. +%%-------------------------------------------------------------------- +-spec get_commands_spec() -> [ejabberd_commands()]. +get_commands_spec() -> + [#ejabberd_commands{name = delete_old_push_sessions, tags = [purge], + desc = "Remove push sessions older than DAYS", + module = ?MODULE, function = delete_old_sessions, + args = [{days, integer}], + result = {res, rescode}}]. + +-spec delete_old_sessions(non_neg_integer()) -> ok | any(). +delete_old_sessions(Days) -> + CurrentTime = p1_time_compat:system_time(micro_seconds), + Diff = Days * 24 * 60 * 60 * 1000000, + TimeStamp = misc:usec_to_now(CurrentTime - Diff), + DBTypes = lists:usort( + lists:map( + fun(Host) -> + case gen_mod:db_type(Host, ?MODULE) of + sql -> {sql, Host}; + Other -> {Other, global} + end + end, ?MYHOSTS)), + Results = lists:map( + fun({DBType, Host}) -> + Mod = gen_mod:db_mod(DBType, ?MODULE), + Mod:delete_old_sessions(Host, TimeStamp) + end, DBTypes), + ets_cache:clear(?PUSH_CACHE, ejabberd_cluster:get_nodes()), + case lists:filter(fun(Res) -> Res /= ok end, Results) of + [] -> + ?INFO_MSG("Deleted push sessions older than ~B days", [Days]), + ok; + [NotOk | _] -> + ?ERROR_MSG("Error while deleting old push sessions: ~p", [NotOk]), + NotOk + end. + +%%-------------------------------------------------------------------- +%% Register/unregister hooks. +%%-------------------------------------------------------------------- +-spec register_hooks(binary()) -> ok. +register_hooks(Host) -> + ejabberd_hooks:add(disco_sm_features, Host, ?MODULE, + disco_sm_features, 50), + ejabberd_hooks:add(c2s_session_pending, Host, ?MODULE, + c2s_session_pending, 50), + ejabberd_hooks:add(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE, + c2s_handle_cast, 50), + ejabberd_hooks:add(c2s_handle_send, Host, ?MODULE, + c2s_stanza, 50), + ejabberd_hooks:add(store_mam_message, Host, ?MODULE, + mam_message, 50), + ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, + offline_message, 50), + ejabberd_hooks:add(remove_user, Host, ?MODULE, + remove_user, 50). + +-spec unregister_hooks(binary()) -> ok. +unregister_hooks(Host) -> + ejabberd_hooks:delete(disco_sm_features, Host, ?MODULE, + disco_sm_features, 50), + ejabberd_hooks:delete(c2s_session_pending, Host, ?MODULE, + c2s_session_pending, 50), + ejabberd_hooks:delete(c2s_copy_session, Host, ?MODULE, + c2s_copy_session, 50), + ejabberd_hooks:delete(c2s_handle_cast, Host, ?MODULE, + c2s_handle_cast, 50), + ejabberd_hooks:delete(c2s_handle_send, Host, ?MODULE, + c2s_stanza, 50), + ejabberd_hooks:delete(store_mam_message, Host, ?MODULE, + mam_message, 50), + ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, + offline_message, 50), + ejabberd_hooks:delete(remove_user, Host, ?MODULE, + remove_user, 50). + +%%-------------------------------------------------------------------- +%% Service discovery. +%%-------------------------------------------------------------------- +-spec disco_sm_features(empty | {result, [binary()]} | {error, stanza_error()}, + jid(), jid(), binary(), binary()) + -> {result, [binary()]} | {error, stanza_error()}. +disco_sm_features(empty, From, To, Node, Lang) -> + disco_sm_features({result, []}, From, To, Node, Lang); +disco_sm_features({result, OtherFeatures}, + #jid{luser = U, lserver = S}, + #jid{luser = U, lserver = S}, <<"">>, _Lang) -> + {result, [?NS_PUSH_0 | OtherFeatures]}; +disco_sm_features(Acc, _From, _To, _Node, _Lang) -> + Acc. + +%%-------------------------------------------------------------------- +%% IQ handlers. +%%-------------------------------------------------------------------- +-spec register_iq_handlers(binary(), gen_iq_handler:type()) -> ok. +register_iq_handlers(Host, IQDisc) -> + gen_iq_handler:add_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0, + ?MODULE, process_iq, IQDisc). + +-spec unregister_iq_handlers(binary()) -> ok. +unregister_iq_handlers(Host) -> + gen_iq_handler:remove_iq_handler(ejabberd_sm, Host, ?NS_PUSH_0). + +-spec process_iq(iq()) -> iq(). +process_iq(#iq{type = get, lang = Lang} = IQ) -> + Txt = <<"Value 'get' of 'type' attribute is not allowed">>, + xmpp:make_error(IQ, xmpp:err_not_allowed(Txt, Lang)); +process_iq(#iq{lang = Lang, sub_els = [#push_enable{node = <<>>}]} = IQ) -> + Txt = <<"Enabling push without 'node' attribute is not supported">>, + xmpp:make_error(IQ, xmpp:err_feature_not_implemented(Txt, Lang)); +process_iq(#iq{from = #jid{lserver = LServer} = JID, + to = #jid{lserver = LServer}, + sub_els = [#push_enable{jid = PushJID, + node = Node, + xdata = XData}]} = IQ) -> + case enable(JID, PushJID, Node, XData) of + ok -> + xmpp:make_iq_result(IQ); + error -> + xmpp:make_error(IQ, xmpp:err_internal_server_error()) + end; +process_iq(#iq{from = #jid{lserver = LServer} = JID, + to = #jid{lserver = LServer}, + sub_els = [#push_disable{jid = PushJID, + node = Node}]} = IQ) -> + case disable(JID, PushJID, Node) of + ok -> + xmpp:make_iq_result(IQ); + error -> + xmpp:make_error(IQ, xmpp:err_item_not_found()) + end; +process_iq(IQ) -> + xmpp:make_error(IQ, xmpp:err_not_allowed()). + +-spec enable(jid(), jid(), binary(), xdata()) -> ok | error. +enable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, + PushJID, Node, XData) -> + case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of + {TS, PID} -> + case store_session(LUser, LServer, TS, PushJID, Node, XData) of + {ok, _} -> + ?INFO_MSG("Enabling push notifications for ~s", + [jid:encode(JID)]), + ejabberd_c2s:cast(PID, push_enable); + error -> + ?ERROR_MSG("Cannot enable push for ~s: database error", + [jid:encode(JID)]), + error + end; + none -> + ?WARNING_MSG("Cannot enable push for ~s: session not found", + [jid:encode(JID)]), + error + end. + +-spec disable(jid(), jid(), binary() | undefined) -> ok | error. +disable(#jid{luser = LUser, lserver = LServer, lresource = LResource} = JID, + PushJID, Node) -> + case ejabberd_sm:get_session_sid(LUser, LServer, LResource) of + {_TS, PID} -> + ?INFO_MSG("Disabling push notifications for ~s", + [jid:encode(JID)]), + ejabberd_c2s:cast(PID, push_disable); + none -> + ?WARNING_MSG("Session not found while disabling push for ~s", + [jid:encode(JID)]) + end, + if Node /= undefined -> + delete_session(LUser, LServer, PushJID, Node); + true -> + delete_sessions(LUser, LServer, PushJID) + end. + +%%-------------------------------------------------------------------- +%% Hook callbacks. +%%-------------------------------------------------------------------- +-spec c2s_stanza(c2s_state(), xmpp_element() | xmlel(), term()) -> c2s_state(). +c2s_stanza(#{push_enabled := true, mgmt_state := pending} = State, + _Pkt, _SendResult) -> + notify(State), + State; +c2s_stanza(State, _Pkt, _SendResult) -> + State. + +-spec mam_message(message() | drop, binary(), binary(), jid(), + chat | groupchat, recv | send) -> message(). +mam_message(#message{meta = #{push_notified := true}} = Pkt, + _LUser, _LServer, _Peer, _Type, _Dir) -> + Pkt; +mam_message(#message{} = Pkt, LUser, LServer, _Peer, chat, _Dir) -> + case lookup_sessions(LUser, LServer) of + {ok, [_|_] = Clients} -> + case drop_online_sessions(LUser, LServer, Clients) of + [_|_] = Clients1 -> + ?DEBUG("Notifying ~s@~s of MAM message", [LUser, LServer]), + notify(LUser, LServer, Clients1); + [] -> + ok + end; + _ -> + ok + end, + xmpp:put_meta(Pkt, push_notified, true); +mam_message(Pkt, _LUser, _LServer, _Peer, _Type, _Dir) -> + Pkt. + +-spec offline_message({any(), message()}) -> {any(), message()}. +offline_message({_Action, #message{meta = #{push_notified := true}}} = Acc) -> + Acc; +offline_message({Action, #message{to = #jid{luser = LUser, + lserver = LServer}} = Pkt}) -> + case lookup_sessions(LUser, LServer) of + {ok, [_|_] = Clients} -> + ?DEBUG("Notifying ~s@~s of offline message", [LUser, LServer]), + notify(LUser, LServer, Clients); + _ -> + ok + end, + {Action, xmpp:put_meta(Pkt, push_notified, true)}. + +-spec c2s_session_pending(c2s_state()) -> c2s_state(). +c2s_session_pending(#{push_enabled := true, mgmt_queue := Queue} = State) -> + case p1_queue:len(Queue) of + Len when Len > 0 -> + ?DEBUG("Notifying client of unacknowledged messages", []), + notify(State), + State; + 0 -> + State + end; +c2s_session_pending(State) -> + State. + +-spec c2s_copy_session(c2s_state(), c2s_state()) -> c2s_state(). +c2s_copy_session(State, #{push_enabled := true}) -> + State#{push_enabled => true}; +c2s_copy_session(State, _) -> + State. + +-spec c2s_handle_cast(c2s_state(), any()) -> c2s_state() | {stop, c2s_state()}. +c2s_handle_cast(State, push_enable) -> + {stop, State#{push_enabled => true}}; +c2s_handle_cast(State, push_disable) -> + {stop, maps:remove(push_enabled, State)}; +c2s_handle_cast(State, _Msg) -> + State. + +-spec remove_user(binary(), binary()) -> ok | error. +remove_user(LUser, LServer) -> + ?INFO_MSG("Removing any push sessions of ~s@~s", [LUser, LServer]), + Mod = gen_mod:db_mod(LServer, ?MODULE), + LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer) end, + delete_sessions(LUser, LServer, LookupFun, Mod). + +%%-------------------------------------------------------------------- +%% Generate push notifications. +%%-------------------------------------------------------------------- +-spec notify(c2s_state()) -> ok. +notify(#{jid := #jid{luser = LUser, lserver = LServer}, sid := {TS, _}}) -> + case lookup_session(LUser, LServer, TS) of + {ok, Client} -> + notify(LUser, LServer, [Client]); + error -> + ok + end. + +-spec notify(binary(), binary(), [push_session()]) -> ok. +notify(LUser, LServer, Clients) -> + lists:foreach( + fun({TS, PushLJID, Node, XData}) -> + HandleResponse = fun(#iq{type = result}) -> + ok; + (#iq{type = error}) -> + delete_session(LUser, LServer, TS); + (timeout) -> + ok % Hmm. + end, + notify(LServer, PushLJID, Node, XData, HandleResponse) + end, Clients). + +-spec notify(binary(), ljid(), binary(), xdata(), + fun((iq() | timeout) -> any())) -> ok. +notify(LServer, PushLJID, Node, XData, HandleResponse) -> + From = jid:make(LServer), + Item = #ps_item{xml_els = [xmpp:encode(#push_notification{})]}, + PubSub = #pubsub{publish = #ps_publish{node = Node, items = [Item]}, + publish_options = XData}, + IQ = #iq{type = set, + from = From, + to = jid:make(PushLJID), + id = randoms:get_string(), + sub_els = [PubSub]}, + ejabberd_local:route_iq(IQ, HandleResponse), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions. +%%-------------------------------------------------------------------- +-spec store_session(binary(), binary(), timestamp(), jid(), binary(), xdata()) + -> {ok, push_session()} | error. +store_session(LUser, LServer, TS, PushJID, Node, XData) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + delete_session(LUser, LServer, PushJID, Node), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)), + ets_cache:update( + ?PUSH_CACHE, + {LUser, LServer, TS}, {ok, {TS, PushJID, Node, XData}}, + fun() -> + Mod:store_session(LUser, LServer, TS, PushJID, Node, + XData) + end, cache_nodes(Mod, LServer)); + false -> + Mod:store_session(LUser, LServer, TS, PushJID, Node, XData) + end. + +-spec lookup_session(binary(), binary(), timestamp()) + -> {ok, push_session()} | error. +lookup_session(LUser, LServer, TS) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?PUSH_CACHE, {LUser, LServer, TS}, + fun() -> Mod:lookup_session(LUser, LServer, TS) end); + false -> + Mod:lookup_session(LUser, LServer, TS) + end. + +-spec lookup_sessions(binary(), binary()) -> {ok, [push_session()]} | error. +lookup_sessions(LUser, LServer) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case use_cache(Mod, LServer) of + true -> + ets_cache:lookup( + ?PUSH_CACHE, {LUser, LServer}, + fun() -> Mod:lookup_sessions(LUser, LServer) end); + false -> + Mod:lookup_sessions(LUser, LServer) + end. + +-spec delete_session(binary(), binary(), timestamp()) -> ok | error. +delete_session(LUser, LServer, TS) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + ok = Mod:delete_session(LUser, LServer, TS), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)), + ets_cache:delete(?PUSH_CACHE, {LUser, LServer, TS}, + cache_nodes(Mod, LServer)); + false -> + ok + end. + +-spec delete_session(binary(), binary(), jid(), binary()) -> ok | error. +delete_session(LUser, LServer, PushJID, Node) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + case Mod:lookup_session(LUser, LServer, PushJID, Node) of + {ok, {TS, _, _, _}} -> + delete_session(LUser, LServer, TS); + error -> + error + end. + +-spec delete_sessions(binary(), binary(), jid()) -> ok | error. +delete_sessions(LUser, LServer, PushJID) -> + Mod = gen_mod:db_mod(LServer, ?MODULE), + LookupFun = fun() -> Mod:lookup_sessions(LUser, LServer, PushJID) end, + delete_sessions(LUser, LServer, LookupFun, Mod). + +-spec delete_sessions(binary(), binary(), fun(() -> ok | error), module()) + -> ok | error. +delete_sessions(LUser, LServer, LookupFun, Mod) -> + case LookupFun() of + {ok, Clients} -> + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, {LUser, LServer}, + cache_nodes(Mod, LServer)); + false -> + ok + end, + lists:foreach( + fun({TS, _, _, _}) -> + ok = Mod:delete_session(LUser, LServer, TS), + case use_cache(Mod, LServer) of + true -> + ets_cache:delete(?PUSH_CACHE, + {LUser, LServer, TS}, + cache_nodes(Mod, LServer)); + false -> + ok + end + end, Clients); + error -> + error + end. + +-spec drop_online_sessions(binary(), binary(), [push_session()]) + -> [push_session()]. +drop_online_sessions(LUser, LServer, Clients) -> + SessIDs = ejabberd_sm:get_session_sids(LUser, LServer), + [Client || {TS, _, _, _} = Client <- Clients, + lists:keyfind(TS, 1, SessIDs) == false]. + +%%-------------------------------------------------------------------- +%% Caching. +%%-------------------------------------------------------------------- +-spec init_cache(module(), binary(), gen_mod:opts()) -> ok. +init_cache(Mod, Host, Opts) -> + case use_cache(Mod, Host) of + true -> + CacheOpts = cache_opts(Host, Opts), + ets_cache:new(?PUSH_CACHE, CacheOpts); + false -> + ets_cache:delete(?PUSH_CACHE) + end. + +-spec cache_opts(binary(), gen_mod:opts()) -> [proplists:property()]. +cache_opts(Host, Opts) -> + MaxSize = gen_mod:get_opt( + cache_size, Opts, + ejabberd_config:cache_size(Host)), + CacheMissed = gen_mod:get_opt( + cache_missed, Opts, + ejabberd_config:cache_missed(Host)), + LifeTime = case gen_mod:get_opt( + cache_life_time, Opts, + ejabberd_config:cache_life_time(Host)) of + infinity -> infinity; + I -> timer:seconds(I) + end, + [{max_size, MaxSize}, {cache_missed, CacheMissed}, {life_time, LifeTime}]. + +-spec use_cache(module(), binary()) -> boolean(). +use_cache(Mod, Host) -> + case erlang:function_exported(Mod, use_cache, 1) of + true -> Mod:use_cache(Host); + false -> + gen_mod:get_module_opt( + Host, ?MODULE, use_cache, + ejabberd_config:use_cache(Host)) + end. + +-spec cache_nodes(module(), binary()) -> [node()]. +cache_nodes(Mod, Host) -> + case erlang:function_exported(Mod, cache_nodes, 1) of + true -> Mod:cache_nodes(Host); + false -> ejabberd_cluster:get_nodes() + end. |