aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_bosh.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_bosh.erl')
-rw-r--r--src/ejabberd_bosh.erl73
1 files changed, 43 insertions, 30 deletions
diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl
index 204c7b6e5..d34736a44 100644
--- a/src/ejabberd_bosh.erl
+++ b/src/ejabberd_bosh.erl
@@ -96,8 +96,8 @@
-record(state,
{host = <<"">> :: binary(),
sid = <<"">> :: binary(),
- el_ibuf = buf_new() :: ?TQUEUE,
- el_obuf = buf_new() :: ?TQUEUE,
+ el_ibuf :: p1_queue:queue(),
+ el_obuf :: p1_queue:queue(),
shaper_state = none :: shaper:shaper(),
c2s_pid :: pid() | undefined,
xmpp_ver = <<"">> :: binary(),
@@ -111,7 +111,7 @@
max_concat = unlimited :: unlimited | non_neg_integer(),
responses = gb_trees:empty() :: ?TGB_TREE,
receivers = gb_trees:empty() :: ?TGB_TREE,
- shaped_receivers = queue:new() :: ?TQUEUE,
+ shaped_receivers :: p1_queue:queue(),
ip :: inet:ip_address(),
max_requests = 1 :: non_neg_integer()}).
@@ -305,10 +305,10 @@ init([#body{attrs = Attrs}, IP, SID]) ->
false) of
true ->
JID = make_random_jid(XMPPDomain),
- {buf_new(), [{jid, JID} | Opts2]};
+ {buf_new(XMPPDomain), [{jid, JID} | Opts2]};
false ->
{buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)],
- buf_new()),
+ buf_new(XMPPDomain)),
Opts2}
end,
ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket,
@@ -321,10 +321,12 @@ init([#body{attrs = Attrs}, IP, SID]) ->
fun(unlimited) -> unlimited;
(N) when is_integer(N), N>0 -> N
end, unlimited),
+ ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
State = #state{host = XMPPDomain, sid = SID, ip = IP,
xmpp_ver = XMPPVer, el_ibuf = InBuf,
- max_concat = MaxConcat, el_obuf = buf_new(),
+ max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
inactivity_timeout = Inactivity,
+ shaped_receivers = ShapedReceivers,
shaper_state = ShaperState},
NewState = restart_inactivity_timer(State),
mod_bosh:open_session(SID, self()),
@@ -417,15 +419,15 @@ active(#body{attrs = Attrs, size = Size} = Req, From,
shaper:update(State#state.shaper_state, Size),
State1 = State#state{shaper_state = ShaperState},
if Pause > 0 ->
- QLen = queue:len(State1#state.shaped_receivers),
- if QLen < (?MAX_SHAPED_REQUESTS_QUEUE_LEN) ->
- TRef = start_shaper_timer(Pause),
- Q = queue:in({TRef, From, Req},
- State1#state.shaped_receivers),
- State2 = stop_inactivity_timer(State1),
- {next_state, active,
- State2#state{shaped_receivers = Q}};
- true ->
+ TRef = start_shaper_timer(Pause),
+ try p1_queue:in({TRef, From, Req},
+ State1#state.shaped_receivers) of
+ Q ->
+ State2 = stop_inactivity_timer(State1),
+ {next_state, active,
+ State2#state{shaped_receivers = Q}}
+ catch error:full ->
+ cancel_timer(TRef),
RID = get_attr(rid, Attrs),
reply_stop(State1,
#body{http_reason = <<"Too many requests">>,
@@ -572,7 +574,7 @@ handle_sync_event({send_xml, El}, _From, StateName,
reply(State2, Body#body{els = Els},
State2#state.prev_rid, From)};
none ->
- State2 = case queue:out(State1#state.shaped_receivers)
+ State2 = case p1_queue:out(State1#state.shaped_receivers)
of
{{value, {TRef, From, Body}}, Q} ->
cancel_timer(TRef),
@@ -601,7 +603,7 @@ handle_info({timeout, TRef, inactive}, _StateName,
{stop, normal, State};
handle_info({timeout, TRef, shaper_timeout}, StateName,
State) ->
- case queue:out(State#state.shaped_receivers) of
+ case p1_queue:out(State#state.shaped_receivers) of
{{value, {TRef, From, Req}}, Q} ->
(?GEN_FSM):send_event(self(), {Req, From}),
{next_state, StateName,
@@ -646,9 +648,13 @@ code_change(_OldVsn, StateName, State, _Extra) ->
print_state(State) -> State.
-route_els(#state{el_ibuf = Buf} = State) ->
- route_els(State#state{el_ibuf = buf_new()},
- buf_to_list(Buf)).
+route_els(#state{el_ibuf = Buf, c2s_pid = C2SPid} = State) ->
+ NewBuf = p1_queue:dropwhile(
+ fun(El) ->
+ ?GEN_FSM:send_event(C2SPid, El),
+ true
+ end, Buf),
+ State#state{el_ibuf = NewBuf}.
route_els(State, Els) ->
case State#state.c2s_pid of
@@ -734,7 +740,7 @@ bounce_receivers(State, Reason) ->
RID = get_attr(rid, Attrs),
{RID, {From, Body}}
end,
- queue:to_list(State#state.shaped_receivers)),
+ p1_queue:to_list(State#state.shaped_receivers)),
lists:foldl(fun ({RID, {From, Body}}, AccState) ->
NewBody = if Reason == closed ->
#body{http_reason =
@@ -752,7 +758,7 @@ bounce_receivers(State, Reason) ->
State, Receivers ++ ShapedReceivers).
bounce_els_from_obuf(State) ->
- lists:foreach(
+ p1_queue:foreach(
fun({xmlstreamelement, El}) ->
try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of
Pkt when ?is_stanza(Pkt) ->
@@ -769,7 +775,7 @@ bounce_els_from_obuf(State) ->
end;
(_) ->
ok
- end, buf_to_list(State#state.el_obuf)).
+ end, State#state.el_obuf).
is_valid_key(<<"">>, <<"">>) -> true;
is_valid_key(PrevKey, Key) ->
@@ -1029,26 +1035,33 @@ get_attr(Attr, Attrs, Default) ->
_ -> Default
end.
-buf_new() -> queue:new().
+buf_new(Host) ->
+ buf_new(Host, unlimited).
+
+buf_new(Host, Limit) ->
+ QueueType = case gen_mod:get_module_opt(
+ Host, mod_bosh, queue_type,
+ mod_bosh:mod_opt_type(queue_type)) of
+ undefined -> ejabberd_config:default_queue_type(Host);
+ T -> T
+ end,
+ p1_queue:new(QueueType, Limit).
buf_in(Xs, Buf) ->
- lists:foldl(fun (X, Acc) -> queue:in(X, Acc) end, Buf,
- Xs).
+ lists:foldl(fun p1_queue:in/2, Buf, Xs).
buf_out(Buf, Num) when is_integer(Num), Num > 0 ->
buf_out(Buf, Num, []);
-buf_out(Buf, _) -> {queue:to_list(Buf), buf_new()}.
+buf_out(Buf, _) -> {p1_queue:to_list(Buf), p1_queue:clear(Buf)}.
buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf};
buf_out(Buf, I, Els) ->
- case queue:out(Buf) of
+ case p1_queue:out(Buf) of
{{value, El}, NewBuf} ->
buf_out(NewBuf, I - 1, [El | Els]);
{empty, _} -> buf_out(Buf, 0, Els)
end.
-buf_to_list(Buf) -> queue:to_list(Buf).
-
cancel_timer(TRef) when is_reference(TRef) ->
(?GEN_FSM):cancel_timer(TRef);
cancel_timer(_) -> false.