diff options
Diffstat (limited to 'src/ejabberd_bosh.erl')
-rw-r--r-- | src/ejabberd_bosh.erl | 73 |
1 files changed, 43 insertions, 30 deletions
diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 204c7b6e5..d34736a44 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -96,8 +96,8 @@ -record(state, {host = <<"">> :: binary(), sid = <<"">> :: binary(), - el_ibuf = buf_new() :: ?TQUEUE, - el_obuf = buf_new() :: ?TQUEUE, + el_ibuf :: p1_queue:queue(), + el_obuf :: p1_queue:queue(), shaper_state = none :: shaper:shaper(), c2s_pid :: pid() | undefined, xmpp_ver = <<"">> :: binary(), @@ -111,7 +111,7 @@ max_concat = unlimited :: unlimited | non_neg_integer(), responses = gb_trees:empty() :: ?TGB_TREE, receivers = gb_trees:empty() :: ?TGB_TREE, - shaped_receivers = queue:new() :: ?TQUEUE, + shaped_receivers :: p1_queue:queue(), ip :: inet:ip_address(), max_requests = 1 :: non_neg_integer()}). @@ -305,10 +305,10 @@ init([#body{attrs = Attrs}, IP, SID]) -> false) of true -> JID = make_random_jid(XMPPDomain), - {buf_new(), [{jid, JID} | Opts2]}; + {buf_new(XMPPDomain), [{jid, JID} | Opts2]}; false -> {buf_in([make_xmlstreamstart(XMPPDomain, XMPPVer)], - buf_new()), + buf_new(XMPPDomain)), Opts2} end, ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, @@ -321,10 +321,12 @@ init([#body{attrs = Attrs}, IP, SID]) -> fun(unlimited) -> unlimited; (N) when is_integer(N), N>0 -> N end, unlimited), + ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), State = #state{host = XMPPDomain, sid = SID, ip = IP, xmpp_ver = XMPPVer, el_ibuf = InBuf, - max_concat = MaxConcat, el_obuf = buf_new(), + max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), inactivity_timeout = Inactivity, + shaped_receivers = ShapedReceivers, shaper_state = ShaperState}, NewState = restart_inactivity_timer(State), mod_bosh:open_session(SID, self()), @@ -417,15 +419,15 @@ active(#body{attrs = Attrs, size = Size} = Req, From, shaper:update(State#state.shaper_state, Size), State1 = State#state{shaper_state = ShaperState}, if Pause > 0 -> - QLen = queue:len(State1#state.shaped_receivers), - if QLen < (?MAX_SHAPED_REQUESTS_QUEUE_LEN) -> - TRef = start_shaper_timer(Pause), - Q = queue:in({TRef, From, Req}, - State1#state.shaped_receivers), - State2 = stop_inactivity_timer(State1), - {next_state, active, - State2#state{shaped_receivers = Q}}; - true -> + TRef = start_shaper_timer(Pause), + try p1_queue:in({TRef, From, Req}, + State1#state.shaped_receivers) of + Q -> + State2 = stop_inactivity_timer(State1), + {next_state, active, + State2#state{shaped_receivers = Q}} + catch error:full -> + cancel_timer(TRef), RID = get_attr(rid, Attrs), reply_stop(State1, #body{http_reason = <<"Too many requests">>, @@ -572,7 +574,7 @@ handle_sync_event({send_xml, El}, _From, StateName, reply(State2, Body#body{els = Els}, State2#state.prev_rid, From)}; none -> - State2 = case queue:out(State1#state.shaped_receivers) + State2 = case p1_queue:out(State1#state.shaped_receivers) of {{value, {TRef, From, Body}}, Q} -> cancel_timer(TRef), @@ -601,7 +603,7 @@ handle_info({timeout, TRef, inactive}, _StateName, {stop, normal, State}; handle_info({timeout, TRef, shaper_timeout}, StateName, State) -> - case queue:out(State#state.shaped_receivers) of + case p1_queue:out(State#state.shaped_receivers) of {{value, {TRef, From, Req}}, Q} -> (?GEN_FSM):send_event(self(), {Req, From}), {next_state, StateName, @@ -646,9 +648,13 @@ code_change(_OldVsn, StateName, State, _Extra) -> print_state(State) -> State. -route_els(#state{el_ibuf = Buf} = State) -> - route_els(State#state{el_ibuf = buf_new()}, - buf_to_list(Buf)). +route_els(#state{el_ibuf = Buf, c2s_pid = C2SPid} = State) -> + NewBuf = p1_queue:dropwhile( + fun(El) -> + ?GEN_FSM:send_event(C2SPid, El), + true + end, Buf), + State#state{el_ibuf = NewBuf}. route_els(State, Els) -> case State#state.c2s_pid of @@ -734,7 +740,7 @@ bounce_receivers(State, Reason) -> RID = get_attr(rid, Attrs), {RID, {From, Body}} end, - queue:to_list(State#state.shaped_receivers)), + p1_queue:to_list(State#state.shaped_receivers)), lists:foldl(fun ({RID, {From, Body}}, AccState) -> NewBody = if Reason == closed -> #body{http_reason = @@ -752,7 +758,7 @@ bounce_receivers(State, Reason) -> State, Receivers ++ ShapedReceivers). bounce_els_from_obuf(State) -> - lists:foreach( + p1_queue:foreach( fun({xmlstreamelement, El}) -> try xmpp:decode(El, ?NS_CLIENT, [ignore_els]) of Pkt when ?is_stanza(Pkt) -> @@ -769,7 +775,7 @@ bounce_els_from_obuf(State) -> end; (_) -> ok - end, buf_to_list(State#state.el_obuf)). + end, State#state.el_obuf). is_valid_key(<<"">>, <<"">>) -> true; is_valid_key(PrevKey, Key) -> @@ -1029,26 +1035,33 @@ get_attr(Attr, Attrs, Default) -> _ -> Default end. -buf_new() -> queue:new(). +buf_new(Host) -> + buf_new(Host, unlimited). + +buf_new(Host, Limit) -> + QueueType = case gen_mod:get_module_opt( + Host, mod_bosh, queue_type, + mod_bosh:mod_opt_type(queue_type)) of + undefined -> ejabberd_config:default_queue_type(Host); + T -> T + end, + p1_queue:new(QueueType, Limit). buf_in(Xs, Buf) -> - lists:foldl(fun (X, Acc) -> queue:in(X, Acc) end, Buf, - Xs). + lists:foldl(fun p1_queue:in/2, Buf, Xs). buf_out(Buf, Num) when is_integer(Num), Num > 0 -> buf_out(Buf, Num, []); -buf_out(Buf, _) -> {queue:to_list(Buf), buf_new()}. +buf_out(Buf, _) -> {p1_queue:to_list(Buf), p1_queue:clear(Buf)}. buf_out(Buf, 0, Els) -> {lists:reverse(Els), Buf}; buf_out(Buf, I, Els) -> - case queue:out(Buf) of + case p1_queue:out(Buf) of {{value, El}, NewBuf} -> buf_out(NewBuf, I - 1, [El | Els]); {empty, _} -> buf_out(Buf, 0, Els) end. -buf_to_list(Buf) -> queue:to_list(Buf). - cancel_timer(TRef) when is_reference(TRef) -> (?GEN_FSM):cancel_timer(TRef); cancel_timer(_) -> false. |