aboutsummaryrefslogtreecommitdiff
path: root/src/mod_stream_mgmt.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-03-10 15:12:43 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-03-10 15:12:43 +0300
commit02064ae12afe9ebbe92196575b427436398fd680 (patch)
treee9262548102b4096020a9c50b7c230856b6cc7c8 /src/mod_stream_mgmt.erl
parentReport more TLS errors (diff)
Add support for file-based queues
It's now possible to use files as internal packet queues. The following options are introduced: * queue_type: the option can be set to `ram` (default) or `file`. The option can be set per virtual host. * queue_dir: path to the directory where queues will be allocated. The default is 'queue' directory inside Mnesia directory. This is a global option and cannot be set per virtual host.
Diffstat (limited to 'src/mod_stream_mgmt.erl')
-rw-r--r--src/mod_stream_mgmt.erl98
1 files changed, 41 insertions, 57 deletions
diff --git a/src/mod_stream_mgmt.erl b/src/mod_stream_mgmt.erl
index f0152a722..535d014f1 100644
--- a/src/mod_stream_mgmt.erl
+++ b/src/mod_stream_mgmt.erl
@@ -36,6 +36,7 @@
-include("xmpp.hrl").
-include("logger.hrl").
+-include("p1_queue.hrl").
-define(is_sm_packet(Pkt),
is_record(Pkt, sm_enable) or
@@ -44,7 +45,6 @@
is_record(Pkt, sm_r)).
-type state() :: ejabberd_c2s:state().
--type lqueue() :: {non_neg_integer(), queue:queue()}.
%%%===================================================================
%%% API
@@ -102,6 +102,7 @@ c2s_stream_init({ok, State}, Opts) ->
({max_resume_timeout, _}) -> true;
({ack_timeout, _}) -> true;
({resend_on_timeout, _}) -> true;
+ ({queue_type, _}) -> true;
(_) -> false
end, Opts),
{ok, State#{mgmt_options => MgmtOpts}};
@@ -114,6 +115,7 @@ c2s_stream_started(#{lserver := LServer, mgmt_options := Opts} = State,
ResumeTimeout = get_resume_timeout(LServer, Opts),
MaxResumeTimeout = get_max_resume_timeout(LServer, Opts, ResumeTimeout),
State1#{mgmt_state => inactive,
+ mgmt_queue_type => get_queue_type(LServer, Opts),
mgmt_max_queue => get_max_ack_queue(LServer, Opts),
mgmt_timeout => ResumeTimeout,
mgmt_max_timeout => MaxResumeTimeout,
@@ -216,9 +218,10 @@ c2s_handle_send(#{mgmt_state := MgmtState, mod := Mod,
c2s_handle_send(State, _Pkt, _Result) ->
State.
-c2s_handle_call(#{sid := {Time, _}, mod := Mod} = State,
+c2s_handle_call(#{sid := {Time, _}, mod := Mod, mgmt_queue := Queue} = State,
{resume_session, Time}, From) ->
- Mod:reply(From, {resume, State}),
+ State1 = State#{mgmt_queue => p1_queue:file_to_ram(Queue)},
+ Mod:reply(From, {resume, State1}),
{stop, State#{mgmt_state => resumed}};
c2s_handle_call(#{mod := Mod} = State, {resume_session, _}, From) ->
Mod:reply(From, {error, <<"Previous session not found">>}),
@@ -316,6 +319,7 @@ perform_stream_mgmt(Pkt, #{mgmt_xmlns := Xmlns} = State) ->
-spec handle_enable(state(), sm_enable()) -> state().
handle_enable(#{mgmt_timeout := DefaultTimeout,
+ mgmt_queue_type := QueueType,
mgmt_max_timeout := MaxTimeout,
mgmt_xmlns := Xmlns, jid := JID} = State,
#sm_enable{resume = Resume, max = Max}) ->
@@ -339,7 +343,7 @@ handle_enable(#{mgmt_timeout := DefaultTimeout,
#sm_enabled{xmlns = Xmlns}
end,
State1 = State#{mgmt_state => active,
- mgmt_queue => queue_new(),
+ mgmt_queue => p1_queue:new(QueueType),
mgmt_timeout => Timeout},
send(State1, Res).
@@ -446,7 +450,7 @@ resend_rack(#{mgmt_ack_timer := _,
mgmt_stanzas_out := NumStanzasOut,
mgmt_stanzas_req := NumStanzasReq} = State) ->
State1 = cancel_ack_timer(State),
- case NumStanzasReq < NumStanzasOut andalso not queue_is_empty(Queue) of
+ case NumStanzasReq < NumStanzasOut andalso not p1_queue:is_empty(Queue) of
true -> send_rack(State1);
false -> State1
end;
@@ -460,13 +464,13 @@ mgmt_queue_add(#{mgmt_stanzas_out := NumStanzasOut,
4294967295 -> 0;
Num -> Num + 1
end,
- Queue1 = queue_in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue),
+ Queue1 = p1_queue:in({NewNum, p1_time_compat:timestamp(), Pkt}, Queue),
State1 = State#{mgmt_queue => Queue1, mgmt_stanzas_out => NewNum},
check_queue_length(State1).
-spec mgmt_queue_drop(state(), non_neg_integer()) -> state().
mgmt_queue_drop(#{mgmt_queue := Queue} = State, NumHandled) ->
- NewQueue = queue_dropwhile(
+ NewQueue = p1_queue:dropwhile(
fun({N, _T, _E}) -> N =< NumHandled end, Queue),
State#{mgmt_queue => NewQueue}.
@@ -475,7 +479,7 @@ check_queue_length(#{mgmt_max_queue := Limit} = State)
when Limit == infinity; Limit == exceeded ->
State;
check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) ->
- case queue_len(Queue) > Limit of
+ case p1_queue:len(Queue) > Limit of
true ->
State#{mgmt_max_queue => exceeded};
false ->
@@ -484,14 +488,14 @@ check_queue_length(#{mgmt_queue := Queue, mgmt_max_queue := Limit} = State) ->
-spec resend_unacked_stanzas(state()) -> state().
resend_unacked_stanzas(#{mgmt_state := MgmtState,
- mgmt_queue := {QueueLen, _} = Queue,
+ mgmt_queue := Queue,
jid := JID} = State)
when (MgmtState == active orelse
MgmtState == pending orelse
- MgmtState == timeout) andalso QueueLen > 0 ->
+ MgmtState == timeout) andalso ?qlen(Queue) > 0 ->
?DEBUG("Resending ~B unacknowledged stanza(s) to ~s",
- [QueueLen, jid:encode(JID)]),
- queue_foldl(
+ [p1_queue:len(Queue), jid:encode(JID)]),
+ p1_queue:foldl(
fun({_, Time, Pkt}, AccState) ->
NewPkt = add_resent_delay_info(AccState, Pkt, Time),
send(AccState, xmpp:put_meta(NewPkt, mgmt_is_resent, true))
@@ -504,11 +508,11 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
mgmt_resend := MgmtResend,
lang := Lang, user := User,
jid := JID, lserver := LServer,
- mgmt_queue := {QueueLen, _} = Queue,
+ mgmt_queue := Queue,
resource := Resource} = State)
when (MgmtState == active orelse
MgmtState == pending orelse
- MgmtState == timeout) andalso QueueLen > 0 ->
+ MgmtState == timeout) andalso ?qlen(Queue) > 0 ->
ResendOnTimeout = case MgmtResend of
Resend when is_boolean(Resend) ->
Resend;
@@ -522,8 +526,8 @@ route_unacked_stanzas(#{mgmt_state := MgmtState,
end
end,
?DEBUG("Re-routing ~B unacknowledged stanza(s) to ~s",
- [QueueLen, jid:encode(JID)]),
- queue_foreach(
+ [p1_queue:len(Queue), jid:encode(JID)]),
+ p1_queue:foreach(
fun({_, _Time, #presence{from = From}}) ->
?DEBUG("Dropping presence stanza from ~s", [jid:encode(From)]);
({_, _Time, #iq{} = El}) ->
@@ -564,7 +568,8 @@ route_unacked_stanzas(_State) ->
-spec inherit_session_state(state(), binary()) -> {ok, state()} |
{error, binary()} |
{error, binary(), non_neg_integer()}.
-inherit_session_state(#{user := U, server := S} = State, ResumeID) ->
+inherit_session_state(#{user := U, server := S,
+ mgmt_queue_type := QueueType} = State, ResumeID) ->
case jlib:base64_to_term(ResumeID) of
{term, {R, Time}} ->
case ejabberd_sm:get_session_pid(U, S, R) of
@@ -589,8 +594,12 @@ inherit_session_state(#{user := U, server := S} = State, ResumeID) ->
mgmt_stanzas_in := NumStanzasIn,
mgmt_stanzas_out := NumStanzasOut} = OldState} ->
State1 = ejabberd_c2s:copy_state(State, OldState),
+ Queue1 = case QueueType of
+ ram -> Queue;
+ _ -> p1_queue:ram_to_file(Queue)
+ end,
State2 = State1#{mgmt_xmlns => Xmlns,
- mgmt_queue => Queue,
+ mgmt_queue => Queue1,
mgmt_timeout => Timeout,
mgmt_stanzas_in => NumStanzasIn,
mgmt_stanzas_out => NumStanzasOut,
@@ -632,44 +641,6 @@ add_resent_delay_info(_State, El, _Time) ->
send(#{mod := Mod} = State, Pkt) ->
Mod:send(State, Pkt).
--spec queue_new() -> lqueue().
-queue_new() ->
- {0, queue:new()}.
-
--spec queue_in(term(), lqueue()) -> lqueue().
-queue_in(Elem, {N, Q}) ->
- {N+1, queue:in(Elem, Q)}.
-
--spec queue_len(lqueue()) -> non_neg_integer().
-queue_len({N, _}) ->
- N.
-
--spec queue_foldl(fun((term(), T) -> T), T, lqueue()) -> T.
-queue_foldl(F, Acc, {_N, Q}) ->
- jlib:queue_foldl(F, Acc, Q).
-
--spec queue_foreach(fun((_) -> _), lqueue()) -> ok.
-queue_foreach(F, {_N, Q}) ->
- jlib:queue_foreach(F, Q).
-
--spec queue_dropwhile(fun((term()) -> boolean()), lqueue()) -> lqueue().
-queue_dropwhile(F, {N, Q}) ->
- case queue:peek(Q) of
- {value, Item} ->
- case F(Item) of
- true ->
- queue_dropwhile(F, {N-1, queue:drop(Q)});
- false ->
- {N, Q}
- end;
- empty ->
- {N, Q}
- end.
-
--spec queue_is_empty(lqueue()) -> boolean().
-queue_is_empty({N, _Q}) ->
- N == 0.
-
-spec cancel_ack_timer(state()) -> state().
cancel_ack_timer(#{mgmt_ack_timer := TRef} = State) ->
case erlang:cancel_timer(TRef) of
@@ -741,6 +712,17 @@ get_resend_on_timeout(Host, Opts) ->
Resend -> Resend
end.
+get_queue_type(Host, Opts) ->
+ VFun = mod_opt_type(queue_type),
+ case gen_mod:get_module_opt(Host, ?MODULE, queue_type, VFun) of
+ undefined ->
+ case gen_mod:get_opt(queue_type, Opts, VFun) of
+ undefined -> ejabberd_config:default_queue_type(Host);
+ Type -> Type
+ end;
+ Type -> Type
+ end.
+
mod_opt_type(max_ack_queue) ->
fun(I) when is_integer(I), I > 0 -> I;
(infinity) -> infinity
@@ -757,6 +739,8 @@ mod_opt_type(resend_on_timeout) ->
fun(B) when is_boolean(B) -> B;
(if_offline) -> if_offline
end;
+mod_opt_type(queue_type) ->
+ fun(ram) -> ram; (file) -> file end;
mod_opt_type(_) ->
[max_ack_queue, resume_timeout, max_resume_timeout, ack_timeout,
- resend_on_timeout].
+ resend_on_timeout, queue_type].