summaryrefslogtreecommitdiff
path: root/src/mod_muc_room.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-03-24 13:27:56 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2017-03-24 13:27:56 +0300
commite30d41e5f0be428d4a5f51b764d35040752a344b (patch)
treeb614fdf27661edb42f35d7d5ccfc6e83945d08a1 /src/mod_muc_room.erl
parentAvoid PID collisions (diff)
parentImprove overloaded S2S queue processing (diff)
Merge branch 'new_queue'
Conflicts: rebar.config src/mod_muc_admin.erl
Diffstat (limited to 'src/mod_muc_room.erl')
-rw-r--r--src/mod_muc_room.erl168
1 files changed, 97 insertions, 71 deletions
diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl
index dc30cc89..ed7e79ac 100644
--- a/src/mod_muc_room.erl
+++ b/src/mod_muc_room.erl
@@ -30,10 +30,10 @@
-behaviour(gen_fsm).
%% External exports
--export([start_link/9,
- start_link/7,
- start/9,
- start/7,
+-export([start_link/10,
+ start_link/8,
+ start/10,
+ start/8,
get_role/2,
get_affiliation/2,
is_occupant_or_admin/2,
@@ -93,25 +93,25 @@
%%% API
%%%----------------------------------------------------------------------
start(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
- Creator, Nick, DefRoomOpts) ->
+ Creator, Nick, DefRoomOpts, QueueType) ->
gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Creator, Nick, DefRoomOpts],
+ RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
?FSMOPTS).
-start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
+start(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
gen_fsm:start(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Opts],
+ RoomShaper, Opts, QueueType],
?FSMOPTS).
start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper,
- Creator, Nick, DefRoomOpts) ->
+ Creator, Nick, DefRoomOpts, QueueType) ->
gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Creator, Nick, DefRoomOpts],
+ RoomShaper, Creator, Nick, DefRoomOpts, QueueType],
?FSMOPTS).
-start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
+start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType) ->
gen_fsm:start_link(?MODULE, [Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Opts],
+ RoomShaper, Opts, QueueType],
?FSMOPTS).
%%%----------------------------------------------------------------------
@@ -119,15 +119,17 @@ start_link(Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts) ->
%%%----------------------------------------------------------------------
init([Host, ServerHost, Access, Room, HistorySize,
- RoomShaper, Creator, _Nick, DefRoomOpts]) ->
+ RoomShaper, Creator, _Nick, DefRoomOpts, QueueType]) ->
process_flag(trap_exit, true),
Shaper = shaper:new(RoomShaper),
+ RoomQueue = room_queue_new(ServerHost, Shaper, QueueType),
State = set_affiliation(Creator, owner,
#state{host = Host, server_host = ServerHost,
access = Access, room = Room,
- history = lqueue_new(HistorySize),
+ history = lqueue_new(HistorySize, QueueType),
jid = jid:make(Room, Host),
just_created = true,
+ room_queue = RoomQueue,
room_shaper = Shaper}),
State1 = set_opts(DefRoomOpts, State),
store_room(State1),
@@ -136,15 +138,17 @@ init([Host, ServerHost, Access, Room, HistorySize,
add_to_log(room_existence, created, State1),
add_to_log(room_existence, started, State1),
{ok, normal_state, State1};
-init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts]) ->
+init([Host, ServerHost, Access, Room, HistorySize, RoomShaper, Opts, QueueType]) ->
process_flag(trap_exit, true),
Shaper = shaper:new(RoomShaper),
+ RoomQueue = room_queue_new(ServerHost, Shaper, QueueType),
State = set_opts(Opts, #state{host = Host,
server_host = ServerHost,
access = Access,
room = Room,
- history = lqueue_new(HistorySize),
+ history = lqueue_new(HistorySize, QueueType),
jid = jid:make(Room, Host),
+ room_queue = RoomQueue,
room_shaper = Shaper}),
add_to_log(room_existence, started, State),
{ok, normal_state, State}.
@@ -175,7 +179,10 @@ normal_state({route, <<"">>,
MessageShaperInterval == 0 ->
{RoomShaper, RoomShaperInterval} =
shaper:update(StateData#state.room_shaper, Size),
- RoomQueueEmpty = queue:is_empty(StateData#state.room_queue),
+ RoomQueueEmpty = case StateData#state.room_queue of
+ undefined -> true;
+ RQ -> p1_queue:is_empty(RQ)
+ end,
if RoomShaperInterval == 0, RoomQueueEmpty ->
NewActivity = Activity#activity{
message_time = Now,
@@ -200,8 +207,8 @@ normal_state({route, <<"">>,
message_time = Now,
message_shaper = MessageShaper,
message = Packet},
- RoomQueue = queue:in({message, From},
- StateData#state.room_queue),
+ RoomQueue = p1_queue:in({message, From},
+ StateData#state.room_queue),
StateData2 = store_user_activity(From,
NewActivity,
StateData1),
@@ -584,8 +591,8 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
handle_info({process_user_presence, From}, normal_state = _StateName, StateData) ->
- RoomQueueEmpty = queue:is_empty(StateData#state.room_queue),
- RoomQueue = queue:in({presence, From}, StateData#state.room_queue),
+ RoomQueueEmpty = p1_queue:is_empty(StateData#state.room_queue),
+ RoomQueue = p1_queue:in({presence, From}, StateData#state.room_queue),
StateData1 = StateData#state{room_queue = RoomQueue},
if RoomQueueEmpty ->
StateData2 = prepare_room_queue(StateData1),
@@ -595,9 +602,9 @@ handle_info({process_user_presence, From}, normal_state = _StateName, StateData)
handle_info({process_user_message, From},
normal_state = _StateName, StateData) ->
RoomQueueEmpty =
- queue:is_empty(StateData#state.room_queue),
- RoomQueue = queue:in({message, From},
- StateData#state.room_queue),
+ p1_queue:is_empty(StateData#state.room_queue),
+ RoomQueue = p1_queue:in({message, From},
+ StateData#state.room_queue),
StateData1 = StateData#state{room_queue = RoomQueue},
if RoomQueueEmpty ->
StateData2 = prepare_room_queue(StateData1),
@@ -606,7 +613,7 @@ handle_info({process_user_message, From},
end;
handle_info(process_room_queue,
normal_state = StateName, StateData) ->
- case queue:out(StateData#state.room_queue) of
+ case p1_queue:out(StateData#state.room_queue) of
{{value, {message, From}}, RoomQueue} ->
Activity = get_user_activity(From, StateData),
Packet = Activity#activity.message,
@@ -1418,6 +1425,32 @@ get_max_users_admin_threshold(StateData) ->
fun(I) when is_integer(I), I>0 -> I end,
5).
+-spec room_queue_new(binary(), shaper:shaper(), _) -> p1_queue:queue().
+room_queue_new(ServerHost, Shaper, QueueType) ->
+ HaveRoomShaper = Shaper /= none,
+ HaveMessageShaper = gen_mod:get_module_opt(
+ ServerHost, mod_muc, user_message_shaper,
+ fun(A) when is_atom(A) -> A end,
+ none) /= none,
+ HavePresenceShaper = gen_mod:get_module_opt(
+ ServerHost, mod_muc, user_presence_shaper,
+ fun(A) when is_atom(A) -> A end,
+ none) /= none,
+ HaveMinMessageInterval = gen_mod:get_module_opt(
+ ServerHost, mod_muc, min_message_interval,
+ fun(I) when is_number(I), I>=0 -> I end,
+ 0) /= 0,
+ HaveMinPresenceInterval = gen_mod:get_module_opt(
+ ServerHost, mod_muc, min_presence_interval,
+ fun(I) when is_number(I), I>=0 -> I end,
+ 0) /= 0,
+ if HaveRoomShaper or HaveMessageShaper or HavePresenceShaper
+ or HaveMinMessageInterval or HaveMinPresenceInterval ->
+ p1_queue:new(QueueType);
+ true ->
+ undefined
+ end.
+
-spec get_user_activity(jid(), state()) -> #activity{}.
get_user_activity(JID, StateData) ->
case treap:lookup(jid:tolower(JID),
@@ -1515,7 +1548,7 @@ clean_treap(Treap, CleanPriority) ->
-spec prepare_room_queue(state()) -> state().
prepare_room_queue(StateData) ->
- case queue:out(StateData#state.room_queue) of
+ case p1_queue:out(StateData#state.room_queue) of
{{value, {message, From}}, _RoomQueue} ->
Activity = get_user_activity(From, StateData),
Packet = Activity#activity.message,
@@ -1997,38 +2030,34 @@ get_history(Nick, Packet, #state{history = History}) ->
#muc{history = #muc_history{} = MUCHistory} ->
Now = p1_time_compat:timestamp(),
Q = History#lqueue.queue,
- {NewQ, Len} = filter_history(Q, MUCHistory, Now, Nick, queue:new(), 0, 0),
- History#lqueue{queue = NewQ, len = Len};
+ filter_history(Q, Now, Nick, MUCHistory);
_ ->
- History
+ p1_queue:to_list(History#lqueue.queue)
end.
--spec filter_history(?TQUEUE, muc_history(), erlang:timestamp(), binary(),
- ?TQUEUE, non_neg_integer(), non_neg_integer()) ->
- {?TQUEUE, non_neg_integer()}.
-filter_history(Queue, #muc_history{since = Since,
- seconds = Seconds,
- maxstanzas = MaxStanzas,
- maxchars = MaxChars} = MUC,
- Now, Nick, AccQueue, NumStanzas, NumChars) ->
- case queue:out_r(Queue) of
- {{value, {_, _, _, TimeStamp, Size} = Elem}, NewQueue} ->
- NowDiff = timer:now_diff(Now, TimeStamp) div 1000000,
- Chars = Size + byte_size(Nick) + 1,
- if (NumStanzas < MaxStanzas) andalso
- (TimeStamp > Since) andalso
- (NowDiff =< Seconds) andalso
- (NumChars + Chars =< MaxChars) ->
- filter_history(NewQueue, MUC, Now, Nick,
- queue:in_r(Elem, AccQueue),
- NumStanzas + 1,
- NumChars + Chars);
- true ->
- {AccQueue, NumStanzas}
- end;
- {empty, _} ->
- {AccQueue, NumStanzas}
- end.
+-spec filter_history(p1_queue:queue(), erlang:timestamp(),
+ binary(), muc_history()) -> list().
+filter_history(Queue, Now, Nick,
+ #muc_history{since = Since,
+ seconds = Seconds,
+ maxstanzas = MaxStanzas,
+ maxchars = MaxChars}) ->
+ {History, _, _} =
+ lists:foldr(
+ fun({_, _, _, TimeStamp, Size} = Elem,
+ {Elems, NumStanzas, NumChars} = Acc) ->
+ NowDiff = timer:now_diff(Now, TimeStamp) div 1000000,
+ Chars = Size + byte_size(Nick) + 1,
+ if (NumStanzas < MaxStanzas) andalso
+ (TimeStamp > Since) andalso
+ (NowDiff =< Seconds) andalso
+ (NumChars + Chars =< MaxChars) ->
+ {[Elem|Elems], NumStanzas + 1, NumChars + Chars};
+ true ->
+ Acc
+ end
+ end, {[], 0, 0}, p1_queue:to_list(Queue)),
+ History.
-spec is_room_overcrowded(state()) -> boolean().
is_room_overcrowded(StateData) ->
@@ -2381,31 +2410,28 @@ status_codes(IsInitialPresence, _IsSelfPresence = true, StateData) ->
end;
status_codes(_IsInitialPresence, _IsSelfPresence = false, _StateData) -> [].
--spec lqueue_new(non_neg_integer()) -> lqueue().
-lqueue_new(Max) ->
- #lqueue{queue = queue:new(), len = 0, max = Max}.
+-spec lqueue_new(non_neg_integer(), ram | file) -> lqueue().
+lqueue_new(Max, Type) ->
+ #lqueue{queue = p1_queue:new(Type), max = Max}.
-spec lqueue_in(term(), lqueue()) -> lqueue().
%% If the message queue limit is set to 0, do not store messages.
lqueue_in(_Item, LQ = #lqueue{max = 0}) -> LQ;
%% Otherwise, rotate messages in the queue store.
-lqueue_in(Item,
- #lqueue{queue = Q1, len = Len, max = Max}) ->
- Q2 = queue:in(Item, Q1),
+lqueue_in(Item, #lqueue{queue = Q1, max = Max}) ->
+ Len = p1_queue:len(Q1),
+ Q2 = p1_queue:in(Item, Q1),
if Len >= Max ->
Q3 = lqueue_cut(Q2, Len - Max + 1),
- #lqueue{queue = Q3, len = Max, max = Max};
- true -> #lqueue{queue = Q2, len = Len + 1, max = Max}
+ #lqueue{queue = Q3, max = Max};
+ true -> #lqueue{queue = Q2, max = Max}
end.
--spec lqueue_cut(queue:queue(), non_neg_integer()) -> queue:queue().
+-spec lqueue_cut(p1_queue:queue(), non_neg_integer()) -> p1_queue:queue().
lqueue_cut(Q, 0) -> Q;
lqueue_cut(Q, N) ->
- {_, Q1} = queue:out(Q), lqueue_cut(Q1, N - 1).
-
--spec lqueue_to_list(lqueue()) -> list().
-lqueue_to_list(#lqueue{queue = Q1}) ->
- queue:to_list(Q1).
+ {_, Q1} = p1_queue:out(Q),
+ lqueue_cut(Q1, N - 1).
-spec add_message_to_history(binary(), jid(), message(), state()) -> state().
add_message_to_history(FromNick, FromJID, Packet, StateData) ->
@@ -2436,7 +2462,7 @@ add_message_to_history(FromNick, FromJID, Packet, StateData) ->
StateData
end.
--spec send_history(jid(), lqueue(), state()) -> ok.
+-spec send_history(jid(), list(), state()) -> ok.
send_history(JID, History, StateData) ->
lists:foreach(
fun({Nick, Packet, _HaveSubject, _TimeStamp, _Size}) ->
@@ -2445,7 +2471,7 @@ send_history(JID, History, StateData) ->
Packet,
jid:replace_resource(StateData#state.jid, Nick),
JID))
- end, lqueue_to_list(History)).
+ end, History).
-spec send_subject(jid(), state()) -> ok.
send_subject(JID, #state{subject_author = Nick} = StateData) ->