diff options
-rwxr-xr-x | ejabberdctl.template | 2 | ||||
-rw-r--r-- | src/ejabberd_app.erl | 1 | ||||
-rw-r--r-- | src/ejabberd_auth_sql.erl | 11 | ||||
-rw-r--r-- | src/ejabberd_c2s.erl | 4 | ||||
-rw-r--r-- | src/ejabberd_logger.erl | 48 | ||||
-rw-r--r-- | src/ejabberd_oauth_sql.erl | 4 | ||||
-rw-r--r-- | src/ejabberd_router_sql.erl | 12 | ||||
-rw-r--r-- | src/ejabberd_s2s_in.erl | 4 | ||||
-rw-r--r-- | src/ejabberd_sm.erl | 7 | ||||
-rw-r--r-- | src/ejabberd_sm_sql.erl | 12 | ||||
-rw-r--r-- | src/ejabberd_sql.erl | 23 | ||||
-rw-r--r-- | src/ejabberd_sql_sup.erl | 2 | ||||
-rw-r--r-- | src/ejabberd_sup.erl | 8 | ||||
-rw-r--r-- | src/ejabberd_system_monitor.erl | 588 | ||||
-rw-r--r-- | src/mod_bosh_sql.erl | 9 | ||||
-rw-r--r-- | src/mod_caps.erl | 4 | ||||
-rw-r--r-- | src/mod_caps_sql.erl | 4 | ||||
-rw-r--r-- | src/mod_carboncopy_sql.erl | 9 | ||||
-rw-r--r-- | src/mod_last_sql.erl | 8 | ||||
-rw-r--r-- | src/mod_muc_admin.erl | 11 | ||||
-rw-r--r-- | src/mod_muc_room.erl | 3 | ||||
-rw-r--r-- | src/mod_muc_sql.erl | 26 | ||||
-rw-r--r-- | src/mod_proxy65_sql.erl | 3 | ||||
-rw-r--r-- | src/mod_pubsub.erl | 3 | ||||
-rw-r--r-- | src/mod_push_sql.erl | 24 | ||||
-rw-r--r-- | src/node_flat_sql.erl | 2 | ||||
-rw-r--r-- | src/xmpp_stream_in.erl | 38 |
27 files changed, 424 insertions, 446 deletions
diff --git a/ejabberdctl.template b/ejabberdctl.template index 836377662..571b90b66 100755 --- a/ejabberdctl.template +++ b/ejabberdctl.template @@ -110,7 +110,7 @@ export ERL_LIBS exec_cmd() { case $EXEC_CMD in - as_install_user) su -c '"$0" "$@"' "$INSTALLUSER" -- "$@" ;; + as_install_user) su -s /bin/sh -c '"$0" "$@"' "$INSTALLUSER" -- "$@" ;; as_current_user) "$@" ;; esac } diff --git a/src/ejabberd_app.erl b/src/ejabberd_app.erl index 3743a8f04..4e9819d64 100644 --- a/src/ejabberd_app.erl +++ b/src/ejabberd_app.erl @@ -50,6 +50,7 @@ start(normal, _Args) -> ejabberd_mnesia:start(), file_queue_init(), maybe_add_nameservers(), + ejabberd_system_monitor:start(), case ejabberd_sup:start_link() of {ok, SupPid} -> register_elixir_config_hooks(), diff --git a/src/ejabberd_auth_sql.erl b/src/ejabberd_auth_sql.erl index 3f328c4a1..af4ad2821 100644 --- a/src/ejabberd_auth_sql.erl +++ b/src/ejabberd_auth_sql.erl @@ -71,8 +71,7 @@ set_password(User, Server, Password) -> case ejabberd_sql:sql_transaction(Server, F) of {atomic, _} -> ok; - {aborted, Reason} -> - ?ERROR_MSG("failed to write to SQL table: ~p", [Reason]), + {aborted, _} -> {error, db_failure} end. @@ -115,9 +114,7 @@ get_password(User, Server) -> iterationcount = IterationCount}}; {selected, []} -> error; - Err -> - ?ERROR_MSG("Failed to read password for user ~s@~s: ~p", - [User, Server, Err]), + _ -> error end. @@ -125,9 +122,7 @@ remove_user(User, Server) -> case del_user(Server, User) of {updated, _} -> ok; - Err -> - ?ERROR_MSG("failed to delete user ~s@~s: ~p", - [User, Server, Err]), + _ -> {error, db_failure} end. diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 258b5f3f2..a49987feb 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -287,8 +287,8 @@ process_terminated(#{sid := SID, sockmod := SockMod, socket := Socket, State1; process_terminated(#{sockmod := SockMod, socket := Socket, stop_reason := {tls, _}} = State, Reason) -> - ?ERROR_MSG("(~s) Failed to secure c2s connection: ~s", - [SockMod:pp(Socket), format_reason(State, Reason)]), + ?WARNING_MSG("(~s) Failed to secure c2s connection: ~s", + [SockMod:pp(Socket), format_reason(State, Reason)]), State; process_terminated(State, _Reason) -> State. diff --git a/src/ejabberd_logger.erl b/src/ejabberd_logger.erl index eee9d3b83..1181d4bcb 100644 --- a/src/ejabberd_logger.erl +++ b/src/ejabberd_logger.erl @@ -27,7 +27,8 @@ -behaviour(ejabberd_config). %% API --export([start/0, reopen_log/0, rotate_log/0, get/0, set/1, get_log_path/0, opt_type/1]). +-export([start/0, restart/0, reopen_log/0, rotate_log/0, get/0, set/1, + get_log_path/0, opt_type/1]). -include("ejabberd.hrl"). @@ -102,6 +103,11 @@ get_string_env(Name, Default) -> %% @spec () -> ok start() -> + start(4). + +-spec start(loglevel()) -> ok. +start(Level) -> + LLevel = get_lager_loglevel(Level), StartedApps = application:which_applications(5000), case lists:keyfind(logger, 1, StartedApps) of %% Elixir logger is started. We assume everything is in place @@ -109,24 +115,24 @@ start() -> {logger, _, _} -> error_logger:info_msg("Ignoring ejabberd logger options, using Elixir Logger.", []), %% Do not start lager, we rely on Elixir Logger - do_start_for_logger(); + do_start_for_logger(LLevel); _ -> - do_start() + do_start(LLevel) end. -do_start_for_logger() -> +do_start_for_logger(Level) -> application:load(sasl), application:set_env(sasl, sasl_error_logger, false), application:load(lager), application:set_env(lager, error_logger_redirect, false), application:set_env(lager, error_logger_whitelist, ['Elixir.Logger.ErrorHandler']), application:set_env(lager, crash_log, false), - application:set_env(lager, handlers, [{elixir_logger_backend, [{level, info}]}]), + application:set_env(lager, handlers, [{elixir_logger_backend, [{level, Level}]}]), ejabberd:start_app(lager), ok. %% Start lager -do_start() -> +do_start(Level) -> application:load(sasl), application:set_env(sasl, sasl_error_logger, false), application:load(lager), @@ -141,8 +147,8 @@ do_start() -> application:set_env(lager, error_logger_hwm, LogRateLimit), application:set_env( lager, handlers, - [{lager_console_backend, info}, - {lager_file_backend, [{file, ConsoleLog}, {level, info}, {date, LogRotateDate}, + [{lager_console_backend, Level}, + {lager_file_backend, [{file, ConsoleLog}, {level, Level}, {date, LogRotateDate}, {count, LogRotateCount}, {size, LogRotateSize}]}, {lager_file_backend, [{file, ErrorLog}, {level, error}, {date, LogRotateDate}, {count, LogRotateCount}, {size, LogRotateSize}]}]), @@ -156,6 +162,11 @@ do_start() -> end, gen_event:which_handlers(lager_event)), ok. +restart() -> + Level = ejabberd_config:get_option(loglevel, 4), + application:stop(lager), + start(Level). + %% @spec () -> ok reopen_log() -> %% Lager detects external log rotation automatically. @@ -187,15 +198,7 @@ get() -> %% @spec (loglevel() | {loglevel(), list()}) -> {module, module()} set(LogLevel) when is_integer(LogLevel) -> - LagerLogLevel = case LogLevel of - 0 -> none; - 1 -> critical; - 2 -> error; - 3 -> warning; - 4 -> info; - 5 -> debug; - E -> throw({wrong_loglevel, E}) - end, + LagerLogLevel = get_lager_loglevel(LogLevel), case get_lager_loglevel() of LagerLogLevel -> ok; @@ -228,6 +231,17 @@ get_lager_loglevel() -> end, none, Handlers). +get_lager_loglevel(LogLevel) -> + case LogLevel of + 0 -> none; + 1 -> critical; + 2 -> error; + 3 -> warning; + 4 -> info; + 5 -> debug; + E -> erlang:error({wrong_loglevel, E}) + end. + get_lager_handlers() -> case catch gen_event:which_handlers(lager_event) of {'EXIT',noproc} -> diff --git a/src/ejabberd_oauth_sql.erl b/src/ejabberd_oauth_sql.erl index 14eaca6a8..61e9986a6 100644 --- a/src/ejabberd_oauth_sql.erl +++ b/src/ejabberd_oauth_sql.erl @@ -57,9 +57,7 @@ store(R) -> "expire=%(Expire)d"]) of ok -> ok; - Err -> - ?ERROR_MSG("Failed to write to SQL 'oauth_token' table: ~p", - [Err]), + _ -> {error, db_failure} end. diff --git a/src/ejabberd_router_sql.erl b/src/ejabberd_router_sql.erl index c0264da31..d0ab98a6a 100644 --- a/src/ejabberd_router_sql.erl +++ b/src/ejabberd_router_sql.erl @@ -61,8 +61,7 @@ register_route(Domain, ServerHost, LocalHint, _, Pid) -> "local_hint=%(LocalHintS)s"]) of ok -> ok; - Err -> - ?ERROR_MSG("failed to update 'route' table: ~p", [Err]), + _ -> {error, db_failure} end. @@ -75,8 +74,7 @@ unregister_route(Domain, _, Pid) -> "and pid=%(PidS)s and node=%(Node)s")) of {updated, _} -> ok; - Err -> - ?ERROR_MSG("failed to delete from 'route' table: ~p", [Err]), + _ -> {error, db_failure} end. @@ -90,8 +88,7 @@ find_routes(Domain) -> fun(Row) -> row_to_route(Domain, Row) end, Rows)}; - Err -> - ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]), + _ -> {error, db_failure} end. @@ -101,8 +98,7 @@ get_all_routes() -> ?SQL("select @(domain)s from route where domain <> server_host")) of {selected, Domains} -> {ok, [Domain || {Domain} <- Domains]}; - Err -> - ?ERROR_MSG("failed to select from 'route' table: ~p", [Err]), + _ -> {error, db_failure} end. diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index a949e83d6..ae81f739e 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -289,8 +289,8 @@ terminate(Reason, #{auth_domains := AuthDomains, sockmod := SockMod, socket := Socket} = State) -> case maps:get(stop_reason, State, undefined) of {tls, _} = Err -> - ?ERROR_MSG("(~s) Failed to secure inbound s2s connection: ~s", - [SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]); + ?WARNING_MSG("(~s) Failed to secure inbound s2s connection: ~s", + [SockMod:pp(Socket), xmpp_stream_in:format_error(Err)]); _ -> ok end, diff --git a/src/ejabberd_sm.erl b/src/ejabberd_sm.erl index 3df1d88e0..97e599253 100644 --- a/src/ejabberd_sm.erl +++ b/src/ejabberd_sm.erl @@ -314,8 +314,11 @@ get_session_sid(User, Server, Resource) -> LResource = jid:resourceprep(Resource), Mod = get_sm_backend(LServer), case online(get_sessions(Mod, LUser, LServer, LResource)) of - [#session{sid = SID}] -> SID; - _ -> none + [] -> + none; + Ss -> + #session{sid = SID} = lists:max(Ss), + SID end. -spec get_session_sids(binary(), binary()) -> [sid()]. diff --git a/src/ejabberd_sm_sql.erl b/src/ejabberd_sm_sql.erl index 55e21040b..2dd884cd2 100644 --- a/src/ejabberd_sm_sql.erl +++ b/src/ejabberd_sm_sql.erl @@ -80,8 +80,7 @@ set_session(#session{sid = {Now, Pid}, usr = {U, LServer, R}, "info=%(InfoS)s"]) of ok -> ok; - Err -> - ?ERROR_MSG("failed to update 'sm' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -93,8 +92,7 @@ delete_session(#session{usr = {_, LServer, _}, sid = {Now, Pid}}) -> ?SQL("delete from sm where usec=%(TS)d and pid=%(PidS)s")) of {updated, _} -> ok; - Err -> - ?ERROR_MSG("failed to delete from 'sm' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -117,8 +115,7 @@ get_sessions(LServer) -> catch _:{bad_node, _} -> [] end end, Rows); - Err -> - ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]), + _Err -> [] end. @@ -135,8 +132,7 @@ get_sessions(LUser, LServer) -> catch _:{bad_node, _} -> [] end end, Rows)}; - Err -> - ?ERROR_MSG("failed to select from 'sm' table: ~p", [Err]), + _Err -> {error, db_failure} end. diff --git a/src/ejabberd_sql.erl b/src/ejabberd_sql.erl index 8d30cf637..c4bf3a737 100644 --- a/src/ejabberd_sql.erl +++ b/src/ejabberd_sql.erl @@ -158,18 +158,24 @@ sql_call(Host, Msg) -> case ejabberd_sql_sup:get_random_pid(Host) of none -> {error, <<"Unknown Host">>}; Pid -> - p1_fsm:sync_send_event(Pid,{sql_cmd, Msg, - p1_time_compat:monotonic_time(milli_seconds)}, - query_timeout(Host)) + sync_send_event(Pid,{sql_cmd, Msg, + p1_time_compat:monotonic_time(milli_seconds)}, + query_timeout(Host)) end; _State -> nested_op(Msg) end. keep_alive(Host, PID) -> - p1_fsm:sync_send_event(PID, - {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, - p1_time_compat:monotonic_time(milli_seconds)}, - query_timeout(Host)). + sync_send_event(PID, + {sql_cmd, {sql_query, ?KEEPALIVE_QUERY}, + p1_time_compat:monotonic_time(milli_seconds)}, + query_timeout(Host)). + +sync_send_event(Pid, Msg, Timeout) -> + try p1_fsm:sync_send_event(Pid, Msg, Timeout) + catch _:{Reason, {p1_fsm, _, _}} -> + {error, Reason} + end. -spec sql_query_t(sql_query()) -> sql_query_result(). @@ -270,6 +276,7 @@ sqlite_file(Host) -> %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- init([Host, StartInterval]) -> + process_flag(trap_exit, true), case ejabberd_config:get_option({sql_keepalive_interval, Host}) of undefined -> ok; @@ -1077,6 +1084,8 @@ query_timeout(LServer) -> timer:seconds( ejabberd_config:get_option({sql_query_timeout, LServer}, 60)). +check_error({error, Why} = Err, _Query) when Why == killed -> + Err; check_error({error, Why} = Err, #sql_query{} = Query) -> ?ERROR_MSG("SQL query '~s' at ~p failed: ~p", [Query#sql_query.hash, Query#sql_query.loc, Why]), diff --git a/src/ejabberd_sql_sup.erl b/src/ejabberd_sql_sup.erl index f7793eb2c..759f8128e 100644 --- a/src/ejabberd_sql_sup.erl +++ b/src/ejabberd_sql_sup.erl @@ -86,7 +86,7 @@ init([Host]) -> get_pids(Host) -> Rs = mnesia:dirty_read(sql_pool, Host), - [R#sql_pool.pid || R <- Rs]. + [R#sql_pool.pid || R <- Rs, is_process_alive(R#sql_pool.pid)]. get_random_pid(Host) -> case get_pids(Host) of diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index dbbc4d5e4..3f9809a05 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -47,13 +47,6 @@ init([]) -> 5000, worker, [ejabberd_cluster]}, - SystemMonitor = - {ejabberd_system_monitor, - {ejabberd_system_monitor, start_link, []}, - permanent, - brutal_kill, - worker, - [ejabberd_system_monitor]}, S2S = {ejabberd_s2s, {ejabberd_s2s, start_link, []}, @@ -172,7 +165,6 @@ init([]) -> PKIX, ACME, Listener, - SystemMonitor, S2S, Captcha, S2SInSupervisor, diff --git a/src/ejabberd_system_monitor.erl b/src/ejabberd_system_monitor.erl index 773104f9e..7787b34ab 100644 --- a/src/ejabberd_system_monitor.erl +++ b/src/ejabberd_system_monitor.erl @@ -24,320 +24,316 @@ %%%------------------------------------------------------------------- -module(ejabberd_system_monitor). - +-behaviour(gen_event). -behaviour(ejabberd_config). -author('alexey@process-one.net'). +-author('ekhramtsov@process-one.net'). --behaviour(gen_server). - -%% API --export([start_link/0, process_command/1, register_hook/1, - unregister_hook/1, process_remote_command/1]). - --export([init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, opt_type/1]). - --include("ejabberd.hrl"). --include("logger.hrl"). - --include("xmpp.hrl"). - --record(state, {}). - -%%==================================================================== %% API -%%==================================================================== -%%-------------------------------------------------------------------- -%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} -%% Description: Starts the server -%%-------------------------------------------------------------------- -start_link() -> - LH = ejabberd_config:get_option(watchdog_large_heap, 1000000), - Opts = [{large_heap, LH}], - gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, - []). +-export([start/0, opt_type/1]). + +%% gen_event callbacks +-export([init/1, handle_event/2, handle_call/2, + handle_info/2, terminate/2, code_change/3]). + +%% We don't use ejabberd logger because lager can be overloaded +%% too and alarm_handler may get stuck. +%%-include("logger.hrl"). + +-define(CHECK_INTERVAL, timer:seconds(30)). +-define(DISK_FULL_THRES, 0.99). + +-record(state, {tref :: reference(), + mref :: reference()}). +-record(proc_stat, {qlen :: non_neg_integer(), + memory :: non_neg_integer(), + initial_call :: mfa(), + current_function :: mfa(), + ancestors :: [pid() | atom()], + application :: pid() | atom(), + name :: pid() | atom()}). +-type state() :: #state{}. +-type proc_stat() :: #proc_stat{}. + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec start() -> ok. +start() -> + gen_event:add_handler(alarm_handler, ?MODULE, []), + gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {?MODULE, []}), + application:load(os_mon), + application:set_env(os_mon, start_cpu_sup, false), + application:set_env(os_mon, start_os_sup, false), + application:set_env(os_mon, start_memsup, true), + application:set_env(os_mon, start_disksup, true), + application:set_env(os_mon, disk_almost_full_threshold, ?DISK_FULL_THRES), + ejabberd:start_app(os_mon). + +excluded_apps() -> + [os_mon, mnesia, sasl, stdlib, kernel]. + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== +init([]) -> + {ok, #state{}}. --spec process_command(stanza()) -> ok. -process_command(#message{from = From, to = To, body = Body}) -> - case To of - #jid{luser = <<"">>, lresource = <<"watchdog">>} -> - LFrom = jid:tolower(jid:remove_resource(From)), - case lists:member(LFrom, get_admin_jids()) of - true -> - BodyText = xmpp:get_text(Body), - spawn(fun () -> - process_flag(priority, high), - process_command1(From, To, BodyText) - end), - ok; - false -> ok - end; +handle_event({set_alarm, {system_memory_high_watermark, _}}, State) -> + error_logger:warning_msg( + "More than 80% of OS memory is allocated, " + "starting OOM watchdog", []), + handle_overload(State), + {ok, restart_timer(State)}; +handle_event({clear_alarm, system_memory_high_watermark}, State) -> + cancel_timer(State#state.tref), + error_logger:info_msg( + "Memory consumption is back to normal, " + "stopping OOM watchdog", []), + {ok, State#state{tref = undefined}}; +handle_event({set_alarm, {process_memory_high_watermark, Pid}}, State) -> + case proc_stat(Pid, get_app_pids()) of + #proc_stat{name = Name} = ProcStat -> + error_logger:warning_msg( + "Process ~p consumes more than 5% of OS memory (~s)", + [Name, format_proc(ProcStat)]), + handle_overload(State), + {ok, State}; _ -> - ok + {ok, State} end; -process_command(_) -> +handle_event({clear_alarm, process_memory_high_watermark}, State) -> + {ok, State}; +handle_event({set_alarm, {{disk_almost_full, MountPoint}, _}}, State) -> + error_logger:warning_msg("Disk is almost full on ~p", [MountPoint]), + {ok, State}; +handle_event({clear_alarm, {disk_almost_full, MountPoint}}, State) -> + error_logger:info_msg("Disk usage is back to normal on ~p", [MountPoint]), + {ok, State}; +handle_event(Event, State) -> + error_logger:warning_msg("unexpected event: ~p", [Event]), + {ok, State}. + +handle_call(_Request, State) -> + {ok, {error, badarg}, State}. + +handle_info({timeout, _TRef, handle_overload}, State) -> + handle_overload(State), + {ok, restart_timer(State)}; +handle_info(Info, State) -> + error_logger:warning_msg("unexpected info: ~p", [Info]), + {ok, State}. + +terminate(_Reason, _State) -> ok. -register_hook(Host) -> - ejabberd_hooks:add(local_send_to_resource_hook, Host, - ?MODULE, process_command, 50). - -unregister_hook(Host) -> - ejabberd_hooks:delete(local_send_to_resource_hook, Host, - ?MODULE, process_command, 50). - -%%==================================================================== -%% gen_server callbacks -%%==================================================================== - -%%-------------------------------------------------------------------- -%% Function: init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% Description: Initiates the server -%%-------------------------------------------------------------------- -init(Opts) -> - LH = proplists:get_value(large_heap, Opts), - process_flag(priority, high), - erlang:system_monitor(self(), [{large_heap, LH}]), - ejabberd_hooks:add(host_up, ?MODULE, register_hook, 50), - ejabberd_hooks:add(host_down, ?MODULE, unregister_hook, 60), - lists:foreach(fun register_hook/1, ?MYHOSTS), - {ok, #state{}}. - -%%-------------------------------------------------------------------- -%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% Description: Handling call messages -%%-------------------------------------------------------------------- -handle_call({get, large_heap}, _From, State) -> - {reply, get_large_heap(), State}; -handle_call({set, large_heap, NewValue}, _From, - State) -> - MonSettings = erlang:system_monitor(self(), - [{large_heap, NewValue}]), - OldLH = get_large_heap(MonSettings), - NewLH = get_large_heap(), - {reply, {lh_changed, OldLH, NewLH}, State}; -handle_call(_Request, _From, State) -> - Reply = ok, {reply, Reply, State}. - -get_large_heap() -> - MonSettings = erlang:system_monitor(), - get_large_heap(MonSettings). - -get_large_heap(MonSettings) -> - {_MonitorPid, Options} = MonSettings, - proplists:get_value(large_heap, Options). - -%%-------------------------------------------------------------------- -%% Function: handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling cast messages -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% Description: Handling all non call/cast messages -%%-------------------------------------------------------------------- -handle_info({monitor, Pid, large_heap, Info}, State) -> - spawn(fun () -> - process_flag(priority, high), - process_large_heap(Pid, Info) - end), - {noreply, State}; -handle_info(_Info, State) -> {noreply, State}. - -%%-------------------------------------------------------------------- -%% Function: terminate(Reason, State) -> void() -%% Description: This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any necessary -%% cleaning up. When it returns, the gen_server terminates with Reason. -%% The return value is ignored. -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. -%%-------------------------------------------------------------------- -%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} -%% Description: Convert process state when code is changed -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%-------------------------------------------------------------------- +%%%=================================================================== %%% Internal functions -%%-------------------------------------------------------------------- - -process_large_heap(Pid, Info) -> - Host = (?MYNAME), - JIDs = get_admin_jids(), - DetailedInfo = detailed_info(Pid), - Body = str:format("(~w) The process ~w is consuming too " - "much memory:~n~p~n~s", - [node(), Pid, Info, DetailedInfo]), - From = jid:make(<<"">>, Host, <<"watchdog">>), - Hint = [#hint{type = 'no-permanent-store'}], - lists:foreach( - fun(JID) -> - send_message(From, jid:make(JID), Body, Hint) - end, JIDs). - -send_message(From, To, Body) -> - send_message(From, To, Body, []). - -send_message(From, To, Body, ExtraEls) -> - ejabberd_router:route(#message{type = chat, - from = From, - to = To, - body = xmpp:mk_text(Body), - sub_els = ExtraEls}). - -get_admin_jids() -> - ejabberd_config:get_option(watchdog_admins, []). - -detailed_info(Pid) -> - case process_info(Pid, dictionary) of - {dictionary, Dict} -> - case lists:keysearch('$ancestors', 1, Dict) of - {value, {'$ancestors', [Sup | _]}} -> - case Sup of - ejabberd_c2s_sup -> c2s_info(Pid); - ejabberd_s2s_out_sup -> s2s_out_info(Pid); - ejabberd_service_sup -> service_info(Pid); - _ -> detailed_info1(Pid) - end; - _ -> detailed_info1(Pid) - end; - _ -> detailed_info1(Pid) +%%%=================================================================== +-spec handle_overload(state()) -> ok. +handle_overload(State) -> + handle_overload(State, processes()). + +-spec handle_overload(state(), [pid()]) -> ok. +handle_overload(_State, Procs) -> + AppPids = get_app_pids(), + {TotalMsgs, ProcsNum, Apps, Stats} = overloaded_procs(AppPids, Procs), + if TotalMsgs >= 10000 -> + SortedStats = lists:reverse(lists:keysort(#proc_stat.qlen, Stats)), + error_logger:warning_msg( + "The system is overloaded with ~b messages " + "queued by ~b process(es) (~b%) " + "from the following applications: ~s; " + "the top processes are:~n~s", + [TotalMsgs, ProcsNum, + round(ProcsNum*100/length(Procs)), + format_apps(Apps), + format_top_procs(SortedStats)]), + kill(SortedStats, round(TotalMsgs/ProcsNum)); + true -> + ok + end, + lists:foreach(fun erlang:garbage_collect/1, Procs). + +-spec get_app_pids() -> map(). +get_app_pids() -> + try application:info() of + Info -> + case lists:keyfind(running, 1, Info) of + {_, Apps} -> + lists:foldl( + fun({Name, Pid}, M) when is_pid(Pid) -> + maps:put(Pid, Name, M); + (_, M) -> + M + end, #{}, Apps); + false -> + #{} + end + catch _:_ -> + #{} end. -detailed_info1(Pid) -> - io_lib:format("~p", - [[process_info(Pid, current_function), - process_info(Pid, initial_call), - process_info(Pid, message_queue_len), - process_info(Pid, links), process_info(Pid, dictionary), - process_info(Pid, heap_size), - process_info(Pid, stack_size)]]). - -c2s_info(Pid) -> - [<<"Process type: c2s">>, check_send_queue(Pid), - <<"\n">>, - io_lib:format("Command to kill this process: kill ~s ~w", - [iolist_to_binary(atom_to_list(node())), Pid])]. - -s2s_out_info(Pid) -> - FromTo = mnesia:dirty_select(s2s, - [{{s2s, '$1', Pid, '_'}, [], ['$1']}]), - [<<"Process type: s2s_out">>, - case FromTo of - [{From, To}] -> - <<"\n", - (io_lib:format("S2S connection: from ~s to ~s", - [From, To]))/binary>>; - _ -> <<"">> - end, - check_send_queue(Pid), <<"\n">>, - io_lib:format("Command to kill this process: kill ~s ~w", - [iolist_to_binary(atom_to_list(node())), Pid])]. - -service_info(Pid) -> - Routes = mnesia:dirty_select(route, - [{{route, '$1', Pid, '_'}, [], ['$1']}]), - [<<"Process type: s2s_out">>, - case Routes of - [Route] -> <<"\nServiced domain: ", Route/binary>>; - _ -> <<"">> - end, - check_send_queue(Pid), <<"\n">>, - io_lib:format("Command to kill this process: kill ~s ~w", - [iolist_to_binary(atom_to_list(node())), Pid])]. - -check_send_queue(Pid) -> - case {process_info(Pid, current_function), - process_info(Pid, message_queue_len)} - of - {{current_function, MFA}, {message_queue_len, MLen}} -> - if MLen > 100 -> - case MFA of - {prim_inet, send, 2} -> - <<"\nPossible reason: the process is blocked " - "trying to send data over its TCP connection.">>; - {M, F, A} -> - [<<"\nPossible reason: the process can't " - "process messages faster than they arrive. ">>, - io_lib:format("Current function is ~w:~w/~w", - [M, F, A])] - end; - true -> <<"">> - end; - _ -> <<"">> +-spec overloaded_procs(map(), [pid()]) + -> {non_neg_integer(), non_neg_integer(), dict:dict(), [proc_stat()]}. +overloaded_procs(AppPids, AllProcs) -> + lists:foldl( + fun(Pid, {TotalMsgs, ProcsNum, Apps, Stats}) -> + case proc_stat(Pid, AppPids) of + #proc_stat{qlen = QLen, application = App} = Stat + when QLen > 0 -> + {TotalMsgs + QLen, ProcsNum + 1, + dict:update_counter(App, QLen, Apps), + [Stat|Stats]}; + _ -> + {TotalMsgs, ProcsNum, Apps, Stats} + end + end, {0, 0, dict:new(), []}, AllProcs). + +-spec proc_stat(pid(), map()) -> proc_stat() | undefined. +proc_stat(Pid, AppPids) -> + case process_info(Pid, [message_queue_len, + memory, + initial_call, + current_function, + dictionary, + group_leader, + registered_name]) of + [{_, MsgLen}, {_, Mem}, {_, InitCall}, + {_, CurrFun}, {_, Dict}, {_, GL}, {_, Name}] -> + IntLen = proplists:get_value('$internal_queue_len', Dict, 0), + TrueInitCall = proplists:get_value('$initial_call', Dict, InitCall), + Ancestors = proplists:get_value('$ancestors', Dict, []), + Len = IntLen + MsgLen, + App = maps:get(GL, AppPids, kernel), + RegName = case Name of + [] -> Pid; + _ -> Name + end, + #proc_stat{qlen = Len, + memory = Mem, + initial_call = TrueInitCall, + current_function = CurrFun, + ancestors = Ancestors, + application = App, + name = RegName}; + _ -> + undefined end. -process_command1(From, To, Body) -> - process_command2(str:tokens(Body, <<" ">>), From, To). - -process_command2([<<"kill">>, SNode, SPid], From, To) -> - Node = misc:binary_to_atom(SNode), - remote_command(Node, [kill, SPid], From, To); -process_command2([<<"showlh">>, SNode], From, To) -> - Node = misc:binary_to_atom(SNode), - remote_command(Node, [showlh], From, To); -process_command2([<<"setlh">>, SNode, NewValueString], - From, To) -> - Node = misc:binary_to_atom(SNode), - NewValue = binary_to_integer(NewValueString), - remote_command(Node, [setlh, NewValue], From, To); -process_command2([<<"help">>], From, To) -> - send_message(To, From, help()); -process_command2(_, From, To) -> - send_message(To, From, help()). +-spec restart_timer(#state{}) -> #state{}. +restart_timer(State) -> + cancel_timer(State#state.tref), + TRef = erlang:start_timer(?CHECK_INTERVAL, self(), handle_overload), + State#state{tref = TRef}. + +-spec cancel_timer(reference()) -> ok. +cancel_timer(undefined) -> + ok; +cancel_timer(TRef) -> + case erlang:cancel_timer(TRef) of + false -> + receive {timeout, TRef, _} -> ok + after 0 -> ok + end; + _ -> + ok + end. -help() -> - <<"Commands:\n kill <node> <pid>\n showlh " - "<node>\n setlh <node> <integer>">>. +-spec format_apps(dict:dict()) -> io:data(). +format_apps(Apps) -> + AppList = lists:reverse(lists:keysort(2, dict:to_list(Apps))), + string:join( + [io_lib:format("~p (~b msgs)", [App, Msgs]) || {App, Msgs} <- AppList], + ", "). + +-spec format_top_procs([proc_stat()]) -> io:data(). +format_top_procs(Stats) -> + Stats1 = lists:sublist(Stats, 5), + string:join( + lists:map( + fun(#proc_stat{name = Name} = Stat) -> + [io_lib:format("** ~w: ", [Name]), format_proc(Stat)] + end,Stats1), + io_lib:nl()). + +-spec format_proc(proc_stat()) -> io:data(). +format_proc(#proc_stat{qlen = Len, memory = Mem, initial_call = InitCall, + current_function = CurrFun, ancestors = Ancs, + application = App}) -> + io_lib:format( + "msgs = ~b, memory = ~b, initial_call = ~s, " + "current_function = ~s, ancestors = ~w, application = ~w", + [Len, Mem, format_mfa(InitCall), format_mfa(CurrFun), Ancs, App]). + +-spec format_mfa(mfa()) -> io:data(). +format_mfa({M, F, A}) when is_atom(M), is_atom(F), is_integer(A) -> + io_lib:format("~s:~s/~b", [M, F, A]); +format_mfa(WTF) -> + io_lib:format("~w", [WTF]). + +-spec kill([proc_stat()], non_neg_integer()) -> ok. +kill(Stats, Threshold) -> + case ejabberd_config:get_option(oom_killer, true) of + true -> + do_kill(Stats, Threshold); + false -> + ok + end. -remote_command(Node, Args, From, To) -> - Message = case ejabberd_cluster:call(Node, ?MODULE, - process_remote_command, [Args]) - of - {badrpc, Reason} -> - io_lib:format("Command failed:~n~p", [Reason]); - Result -> Result - end, - send_message(To, From, iolist_to_binary(Message)). +-spec do_kill([proc_stat()], non_neg_integer()) -> ok. +do_kill(Stats, Threshold) -> + Killed = lists:filtermap( + fun(#proc_stat{qlen = Len, name = Name, application = App}) + when Len >= Threshold -> + case lists:member(App, excluded_apps()) of + true -> + error_logger:warning_msg( + "Unable to kill process ~p from whitelisted " + "application ~p", [Name, App]), + false; + false -> + case kill_proc(Name) of + false -> + false; + Pid -> + maybe_restart_app(App), + {true, Pid} + end + end; + (_) -> + false + end, Stats), + TotalKilled = length(Killed), + if TotalKilled > 0 -> + error_logger:error_msg( + "Killed ~b process(es) consuming more than ~b message(s) each", + [TotalKilled, Threshold]); + true -> + ok + end. -process_remote_command([kill, SPid]) -> - exit(list_to_pid(SPid), kill), <<"ok">>; -process_remote_command([showlh]) -> - Res = gen_server:call(ejabberd_system_monitor, - {get, large_heap}), - io_lib:format("Current large heap: ~p", [Res]); -process_remote_command([setlh, NewValue]) -> - {lh_changed, OldLH, NewLH} = - gen_server:call(ejabberd_system_monitor, - {set, large_heap, NewValue}), - io_lib:format("Result of set large heap: ~p --> ~p", - [OldLH, NewLH]); -process_remote_command(_) -> throw(unknown_command). +-spec kill_proc(pid() | atom()) -> false | pid(). +kill_proc(undefined) -> + false; +kill_proc(Name) when is_atom(Name) -> + kill_proc(whereis(Name)); +kill_proc(Pid) -> + exit(Pid, kill), + Pid. + +-spec maybe_restart_app(atom()) -> any(). +maybe_restart_app(lager) -> + ejabberd_logger:restart(); +maybe_restart_app(_) -> + ok. --spec opt_type(watchdog_admins) -> fun(([binary()]) -> ljid()); - (watchdog_large_heap) -> fun((pos_integer()) -> pos_integer()); +-spec opt_type(oom_killer) -> fun((boolean()) -> boolean()); (atom()) -> [atom()]. -opt_type(watchdog_admins) -> - fun (JIDs) -> - [jid:tolower(jid:decode(iolist_to_binary(S))) - || S <- JIDs] - end; -opt_type(watchdog_large_heap) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(_) -> [watchdog_admins, watchdog_large_heap]. +opt_type(oom_killer) -> + fun(B) when is_boolean(B) -> B end; +opt_type(_) -> [oom_killer]. diff --git a/src/mod_bosh_sql.erl b/src/mod_bosh_sql.erl index 621e9d317..d732521a6 100644 --- a/src/mod_bosh_sql.erl +++ b/src/mod_bosh_sql.erl @@ -59,8 +59,7 @@ open_session(SID, Pid) -> "pid=%(PidS)s"]) of ok -> ok; - Err -> - ?ERROR_MSG("failed to update 'bosh' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -69,8 +68,7 @@ close_session(SID) -> ?MYNAME, ?SQL("delete from bosh where sid=%(SID)s")) of {updated, _} -> ok; - Err -> - ?ERROR_MSG("failed to delete from 'bosh' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -84,8 +82,7 @@ find_session(SID) -> end; {selected, []} -> {error, notfound}; - Err -> - ?ERROR_MSG("failed to select 'bosh' table: ~p", [Err]), + _Err -> {error, db_failure} end. diff --git a/src/mod_caps.erl b/src/mod_caps.erl index edc93bbf1..6021da252 100644 --- a/src/mod_caps.erl +++ b/src/mod_caps.erl @@ -206,8 +206,10 @@ c2s_presence_in(C2SState, {Subscription, _} = ejabberd_hooks:run_fold( roster_get_jid_info, To#jid.lserver, {none, []}, [To#jid.luser, To#jid.lserver, From]), + ToSelf = (From#jid.luser == To#jid.luser) + and (From#jid.lserver == To#jid.lserver), Insert = (Type == available) - and ((Subscription == both) or (Subscription == to)), + and ((Subscription == both) or (Subscription == to) or ToSelf), Delete = (Type == unavailable) or (Type == error), if Insert or Delete -> LFrom = jid:tolower(From), diff --git a/src/mod_caps_sql.erl b/src/mod_caps_sql.erl index 5d4c1e934..3fa18f16b 100644 --- a/src/mod_caps_sql.erl +++ b/src/mod_caps_sql.erl @@ -63,9 +63,7 @@ caps_write(LServer, NodePair, Features) -> sql_write_features_t(NodePair, Features)) of {atomic, _} -> ok; - {aborted, Reason} -> - ?ERROR_MSG("Failed to write to SQL 'caps_features' table: ~p", - [Reason]), + {aborted, _Reason} -> {error, db_failure} end. diff --git a/src/mod_carboncopy_sql.erl b/src/mod_carboncopy_sql.erl index 1b8e1e111..cda8c60d0 100644 --- a/src/mod_carboncopy_sql.erl +++ b/src/mod_carboncopy_sql.erl @@ -48,8 +48,7 @@ enable(LUser, LServer, LResource, NS) -> "node=%(NodeS)s"]) of ok -> ok; - Err -> - ?ERROR_MSG("failed to update 'carboncopy' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -60,8 +59,7 @@ disable(LUser, LServer, LResource) -> "and %(LServer)H and resource=%(LResource)s")) of {updated, _} -> ok; - Err -> - ?ERROR_MSG("failed to delete from 'carboncopy' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -73,8 +71,7 @@ list(LUser, LServer) -> {selected, Rows} -> {ok, [{Resource, NS, binary_to_atom(Node, latin1)} || {Resource, NS, Node} <- Rows]}; - Err -> - ?ERROR_MSG("failed to select from 'carboncopy' table: ~p", [Err]), + _Err -> {error, db_failure} end. diff --git a/src/mod_last_sql.erl b/src/mod_last_sql.erl index f0889e4ec..0c86c72c4 100644 --- a/src/mod_last_sql.erl +++ b/src/mod_last_sql.erl @@ -51,9 +51,7 @@ get_last(LUser, LServer) -> error; {selected, [{TimeStamp, Status}]} -> {ok, {TimeStamp, Status}}; - Reason -> - ?ERROR_MSG("failed to get last for user ~s@~s: ~p", - [LUser, LServer, Reason]), + _Reason -> {error, db_failure} end. @@ -65,9 +63,7 @@ store_last_info(LUser, LServer, TimeStamp, Status) -> "state=%(Status)s"]) of ok -> ok; - Err -> - ?ERROR_MSG("failed to store last activity for ~s@~s: ~p", - [LUser, LServer, Err]), + _Err -> {error, db_failure} end. diff --git a/src/mod_muc_admin.erl b/src/mod_muc_admin.erl index 440deee02..39abb8334 100644 --- a/src/mod_muc_admin.erl +++ b/src/mod_muc_admin.erl @@ -866,9 +866,9 @@ get_room_occupants_number(Room, Host) -> send_direct_invitation(RoomName, RoomService, Password, Reason, UsersString) -> RoomJid = jid:make(RoomName, RoomService), XmlEl = build_invitation(Password, Reason, RoomJid), - UsersStrings = get_users_to_invite(RoomJid, UsersString), - [send_direct_invitation(RoomJid, UserStrings, XmlEl) - || UserStrings <- UsersStrings], + Users = get_users_to_invite(RoomJid, UsersString), + [send_direct_invitation(RoomJid, UserJid, XmlEl) + || UserJid <- Users], timer:sleep(1000), ok. @@ -886,8 +886,9 @@ get_users_to_invite(RoomJid, UsersString) -> orelse UserJid#jid.lserver /= OccupantJid#jid.lserver end, OccupantsJids), - case Val of - true -> {true, UserJid}; + case {UserJid#jid.luser, Val} of + {<<>>, _} -> false; + {_, true} -> {true, UserJid}; _ -> false end end, diff --git a/src/mod_muc_room.erl b/src/mod_muc_room.erl index e8d61dacb..fadfff6c5 100644 --- a/src/mod_muc_room.erl +++ b/src/mod_muc_room.erl @@ -331,7 +331,8 @@ normal_state({route, <<"">>, catch _:{xmpp_codec, Why} -> ErrTxt = xmpp:io_format_error(Why), Err = xmpp:err_bad_request(ErrTxt, Lang), - ejabberd_router:route_error(IQ0, Err) + ejabberd_router:route_error(IQ0, Err), + {next_state, normal_state, StateData} end; normal_state({route, <<"">>, #iq{} = IQ}, StateData) -> Err = xmpp:err_bad_request(), diff --git a/src/mod_muc_sql.erl b/src/mod_muc_sql.erl index 8aa6071c8..0cb09775b 100644 --- a/src/mod_muc_sql.erl +++ b/src/mod_muc_sql.erl @@ -187,12 +187,10 @@ get_rooms(LServer, Host) -> #muc_room{name_host = {Room, Host}, opts = mod_muc:opts_to_binary(OptsD2)} end, RoomOpts); - Err -> - ?ERROR_MSG("failed to get rooms subscribers: ~p", [Err]), + _Err -> [] end; - Err -> - ?ERROR_MSG("failed to get rooms: ~p", [Err]), + _Err -> [] end. @@ -266,7 +264,6 @@ register_online_room(ServerHost, Room, Host, Pid) -> ok -> ok; Err -> - ?ERROR_MSG("failed to update 'muc_online_room': ~p", [Err]), Err end. @@ -290,8 +287,7 @@ find_online_room(ServerHost, Room, Host) -> end; {selected, []} -> error; - Err -> - ?ERROR_MSG("failed to select 'muc_online_room': ~p", [Err]), + _Err -> error end. @@ -302,8 +298,7 @@ count_online_rooms(ServerHost, Host) -> "where host=%(Host)s")) of {selected, [{Num}]} -> Num; - Err -> - ?ERROR_MSG("failed to select 'muc_online_room': ~p", [Err]), + _Err -> 0 end. @@ -319,8 +314,7 @@ get_online_rooms(ServerHost, Host, _RSM) -> catch _:{bad_node, _} -> [] end end, Rows); - Err -> - ?ERROR_MSG("failed to select 'muc_online_room': ~p", [Err]), + _Err -> [] end. @@ -340,7 +334,6 @@ register_online_user(ServerHost, {U, S, R}, Room, Host) -> ok -> ok; Err -> - ?ERROR_MSG("failed to update 'muc_online_users': ~p", [Err]), Err end. @@ -359,8 +352,7 @@ count_online_rooms_by_user(ServerHost, U, S) -> "username=%(U)s and server=%(S)s")) of {selected, [{Num}]} -> Num; - Err -> - ?ERROR_MSG("failed to select 'muc_online_users': ~p", [Err]), + _Err -> 0 end. @@ -371,8 +363,7 @@ get_online_rooms_by_user(ServerHost, U, S) -> "username=%(U)s and server=%(S)s")) of {selected, Rows} -> Rows; - Err -> - ?ERROR_MSG("failed to select 'muc_online_users': ~p", [Err]), + _Err -> [] end. @@ -424,8 +415,7 @@ get_subscribed_rooms(LServer, Host, Jid) -> " and host=%(Host)s")) of {selected, Subs} -> [jid:make(Room, Host, <<>>) || {Room} <- Subs]; - Error -> - ?ERROR_MSG("Error when fetching subscribed rooms ~p", [Error]), + _Error -> [] end. diff --git a/src/mod_proxy65_sql.erl b/src/mod_proxy65_sql.erl index 18f8db147..715289db6 100644 --- a/src/mod_proxy65_sql.erl +++ b/src/mod_proxy65_sql.erl @@ -69,7 +69,6 @@ register_stream(SID, Pid) -> {atomic, _} -> ok; {aborted, Reason} -> - ?ERROR_MSG("failed to register stream: ~p", [Reason]), {error, Reason} end. @@ -82,7 +81,6 @@ unregister_stream(SID) -> {atomic, _} -> ok; {aborted, Reason} -> - ?ERROR_MSG("failed to unregister stream: ~p", [Reason]), {error, Reason} end. @@ -133,7 +131,6 @@ activate_stream(SID, IJID, MaxConnections, _Node) -> {aborted, {limit, _, _} = Limit} -> {error, Limit}; {aborted, Reason} -> - ?ERROR_MSG("failed to activate bytestream: ~p", [Reason]), {error, Reason} end. diff --git a/src/mod_pubsub.erl b/src/mod_pubsub.erl index efa22305c..bedbd3495 100644 --- a/src/mod_pubsub.erl +++ b/src/mod_pubsub.erl @@ -550,6 +550,9 @@ disco_items(Host, Node, From) -> %% -spec caps_add(jid(), jid(), [binary()]) -> ok. +caps_add(JID, JID, _Features) -> + %% Send the owner his last PEP items. + send_last_pep(JID, JID); caps_add(#jid{lserver = S1} = From, #jid{lserver = S2} = To, _Features) when S1 =/= S2 -> %% When a remote contact goes online while the local user is offline, the diff --git a/src/mod_push_sql.erl b/src/mod_push_sql.erl index c82d9fc02..2a1d588e9 100644 --- a/src/mod_push_sql.erl +++ b/src/mod_push_sql.erl @@ -57,8 +57,7 @@ store_session(LUser, LServer, NowTS, PushJID, Node, XData) -> "xml=%(XML)s"]) of ok -> {ok, {NowTS, PushLJID, Node, XData}}; - Err -> - ?ERROR_MSG("Failed to update 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -77,8 +76,7 @@ lookup_session(LUser, LServer, PushJID, Node) -> {ok, {NowTS, PushLJID, Node, XData}}; {selected, []} -> {error, notfound}; - Err -> - ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -95,8 +93,7 @@ lookup_session(LUser, LServer, NowTS) -> {ok, {NowTS, PushLJID, Node, XData}}; {selected, []} -> {error, notfound}; - Err -> - ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -115,8 +112,7 @@ lookup_sessions(LUser, LServer, PushJID) -> XData = decode_xdata(XML, LUser, LServer), {NowTS, PushLJID, Node, XData} end, Rows)}; - Err -> - ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -134,8 +130,7 @@ lookup_sessions(LUser, LServer) -> PushLJID = jid:tolower(jid:decode(Service)), {NowTS, PushLJID,Node, XData} end, Rows)}; - Err -> - ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -153,8 +148,7 @@ lookup_sessions(LServer) -> PushLJID = jid:tolower(jid:decode(Service)), {NowTS, PushLJID, Node, XData} end, Rows)}; - Err -> - ?ERROR_MSG("Failed to select from 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -166,8 +160,7 @@ delete_session(LUser, LServer, NowTS) -> "username=%(LUser)s and %(LServer)H and timestamp=%(TS)d")) of {updated, _} -> ok; - Err -> - ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. @@ -179,8 +172,7 @@ delete_old_sessions(LServer, Time) -> "and %(LServer)H")) of {updated, _} -> ok; - Err -> - ?ERROR_MSG("failed to delete from 'push_session' table: ~p", [Err]), + _Err -> {error, db_failure} end. diff --git a/src/node_flat_sql.erl b/src/node_flat_sql.erl index afbc050a8..2773114a2 100644 --- a/src/node_flat_sql.erl +++ b/src/node_flat_sql.erl @@ -591,6 +591,7 @@ get_states(Nidx) -> fun({SJID, Aff, Subs}) -> JID = decode_jid(SJID), #pubsub_state{stateid = {JID, Nidx}, + nodeidx = Nidx, items = itemids(Nidx, JID), affiliation = decode_affiliation(Aff), subscriptions = decode_subscriptions(Subs)} @@ -997,6 +998,7 @@ raw_to_item(Nidx, {ItemId, SJID, Creation, Modification, XML}) -> El -> [El] end, #pubsub_item{itemid = {ItemId, Nidx}, + nodeidx = Nidx, creation = {decode_now(Creation), jid:remove_resource(JID)}, modification = {decode_now(Modification), JID}, payload = Payload}. diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl index caad482c5..c28bad8e1 100644 --- a/src/xmpp_stream_in.erl +++ b/src/xmpp_stream_in.erl @@ -314,16 +314,10 @@ handle_info({'$gen_event', {xmlstreamstart, Name, Attrs}}, send_pkt(State1, Err) end end); -handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) -> - %% TODO: find and fix this in fast_xml - error_logger:error_msg("unexpected event from receiver: ~p; " - "xmlstreamstart was expected", [El]), - State1 = send_header(State), - noreply( - case is_disconnected(State1) of - true -> State1; - false -> send_pkt(State1, xmpp:serr_invalid_xml()) - end); +handle_info({'$gen_event', {xmlstreamend, _}}, State) -> + noreply(process_stream_end({stream, reset}, State)); +handle_info({'$gen_event', closed}, State) -> + noreply(process_stream_end({socket, closed}, State)); handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) -> State1 = send_header(State), noreply( @@ -338,6 +332,16 @@ handle_info({'$gen_event', {xmlstreamerror, Reason}}, #{lang := Lang}= State) -> end, send_pkt(State1, Err) end); +handle_info({'$gen_event', El}, #{stream_state := wait_for_stream} = State) -> + %% TODO: find and fix this in fast_xml + error_logger:error_msg("unexpected event from receiver: ~p; " + "xmlstreamstart was expected", [El]), + State1 = send_header(State), + noreply( + case is_disconnected(State1) of + true -> State1; + false -> send_pkt(State1, xmpp:serr_invalid_xml()) + end); handle_info({'$gen_event', {xmlstreamelement, El}}, #{xmlns := NS, mod := Mod} = State) -> noreply( @@ -364,10 +368,6 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}}, noreply(try Mod:handle_cdata(Data, State) catch _:undef -> State end); -handle_info({'$gen_event', {xmlstreamend, _}}, State) -> - noreply(process_stream_end({stream, reset}, State)); -handle_info({'$gen_event', closed}, State) -> - noreply(process_stream_end({socket, closed}, State)); handle_info(timeout, #{mod := Mod} = State) -> Disconnected = is_disconnected(State), noreply(try Mod:handle_timeout(State) @@ -565,8 +565,6 @@ process_element(Pkt, #{stream_state := StateName, lang := Lang} = State) -> send_pkt(State, #sasl_failure{reason = 'aborted'}); #sasl_success{} -> State; - #compress{} when StateName == wait_for_sasl_response -> - send_pkt(State, #compress_failure{reason = 'setup-failed'}); #compress{} -> process_compress(Pkt, State); #handshake{} when StateName == wait_for_handshake -> @@ -694,7 +692,10 @@ process_stream_established(#{mod := Mod} = State) -> end. -spec process_compress(compress(), state()) -> state(). -process_compress(#compress{}, #{stream_compressed := true} = State) -> +process_compress(#compress{}, + #{stream_compressed := Compressed, + stream_authenticated := Authenticated} = State) + when Compressed or not Authenticated -> send_pkt(State, #compress_failure{reason = 'setup-failed'}); process_compress(#compress{methods = HisMethods}, #{socket := Socket, sockmod := SockMod, mod := Mod} = State) -> @@ -913,7 +914,8 @@ get_sasl_feature(_) -> []. -spec get_compress_feature(state()) -> [compression()]. -get_compress_feature(#{stream_compressed := false, mod := Mod} = State) -> +get_compress_feature(#{stream_compressed := false, mod := Mod, + stream_authenticated := true} = State) -> try Mod:compress_methods(State) of [] -> []; Ms -> [#compression{methods = Ms}] |