aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xejabberdctl.template2
-rw-r--r--src/ejabberd_app.erl1
-rw-r--r--src/ejabberd_auth_sql.erl11
-rw-r--r--src/ejabberd_c2s.erl4
-rw-r--r--src/ejabberd_logger.erl48
-rw-r--r--src/ejabberd_oauth_sql.erl4
-rw-r--r--src/ejabberd_router_sql.erl12
-rw-r--r--src/ejabberd_s2s_in.erl4
-rw-r--r--src/ejabberd_sm.erl7
-rw-r--r--src/ejabberd_sm_sql.erl12
-rw-r--r--src/ejabberd_sql.erl23
-rw-r--r--src/ejabberd_sql_sup.erl2
-rw-r--r--src/ejabberd_sup.erl8
-rw-r--r--src/ejabberd_system_monitor.erl588
-rw-r--r--src/mod_bosh_sql.erl9
-rw-r--r--src/mod_caps.erl4
-rw-r--r--src/mod_caps_sql.erl4
-rw-r--r--src/mod_carboncopy_sql.erl9
-rw-r--r--src/mod_last_sql.erl8
-rw-r--r--src/mod_muc_admin.erl11
-rw-r--r--src/mod_muc_room.erl3
-rw-r--r--src/mod_muc_sql.erl26
-rw-r--r--src/mod_proxy65_sql.erl3
-rw-r--r--src/mod_pubsub.erl3
-rw-r--r--src/mod_push_sql.erl24
-rw-r--r--src/node_flat_sql.erl2
-rw-r--r--src/xmpp_stream_in.erl38
27 files changed, 424 insertions, 446 deletions
diff --git a/ejabberdctl.template b/ejabberdctl.template
index 836377662..571b90b66 100755
--- a/ejabberdctl.template
+++ b/ejabberdctl.template
@@ -110,7 +110,7 @@ export ERL_LIBS
exec_cmd()
{
case $EXEC_CMD in
- as_install_user) su -c '"$0" "$@"' "$INSTALLUSER" -- "$@" ;;
+ as_install_user) su -s /bin/sh -c '"$0" "$@"' "$INSTALLUSER" -- "$@" ;;
as_current_user) "$@" ;;
esac
}
diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl
index 3743a8f04..4e9819d64 100644
--- a/src/ejabberd_app.erl
+++ b/src/ejabberd_app.erl
@@ -50,6 +50,7 @@ start(normal, _Args) ->
ejabberd_mnesia:start(),
file_queue_init(),
maybe_add_nameservers(),
+ ejabberd_system_monitor:start(),
case ejabberd_sup:start_link() of
{ok, SupPid} ->
register_elixir_config_hooks(),
diff --git a/src/ejabberd_auth_sql.erl b/src/ejabberd_auth_sql.erl
index 3f328c4a1..af4ad2821 100644
--- a/src/ejabberd_auth_sql.erl
+++ b/src/ejabberd_auth_sql.erl
@@ -71,8 +71,7 @@ set_password(User, Server, Password) ->
case ejabberd_sql:sql_transaction(Server, F) of
{atomic, _} ->
ok;
- {aborted, Reason} ->
- ?ERROR_MSG("failed to write to SQL table: ~p", [Reason]),
+ {aborted, _} ->
{error, db_failure}
end.
@@ -115,9 +114,7 @@ get_password(User, Server) ->
iterationcount = IterationCount}};
{selected, []} ->
error;
- Err ->
- ?ERROR_MSG("Failed to read password for user ~s@~s: ~p",
- [User, Server, Err]),
+ _ ->
error
end.
@@ -125,9 +122,7 @@ remove_user(User, Server) ->
case del_user(Server, User) of
{updated, _} ->
ok;
- Err ->
- ?ERROR_MSG("failed to delete user ~s@~s: ~p",
- [User, Server, Err]),
+ _ ->
{error, db_failure}
end.
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 258b5f3f2..a49987feb 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -287,8 +287,8 @@ process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket,
State1;
process_terminated(#{sockmod := SockMod, socket := Socket,
stop_reason := {tls, _}} = State, Reason) ->
- ?ERROR_MSG("(~s) Failed to secure c2s connection: ~s",
- [SockMod:pp(Socket), format_reason(State, Reason)]),
+ ?WARNING_MSG("(~s) Failed to secure c2s connection: ~s",
+ [SockMod:pp(Socket), format_reason(State, Reason)]),
State;
process_terminated(State, _Reason) ->
State.
diff --git a/src/ejabberd_logger.erl b/src/ejabberd_logger.erl
index eee9d3b83..1181d4bcb 100644
--- a/src/ejabberd_logger.erl
+++ b/src/ejabberd_logger.erl
@@ -27,7 +27,8 @@
-behaviour(ejabberd_config).
%% API
--export([start/0, reopen_log/0, rotate_log/0, get/0, set/1, get_log_path/0, opt_type/1]).
+-export([start/0, restart/0, reopen_log/0, rotate_log/0, get/0, set/1,
+ get_log_path/0, opt_type/1]).
-include("ejabberd.hrl").
@@ -102,6 +103,11 @@ get_string_env(Name, Default) ->
%% @spec () -> ok
start() ->
+ start(4).
+
+-spec start(loglevel()) -> ok.
+start(Level) ->
+ LLevel = get_lager_loglevel(Level),
StartedApps = application:which_applications(5000),
case lists:keyfind(logger, 1, StartedApps) of
%% Elixir logger is started. We assume everything is in place
@@ -109,24 +115,24 @@ start() ->
{logger, _, _} ->
error_logger:info_msg("Ignoring ejabberd logger options, using Elixir Logger.", []),
%% Do not start lager, we rely on Elixir Logger
- do_start_for_logger();
+ do_start_for_logger(LLevel);
_ ->
- do_start()
+ do_start(LLevel)
end.
-do_start_for_logger() ->
+do_start_for_logger(Level) ->
application:load(sasl),
application:set_env(sasl, sasl_error_logger, false),
application:load(lager),
application:set_env(lager, error_logger_redirect, false),
application:set_env(lager, error_logger_whitelist, ['Elixir.Logger.ErrorHandler']),
application:set_env(lager, crash_log, false),
- application:set_env(lager, handlers, [{elixir_logger_backend, [{level, info}]}]),
+ application:set_env(lager, handlers, [{elixir_logger_backend, [{level, Level}]}]),
ejabberd:start_app(lager),
ok.
%% Start lager
-do_start() ->
+do_start(Level) ->
application:load(sasl),
application:set_env(sasl, sasl_error_logger, false),
application:load(lager),
@@ -141,8 +147,8 @@ do_start() ->
application:set_env(lager, error_logger_hwm, LogRateLimit),
application:set_env(
lager, handlers,
- [{lager_console_backend, info},
- {lager_file_backend, [{file, ConsoleLog}, {level, info}, {date, LogRotateDate},
+ [{lager_console_backend, Level},
+ {lager_file_backend, [{file, ConsoleLog}, {level, Level}, {date, LogRotateDate},
{count, LogRotateCount}, {size, LogRotateSize}]},
{lager_file_backend, [{file, ErrorLog}, {level, error}, {date, LogRotateDate},
{count, LogRotateCount}, {size, LogRotateSize}]}]),
@@ -156,6 +162,11 @@ do_start() ->
end, gen_event:which_handlers(lager_event)),
ok.
+restart() ->
+ Level = ejabberd_config:get_option(loglevel, 4),
+ application:stop(lager),
+ start(Level).
+
%% @spec () -> ok
reopen_log() ->
%% Lager detects external log rotation automatically.
@@ -187,15 +198,7 @@ get() ->
%% @spec (loglevel() | {loglevel(), list()}) -> {module, module()}
set(LogLevel) when is_integer(LogLevel) ->
- LagerLogLevel = case LogLevel of
- 0 -> none;
- 1 -> critical;
- 2 -> error;
- 3 -> warning;
- 4 -> info;
- 5 -> debug;
- E -> throw({wrong_loglevel, E})
- end,
+ LagerLogLevel = get_lager_loglevel(LogLevel),
case get_lager_loglevel() of
LagerLogLevel ->
ok;
@@ -228,6 +231,17 @@ get_lager_loglevel() ->
end,
none, Handlers).
+get_lager_loglevel(LogLevel) ->
+ case LogLevel of
+ 0 -> none;
+ 1 -> critical;
+ 2 -> error;
+ 3 -> warning;
+ 4 -> info;
+ 5 -> debug;
+ E -> erlang:error({wrong_loglevel, E})
+ end.
+
get_lager_handlers() ->
case catch gen_event:which_handlers(lager_event) of
{'EXIT',noproc} ->
diff --git a/src/ejabberd_oauth_sql.erl b/src/ejabberd_oauth_sql.erl
index 14eaca6a8..61e9986a6 100644
--- a/src/ejabberd_oauth_sql.erl
+++ b/src/ejabberd_oauth_sql.erl
@@ -57,9 +57,7 @@ store(R) ->
"expire=%(Expire)d"]) of
ok ->
ok;
- Err ->
- ?ERROR_MSG("Failed to write to SQL 'oauth_token' table: ~p",
- [Err]),
+ _ ->
{error, db_failure}
end.
diff --git a/src/ejabberd_router_sql.erl b/src/ejabberd_router_sql.erl
index c0264da31..d0ab98a6a 100644
--- a/src/ejabberd_router_sql.erl
+++ b/src/ejabberd_router_sql.erl
@@ -61,8 +61,7 @@ register_route(Domain, ServerHost, LocalHint, _, Pid) ->
"local_hint=%(LocalHintS)s"]) of
ok ->
ok;
- Err ->
- ?ERROR_MSG("failed to update 'route' table: ~p", [Err]),
+ _ ->
{error, db_failure}
end.
@@ -75,8 +74,7 @@ unregister_route(Domain, _, Pid) ->
"and pid=%(PidS)s and node=%(Node)s")) of
{updated, _} ->
ok;
- Err ->
- ?ERROR_MSG("failed to delete from 'route' table: ~p", [Err]),
+ _ ->
{error, db_failure}
end.
@@ -90,8 +88,7 @@ find_routes(Domain) ->
fun(Row) ->
row_to_route(Domain, Row)
end, Rows)};
- Err ->
- ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]),
+ _ ->
{error, db_failure}
end.
@@ -101,8 +98,7 @@ get_all_routes() ->
?SQL("select @(domain)s from route where domain <> server_host")) of
{selected, Domains} ->
{ok, [Domain || {Domain} <- Domains]};
- Err ->
- ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]),
+ _ ->
{error, db_failure}
end.
diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl
index a949e83d6..ae81f739e 100644
--- a/src/ejabberd_s2s_in.erl
+++ b/src/ejabberd_s2s_in.erl
@@ -289,8 +289,8 @@ terminate(Reason, #{auth_domains := AuthDomains,
sockmod := SockMod, socket := Socket} = State) ->
case maps:get(stop_reason, State, undefined) of
{tls, _} = Err ->
- ?ERROR_MSG("(~s) Failed to secure inbound s2s connection: ~s",
- [SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]);
+ ?WARNING_MSG("(~s) Failed to secure inbound s2s connection: ~s",
+ [SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]);
_ ->
ok
end,
diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl
index 3df1d88e0..97e599253 100644
--- a/src/ejabberd_sm.erl
+++ b/src/ejabberd_sm.erl
@@ -314,8 +314,11 @@ get_session_sid(User, Server, Resource) ->
LResource = jid:resourceprep(Resource),
Mod = get_sm_backend(LServer),
case online(get_sessions(Mod, LUser, LServer, LResource)) of
- [#session{sid = SID}] -> SID;
- _ -> none
+ [] ->
+ none;
+ Ss ->
+ #session{sid = SID} = lists:max(Ss),
+ SID
end.
-spec get_session_sids(binary(), binary()) -> [sid()].
diff --git a/src/ejabberd_sm_sql.erl b/src/ejabberd_sm_sql.erl
index 55e21040b..2dd884cd2 100644
--- a/src/ejabberd_sm_sql.erl
+++ b/src/ejabberd_sm_sql.erl
@@ -80,8 +80,7 @@ set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R},
"info=%(InfoS)s"]) of
ok ->
ok;
- Err ->
- ?ERROR_MSG("failed to update 'sm' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -93,8 +92,7 @@ delete_session(#session{usr = {_, LServer, _}, sid = {Now, Pid}}) ->
?SQL("delete from sm where usec=%(TS)d and pid=%(PidS)s")) of
{updated, _} ->
ok;
- Err ->
- ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -117,8 +115,7 @@ get_sessions(LServer) ->
catch _:{bad_node, _} -> []
end
end, Rows);
- Err ->
- ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]),
+ _Err ->
[]
end.
@@ -135,8 +132,7 @@ get_sessions(LUser, LServer) ->
catch _:{bad_node, _} -> []
end
end, Rows)};
- Err ->
- ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl
index 8d30cf637..c4bf3a737 100644
--- a/src/ejabberd_sql.erl
+++ b/src/ejabberd_sql.erl
@@ -158,18 +158,24 @@ sql_call(Host, Msg) ->
case ejabberd_sql_sup:get_random_pid(Host) of
none -> {error, <<"Unknown Host">>};
Pid ->
- p1_fsm:sync_send_event(Pid,{sql_cmd, Msg,
- p1_time_compat:monotonic_time(milli_seconds)},
- query_timeout(Host))
+ sync_send_event(Pid,{sql_cmd, Msg,
+ p1_time_compat:monotonic_time(milli_seconds)},
+ query_timeout(Host))
end;
_State -> nested_op(Msg)
end.
keep_alive(Host, PID) ->
- p1_fsm:sync_send_event(PID,
- {sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
- p1_time_compat:monotonic_time(milli_seconds)},
- query_timeout(Host)).
+ sync_send_event(PID,
+ {sql_cmd, {sql_query, ?KEEPALIVE_QUERY},
+ p1_time_compat:monotonic_time(milli_seconds)},
+ query_timeout(Host)).
+
+sync_send_event(Pid, Msg, Timeout) ->
+ try p1_fsm:sync_send_event(Pid, Msg, Timeout)
+ catch _:{Reason, {p1_fsm, _, _}} ->
+ {error, Reason}
+ end.
-spec sql_query_t(sql_query()) -> sql_query_result().
@@ -270,6 +276,7 @@ sqlite_file(Host) ->
%%% Callback functions from gen_fsm
%%%----------------------------------------------------------------------
init([Host, StartInterval]) ->
+ process_flag(trap_exit, true),
case ejabberd_config:get_option({sql_keepalive_interval, Host}) of
undefined ->
ok;
@@ -1077,6 +1084,8 @@ query_timeout(LServer) ->
timer:seconds(
ejabberd_config:get_option({sql_query_timeout, LServer}, 60)).
+check_error({error, Why} = Err, _Query) when Why == killed ->
+ Err;
check_error({error, Why} = Err, #sql_query{} = Query) ->
?ERROR_MSG("SQL query '~s' at ~p failed: ~p",
[Query#sql_query.hash, Query#sql_query.loc, Why]),
diff --git a/src/ejabberd_sql_sup.erl b/src/ejabberd_sql_sup.erl
index f7793eb2c..759f8128e 100644
--- a/src/ejabberd_sql_sup.erl
+++ b/src/ejabberd_sql_sup.erl
@@ -86,7 +86,7 @@ init([Host]) ->
get_pids(Host) ->
Rs = mnesia:dirty_read(sql_pool, Host),
- [R#sql_pool.pid || R <- Rs].
+ [R#sql_pool.pid || R <- Rs, is_process_alive(R#sql_pool.pid)].
get_random_pid(Host) ->
case get_pids(Host) of
diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl
index dbbc4d5e4..3f9809a05 100644
--- a/src/ejabberd_sup.erl
+++ b/src/ejabberd_sup.erl
@@ -47,13 +47,6 @@ init([]) ->
5000,
worker,
[ejabberd_cluster]},
- SystemMonitor =
- {ejabberd_system_monitor,
- {ejabberd_system_monitor, start_link, []},
- permanent,
- brutal_kill,
- worker,
- [ejabberd_system_monitor]},
S2S =
{ejabberd_s2s,
{ejabberd_s2s, start_link, []},
@@ -172,7 +165,6 @@ init([]) ->
PKIX,
ACME,
Listener,
- SystemMonitor,
S2S,
Captcha,
S2SInSupervisor,
diff --git a/src/ejabberd_system_monitor.erl b/src/ejabberd_system_monitor.erl
index 773104f9e..7787b34ab 100644
--- a/src/ejabberd_system_monitor.erl
+++ b/src/ejabberd_system_monitor.erl
@@ -24,320 +24,316 @@
%%%-------------------------------------------------------------------
-module(ejabberd_system_monitor).
-
+-behaviour(gen_event).
-behaviour(ejabberd_config).
-author('alexey@process-one.net').
+-author('ekhramtsov@process-one.net').
--behaviour(gen_server).
-
-%% API
--export([start_link/0, process_command/1, register_hook/1,
- unregister_hook/1, process_remote_command/1]).
-
--export([init/1, handle_call/3, handle_cast/2,
- handle_info/2, terminate/2, code_change/3, opt_type/1]).
-
--include("ejabberd.hrl").
--include("logger.hrl").
-
--include("xmpp.hrl").
-
--record(state, {}).
-
-%%====================================================================
%% API
-%%====================================================================
-%%--------------------------------------------------------------------
-%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
-%% Description: Starts the server
-%%--------------------------------------------------------------------
-start_link() ->
- LH = ejabberd_config:get_option(watchdog_large_heap, 1000000),
- Opts = [{large_heap, LH}],
- gen_server:start_link({local, ?MODULE}, ?MODULE, Opts,
- []).
+-export([start/0, opt_type/1]).
+
+%% gen_event callbacks
+-export([init/1, handle_event/2, handle_call/2,
+ handle_info/2, terminate/2, code_change/3]).
+
+%% We don't use ejabberd logger because lager can be overloaded
+%% too and alarm_handler may get stuck.
+%%-include("logger.hrl").
+
+-define(CHECK_INTERVAL, timer:seconds(30)).
+-define(DISK_FULL_THRES, 0.99).
+
+-record(state, {tref :: reference(),
+ mref :: reference()}).
+-record(proc_stat, {qlen :: non_neg_integer(),
+ memory :: non_neg_integer(),
+ initial_call :: mfa(),
+ current_function :: mfa(),
+ ancestors :: [pid() | atom()],
+ application :: pid() | atom(),
+ name :: pid() | atom()}).
+-type state() :: #state{}.
+-type proc_stat() :: #proc_stat{}.
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+-spec start() -> ok.
+start() ->
+ gen_event:add_handler(alarm_handler, ?MODULE, []),
+ gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {?MODULE, []}),
+ application:load(os_mon),
+ application:set_env(os_mon, start_cpu_sup, false),
+ application:set_env(os_mon, start_os_sup, false),
+ application:set_env(os_mon, start_memsup, true),
+ application:set_env(os_mon, start_disksup, true),
+ application:set_env(os_mon, disk_almost_full_threshold, ?DISK_FULL_THRES),
+ ejabberd:start_app(os_mon).
+
+excluded_apps() ->
+ [os_mon, mnesia, sasl, stdlib, kernel].
+
+%%%===================================================================
+%%% gen_event callbacks
+%%%===================================================================
+init([]) ->
+ {ok, #state{}}.
--spec process_command(stanza()) -> ok.
-process_command(#message{from = From, to = To, body = Body}) ->
- case To of
- #jid{luser = <<"">>, lresource = <<"watchdog">>} ->
- LFrom = jid:tolower(jid:remove_resource(From)),
- case lists:member(LFrom, get_admin_jids()) of
- true ->
- BodyText = xmpp:get_text(Body),
- spawn(fun () ->
- process_flag(priority, high),
- process_command1(From, To, BodyText)
- end),
- ok;
- false -> ok
- end;
+handle_event({set_alarm, {system_memory_high_watermark, _}}, State) ->
+ error_logger:warning_msg(
+ "More than 80% of OS memory is allocated, "
+ "starting OOM watchdog", []),
+ handle_overload(State),
+ {ok, restart_timer(State)};
+handle_event({clear_alarm, system_memory_high_watermark}, State) ->
+ cancel_timer(State#state.tref),
+ error_logger:info_msg(
+ "Memory consumption is back to normal, "
+ "stopping OOM watchdog", []),
+ {ok, State#state{tref = undefined}};
+handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) ->
+ case proc_stat(Pid, get_app_pids()) of
+ #proc_stat{name = Name} = ProcStat ->
+ error_logger:warning_msg(
+ "Process ~p consumes more than 5% of OS memory (~s)",
+ [Name, format_proc(ProcStat)]),
+ handle_overload(State),
+ {ok, State};
_ ->
- ok
+ {ok, State}
end;
-process_command(_) ->
+handle_event({clear_alarm, process_memory_high_watermark}, State) ->
+ {ok, State};
+handle_event({set_alarm, {{disk_almost_full, MountPoint}, _}}, State) ->
+ error_logger:warning_msg("Disk is almost full on ~p", [MountPoint]),
+ {ok, State};
+handle_event({clear_alarm, {disk_almost_full, MountPoint}}, State) ->
+ error_logger:info_msg("Disk usage is back to normal on ~p", [MountPoint]),
+ {ok, State};
+handle_event(Event, State) ->
+ error_logger:warning_msg("unexpected event: ~p", [Event]),
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, {error, badarg}, State}.
+
+handle_info({timeout, _TRef, handle_overload}, State) ->
+ handle_overload(State),
+ {ok, restart_timer(State)};
+handle_info(Info, State) ->
+ error_logger:warning_msg("unexpected info: ~p", [Info]),
+ {ok, State}.
+
+terminate(_Reason, _State) ->
ok.
-register_hook(Host) ->
- ejabberd_hooks:add(local_send_to_resource_hook, Host,
- ?MODULE, process_command, 50).
-
-unregister_hook(Host) ->
- ejabberd_hooks:delete(local_send_to_resource_hook, Host,
- ?MODULE, process_command, 50).
-
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
-
-%%--------------------------------------------------------------------
-%% Function: init(Args) -> {ok, State} |
-%% {ok, State, Timeout} |
-%% ignore |
-%% {stop, Reason}
-%% Description: Initiates the server
-%%--------------------------------------------------------------------
-init(Opts) ->
- LH = proplists:get_value(large_heap, Opts),
- process_flag(priority, high),
- erlang:system_monitor(self(), [{large_heap, LH}]),
- ejabberd_hooks:add(host_up, ?MODULE, register_hook, 50),
- ejabberd_hooks:add(host_down, ?MODULE, unregister_hook, 60),
- lists:foreach(fun register_hook/1, ?MYHOSTS),
- {ok, #state{}}.
-
-%%--------------------------------------------------------------------
-%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
-%% {reply, Reply, State, Timeout} |
-%% {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, Reply, State} |
-%% {stop, Reason, State}
-%% Description: Handling call messages
-%%--------------------------------------------------------------------
-handle_call({get, large_heap}, _From, State) ->
- {reply, get_large_heap(), State};
-handle_call({set, large_heap, NewValue}, _From,
- State) ->
- MonSettings = erlang:system_monitor(self(),
- [{large_heap, NewValue}]),
- OldLH = get_large_heap(MonSettings),
- NewLH = get_large_heap(),
- {reply, {lh_changed, OldLH, NewLH}, State};
-handle_call(_Request, _From, State) ->
- Reply = ok, {reply, Reply, State}.
-
-get_large_heap() ->
- MonSettings = erlang:system_monitor(),
- get_large_heap(MonSettings).
-
-get_large_heap(MonSettings) ->
- {_MonitorPid, Options} = MonSettings,
- proplists:get_value(large_heap, Options).
-
-%%--------------------------------------------------------------------
-%% Function: handle_cast(Msg, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling cast messages
-%%--------------------------------------------------------------------
-handle_cast(_Msg, State) -> {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: handle_info(Info, State) -> {noreply, State} |
-%% {noreply, State, Timeout} |
-%% {stop, Reason, State}
-%% Description: Handling all non call/cast messages
-%%--------------------------------------------------------------------
-handle_info({monitor, Pid, large_heap, Info}, State) ->
- spawn(fun () ->
- process_flag(priority, high),
- process_large_heap(Pid, Info)
- end),
- {noreply, State};
-handle_info(_Info, State) -> {noreply, State}.
-
-%%--------------------------------------------------------------------
-%% Function: terminate(Reason, State) -> void()
-%% Description: This function is called by a gen_server when it is about to
-%% terminate. It should be the opposite of Module:init/1 and do any necessary
-%% cleaning up. When it returns, the gen_server terminates with Reason.
-%% The return value is ignored.
-%%--------------------------------------------------------------------
-terminate(_Reason, _State) -> ok.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
-%%--------------------------------------------------------------------
-%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
-%% Description: Convert process state when code is changed
-%%--------------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) -> {ok, State}.
-
-%%--------------------------------------------------------------------
+%%%===================================================================
%%% Internal functions
-%%--------------------------------------------------------------------
-
-process_large_heap(Pid, Info) ->
- Host = (?MYNAME),
- JIDs = get_admin_jids(),
- DetailedInfo = detailed_info(Pid),
- Body = str:format("(~w) The process ~w is consuming too "
- "much memory:~n~p~n~s",
- [node(), Pid, Info, DetailedInfo]),
- From = jid:make(<<"">>, Host, <<"watchdog">>),
- Hint = [#hint{type = 'no-permanent-store'}],
- lists:foreach(
- fun(JID) ->
- send_message(From, jid:make(JID), Body, Hint)
- end, JIDs).
-
-send_message(From, To, Body) ->
- send_message(From, To, Body, []).
-
-send_message(From, To, Body, ExtraEls) ->
- ejabberd_router:route(#message{type = chat,
- from = From,
- to = To,
- body = xmpp:mk_text(Body),
- sub_els = ExtraEls}).
-
-get_admin_jids() ->
- ejabberd_config:get_option(watchdog_admins, []).
-
-detailed_info(Pid) ->
- case process_info(Pid, dictionary) of
- {dictionary, Dict} ->
- case lists:keysearch('$ancestors', 1, Dict) of
- {value, {'$ancestors', [Sup | _]}} ->
- case Sup of
- ejabberd_c2s_sup -> c2s_info(Pid);
- ejabberd_s2s_out_sup -> s2s_out_info(Pid);
- ejabberd_service_sup -> service_info(Pid);
- _ -> detailed_info1(Pid)
- end;
- _ -> detailed_info1(Pid)
- end;
- _ -> detailed_info1(Pid)
+%%%===================================================================
+-spec handle_overload(state()) -> ok.
+handle_overload(State) ->
+ handle_overload(State, processes()).
+
+-spec handle_overload(state(), [pid()]) -> ok.
+handle_overload(_State, Procs) ->
+ AppPids = get_app_pids(),
+ {TotalMsgs, ProcsNum, Apps, Stats} = overloaded_procs(AppPids, Procs),
+ if TotalMsgs >= 10000 ->
+ SortedStats = lists:reverse(lists:keysort(#proc_stat.qlen, Stats)),
+ error_logger:warning_msg(
+ "The system is overloaded with ~b messages "
+ "queued by ~b process(es) (~b%) "
+ "from the following applications: ~s; "
+ "the top processes are:~n~s",
+ [TotalMsgs, ProcsNum,
+ round(ProcsNum*100/length(Procs)),
+ format_apps(Apps),
+ format_top_procs(SortedStats)]),
+ kill(SortedStats, round(TotalMsgs/ProcsNum));
+ true ->
+ ok
+ end,
+ lists:foreach(fun erlang:garbage_collect/1, Procs).
+
+-spec get_app_pids() -> map().
+get_app_pids() ->
+ try application:info() of
+ Info ->
+ case lists:keyfind(running, 1, Info) of
+ {_, Apps} ->
+ lists:foldl(
+ fun({Name, Pid}, M) when is_pid(Pid) ->
+ maps:put(Pid, Name, M);
+ (_, M) ->
+ M
+ end, #{}, Apps);
+ false ->
+ #{}
+ end
+ catch _:_ ->
+ #{}
end.
-detailed_info1(Pid) ->
- io_lib:format("~p",
- [[process_info(Pid, current_function),
- process_info(Pid, initial_call),
- process_info(Pid, message_queue_len),
- process_info(Pid, links), process_info(Pid, dictionary),
- process_info(Pid, heap_size),
- process_info(Pid, stack_size)]]).
-
-c2s_info(Pid) ->
- [<<"Process type: c2s">>, check_send_queue(Pid),
- <<"\n">>,
- io_lib:format("Command to kill this process: kill ~s ~w",
- [iolist_to_binary(atom_to_list(node())), Pid])].
-
-s2s_out_info(Pid) ->
- FromTo = mnesia:dirty_select(s2s,
- [{{s2s, '$1', Pid, '_'}, [], ['$1']}]),
- [<<"Process type: s2s_out">>,
- case FromTo of
- [{From, To}] ->
- <<"\n",
- (io_lib:format("S2S connection: from ~s to ~s",
- [From, To]))/binary>>;
- _ -> <<"">>
- end,
- check_send_queue(Pid), <<"\n">>,
- io_lib:format("Command to kill this process: kill ~s ~w",
- [iolist_to_binary(atom_to_list(node())), Pid])].
-
-service_info(Pid) ->
- Routes = mnesia:dirty_select(route,
- [{{route, '$1', Pid, '_'}, [], ['$1']}]),
- [<<"Process type: s2s_out">>,
- case Routes of
- [Route] -> <<"\nServiced domain: ", Route/binary>>;
- _ -> <<"">>
- end,
- check_send_queue(Pid), <<"\n">>,
- io_lib:format("Command to kill this process: kill ~s ~w",
- [iolist_to_binary(atom_to_list(node())), Pid])].
-
-check_send_queue(Pid) ->
- case {process_info(Pid, current_function),
- process_info(Pid, message_queue_len)}
- of
- {{current_function, MFA}, {message_queue_len, MLen}} ->
- if MLen > 100 ->
- case MFA of
- {prim_inet, send, 2} ->
- <<"\nPossible reason: the process is blocked "
- "trying to send data over its TCP connection.">>;
- {M, F, A} ->
- [<<"\nPossible reason: the process can't "
- "process messages faster than they arrive. ">>,
- io_lib:format("Current function is ~w:~w/~w",
- [M, F, A])]
- end;
- true -> <<"">>
- end;
- _ -> <<"">>
+-spec overloaded_procs(map(), [pid()])
+ -> {non_neg_integer(), non_neg_integer(), dict:dict(), [proc_stat()]}.
+overloaded_procs(AppPids, AllProcs) ->
+ lists:foldl(
+ fun(Pid, {TotalMsgs, ProcsNum, Apps, Stats}) ->
+ case proc_stat(Pid, AppPids) of
+ #proc_stat{qlen = QLen, application = App} = Stat
+ when QLen > 0 ->
+ {TotalMsgs + QLen, ProcsNum + 1,
+ dict:update_counter(App, QLen, Apps),
+ [Stat|Stats]};
+ _ ->
+ {TotalMsgs, ProcsNum, Apps, Stats}
+ end
+ end, {0, 0, dict:new(), []}, AllProcs).
+
+-spec proc_stat(pid(), map()) -> proc_stat() | undefined.
+proc_stat(Pid, AppPids) ->
+ case process_info(Pid, [message_queue_len,
+ memory,
+ initial_call,
+ current_function,
+ dictionary,
+ group_leader,
+ registered_name]) of
+ [{_, MsgLen}, {_, Mem}, {_, InitCall},
+ {_, CurrFun}, {_, Dict}, {_, GL}, {_, Name}] ->
+ IntLen = proplists:get_value('$internal_queue_len', Dict, 0),
+ TrueInitCall = proplists:get_value('$initial_call', Dict, InitCall),
+ Ancestors = proplists:get_value('$ancestors', Dict, []),
+ Len = IntLen + MsgLen,
+ App = maps:get(GL, AppPids, kernel),
+ RegName = case Name of
+ [] -> Pid;
+ _ -> Name
+ end,
+ #proc_stat{qlen = Len,
+ memory = Mem,
+ initial_call = TrueInitCall,
+ current_function = CurrFun,
+ ancestors = Ancestors,
+ application = App,
+ name = RegName};
+ _ ->
+ undefined
end.
-process_command1(From, To, Body) ->
- process_command2(str:tokens(Body, <<" ">>), From, To).
-
-process_command2([<<"kill">>, SNode, SPid], From, To) ->
- Node = misc:binary_to_atom(SNode),
- remote_command(Node, [kill, SPid], From, To);
-process_command2([<<"showlh">>, SNode], From, To) ->
- Node = misc:binary_to_atom(SNode),
- remote_command(Node, [showlh], From, To);
-process_command2([<<"setlh">>, SNode, NewValueString],
- From, To) ->
- Node = misc:binary_to_atom(SNode),
- NewValue = binary_to_integer(NewValueString),
- remote_command(Node, [setlh, NewValue], From, To);
-process_command2([<<"help">>], From, To) ->
- send_message(To, From, help());
-process_command2(_, From, To) ->
- send_message(To, From, help()).
+-spec restart_timer(#state{}) -> #state{}.
+restart_timer(State) ->
+ cancel_timer(State#state.tref),
+ TRef = erlang:start_timer(?CHECK_INTERVAL, self(), handle_overload),
+ State#state{tref = TRef}.
+
+-spec cancel_timer(reference()) -> ok.
+cancel_timer(undefined) ->
+ ok;
+cancel_timer(TRef) ->
+ case erlang:cancel_timer(TRef) of
+ false ->
+ receive {timeout, TRef, _} -> ok
+ after 0 -> ok
+ end;
+ _ ->
+ ok
+ end.
-help() ->
- <<"Commands:\n kill <node> <pid>\n showlh "
- "<node>\n setlh <node> <integer>">>.
+-spec format_apps(dict:dict()) -> io:data().
+format_apps(Apps) ->
+ AppList = lists:reverse(lists:keysort(2, dict:to_list(Apps))),
+ string:join(
+ [io_lib:format("~p (~b msgs)", [App, Msgs]) || {App, Msgs} <- AppList],
+ ", ").
+
+-spec format_top_procs([proc_stat()]) -> io:data().
+format_top_procs(Stats) ->
+ Stats1 = lists:sublist(Stats, 5),
+ string:join(
+ lists:map(
+ fun(#proc_stat{name = Name} = Stat) ->
+ [io_lib:format("** ~w: ", [Name]), format_proc(Stat)]
+ end,Stats1),
+ io_lib:nl()).
+
+-spec format_proc(proc_stat()) -> io:data().
+format_proc(#proc_stat{qlen = Len, memory = Mem, initial_call = InitCall,
+ current_function = CurrFun, ancestors = Ancs,
+ application = App}) ->
+ io_lib:format(
+ "msgs = ~b, memory = ~b, initial_call = ~s, "
+ "current_function = ~s, ancestors = ~w, application = ~w",
+ [Len, Mem, format_mfa(InitCall), format_mfa(CurrFun), Ancs, App]).
+
+-spec format_mfa(mfa()) -> io:data().
+format_mfa({M, F, A}) when is_atom(M), is_atom(F), is_integer(A) ->
+ io_lib:format("~s:~s/~b", [M, F, A]);
+format_mfa(WTF) ->
+ io_lib:format("~w", [WTF]).
+
+-spec kill([proc_stat()], non_neg_integer()) -> ok.
+kill(Stats, Threshold) ->
+ case ejabberd_config:get_option(oom_killer, true) of
+ true ->
+ do_kill(Stats, Threshold);
+ false ->
+ ok
+ end.
-remote_command(Node, Args, From, To) ->
- Message = case ejabberd_cluster:call(Node, ?MODULE,
- process_remote_command, [Args])
- of
- {badrpc, Reason} ->
- io_lib:format("Command failed:~n~p", [Reason]);
- Result -> Result
- end,
- send_message(To, From, iolist_to_binary(Message)).
+-spec do_kill([proc_stat()], non_neg_integer()) -> ok.
+do_kill(Stats, Threshold) ->
+ Killed = lists:filtermap(
+ fun(#proc_stat{qlen = Len, name = Name, application = App})
+ when Len >= Threshold ->
+ case lists:member(App, excluded_apps()) of
+ true ->
+ error_logger:warning_msg(
+ "Unable to kill process ~p from whitelisted "
+ "application ~p", [Name, App]),
+ false;
+ false ->
+ case kill_proc(Name) of
+ false ->
+ false;
+ Pid ->
+ maybe_restart_app(App),
+ {true, Pid}
+ end
+ end;
+ (_) ->
+ false
+ end, Stats),
+ TotalKilled = length(Killed),
+ if TotalKilled > 0 ->
+ error_logger:error_msg(
+ "Killed ~b process(es) consuming more than ~b message(s) each",
+ [TotalKilled, Threshold]);
+ true ->
+ ok
+ end.
-process_remote_command([kill, SPid]) ->
- exit(list_to_pid(SPid), kill), <<"ok">>;
-process_remote_command([showlh]) ->
- Res = gen_server:call(ejabberd_system_monitor,
- {get, large_heap}),
- io_lib:format("Current large heap: ~p", [Res]);
-process_remote_command([setlh, NewValue]) ->
- {lh_changed, OldLH, NewLH} =
- gen_server:call(ejabberd_system_monitor,
- {set, large_heap, NewValue}),
- io_lib:format("Result of set large heap: ~p --> ~p",
- [OldLH, NewLH]);
-process_remote_command(_) -> throw(unknown_command).
+-spec kill_proc(pid() | atom()) -> false | pid().
+kill_proc(undefined) ->
+ false;
+kill_proc(Name) when is_atom(Name) ->
+ kill_proc(whereis(Name));
+kill_proc(Pid) ->
+ exit(Pid, kill),
+ Pid.
+
+-spec maybe_restart_app(atom()) -> any().
+maybe_restart_app(lager) ->
+ ejabberd_logger:restart();
+maybe_restart_app(_) ->
+ ok.
--spec opt_type(watchdog_admins) -> fun(([binary()]) -> ljid());
- (watchdog_large_heap) -> fun((pos_integer()) -> pos_integer());
+-spec opt_type(oom_killer) -> fun((boolean()) -> boolean());
(atom()) -> [atom()].
-opt_type(watchdog_admins) ->
- fun (JIDs) ->
- [jid:tolower(jid:decode(iolist_to_binary(S)))
- || S <- JIDs]
- end;
-opt_type(watchdog_large_heap) ->
- fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(_) -> [watchdog_admins, watchdog_large_heap].
+opt_type(oom_killer) ->
+ fun(B) when is_boolean(B) -> B end;
+opt_type(_) -> [oom_killer].
diff --git a/src/mod_bosh_sql.erl b/src/mod_bosh_sql.erl
index 621e9d317..d732521a6 100644
--- a/src/mod_bosh_sql.erl
+++ b/src/mod_bosh_sql.erl
@@ -59,8 +59,7 @@ open_session(SID, Pid) ->
"pid=%(PidS)s"]) of
ok ->
ok;
- Err ->
- ?ERROR_MSG("failed to update 'bosh' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -69,8 +68,7 @@ close_session(SID) ->
?MYNAME, ?SQL("delete from bosh where sid=%(SID)s")) of
{updated, _} ->
ok;
- Err ->
- ?ERROR_MSG("failed to delete from 'bosh' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -84,8 +82,7 @@ find_session(SID) ->
end;
{selected, []} ->
{error, notfound};
- Err ->
- ?ERROR_MSG("failed to select 'bosh' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
diff --git a/src/mod_caps.erl b/src/mod_caps.erl
index edc93bbf1..6021da252 100644
--- a/src/mod_caps.erl
+++ b/src/mod_caps.erl
@@ -206,8 +206,10 @@ c2s_presence_in(C2SState,
{Subscription, _} = ejabberd_hooks:run_fold(
roster_get_jid_info, To#jid.lserver,
{none, []}, [To#jid.luser, To#jid.lserver, From]),
+ ToSelf = (From#jid.luser == To#jid.luser)
+ and (From#jid.lserver == To#jid.lserver),
Insert = (Type == available)
- and ((Subscription == both) or (Subscription == to)),
+ and ((Subscription == both) or (Subscription == to) or ToSelf),
Delete = (Type == unavailable) or (Type == error),
if Insert or Delete ->
LFrom = jid:tolower(From),
diff --git a/src/mod_caps_sql.erl b/src/mod_caps_sql.erl
index 5d4c1e934..3fa18f16b 100644
--- a/src/mod_caps_sql.erl
+++ b/src/mod_caps_sql.erl
@@ -63,9 +63,7 @@ caps_write(LServer, NodePair, Features) ->
sql_write_features_t(NodePair, Features)) of
{atomic, _} ->
ok;
- {aborted, Reason} ->
- ?ERROR_MSG("Failed to write to SQL 'caps_features' table: ~p",
- [Reason]),
+ {aborted, _Reason} ->
{error, db_failure}
end.
diff --git a/src/mod_carboncopy_sql.erl b/src/mod_carboncopy_sql.erl
index 1b8e1e111..cda8c60d0 100644
--- a/src/mod_carboncopy_sql.erl
+++ b/src/mod_carboncopy_sql.erl
@@ -48,8 +48,7 @@ enable(LUser, LServer, LResource, NS) ->
"node=%(NodeS)s"]) of
ok ->
ok;
- Err ->
- ?ERROR_MSG("failed to update 'carboncopy' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -60,8 +59,7 @@ disable(LUser, LServer, LResource) ->
"and %(LServer)H and resource=%(LResource)s")) of
{updated, _} ->
ok;
- Err ->
- ?ERROR_MSG("failed to delete from 'carboncopy' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -73,8 +71,7 @@ list(LUser, LServer) ->
{selected, Rows} ->
{ok, [{Resource, NS, binary_to_atom(Node, latin1)}
|| {Resource, NS, Node} <- Rows]};
- Err ->
- ?ERROR_MSG("failed to select from 'carboncopy' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
diff --git a/src/mod_last_sql.erl b/src/mod_last_sql.erl
index f0889e4ec..0c86c72c4 100644
--- a/src/mod_last_sql.erl
+++ b/src/mod_last_sql.erl
@@ -51,9 +51,7 @@ get_last(LUser, LServer) ->
error;
{selected, [{TimeStamp, Status}]} ->
{ok, {TimeStamp, Status}};
- Reason ->
- ?ERROR_MSG("failed to get last for user ~s@~s: ~p",
- [LUser, LServer, Reason]),
+ _Reason ->
{error, db_failure}
end.
@@ -65,9 +63,7 @@ store_last_info(LUser, LServer, TimeStamp, Status) ->
"state=%(Status)s"]) of
ok ->
ok;
- Err ->
- ?ERROR_MSG("failed to store last activity for ~s@~s: ~p",
- [LUser, LServer, Err]),
+ _Err ->
{error, db_failure}
end.
diff --git a/src/mod_muc_admin.erl b/src/mod_muc_admin.erl
index 440deee02..39abb8334 100644
--- a/src/mod_muc_admin.erl
+++ b/src/mod_muc_admin.erl
@@ -866,9 +866,9 @@ get_room_occupants_number(Room, Host) ->
send_direct_invitation(RoomName, RoomService, Password, Reason, UsersString) ->
RoomJid = jid:make(RoomName, RoomService),
XmlEl = build_invitation(Password, Reason, RoomJid),
- UsersStrings = get_users_to_invite(RoomJid, UsersString),
- [send_direct_invitation(RoomJid, UserStrings, XmlEl)
- || UserStrings <- UsersStrings],
+ Users = get_users_to_invite(RoomJid, UsersString),
+ [send_direct_invitation(RoomJid, UserJid, XmlEl)
+ || UserJid <- Users],
timer:sleep(1000),
ok.
@@ -886,8 +886,9 @@ get_users_to_invite(RoomJid, UsersString) ->
orelse UserJid#jid.lserver /= OccupantJid#jid.lserver
end,
OccupantsJids),
- case Val of
- true -> {true, UserJid};
+ case {UserJid#jid.luser, Val} of
+ {<<>>, _} -> false;
+ {_, true} -> {true, UserJid};
_ -> false
end
end,
diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl
index e8d61dacb..fadfff6c5 100644
--- a/src/mod_muc_room.erl
+++ b/src/mod_muc_room.erl
@@ -331,7 +331,8 @@ normal_state({route, <<"">>,
catch _:{xmpp_codec, Why} ->
ErrTxt = xmpp:io_format_error(Why),
Err = xmpp:err_bad_request(ErrTxt, Lang),
- ejabberd_router:route_error(IQ0, Err)
+ ejabberd_router:route_error(IQ0, Err),
+ {next_state, normal_state, StateData}
end;
normal_state({route, <<"">>, #iq{} = IQ}, StateData) ->
Err = xmpp:err_bad_request(),
diff --git a/src/mod_muc_sql.erl b/src/mod_muc_sql.erl
index 8aa6071c8..0cb09775b 100644
--- a/src/mod_muc_sql.erl
+++ b/src/mod_muc_sql.erl
@@ -187,12 +187,10 @@ get_rooms(LServer, Host) ->
#muc_room{name_host = {Room, Host},
opts = mod_muc:opts_to_binary(OptsD2)}
end, RoomOpts);
- Err ->
- ?ERROR_MSG("failed to get rooms subscribers: ~p", [Err]),
+ _Err ->
[]
end;
- Err ->
- ?ERROR_MSG("failed to get rooms: ~p", [Err]),
+ _Err ->
[]
end.
@@ -266,7 +264,6 @@ register_online_room(ServerHost, Room, Host, Pid) ->
ok ->
ok;
Err ->
- ?ERROR_MSG("failed to update 'muc_online_room': ~p", [Err]),
Err
end.
@@ -290,8 +287,7 @@ find_online_room(ServerHost, Room, Host) ->
end;
{selected, []} ->
error;
- Err ->
- ?ERROR_MSG("failed to select 'muc_online_room': ~p", [Err]),
+ _Err ->
error
end.
@@ -302,8 +298,7 @@ count_online_rooms(ServerHost, Host) ->
"where host=%(Host)s")) of
{selected, [{Num}]} ->
Num;
- Err ->
- ?ERROR_MSG("failed to select 'muc_online_room': ~p", [Err]),
+ _Err ->
0
end.
@@ -319,8 +314,7 @@ get_online_rooms(ServerHost, Host, _RSM) ->
catch _:{bad_node, _} -> []
end
end, Rows);
- Err ->
- ?ERROR_MSG("failed to select 'muc_online_room': ~p", [Err]),
+ _Err ->
[]
end.
@@ -340,7 +334,6 @@ register_online_user(ServerHost, {U, S, R}, Room, Host) ->
ok ->
ok;
Err ->
- ?ERROR_MSG("failed to update 'muc_online_users': ~p", [Err]),
Err
end.
@@ -359,8 +352,7 @@ count_online_rooms_by_user(ServerHost, U, S) ->
"username=%(U)s and server=%(S)s")) of
{selected, [{Num}]} ->
Num;
- Err ->
- ?ERROR_MSG("failed to select 'muc_online_users': ~p", [Err]),
+ _Err ->
0
end.
@@ -371,8 +363,7 @@ get_online_rooms_by_user(ServerHost, U, S) ->
"username=%(U)s and server=%(S)s")) of
{selected, Rows} ->
Rows;
- Err ->
- ?ERROR_MSG("failed to select 'muc_online_users': ~p", [Err]),
+ _Err ->
[]
end.
@@ -424,8 +415,7 @@ get_subscribed_rooms(LServer, Host, Jid) ->
" and host=%(Host)s")) of
{selected, Subs} ->
[jid:make(Room, Host, <<>>) || {Room} <- Subs];
- Error ->
- ?ERROR_MSG("Error when fetching subscribed rooms ~p", [Error]),
+ _Error ->
[]
end.
diff --git a/src/mod_proxy65_sql.erl b/src/mod_proxy65_sql.erl
index 18f8db147..715289db6 100644
--- a/src/mod_proxy65_sql.erl
+++ b/src/mod_proxy65_sql.erl
@@ -69,7 +69,6 @@ register_stream(SID, Pid) ->
{atomic, _} ->
ok;
{aborted, Reason} ->
- ?ERROR_MSG("failed to register stream: ~p", [Reason]),
{error, Reason}
end.
@@ -82,7 +81,6 @@ unregister_stream(SID) ->
{atomic, _} ->
ok;
{aborted, Reason} ->
- ?ERROR_MSG("failed to unregister stream: ~p", [Reason]),
{error, Reason}
end.
@@ -133,7 +131,6 @@ activate_stream(SID, IJID, MaxConnections, _Node) ->
{aborted, {limit, _, _} = Limit} ->
{error, Limit};
{aborted, Reason} ->
- ?ERROR_MSG("failed to activate bytestream: ~p", [Reason]),
{error, Reason}
end.
diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl
index efa22305c..bedbd3495 100644
--- a/src/mod_pubsub.erl
+++ b/src/mod_pubsub.erl
@@ -550,6 +550,9 @@ disco_items(Host, Node, From) ->
%%
-spec caps_add(jid(), jid(), [binary()]) -> ok.
+caps_add(JID, JID, _Features) ->
+ %% Send the owner his last PEP items.
+ send_last_pep(JID, JID);
caps_add(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features)
when S1 =/= S2 ->
%% When a remote contact goes online while the local user is offline, the
diff --git a/src/mod_push_sql.erl b/src/mod_push_sql.erl
index c82d9fc02..2a1d588e9 100644
--- a/src/mod_push_sql.erl
+++ b/src/mod_push_sql.erl
@@ -57,8 +57,7 @@ store_session(LUser, LServer, NowTS, PushJID, Node, XData) ->
"xml=%(XML)s"]) of
ok ->
{ok, {NowTS, PushLJID, Node, XData}};
- Err ->
- ?ERROR_MSG("Failed to update 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -77,8 +76,7 @@ lookup_session(LUser, LServer, PushJID, Node) ->
{ok, {NowTS, PushLJID, Node, XData}};
{selected, []} ->
{error, notfound};
- Err ->
- ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -95,8 +93,7 @@ lookup_session(LUser, LServer, NowTS) ->
{ok, {NowTS, PushLJID, Node, XData}};
{selected, []} ->
{error, notfound};
- Err ->
- ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -115,8 +112,7 @@ lookup_sessions(LUser, LServer, PushJID) ->
XData = decode_xdata(XML, LUser, LServer),
{NowTS, PushLJID, Node, XData}
end, Rows)};
- Err ->
- ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -134,8 +130,7 @@ lookup_sessions(LUser, LServer) ->
PushLJID = jid:tolower(jid:decode(Service)),
{NowTS, PushLJID,Node, XData}
end, Rows)};
- Err ->
- ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -153,8 +148,7 @@ lookup_sessions(LServer) ->
PushLJID = jid:tolower(jid:decode(Service)),
{NowTS, PushLJID, Node, XData}
end, Rows)};
- Err ->
- ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -166,8 +160,7 @@ delete_session(LUser, LServer, NowTS) ->
"username=%(LUser)s and %(LServer)H and timestamp=%(TS)d")) of
{updated, _} ->
ok;
- Err ->
- ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
@@ -179,8 +172,7 @@ delete_old_sessions(LServer, Time) ->
"and %(LServer)H")) of
{updated, _} ->
ok;
- Err ->
- ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]),
+ _Err ->
{error, db_failure}
end.
diff --git a/src/node_flat_sql.erl b/src/node_flat_sql.erl
index afbc050a8..2773114a2 100644
--- a/src/node_flat_sql.erl
+++ b/src/node_flat_sql.erl
@@ -591,6 +591,7 @@ get_states(Nidx) ->
fun({SJID, Aff, Subs}) ->
JID = decode_jid(SJID),
#pubsub_state{stateid = {JID, Nidx},
+ nodeidx = Nidx,
items = itemids(Nidx, JID),
affiliation = decode_affiliation(Aff),
subscriptions = decode_subscriptions(Subs)}
@@ -997,6 +998,7 @@ raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) ->
El -> [El]
end,
#pubsub_item{itemid = {ItemId, Nidx},
+ nodeidx = Nidx,
creation = {decode_now(Creation), jid:remove_resource(JID)},
modification = {decode_now(Modification), JID},
payload = Payload}.
diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl
index caad482c5..c28bad8e1 100644
--- a/src/xmpp_stream_in.erl
+++ b/src/xmpp_stream_in.erl
@@ -314,16 +314,10 @@ handle_info({'$gen_event', {xmlstreamstart, Name, Attrs}},
send_pkt(State1, Err)
end
end);
-handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) ->
- %% TODO: find and fix this in fast_xml
- error_logger:error_msg("unexpected event from receiver: ~p; "
- "xmlstreamstart was expected", [El]),
- State1 = send_header(State),
- noreply(
- case is_disconnected(State1) of
- true -> State1;
- false -> send_pkt(State1, xmpp:serr_invalid_xml())
- end);
+handle_info({'$gen_event', {xmlstreamend, _}}, State) ->
+ noreply(process_stream_end({stream, reset}, State));
+handle_info({'$gen_event', closed}, State) ->
+ noreply(process_stream_end({socket, closed}, State));
handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) ->
State1 = send_header(State),
noreply(
@@ -338,6 +332,16 @@ handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) ->
end,
send_pkt(State1, Err)
end);
+handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) ->
+ %% TODO: find and fix this in fast_xml
+ error_logger:error_msg("unexpected event from receiver: ~p; "
+ "xmlstreamstart was expected", [El]),
+ State1 = send_header(State),
+ noreply(
+ case is_disconnected(State1) of
+ true -> State1;
+ false -> send_pkt(State1, xmpp:serr_invalid_xml())
+ end);
handle_info({'$gen_event', {xmlstreamelement, El}},
#{xmlns := NS, mod := Mod} = State) ->
noreply(
@@ -364,10 +368,6 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}},
noreply(try Mod:handle_cdata(Data, State)
catch _:undef -> State
end);
-handle_info({'$gen_event', {xmlstreamend, _}}, State) ->
- noreply(process_stream_end({stream, reset}, State));
-handle_info({'$gen_event', closed}, State) ->
- noreply(process_stream_end({socket, closed}, State));
handle_info(timeout, #{mod := Mod} = State) ->
Disconnected = is_disconnected(State),
noreply(try Mod:handle_timeout(State)
@@ -565,8 +565,6 @@ process_element(Pkt, #{stream_state := StateName, lang := Lang} = State) ->
send_pkt(State, #sasl_failure{reason = 'aborted'});
#sasl_success{} ->
State;
- #compress{} when StateName == wait_for_sasl_response ->
- send_pkt(State, #compress_failure{reason = 'setup-failed'});
#compress{} ->
process_compress(Pkt, State);
#handshake{} when StateName == wait_for_handshake ->
@@ -694,7 +692,10 @@ process_stream_established(#{mod := Mod} = State) ->
end.
-spec process_compress(compress(), state()) -> state().
-process_compress(#compress{}, #{stream_compressed := true} = State) ->
+process_compress(#compress{},
+ #{stream_compressed := Compressed,
+ stream_authenticated := Authenticated} = State)
+ when Compressed or not Authenticated ->
send_pkt(State, #compress_failure{reason = 'setup-failed'});
process_compress(#compress{methods = HisMethods},
#{socket := Socket, sockmod := SockMod, mod := Mod} = State) ->
@@ -913,7 +914,8 @@ get_sasl_feature(_) ->
[].
-spec get_compress_feature(state()) -> [compression()].
-get_compress_feature(#{stream_compressed := false, mod := Mod} = State) ->
+get_compress_feature(#{stream_compressed := false, mod := Mod,
+ stream_authenticated := true} = State) ->
try Mod:compress_methods(State) of
[] -> [];
Ms -> [#compression{methods = Ms}]