diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-03-24 13:27:56 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2017-03-24 13:27:56 +0300 |
commit | e30d41e5f0be428d4a5f51b764d35040752a344b (patch) | |
tree | b614fdf27661edb42f35d7d5ccfc6e83945d08a1 /src/mod_muc_room.erl | |
parent | Avoid PID collisions (diff) | |
parent | Improve overloaded S2S queue processing (diff) |
Merge branch 'new_queue'
Conflicts:
rebar.config
src/mod_muc_admin.erl
Diffstat (limited to 'src/mod_muc_room.erl')
-rw-r--r-- | src/mod_muc_room.erl | 168 |
1 files changed, 97 insertions, 71 deletions
diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index dc30cc89..ed7e79ac 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -30,10 +30,10 @@ -behaviour(gen_fsm). %% External exports --export([start_link/9, - start_link/7, - start/9, - start/7, +-export([start_link/10, + start_link/8, + start/10, + start/8, get_role/2, get_affiliation/2, is_occupant_or_admin/2, @@ -93,25 +93,25 @@ %%% API %%%---------------------------------------------------------------------- start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, - Creator, Nick, DefRoomOpts) -> + Creator, Nick, DefRoomOpts, QueueType) -> gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, Nick, DefRoomOpts], + RoomShaper, Creator, Nick, DefRoomOpts, QueueType], ?FSMOPTS). -start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> +start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Opts], + RoomShaper, Opts, QueueType], ?FSMOPTS). start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, - Creator, Nick, DefRoomOpts) -> + Creator, Nick, DefRoomOpts, QueueType) -> gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, Nick, DefRoomOpts], + RoomShaper, Creator, Nick, DefRoomOpts, QueueType], ?FSMOPTS). -start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> +start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) -> gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Opts], + RoomShaper, Opts, QueueType], ?FSMOPTS). %%%---------------------------------------------------------------------- @@ -119,15 +119,17 @@ start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) -> %%%---------------------------------------------------------------------- init([Host, ServerHost, Access, Room, HistorySize, - RoomShaper, Creator, _Nick, DefRoomOpts]) -> + RoomShaper, Creator, _Nick, DefRoomOpts, QueueType]) -> process_flag(trap_exit, true), Shaper = shaper:new(RoomShaper), + RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), State = set_affiliation(Creator, owner, #state{host = Host, server_host = ServerHost, access = Access, room = Room, - history = lqueue_new(HistorySize), + history = lqueue_new(HistorySize, QueueType), jid = jid:make(Room, Host), just_created = true, + room_queue = RoomQueue, room_shaper = Shaper}), State1 = set_opts(DefRoomOpts, State), store_room(State1), @@ -136,15 +138,17 @@ init([Host, ServerHost, Access, Room, HistorySize, add_to_log(room_existence, created, State1), add_to_log(room_existence, started, State1), {ok, normal_state, State1}; -init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]) -> +init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType]) -> process_flag(trap_exit, true), Shaper = shaper:new(RoomShaper), + RoomQueue = room_queue_new(ServerHost, Shaper, QueueType), State = set_opts(Opts, #state{host = Host, server_host = ServerHost, access = Access, room = Room, - history = lqueue_new(HistorySize), + history = lqueue_new(HistorySize, QueueType), jid = jid:make(Room, Host), + room_queue = RoomQueue, room_shaper = Shaper}), add_to_log(room_existence, started, State), {ok, normal_state, State}. @@ -175,7 +179,10 @@ normal_state({route, <<"">>, MessageShaperInterval == 0 -> {RoomShaper, RoomShaperInterval} = shaper:update(StateData#state.room_shaper, Size), - RoomQueueEmpty = queue:is_empty(StateData#state.room_queue), + RoomQueueEmpty = case StateData#state.room_queue of + undefined -> true; + RQ -> p1_queue:is_empty(RQ) + end, if RoomShaperInterval == 0, RoomQueueEmpty -> NewActivity = Activity#activity{ message_time = Now, @@ -200,8 +207,8 @@ normal_state({route, <<"">>, message_time = Now, message_shaper = MessageShaper, message = Packet}, - RoomQueue = queue:in({message, From}, - StateData#state.room_queue), + RoomQueue = p1_queue:in({message, From}, + StateData#state.room_queue), StateData2 = store_user_activity(From, NewActivity, StateData1), @@ -584,8 +591,8 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. handle_info({process_user_presence, From}, normal_state = _StateName, StateData) -> - RoomQueueEmpty = queue:is_empty(StateData#state.room_queue), - RoomQueue = queue:in({presence, From}, StateData#state.room_queue), + RoomQueueEmpty = p1_queue:is_empty(StateData#state.room_queue), + RoomQueue = p1_queue:in({presence, From}, StateData#state.room_queue), StateData1 = StateData#state{room_queue = RoomQueue}, if RoomQueueEmpty -> StateData2 = prepare_room_queue(StateData1), @@ -595,9 +602,9 @@ handle_info({process_user_presence, From}, normal_state = _StateName, StateData) handle_info({process_user_message, From}, normal_state = _StateName, StateData) -> RoomQueueEmpty = - queue:is_empty(StateData#state.room_queue), - RoomQueue = queue:in({message, From}, - StateData#state.room_queue), + p1_queue:is_empty(StateData#state.room_queue), + RoomQueue = p1_queue:in({message, From}, + StateData#state.room_queue), StateData1 = StateData#state{room_queue = RoomQueue}, if RoomQueueEmpty -> StateData2 = prepare_room_queue(StateData1), @@ -606,7 +613,7 @@ handle_info({process_user_message, From}, end; handle_info(process_room_queue, normal_state = StateName, StateData) -> - case queue:out(StateData#state.room_queue) of + case p1_queue:out(StateData#state.room_queue) of {{value, {message, From}}, RoomQueue} -> Activity = get_user_activity(From, StateData), Packet = Activity#activity.message, @@ -1418,6 +1425,32 @@ get_max_users_admin_threshold(StateData) -> fun(I) when is_integer(I), I>0 -> I end, 5). +-spec room_queue_new(binary(), shaper:shaper(), _) -> p1_queue:queue(). +room_queue_new(ServerHost, Shaper, QueueType) -> + HaveRoomShaper = Shaper /= none, + HaveMessageShaper = gen_mod:get_module_opt( + ServerHost, mod_muc, user_message_shaper, + fun(A) when is_atom(A) -> A end, + none) /= none, + HavePresenceShaper = gen_mod:get_module_opt( + ServerHost, mod_muc, user_presence_shaper, + fun(A) when is_atom(A) -> A end, + none) /= none, + HaveMinMessageInterval = gen_mod:get_module_opt( + ServerHost, mod_muc, min_message_interval, + fun(I) when is_number(I), I>=0 -> I end, + 0) /= 0, + HaveMinPresenceInterval = gen_mod:get_module_opt( + ServerHost, mod_muc, min_presence_interval, + fun(I) when is_number(I), I>=0 -> I end, + 0) /= 0, + if HaveRoomShaper or HaveMessageShaper or HavePresenceShaper + or HaveMinMessageInterval or HaveMinPresenceInterval -> + p1_queue:new(QueueType); + true -> + undefined + end. + -spec get_user_activity(jid(), state()) -> #activity{}. get_user_activity(JID, StateData) -> case treap:lookup(jid:tolower(JID), @@ -1515,7 +1548,7 @@ clean_treap(Treap, CleanPriority) -> -spec prepare_room_queue(state()) -> state(). prepare_room_queue(StateData) -> - case queue:out(StateData#state.room_queue) of + case p1_queue:out(StateData#state.room_queue) of {{value, {message, From}}, _RoomQueue} -> Activity = get_user_activity(From, StateData), Packet = Activity#activity.message, @@ -1997,38 +2030,34 @@ get_history(Nick, Packet, #state{history = History}) -> #muc{history = #muc_history{} = MUCHistory} -> Now = p1_time_compat:timestamp(), Q = History#lqueue.queue, - {NewQ, Len} = filter_history(Q, MUCHistory, Now, Nick, queue:new(), 0, 0), - History#lqueue{queue = NewQ, len = Len}; + filter_history(Q, Now, Nick, MUCHistory); _ -> - History + p1_queue:to_list(History#lqueue.queue) end. --spec filter_history(?TQUEUE, muc_history(), erlang:timestamp(), binary(), - ?TQUEUE, non_neg_integer(), non_neg_integer()) -> - {?TQUEUE, non_neg_integer()}. -filter_history(Queue, #muc_history{since = Since, - seconds = Seconds, - maxstanzas = MaxStanzas, - maxchars = MaxChars} = MUC, - Now, Nick, AccQueue, NumStanzas, NumChars) -> - case queue:out_r(Queue) of - {{value, {_, _, _, TimeStamp, Size} = Elem}, NewQueue} -> - NowDiff = timer:now_diff(Now, TimeStamp) div 1000000, - Chars = Size + byte_size(Nick) + 1, - if (NumStanzas < MaxStanzas) andalso - (TimeStamp > Since) andalso - (NowDiff =< Seconds) andalso - (NumChars + Chars =< MaxChars) -> - filter_history(NewQueue, MUC, Now, Nick, - queue:in_r(Elem, AccQueue), - NumStanzas + 1, - NumChars + Chars); - true -> - {AccQueue, NumStanzas} - end; - {empty, _} -> - {AccQueue, NumStanzas} - end. +-spec filter_history(p1_queue:queue(), erlang:timestamp(), + binary(), muc_history()) -> list(). +filter_history(Queue, Now, Nick, + #muc_history{since = Since, + seconds = Seconds, + maxstanzas = MaxStanzas, + maxchars = MaxChars}) -> + {History, _, _} = + lists:foldr( + fun({_, _, _, TimeStamp, Size} = Elem, + {Elems, NumStanzas, NumChars} = Acc) -> + NowDiff = timer:now_diff(Now, TimeStamp) div 1000000, + Chars = Size + byte_size(Nick) + 1, + if (NumStanzas < MaxStanzas) andalso + (TimeStamp > Since) andalso + (NowDiff =< Seconds) andalso + (NumChars + Chars =< MaxChars) -> + {[Elem|Elems], NumStanzas + 1, NumChars + Chars}; + true -> + Acc + end + end, {[], 0, 0}, p1_queue:to_list(Queue)), + History. -spec is_room_overcrowded(state()) -> boolean(). is_room_overcrowded(StateData) -> @@ -2381,31 +2410,28 @@ status_codes(IsInitialPresence, _IsSelfPresence = true, StateData) -> end; status_codes(_IsInitialPresence, _IsSelfPresence = false, _StateData) -> []. --spec lqueue_new(non_neg_integer()) -> lqueue(). -lqueue_new(Max) -> - #lqueue{queue = queue:new(), len = 0, max = Max}. +-spec lqueue_new(non_neg_integer(), ram | file) -> lqueue(). +lqueue_new(Max, Type) -> + #lqueue{queue = p1_queue:new(Type), max = Max}. -spec lqueue_in(term(), lqueue()) -> lqueue(). %% If the message queue limit is set to 0, do not store messages. lqueue_in(_Item, LQ = #lqueue{max = 0}) -> LQ; %% Otherwise, rotate messages in the queue store. -lqueue_in(Item, - #lqueue{queue = Q1, len = Len, max = Max}) -> - Q2 = queue:in(Item, Q1), +lqueue_in(Item, #lqueue{queue = Q1, max = Max}) -> + Len = p1_queue:len(Q1), + Q2 = p1_queue:in(Item, Q1), if Len >= Max -> Q3 = lqueue_cut(Q2, Len - Max + 1), - #lqueue{queue = Q3, len = Max, max = Max}; - true -> #lqueue{queue = Q2, len = Len + 1, max = Max} + #lqueue{queue = Q3, max = Max}; + true -> #lqueue{queue = Q2, max = Max} end. --spec lqueue_cut(queue:queue(), non_neg_integer()) -> queue:queue(). +-spec lqueue_cut(p1_queue:queue(), non_neg_integer()) -> p1_queue:queue(). lqueue_cut(Q, 0) -> Q; lqueue_cut(Q, N) -> - {_, Q1} = queue:out(Q), lqueue_cut(Q1, N - 1). - --spec lqueue_to_list(lqueue()) -> list(). -lqueue_to_list(#lqueue{queue = Q1}) -> - queue:to_list(Q1). + {_, Q1} = p1_queue:out(Q), + lqueue_cut(Q1, N - 1). -spec add_message_to_history(binary(), jid(), message(), state()) -> state(). add_message_to_history(FromNick, FromJID, Packet, StateData) -> @@ -2436,7 +2462,7 @@ add_message_to_history(FromNick, FromJID, Packet, StateData) -> StateData end. --spec send_history(jid(), lqueue(), state()) -> ok. +-spec send_history(jid(), list(), state()) -> ok. send_history(JID, History, StateData) -> lists:foreach( fun({Nick, Packet, _HaveSubject, _TimeStamp, _Size}) -> @@ -2445,7 +2471,7 @@ send_history(JID, History, StateData) -> Packet, jid:replace_resource(StateData#state.jid, Nick), JID)) - end, lqueue_to_list(History)). + end, History). -spec send_subject(jid(), state()) -> ok. send_subject(JID, #state{subject_author = Nick} = StateData) -> |