diff options
Diffstat (limited to 'src/mod_ack.erl')
-rw-r--r-- | src/mod_ack.erl | 506 |
1 files changed, 251 insertions, 255 deletions
diff --git a/src/mod_ack.erl b/src/mod_ack.erl index 7268b549e..ee2973f95 100644 --- a/src/mod_ack.erl +++ b/src/mod_ack.erl @@ -27,60 +27,61 @@ -module(mod_ack). -behaviour(gen_server). + -behaviour(gen_mod). %% API -export([start/2, stop/1, start_link/2]). --export([user_send_packet/3, - offline_message/3, - delayed_message/3, - remove_connection/3, +-export([user_send_packet/3, offline_message/3, + delayed_message/3, remove_connection/3, feature_inspect_packet/4]). %% gen_server callbacks --export([init/1, - handle_info/2, - handle_call/3, - handle_cast/2, - terminate/2, - code_change/3]). +-export([init/1, handle_info/2, handle_call/3, + handle_cast/2, terminate/2, code_change/3]). -include("jlib.hrl"). + -include("ejabberd.hrl"). -define(PROCNAME, ejabberd_mod_ack). --define(ACK_TIMEOUT, 60). %% seconds + +-define(ACK_TIMEOUT, 60). + -define(DICT, dict). -ifndef(NS_RECEIPTS). --define(NS_RECEIPTS, "urn:xmpp:receipts"). + +-define(NS_RECEIPTS, <<"urn:xmpp:receipts">>). + -endif. + -ifndef(NS_PING). --define(NS_PING, "urn:xmpp:ping"). + +-define(NS_PING, <<"urn:xmpp:ping">>). + -endif. + -ifndef(NS_P1_PUSHED). --define(NS_P1_PUSHED, "p1:pushed"). + +-define(NS_P1_PUSHED, <<"p1:pushed">>). + -endif. --record(state, {host, timers = ?DICT:new(), timeout}). +-record(state, {host = <<"">> :: binary(), + timers = (?DICT):new() :: dict(), + timeout = ?ACK_TIMEOUT :: non_neg_integer()}). -%%==================================================================== -%% API -%%==================================================================== start_link(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []). + gen_server:start_link({local, Proc}, ?MODULE, + [Host, Opts], []). start(Host, Opts) -> Proc = gen_mod:get_module_proc(Host, ?PROCNAME), - ChildSpec = - {Proc, - {?MODULE, start_link, [Host, Opts]}, - transient, - 1000, - worker, - [?MODULE]}, + ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]}, + transient, 1000, worker, [?MODULE]}, supervisor:start_child(ejabberd_sup, ChildSpec). stop(Host) -> @@ -89,98 +90,97 @@ stop(Host) -> supervisor:terminate_child(ejabberd_sup, Proc), supervisor:delete_child(ejabberd_sup, Proc). -%% TODO: Make ack on server receive optional ? -user_send_packet(From, To, {xmlelement, "message", _Attrs, _Els} = Packet) -> +user_send_packet(From, To, + #xmlel{name = <<"message">>} = Packet) -> case has_receipt_request(Packet) of - {true, _} -> - process_ack_request("on-sender-server", From, To, Packet); - false -> - case has_receipt_response(Packet) of - {true, ID} -> - Server = From#jid.lserver, - del_timer(Server, {message, ID}, From); - false -> - do_nothing - end + {true, _} -> + process_ack_request(<<"on-sender-server">>, From, To, + Packet); + false -> + case has_receipt_response(Packet) of + {true, ID} -> + Server = From#jid.lserver, + del_timer(Server, {message, ID}, From); + false -> do_nothing + end end; -user_send_packet(From, _To, {xmlelement, "iq", Attrs, _Els}) -> - case xml:get_attr_s("id", Attrs) of - "" -> - ok; - ID -> - Server = From#jid.lserver, - del_timer(Server, {iq, ID}, From) +user_send_packet(From, _To, + #xmlel{name = <<"iq">>, attrs = Attrs}) -> + case xml:get_attr_s(<<"id">>, Attrs) of + <<"">> -> ok; + ID -> + Server = From#jid.lserver, + del_timer(Server, {iq, ID}, From) end; -user_send_packet(_From, _To, _Packet) -> - do_nothing. +user_send_packet(_From, _To, _Packet) -> do_nothing. offline_message(From, To, Packet) -> - process_ack_request("offline", From, To, Packet), + process_ack_request(<<"offline">>, From, To, Packet), ok. delayed_message(From, To, Packet) -> - process_ack_request("delayed", From, To, Packet), + process_ack_request(<<"delayed">>, From, To, Packet), ok. feature_inspect_packet(JID, Server, - {xmlelement, "presence", _, _} = Pres, - {xmlelement, "message", Attrs, _} = El) -> + #xmlel{name = <<"presence">>} = Pres, + #xmlel{name = <<"message">>, attrs = Attrs} = El) -> HasReceipts = has_receipt_request(El), - ReceiptsSupported = are_receipts_supported(Pres), - ?DEBUG("feature_inspect_packet:~n" - "** JID: ~p~n" - "** Has receipts: ~p~n" - "** Receipts supported: ~p~n" - "** Pres: ~p~n" - "** El: ~p", + ReceiptsSupported = are_receipts_supported(Server, Pres), + ?DEBUG("feature_inspect_packet:~n** JID: ~p~n** " + "Has receipts: ~p~n** Receipts supported: " + "~p~n** Pres: ~p~n** El: ~p", [JID, HasReceipts, ReceiptsSupported, Pres, El]), - Type = xml:get_attr_s("type", Attrs), + Type = xml:get_attr_s(<<"type">>, Attrs), case HasReceipts of - _ when Type == "error" -> - ok; - {true, ID} -> - case {jlib:string_to_jid(xml:get_attr_s("from", Attrs)), - jlib:string_to_jid(xml:get_attr_s("to", Attrs))} of - {#jid{} = From, #jid{} = To} -> - Pkt = {From, To, El}, - case ReceiptsSupported of - true -> - add_timer(Server, {message, ID}, JID, Pkt); - false -> - ping(From, To, Server, JID, El); - unknown -> - process_ack_request("unreliable", From, To, El) - end; - _ -> - ?WARNING_MSG("message doesn't have 'from' or 'to'" - " attribute:~n** El: ~p", [El]) - end; - _ -> - ok + _ when Type == <<"error">> -> ok; + {true, ID} -> + case {jlib:string_to_jid(xml:get_attr_s(<<"from">>, + Attrs)), + jlib:string_to_jid(xml:get_attr_s(<<"to">>, Attrs))} + of + {#jid{} = From, #jid{} = To} -> + Pkt = {From, To, El}, + case ReceiptsSupported of + true -> add_timer(Server, {message, ID}, JID, Pkt); + false -> ping(From, To, Server, JID, El); + unknown -> + process_ack_request(<<"unreliable">>, From, To, El) + end; + _ -> + ?WARNING_MSG("message doesn't have 'from' or 'to' " + "attribute:~n** El: ~p", + [El]) + end; + _ -> ok end; feature_inspect_packet(_User, _Server, _Pres, _El) -> ok. -remove_connection({_, C2SPid}, #jid{lserver = Host}, _Info) -> - gen_server:cast(gen_mod:get_module_proc(Host, ?PROCNAME), {del, C2SPid}). +remove_connection({_, C2SPid}, #jid{lserver = Host}, + _Info) -> + gen_server:cast(gen_mod:get_module_proc(Host, + ?PROCNAME), + {del, C2SPid}). -%%==================================================================== -%% gen_server callbacks -%%==================================================================== init([Host, Opts]) -> - Timeout = timer:seconds(gen_mod:get_opt(timeout, Opts, ?ACK_TIMEOUT)), - ejabberd_hooks:add(user_send_packet, Host, - ?MODULE, user_send_packet, 20), - ejabberd_hooks:add(offline_message_hook, Host, - ?MODULE, offline_message, 20), - ejabberd_hooks:add(delayed_message_hook, Host, - ?MODULE, delayed_message, 20), + Timeout = timer:seconds(gen_mod:get_opt(timeout, Opts, + fun(I) when is_integer(I), I>0 -> + I + end, + ?ACK_TIMEOUT)), + ejabberd_hooks:add(user_send_packet, Host, ?MODULE, + user_send_packet, 20), + ejabberd_hooks:add(offline_message_hook, Host, ?MODULE, + offline_message, 20), + ejabberd_hooks:add(delayed_message_hook, Host, ?MODULE, + delayed_message, 20), ejabberd_hooks:add(feature_inspect_packet, Host, ?MODULE, feature_inspect_packet, 150), ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, remove_connection, 20), - ejabberd_hooks:add(sm_remove_migrated_connection_hook, Host, - ?MODULE, remove_connection, 20), + ejabberd_hooks:add(sm_remove_migrated_connection_hook, + Host, ?MODULE, remove_connection, 20), {ok, #state{host = Host, timeout = Timeout}}. handle_call(stop, _From, State) -> @@ -189,70 +189,74 @@ handle_call(_Req, _From, State) -> {reply, {error, badarg}, State}. handle_cast({add, ID, Pid, Packet}, State) -> - TRef = erlang:start_timer(State#state.timeout, self(), {ID, Pid}), - Timers = insert(Pid, ID, {TRef, Packet}, State#state.timers), + TRef = erlang:start_timer(State#state.timeout, self(), + {ID, Pid}), + Timers = insert(Pid, ID, {TRef, Packet}, + State#state.timers), {noreply, State#state{timers = Timers}}; handle_cast({del, ID, Pid}, State) -> case lookup(Pid, ID, State#state.timers) of - {ok, {TRef, {From, To, {xmlelement, _, Attrs, _}}}} -> - cancel_timer(TRef), - Timers = delete(Pid, ID, State#state.timers), - case ID of - {iq, _} -> - MsgID = xml:get_attr_s("id", Attrs), - Message = {xmlelement, "message", [{"id", MsgID}], - [{xmlelement, "received", - [{"xmlns", ?NS_RECEIPTS}, {"id", MsgID}], []}]}, - ejabberd_router:route(To, From, Message); - _ -> - ok - end, - {noreply, State#state{timers = Timers}}; - error -> - {noreply, State} + {ok, {TRef, {From, To, #xmlel{attrs = Attrs}}}} -> + cancel_timer(TRef), + Timers = delete(Pid, ID, State#state.timers), + case ID of + {iq, _} -> + MsgID = xml:get_attr_s(<<"id">>, Attrs), + Message = #xmlel{name = <<"message">>, + attrs = [{<<"id">>, MsgID}], + children = + [#xmlel{name = <<"received">>, + attrs = + [{<<"xmlns">>, ?NS_RECEIPTS}, + {<<"id">>, MsgID}], + children = []}]}, + ejabberd_router:route(To, From, Message); + _ -> ok + end, + {noreply, State#state{timers = Timers}}; + error -> {noreply, State} end; handle_cast({del, Pid}, State) -> - lists:foreach( - fun({_, _, {TRef, {From, To, El}}}) -> - cancel_timer(TRef), - El1 = xml:remove_subtags(El, "x", {"xmlns", ?NS_P1_PUSHED}), - El2 = xml:append_subtags( - El1, [{xmlelement, "x", [{"xmlns", ?NS_P1_PUSHED}], []}]), - ?DEBUG("Resending message:~n" - "** From: ~p~n" - "** To: ~p~n" - "** El: ~p", - [From, To, El2]), - ejabberd_router:route(From, To, El2) - end, to_list(Pid, State#state.timers)), + lists:foreach(fun ({_, _, {TRef, {From, To, El}}}) -> + cancel_timer(TRef), + El1 = xml:remove_subtags(El, <<"x">>, + {<<"xmlns">>, + ?NS_P1_PUSHED}), + El2 = xml:append_subtags(El1, + [#xmlel{name = <<"x">>, + attrs = + [{<<"xmlns">>, + ?NS_P1_PUSHED}], + children = []}]), + ?DEBUG("Resending message:~n** From: ~p~n** " + "To: ~p~n** El: ~p", + [From, To, El2]), + ejabberd_router:route(From, To, El2) + end, + to_list(Pid, State#state.timers)), Timers = delete(Pid, State#state.timers), {noreply, State#state{timers = Timers}}; -handle_cast(_Msg, State) -> - {noreply, State}. +handle_cast(_Msg, State) -> {noreply, State}. handle_info({timeout, _TRef, {ID, Pid}}, State) -> case lookup(Pid, ID, State#state.timers) of - {ok, _} -> - MRef = erlang:monitor(process, Pid), - catch ejabberd_c2s:stop(Pid), - receive - {'DOWN', MRef, process, Pid, _Reason}-> - ok - after 5 -> - catch exit(Pid, kill) - end, - erlang:demonitor(MRef, [flush]), - handle_cast({del, Pid}, State); - error -> - {noreply, State} + {ok, _} -> + MRef = erlang:monitor(process, Pid), + catch ejabberd_c2s:stop(Pid), + receive + {'DOWN', MRef, process, Pid, _Reason} -> ok + after 5 -> catch exit(Pid, kill) + end, + erlang:demonitor(MRef, [flush]), + handle_cast({del, Pid}, State); + error -> {noreply, State} end; -handle_info(_Info, State) -> - {noreply, State}. +handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, State) -> Host = State#state.host, - ejabberd_hooks:delete(user_send_packet, Host, - ?MODULE, user_send_packet, 20), + ejabberd_hooks:delete(user_send_packet, Host, ?MODULE, + user_send_packet, 20), ejabberd_hooks:delete(offline_message_hook, Host, ?MODULE, offline_message, 20), ejabberd_hooks:delete(delayed_message_hook, Host, @@ -261,160 +265,152 @@ terminate(_Reason, State) -> ?MODULE, feature_inspect_packet, 150), ejabberd_hooks:delete(sm_remove_connection_hook, Host, ?MODULE, remove_connection, 20), - ejabberd_hooks:delete(sm_remove_migrated_connection_hook, Host, - ?MODULE, remove_connection, 20), + ejabberd_hooks:delete(sm_remove_migrated_connection_hook, + Host, ?MODULE, remove_connection, 20), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%==================================================================== -%% Internal functions -%%==================================================================== process_ack_request(AckTagName, - #jid{lserver=LServer} = From, To, - {xmlelement, "message", _Attrs, _Els} = Packet) -> + #jid{lserver = LServer} = From, To, + #xmlel{name = <<"message">>} = Packet) -> case has_receipt_request(Packet) of - {true, ID} -> - BareTo = jlib:jid_remove_resource(To), - Message = {xmlelement, "message", [{"id", ID}], - [{xmlelement, AckTagName, - [{"xmlns", ?NS_RECEIPTS}, - {"server", LServer}, {"id", ID}], []}]}, - ejabberd_router:route(BareTo, From, Message); - false -> - do_nothing + {true, ID} -> + BareTo = jlib:jid_remove_resource(To), + Message = #xmlel{name = <<"message">>, + attrs = [{<<"id">>, ID}], + children = + [#xmlel{name = AckTagName, + attrs = + [{<<"xmlns">>, ?NS_RECEIPTS}, + {<<"server">>, LServer}, + {<<"id">>, ID}], + children = []}]}, + ejabberd_router:route(BareTo, From, Message); + false -> do_nothing end. has_receipt_request(Packet) -> - has_receipt(Packet, "request"). + has_receipt(Packet, <<"request">>). has_receipt_response(Packet) -> - has_receipt(Packet, "received"). - -has_receipt({xmlelement, "message", MsgAttrs, _} = Packet, Type) -> - case xml:get_attr_s("id", MsgAttrs) of - "" -> - case Type of - "request" -> false; %% Message must have an ID to ask a request for ack. - "received" -> - case xml:get_subtag(Packet, "received") of - false -> - false; - {xmlelement, _Name, Attrs, _Els} -> - case xml:get_attr_s("xmlns", Attrs) of - ?NS_RECEIPTS -> - case xml:get_attr_s("id", Attrs) of - "" -> false; - SubTagID -> {true, SubTagID} - end; - _ -> false - end - end - end; - ID -> - case xml:get_subtag(Packet, Type) of - false -> - false; - {xmlelement, _Name, Attrs, _Els} -> - case xml:get_attr_s("xmlns", Attrs) of + has_receipt(Packet, <<"received">>). + +has_receipt(#xmlel{name = <<"message">>, + attrs = MsgAttrs} = + Packet, + Type) -> + case xml:get_attr_s(<<"id">>, MsgAttrs) of + <<"">> -> + case Type of + <<"request">> -> + false; %% Message must have an ID to ask a request for ack. + <<"received">> -> + case xml:get_subtag(Packet, <<"received">>) of + false -> false; + #xmlel{attrs = Attrs} -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of ?NS_RECEIPTS -> - case xml:get_attr_s("id", Attrs) of - "" -> {true, ID}; - SubTagID -> {true, SubTagID} - end; - _ -> - false - end - end + case xml:get_attr_s(<<"id">>, Attrs) of + <<"">> -> false; + SubTagID -> {true, SubTagID} + end; + _ -> false + end + end + end; + ID -> + case xml:get_subtag(Packet, Type) of + false -> false; + #xmlel{attrs = Attrs} -> + case xml:get_attr_s(<<"xmlns">>, Attrs) of + ?NS_RECEIPTS -> + case xml:get_attr_s(<<"id">>, Attrs) of + <<"">> -> {true, ID}; + SubTagID -> {true, SubTagID} + end; + _ -> false + end + end end. -are_receipts_supported(undefined) -> - unknown; -are_receipts_supported({xmlelement, "presence", _, Els}) -> +are_receipts_supported(Server, + #xmlel{name = <<"presence">>, + children = Els}) -> case mod_caps:read_caps(Els) of - nothing -> - unknown; - Caps -> - lists:member(?NS_RECEIPTS, mod_caps:get_features(Caps)) + nothing -> unknown; + Caps -> + lists:member(?NS_RECEIPTS, mod_caps:get_features(Server, Caps)) end. ping(From, To, Server, JID, El) -> ID = randoms:get_string(), add_timer(Server, {iq, ID}, JID, {From, To, El}), - ejabberd_router:route(jlib:make_jid("", Server, ""), JID, - {xmlelement, "iq", - [{"type", "get"}, {"id", ID}], - [{xmlelement, "query", [{"xmlns", ?NS_PING}], []}]}). + ejabberd_router:route(jlib:make_jid(<<"">>, Server, + <<"">>), + JID, + #xmlel{name = <<"iq">>, + attrs = + [{<<"type">>, <<"get">>}, {<<"id">>, ID}], + children = + [#xmlel{name = <<"query">>, + attrs = [{<<"xmlns">>, ?NS_PING}], + children = []}]}). add_timer(Host, ID, JID, Packet) -> {U, S, R} = jlib:jid_tolower(JID), C2SPid = ejabberd_sm:get_session_pid(U, S, R), - gen_server:cast(gen_mod:get_module_proc(Host, ?PROCNAME), + gen_server:cast(gen_mod:get_module_proc(Host, + ?PROCNAME), {add, ID, C2SPid, Packet}). del_timer(Host, ID, JID) -> {U, S, R} = jlib:jid_tolower(JID), C2SPid = ejabberd_sm:get_session_pid(U, S, R), - gen_server:cast(gen_mod:get_module_proc(Host, ?PROCNAME), + gen_server:cast(gen_mod:get_module_proc(Host, + ?PROCNAME), {del, ID, C2SPid}). cancel_timer(TRef) -> case erlang:cancel_timer(TRef) of - false -> - receive - {timeout, TRef, _} -> - ok - after 0 -> - ok - end; - _ -> - ok + false -> + receive {timeout, TRef, _} -> ok after 0 -> ok end; + _ -> ok end. lookup(Pid, Key, Queue) -> - case ?DICT:find(Pid, Queue) of - {ok, Treap} -> - case treap:lookup(Key, Treap) of - {ok, _, Val} -> - {ok, Val}; - error -> - error - end; - error -> - error + case (?DICT):find(Pid, Queue) of + {ok, Treap} -> + case treap:lookup(Key, Treap) of + {ok, _, Val} -> {ok, Val}; + error -> error + end; + error -> error end. insert(Pid, Key, Val, Queue) -> - Treap = case ?DICT:find(Pid, Queue) of - {ok, Treap1} -> - Treap1; - error -> - nil + Treap = case (?DICT):find(Pid, Queue) of + {ok, Treap1} -> Treap1; + error -> nil end, - ?DICT:store(Pid, treap:insert(Key, now(), Val, Treap), Queue). + (?DICT):store(Pid, treap:insert(Key, now(), Val, Treap), + Queue). delete(Pid, Key, Queue) -> - case ?DICT:find(Pid, Queue) of - {ok, Treap} -> - NewTreap = treap:delete(Key, Treap), - case treap:is_empty(NewTreap) of - true -> - ?DICT:erase(Pid, Queue); - false -> - ?DICT:store(Pid, NewTreap, Queue) - end; - error -> - Queue + case (?DICT):find(Pid, Queue) of + {ok, Treap} -> + NewTreap = treap:delete(Key, Treap), + case treap:is_empty(NewTreap) of + true -> (?DICT):erase(Pid, Queue); + false -> (?DICT):store(Pid, NewTreap, Queue) + end; + error -> Queue end. -delete(Pid, Queue) -> - ?DICT:erase(Pid, Queue). +delete(Pid, Queue) -> (?DICT):erase(Pid, Queue). to_list(Pid, Queue) -> - case ?DICT:find(Pid, Queue) of - {ok, Treap} -> - treap:to_list(Treap); - error -> - [] + case (?DICT):find(Pid, Queue) of + {ok, Treap} -> treap:to_list(Treap); + error -> [] end. |