From 02064ae12afe9ebbe92196575b427436398fd680 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 10 Mar 2017 15:12:43 +0300 Subject: Add support for file-based queues It's now possible to use files as internal packet queues. The following options are introduced: * queue_type: the option can be set to `ram` (default) or `file`. The option can be set per virtual host. * queue_dir: path to the directory where queues will be allocated. The default is 'queue' directory inside Mnesia directory. This is a global option and cannot be set per virtual host. --- src/mod_stream_mgmt.erl | 98 +++++++++++++++++++++---------------------------- 1 file changed, 41 insertions(+), 57 deletions(-) (limited to 'src/mod_stream_mgmt.erl') diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl index f0152a722..535d014f1 100644 --- a/src/mod_stream_mgmt.erl +++ b/src/mod_stream_mgmt.erl @@ -36,6 +36,7 @@ -include("xmpp.hrl"). -include("logger.hrl"). +-include("p1_queue.hrl"). -define(is_sm_packet(Pkt), is_record(Pkt, sm_enable) or @@ -44,7 +45,6 @@ is_record(Pkt, sm_r)). -type state() :: ejabberd_c2s:state(). --type lqueue() :: {non_neg_integer(), queue:queue()}. %%%=================================================================== %%% API @@ -102,6 +102,7 @@ c2s_stream_init({ok, State}, Opts) -> ({max_resume_timeout, _}) -> true; ({ack_timeout, _}) -> true; ({resend_on_timeout, _}) -> true; + ({queue_type, _}) -> true; (_) -> false end, Opts), {ok, State#{mgmt_options => MgmtOpts}}; @@ -114,6 +115,7 @@ c2s_stream_started(#{lserver := LServer, mgmt_options := Opts} = State, ResumeTimeout = get_resume_timeout(LServer, Opts), MaxResumeTimeout = get_max_resume_timeout(LServer, Opts, ResumeTimeout), State1#{mgmt_state => inactive, + mgmt_queue_type => get_queue_type(LServer, Opts), mgmt_max_queue => get_max_ack_queue(LServer, Opts), mgmt_timeout => ResumeTimeout, mgmt_max_timeout => MaxResumeTimeout, @@ -216,9 +218,10 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod, c2s_handle_send(State, _Pkt, _Result) -> State. -c2s_handle_call(#{sid := {Time, _}, mod := Mod} = State, +c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State, {resume_session, Time}, From) -> - Mod:reply(From, {resume, State}), + State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)}, + Mod:reply(From, {resume, State1}), {stop, State#{mgmt_state => resumed}}; c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) -> Mod:reply(From, {error, <<"Previous session not found">>}), @@ -316,6 +319,7 @@ perform_stream_mgmt(Pkt, #{mgmt_xmlns := Xmlns} = State) -> -spec handle_enable(state(), sm_enable()) -> state(). handle_enable(#{mgmt_timeout := DefaultTimeout, + mgmt_queue_type := QueueType, mgmt_max_timeout := MaxTimeout, mgmt_xmlns := Xmlns, jid := JID} = State, #sm_enable{resume = Resume, max = Max}) -> @@ -339,7 +343,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout, #sm_enabled{xmlns = Xmlns} end, State1 = State#{mgmt_state => active, - mgmt_queue => queue_new(), + mgmt_queue => p1_queue:new(QueueType), mgmt_timeout => Timeout}, send(State1, Res). @@ -446,7 +450,7 @@ resend_rack(#{mgmt_ack_timer := _, mgmt_stanzas_out := NumStanzasOut, mgmt_stanzas_req := NumStanzasReq} = State) -> State1 = cancel_ack_timer(State), - case NumStanzasReq < NumStanzasOut andalso not queue_is_empty(Queue) of + case NumStanzasReq < NumStanzasOut andalso not p1_queue:is_empty(Queue) of true -> send_rack(State1); false -> State1 end; @@ -460,13 +464,13 @@ mgmt_queue_add(#{mgmt_stanzas_out := NumStanzasOut, 4294967295 -> 0; Num -> Num + 1 end, - Queue1 = queue_in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue), + Queue1 = p1_queue:in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue), State1 = State#{mgmt_queue => Queue1, mgmt_stanzas_out => NewNum}, check_queue_length(State1). -spec mgmt_queue_drop(state(), non_neg_integer()) -> state(). mgmt_queue_drop(#{mgmt_queue := Queue} = State, NumHandled) -> - NewQueue = queue_dropwhile( + NewQueue = p1_queue:dropwhile( fun({N, _T, _E}) -> N =< NumHandled end, Queue), State#{mgmt_queue => NewQueue}. @@ -475,7 +479,7 @@ check_queue_length(#{mgmt_max_queue := Limit} = State) when Limit == infinity; Limit == exceeded -> State; check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) -> - case queue_len(Queue) > Limit of + case p1_queue:len(Queue) > Limit of true -> State#{mgmt_max_queue => exceeded}; false -> @@ -484,14 +488,14 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) -> -spec resend_unacked_stanzas(state()) -> state(). resend_unacked_stanzas(#{mgmt_state := MgmtState, - mgmt_queue := {QueueLen, _} = Queue, + mgmt_queue := Queue, jid := JID} = State) when (MgmtState == active orelse MgmtState == pending orelse - MgmtState == timeout) andalso QueueLen > 0 -> + MgmtState == timeout) andalso ?qlen(Queue) > 0 -> ?DEBUG("Resending ~B unacknowledged stanza(s) to ~s", - [QueueLen, jid:encode(JID)]), - queue_foldl( + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foldl( fun({_, Time, Pkt}, AccState) -> NewPkt = add_resent_delay_info(AccState, Pkt, Time), send(AccState, xmpp:put_meta(NewPkt, mgmt_is_resent, true)) @@ -504,11 +508,11 @@ route_unacked_stanzas(#{mgmt_state := MgmtState, mgmt_resend := MgmtResend, lang := Lang, user := User, jid := JID, lserver := LServer, - mgmt_queue := {QueueLen, _} = Queue, + mgmt_queue := Queue, resource := Resource} = State) when (MgmtState == active orelse MgmtState == pending orelse - MgmtState == timeout) andalso QueueLen > 0 -> + MgmtState == timeout) andalso ?qlen(Queue) > 0 -> ResendOnTimeout = case MgmtResend of Resend when is_boolean(Resend) -> Resend; @@ -522,8 +526,8 @@ route_unacked_stanzas(#{mgmt_state := MgmtState, end end, ?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s", - [QueueLen, jid:encode(JID)]), - queue_foreach( + [p1_queue:len(Queue), jid:encode(JID)]), + p1_queue:foreach( fun({_, _Time, #presence{from = From}}) -> ?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]); ({_, _Time, #iq{} = El}) -> @@ -564,7 +568,8 @@ route_unacked_stanzas(_State) -> -spec inherit_session_state(state(), binary()) -> {ok, state()} | {error, binary()} | {error, binary(), non_neg_integer()}. -inherit_session_state(#{user := U, server := S} = State, ResumeID) -> +inherit_session_state(#{user := U, server := S, + mgmt_queue_type := QueueType} = State, ResumeID) -> case jlib:base64_to_term(ResumeID) of {term, {R, Time}} -> case ejabberd_sm:get_session_pid(U, S, R) of @@ -589,8 +594,12 @@ inherit_session_state(#{user := U, server := S} = State, ResumeID) -> mgmt_stanzas_in := NumStanzasIn, mgmt_stanzas_out := NumStanzasOut} = OldState} -> State1 = ejabberd_c2s:copy_state(State, OldState), + Queue1 = case QueueType of + ram -> Queue; + _ -> p1_queue:ram_to_file(Queue) + end, State2 = State1#{mgmt_xmlns => Xmlns, - mgmt_queue => Queue, + mgmt_queue => Queue1, mgmt_timeout => Timeout, mgmt_stanzas_in => NumStanzasIn, mgmt_stanzas_out => NumStanzasOut, @@ -632,44 +641,6 @@ add_resent_delay_info(_State, El, _Time) -> send(#{mod := Mod} = State, Pkt) -> Mod:send(State, Pkt). --spec queue_new() -> lqueue(). -queue_new() -> - {0, queue:new()}. - --spec queue_in(term(), lqueue()) -> lqueue(). -queue_in(Elem, {N, Q}) -> - {N+1, queue:in(Elem, Q)}. - --spec queue_len(lqueue()) -> non_neg_integer(). -queue_len({N, _}) -> - N. - --spec queue_foldl(fun((term(), T) -> T), T, lqueue()) -> T. -queue_foldl(F, Acc, {_N, Q}) -> - jlib:queue_foldl(F, Acc, Q). - --spec queue_foreach(fun((_) -> _), lqueue()) -> ok. -queue_foreach(F, {_N, Q}) -> - jlib:queue_foreach(F, Q). - --spec queue_dropwhile(fun((term()) -> boolean()), lqueue()) -> lqueue(). -queue_dropwhile(F, {N, Q}) -> - case queue:peek(Q) of - {value, Item} -> - case F(Item) of - true -> - queue_dropwhile(F, {N-1, queue:drop(Q)}); - false -> - {N, Q} - end; - empty -> - {N, Q} - end. - --spec queue_is_empty(lqueue()) -> boolean(). -queue_is_empty({N, _Q}) -> - N == 0. - -spec cancel_ack_timer(state()) -> state(). cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) -> case erlang:cancel_timer(TRef) of @@ -741,6 +712,17 @@ get_resend_on_timeout(Host, Opts) -> Resend -> Resend end. +get_queue_type(Host, Opts) -> + VFun = mod_opt_type(queue_type), + case gen_mod:get_module_opt(Host, ?MODULE, queue_type, VFun) of + undefined -> + case gen_mod:get_opt(queue_type, Opts, VFun) of + undefined -> ejabberd_config:default_queue_type(Host); + Type -> Type + end; + Type -> Type + end. + mod_opt_type(max_ack_queue) -> fun(I) when is_integer(I), I > 0 -> I; (infinity) -> infinity @@ -757,6 +739,8 @@ mod_opt_type(resend_on_timeout) -> fun(B) when is_boolean(B) -> B; (if_offline) -> if_offline end; +mod_opt_type(queue_type) -> + fun(ram) -> ram; (file) -> file end; mod_opt_type(_) -> [max_ack_queue, resume_timeout, max_resume_timeout, ack_timeout, - resend_on_timeout]. + resend_on_timeout, queue_type]. -- cgit v1.2.3