aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_listener.erl
diff options
context:
space:
mode:
authorEvgeny Khramtsov <ekhramtsov@process-one.net>2018-09-18 12:53:36 +0300
committerEvgeny Khramtsov <ekhramtsov@process-one.net>2018-09-18 12:53:36 +0300
commit03de853e4fdcf852ae75a86922c08bb1a0950e6d (patch)
treec65a36565472dc5aeafff2f155a0b5ce80e4d62d /src/ejabberd_listener.erl
parentAdd ability to configure test to use new sql schema (diff)
Refactor ejabberd_listener
Diffstat (limited to 'src/ejabberd_listener.erl')
-rw-r--r--src/ejabberd_listener.erl672
1 files changed, 381 insertions, 291 deletions
diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl
index 3c5192c4a..7a85ac444 100644
--- a/src/ejabberd_listener.erl
+++ b/src/ejabberd_listener.erl
@@ -24,26 +24,36 @@
%%%----------------------------------------------------------------------
-module(ejabberd_listener).
-
+-behaviour(supervisor).
-behaviour(ejabberd_config).
-author('alexey@process-one.net').
+-author('ekhramtsov@process-one.net').
-export([start_link/0, init/1, start/3, init/3,
start_listeners/0, start_listener/3, stop_listeners/0,
- stop_listener/2, parse_listener_portip/2,
- add_listener/3, delete_listener/2, transform_options/1,
- validate_cfg/1, opt_type/1, config_reloaded/0]).
+ stop_listener/2, add_listener/3, delete_listener/2,
+ transform_options/1, validate_cfg/1, opt_type/1,
+ config_reloaded/0]).
+%% Legacy API
+-export([parse_listener_portip/2]).
-include("logger.hrl").
--callback start({gen_tcp, inet:socket()}, [proplists:property()]) ->
+-type transport() :: tcp | udp.
+-type endpoint() :: {inet:port_number(), inet:ip_address(), transport()}.
+-type listen_opts() :: [proplists:property()].
+-type listener() :: {endpoint(), module(), listen_opts()}.
+
+-callback start({gen_tcp, inet:socket()}, listen_opts()) ->
{ok, pid()} | {error, any()} | ignore.
--callback start_link({gen_tcp, inet:socket()}, [proplists:property()]) ->
+-callback start_link({gen_tcp, inet:socket()}, listen_opts()) ->
{ok, pid()} | {error, any()} | ignore.
-callback accept(pid()) -> any().
--callback listen_opt_type(atom()) -> fun((atom()) -> term()) | [atom()].
+-callback listen_opt_type(atom()) -> fun((term()) -> term()).
+-callback listen_options() -> listen_opts().
+
+-optional_callbacks([listen_opt_type/1]).
-%% We do not block on send anymore.
-define(TCP_SEND_TIMEOUT, 15000).
start_link() ->
@@ -52,68 +62,54 @@ start_link() ->
init(_) ->
ets:new(?MODULE, [named_table, public]),
ejabberd_hooks:add(config_reloaded, ?MODULE, config_reloaded, 50),
- {ok, {{one_for_one, 10, 1}, listeners_childspec()}}.
+ case listeners_childspec() of
+ {ok, Specs} -> {ok, {{one_for_one, 10, 1}, Specs}};
+ {error, _} = Err -> Err
+ end.
+-spec listeners_childspec() -> {ok, [supervisor:child_spec()]} | {error, any()}.
listeners_childspec() ->
Ls = ejabberd_config:get_option(listen, []),
- Specs = lists:map(
- fun({Port, Module, Opts}) ->
- ets:insert(?MODULE, {Port, Module, Opts}),
- {Port,
- {?MODULE, start, [Port, Module, Opts]},
- transient,
- brutal_kill,
- worker,
- [?MODULE]}
- end, Ls),
- report_duplicated_portips(Ls),
- Specs.
+ case add_certfiles(Ls) of
+ ok ->
+ {ok, lists:map(
+ fun({EndPoint, Module, Opts}) ->
+ ets:insert(?MODULE, {EndPoint, Module, Opts}),
+ {EndPoint,
+ {?MODULE, start, [EndPoint, Module, Opts]},
+ transient, brutal_kill, worker, [?MODULE]}
+ end, Ls)};
+ {error, _} = Err ->
+ Err
+ end.
+-spec start_listeners() -> ok.
start_listeners() ->
lists:foreach(
fun(Spec) ->
supervisor:start_child(?MODULE, Spec)
end, listeners_childspec()).
-report_duplicated_portips(L) ->
- LKeys = [Port || {Port, _, _} <- L],
- LNoDupsKeys = proplists:get_keys(L),
- case LKeys -- LNoDupsKeys of
- [] -> ok;
- Dups ->
- ?CRITICAL_MSG("In the ejabberd configuration there are duplicated "
- "Port number + IP address:~n ~p",
- [Dups])
- end.
-
-start(Port, Module, Opts) ->
- NewOpts = validate_module_options(Module, Opts),
- start_dependent(Port, Module, NewOpts).
-
-%% @spec(Port, Module, Opts) -> {ok, Pid} | {error, ErrorMessage}
-start_dependent(Port, Module, Opts) ->
- proc_lib:start_link(?MODULE, init, [Port, Module, Opts]).
-
-init(PortIP, Module, RawOpts) ->
- {Port, IPT, IPV, Proto, OptsClean} = parse_listener_portip(PortIP, RawOpts),
- {Opts, SockOpts} = prepare_opts(IPT, IPV, OptsClean),
- if Proto == udp ->
- init_udp(PortIP, Module, Opts, SockOpts, Port);
- true ->
- init_tcp(PortIP, Module, Opts, SockOpts, Port)
- end.
+-spec start(endpoint(), module(), listen_opts()) -> term().
+start(EndPoint, Module, Opts) ->
+ proc_lib:start_link(?MODULE, init, [EndPoint, Module, Opts]).
-init_udp(PortIP, Module, Opts, SockOpts, Port) ->
+-spec init(endpoint(), module(), listen_opts()) -> ok.
+init(EndPoint, Module, AllOpts) ->
+ {ModuleOpts, SockOpts} = split_opts(AllOpts),
+ init(EndPoint, Module, ModuleOpts, SockOpts).
+
+-spec init(endpoint(), module(), listen_opts(), [gen_tcp:option()]) -> ok.
+init({Port, _, udp} = EndPoint, Module, Opts, SockOpts) ->
case gen_udp:open(Port, [binary,
{active, false},
{reuseaddr, true} |
SockOpts]) of
{ok, Socket} ->
- %% Inform my parent that this port was opened successfully
proc_lib:init_ack({ok, self()}),
application:ensure_started(ejabberd),
?INFO_MSG("Start accepting UDP connections at ~s for ~p",
- [format_portip(PortIP), Module]),
+ [format_endpoint(EndPoint), Module]),
case erlang:function_exported(Module, udp_init, 2) of
false ->
udp_recv(Socket, Module, Opts);
@@ -129,18 +125,17 @@ init_udp(PortIP, Module, Opts, SockOpts, Port) ->
end
end;
{error, Reason} = Err ->
- report_socket_error(Reason, PortIP, Module),
+ report_socket_error(Reason, EndPoint, Module),
proc_lib:init_ack(Err)
- end.
-
-init_tcp(PortIP, Module, Opts, SockOpts, Port) ->
- case listen_tcp(PortIP, Module, SockOpts, Port) of
+ end;
+init({_, _, tcp} = EndPoint, Module, Opts, SockOpts) ->
+ case listen_tcp(EndPoint, Module, SockOpts) of
{ok, ListenSocket} ->
proc_lib:init_ack({ok, self()}),
application:ensure_started(ejabberd),
Sup = start_module_sup(Module, Opts),
?INFO_MSG("Start accepting TCP connections at ~s for ~p",
- [format_portip(PortIP), Module]),
+ [format_endpoint(EndPoint), Module]),
case erlang:function_exported(Module, tcp_init, 2) of
false ->
accept(ListenSocket, Module, Opts, Sup);
@@ -159,7 +154,9 @@ init_tcp(PortIP, Module, Opts, SockOpts, Port) ->
proc_lib:init_ack(Err)
end.
-listen_tcp(PortIP, Module, SockOpts, Port) ->
+-spec listen_tcp(endpoint(), module(), [gen_tcp:option()]) ->
+ {ok, inet:socket()} | {error, system_limit | inet:posix()}.
+listen_tcp({Port, _, _} = EndPoint, Module, SockOpts) ->
Res = gen_tcp:listen(Port, [binary,
{packet, 0},
{active, false},
@@ -173,79 +170,31 @@ listen_tcp(PortIP, Module, SockOpts, Port) ->
{ok, ListenSocket} ->
{ok, ListenSocket};
{error, Reason} = Err ->
- report_socket_error(Reason, PortIP, Module),
+ report_socket_error(Reason, EndPoint, Module),
Err
end.
-parse_listener_portip(PortIP, Opts) ->
- {IPOpt, Opts2} = strip_ip_option(Opts),
- {IPVOpt, OptsClean} = case proplists:get_bool(inet6, Opts2) of
- true -> {inet6, proplists:delete(inet6, Opts2)};
- false -> {inet, Opts2}
- end,
- {Port, IPT, Proto} =
- case add_proto(PortIP, Opts) of
- {P, Prot} ->
- T = get_ip_tuple(IPOpt, IPVOpt),
- {P, T, Prot};
- {P, T, Prot} when is_integer(P) and is_tuple(T) ->
- {P, T, Prot};
- {P, S, Prot} when is_integer(P) and is_binary(S) ->
- {ok, T} = inet_parse:address(binary_to_list(S)),
- {P, T, Prot}
- end,
- IPV = case tuple_size(IPT) of
- 4 -> inet;
- 8 -> inet6
- end,
- {Port, IPT, IPV, Proto, OptsClean}.
-
-prepare_opts(IPT, IPV, OptsClean) ->
- %% The first inet|inet6 and the last {ip, _} work,
- %% so overriding those in Opts
- Opts = [IPV | OptsClean] ++ [{ip, IPT}],
- SockOpts = lists:filter(fun({ip, _}) -> true;
- (inet6) -> true;
- (inet) -> true;
- ({backlog, _}) -> true;
- (_) -> false
- end, Opts),
- {Opts, SockOpts}.
-
-add_proto(Port, Opts) when is_integer(Port) ->
- {Port, get_proto(Opts)};
-add_proto({Port, Proto}, _Opts) when is_atom(Proto) ->
- {Port, normalize_proto(Proto)};
-add_proto({Port, Addr}, Opts) ->
- {Port, Addr, get_proto(Opts)};
-add_proto({Port, Addr, Proto}, _Opts) ->
- {Port, Addr, normalize_proto(Proto)}.
-
-strip_ip_option(Opts) ->
- {IPL, OptsNoIP} = lists:partition(
- fun({ip, _}) -> true;
- (_) -> false
- end,
- Opts),
- case IPL of
- %% Only the first ip option is considered
- [{ip, T1} | _] ->
- {T1, OptsNoIP};
- [] ->
- {no_ip_option, OptsNoIP}
- end.
-
-get_ip_tuple(no_ip_option, inet) ->
- {0, 0, 0, 0};
-get_ip_tuple(no_ip_option, inet6) ->
- {0, 0, 0, 0, 0, 0, 0, 0};
-get_ip_tuple(IPOpt, _IPVOpt) ->
- IPOpt.
+-spec split_opts(listen_opts()) -> {listen_opts(), [gen_tcp:option()]}.
+split_opts(Opts) ->
+ lists:foldl(
+ fun(Opt, {ModOpts, SockOpts} = Acc) ->
+ case Opt of
+ {ip, _} -> {ModOpts, [Opt|SockOpts]};
+ {backlog, _} -> {ModOpts, [Opt|SockOpts]};
+ {inet, true} -> {ModOpts, [inet|SockOpts]};
+ {inet6, true} -> {ModOpts, [int6|SockOpts]};
+ {inet, false} -> Acc;
+ {inet6, false} -> Acc;
+ _ -> {[Opt|ModOpts], SockOpts}
+ end
+ end, {[], []}, Opts).
+-spec accept(inet:socket(), module(), listen_opts(), atom()) -> no_return().
accept(ListenSocket, Module, Opts, Sup) ->
Interval = proplists:get_value(accept_interval, Opts, 0),
accept(ListenSocket, Module, Opts, Sup, Interval).
+-spec accept(inet:socket(), module(), listen_opts(), atom(), non_neg_integer()) -> no_return().
accept(ListenSocket, Module, Opts, Sup, Interval) ->
NewInterval = check_rate_limit(Interval),
case gen_tcp:accept(ListenSocket) of
@@ -273,6 +222,7 @@ accept(ListenSocket, Module, Opts, Sup, Interval) ->
accept(ListenSocket, Module, Opts, Sup, NewInterval)
end.
+-spec udp_recv(inet:socket(), module(), listen_opts()) -> no_return().
udp_recv(Socket, Module, Opts) ->
case gen_udp:recv(Socket, 0) of
{ok, {Addr, Port, Packet}} ->
@@ -291,6 +241,8 @@ udp_recv(Socket, Module, Opts) ->
throw({error, Reason})
end.
+-spec start_connection(module(), inet:socket(), listen_opts(), atom()) ->
+ {ok, pid()} | {error, any()} | ignore.
start_connection(Module, Socket, Opts, Sup) ->
Res = case Sup of
undefined -> Module:start({gen_tcp, Socket}, Opts);
@@ -310,9 +262,14 @@ start_connection(Module, Socket, Opts, Sup) ->
Err
end.
-%% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error}
-start_listener(Port, Module, Opts) ->
- case start_listener2(Port, Module, Opts) of
+-spec start_listener(endpoint(), module(), listen_opts()) ->
+ {ok, pid()} | {error, any()}.
+start_listener(EndPoint, Module, Opts) ->
+ %% It is only required to start the supervisor in some cases.
+ %% But it doesn't hurt to attempt to start it for any listener.
+ %% So, it's normal (and harmless) that in most cases this
+ %% call returns: {error, {already_started, pid()}}
+ case start_listener_sup(EndPoint, Module, Opts) of
{ok, _Pid} = R -> R;
{error, {{'EXIT', {undef, [{M, _F, _A}|_]}}, _} = Error} ->
?ERROR_MSG("Error starting the ejabberd listener: ~p.~n"
@@ -325,13 +282,6 @@ start_listener(Port, Module, Opts) ->
{error, Error}
end.
-%% @spec (Port, Module, Opts) -> {ok, Pid} | {error, Error}
-start_listener2(Port, Module, Opts) ->
- %% It is only required to start the supervisor in some cases.
- %% But it doesn't hurt to attempt to start it for any listener.
- %% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}}
- start_listener_sup(Port, Module, Opts).
-
-spec start_module_sup(module(), [proplists:property()]) -> atom().
start_module_sup(Module, Opts) ->
case proplists:get_value(supervisor, Opts, true) of
@@ -348,15 +298,18 @@ start_module_sup(Module, Opts) ->
undefined
end.
-start_listener_sup(Port, Module, Opts) ->
- ChildSpec = {Port,
- {?MODULE, start, [Port, Module, Opts]},
+-spec start_listener_sup(endpoint(), module(), listen_opts()) ->
+ {ok, pid()} | {error, any()}.
+start_listener_sup(EndPoint, Module, Opts) ->
+ ChildSpec = {EndPoint,
+ {?MODULE, start, [EndPoint, Module, Opts]},
transient,
brutal_kill,
worker,
[?MODULE]},
supervisor:start_child(?MODULE, ChildSpec).
+-spec stop_listeners() -> ok.
stop_listeners() ->
Ports = ejabberd_config:get_option(listen, []),
lists:foreach(
@@ -365,96 +318,69 @@ stop_listeners() ->
end,
Ports).
-stop_listener({_, _, Transport} = PortIP, Module) ->
- case supervisor:terminate_child(?MODULE, PortIP) of
+-spec stop_listener(endpoint(), module()) -> ok | {error, any()}.
+stop_listener({_, _, Transport} = EndPoint, Module) ->
+ case supervisor:terminate_child(?MODULE, EndPoint) of
ok ->
?INFO_MSG("Stop accepting ~s connections at ~s for ~p",
[case Transport of udp -> "UDP"; tcp -> "TCP" end,
- format_portip(PortIP), Module]),
- ets:delete(?MODULE, PortIP),
- supervisor:delete_child(?MODULE, PortIP);
+ format_endpoint(EndPoint), Module]),
+ ets:delete(?MODULE, EndPoint),
+ supervisor:delete_child(?MODULE, EndPoint);
Err ->
Err
end.
-add_listener(PortIP, Module, Opts) ->
- {Port, IPT, _, Proto, _} = parse_listener_portip(PortIP, Opts),
- PortIP1 = {Port, IPT, Proto},
- case start_listener(PortIP1, Module, Opts) of
+-spec add_listener(endpoint(), module(), listen_opts()) -> ok | {error, any()}.
+add_listener(EndPoint, Module, Opts) ->
+ case start_listener(EndPoint, Module, Opts) of
{ok, _Pid} ->
ok;
{error, {already_started, _Pid}} ->
- {error, {already_started, PortIP}};
+ {error, {already_started, EndPoint}};
{error, Error} ->
{error, Error}
end.
-delete_listener(PortIP, Module) ->
- delete_listener(PortIP, Module, []).
-
-%% @spec (PortIP, Module, Opts) -> ok
-%% where
-%% PortIP = {Port, IPT | IPS}
-%% Port = integer()
-%% IPT = tuple()
-%% IPS = string()
-%% Module = atom()
-%% Opts = [term()]
-delete_listener(PortIP, Module, Opts) ->
- {Port, IPT, _, Proto, _} = parse_listener_portip(PortIP, Opts),
- PortIP1 = {Port, IPT, Proto},
- stop_listener(PortIP1, Module).
+-spec delete_listener(endpoint(), module()) -> ok | {error, any()}.
+delete_listener(EndPoint, Module) ->
+ stop_listener(EndPoint, Module).
+-spec config_reloaded() -> ok.
config_reloaded() ->
New = ejabberd_config:get_option(listen, []),
+ ?INFO_MSG("New = ~p", [New]),
Old = ets:tab2list(?MODULE),
lists:foreach(
- fun({PortIP, Module, _Opts}) ->
- case lists:keyfind(PortIP, 1, New) of
+ fun({EndPoint, Module, _Opts}) ->
+ case lists:keyfind(EndPoint, 1, New) of
false ->
- stop_listener(PortIP, Module);
+ stop_listener(EndPoint, Module);
_ ->
ok
end
end, Old),
lists:foreach(
- fun({PortIP, Module, Opts}) ->
- case lists:keyfind(PortIP, 1, Old) of
+ fun({EndPoint, Module, Opts}) ->
+ case lists:keyfind(EndPoint, 1, Old) of
{_, Module, Opts} ->
ok;
{_, OldModule, _} ->
- stop_listener(PortIP, OldModule),
- ets:insert(?MODULE, {PortIP, Module, Opts}),
- start_listener(PortIP, Module, Opts);
+ stop_listener(EndPoint, OldModule),
+ ets:insert(?MODULE, {EndPoint, Module, Opts}),
+ start_listener(EndPoint, Module, Opts);
false ->
- ets:insert(?MODULE, {PortIP, Module, Opts}),
- start_listener(PortIP, Module, Opts)
+ ets:insert(?MODULE, {EndPoint, Module, Opts}),
+ start_listener(EndPoint, Module, Opts)
end
end, New).
-%%%
-%%% Check options
-%%%
-get_proto(Opts) ->
- case proplists:get_value(proto, Opts) of
- undefined ->
- tcp;
- Proto ->
- normalize_proto(Proto)
- end.
-
-normalize_proto(tcp) -> tcp;
-normalize_proto(udp) -> udp;
-normalize_proto(UnknownProto) ->
- ?WARNING_MSG("There is a problem in the configuration: "
- "~p is an unknown IP protocol. Using tcp as fallback",
- [UnknownProto]),
- tcp.
-
-report_socket_error(Reason, PortIP, Module) ->
+-spec report_socket_error(inet:posix(), endpoint(), module()) -> ok.
+report_socket_error(Reason, EndPoint, Module) ->
?ERROR_MSG("Failed to open socket at ~s for ~s: ~s",
- [format_portip(PortIP), Module, format_error(Reason)]).
+ [format_endpoint(EndPoint), Module, format_error(Reason)]).
+-spec format_error(inet:posix()) -> string().
format_error(Reason) ->
case inet:format_error(Reason) of
"unknown POSIX error" ->
@@ -463,13 +389,15 @@ format_error(Reason) ->
ReasonStr
end.
-format_portip({Port, IP, _Transport}) ->
+-spec format_endpoint(endpoint()) -> string().
+format_endpoint({Port, IP, _Transport}) ->
IPStr = case tuple_size(IP) of
4 -> inet:ntoa(IP);
8 -> "[" ++ inet:ntoa(IP) ++ "]"
end,
IPStr ++ ":" ++ integer_to_list(Port).
+-spec check_rate_limit(non_neg_integer()) -> non_neg_integer().
check_rate_limit(Interval) ->
NewInterval = receive
{rate_limit, AcceptInterval} ->
@@ -495,10 +423,19 @@ check_rate_limit(Interval) ->
end,
NewInterval.
--define(IS_CHAR(C), (is_integer(C) and (C >= 0) and (C =< 255))).
--define(IS_UINT(U), (is_integer(U) and (U >= 0) and (U =< 65535))).
--define(IS_PORT(P), (is_integer(P) and (P > 0) and (P =< 65535))).
--define(IS_TRANSPORT(T), ((T == tcp) or (T == udp))).
+-spec add_certfiles([listener()]) -> ok | {error, any()}.
+add_certfiles([{_, _, Opts}|Listeners]) ->
+ case lists:keyfind(certfile, 1, Opts) of
+ {_, Path} ->
+ case ejabberd_pkix:add_certfile(Path) of
+ ok -> add_certfiles(Listeners);
+ {error, _} = Err -> Err
+ end;
+ false ->
+ add_certfiles(Listeners)
+ end;
+add_certfiles([]) ->
+ ok.
transform_option({{Port, IP, Transport}, Mod, Opts}) ->
IPStr = if is_tuple(IP) ->
@@ -529,7 +466,7 @@ transform_option({{Port, IP, Transport}, Mod, Opts}) ->
end,
IPOpt ++ TransportOpt ++ [{port, Port}, {module, Mod} | Opts2];
transform_option({{Port, Transport}, Mod, Opts})
- when ?IS_TRANSPORT(Transport) ->
+ when Transport == tcp orelse Transport == udp ->
transform_option({{Port, all_zero_ip(Opts), Transport}, Mod, Opts});
transform_option({{Port, IP}, Mod, Opts}) ->
transform_option({{Port, IP, tcp}, Mod, Opts});
@@ -546,113 +483,266 @@ transform_options({listen, LOpts}, Opts) ->
transform_options(Opt, Opts) ->
[Opt|Opts].
-known_listen_options(Module) ->
- try Module:listen_options() of
- Opts -> [element(1, Opt) || Opt <- Opts]
- catch _:undef ->
- Module:listen_opt_type('')
+-spec validate_cfg(list()) -> [listener()].
+validate_cfg(Listeners) ->
+ Listeners1 = lists:map(fun validate_opts/1, Listeners),
+ Listeners2 = lists:keysort(1, Listeners1),
+ check_overlapping_listeners(Listeners2).
+
+-spec validate_module(module()) -> ok.
+validate_module(Mod) ->
+ case code:ensure_loaded(Mod) of
+ {module, Mod} ->
+ lists:foreach(
+ fun({Fun, Arity}) ->
+ case erlang:function_exported(Mod, Fun, Arity) of
+ true -> ok;
+ false ->
+ ?ERROR_MSG("Failed to load listening module ~s, "
+ "because it doesn't export ~s/~B callback. "
+ "The module is either not a listening module "
+ "or it is a third-party module which "
+ "requires update",
+ [Mod, Fun, Arity]),
+ erlang:error(badarg)
+ end
+ end, [{start, 2}, {start_link, 2},
+ {accept, 1}, {listen_options, 0}]);
+ _ ->
+ ?ERROR_MSG("Failed to load unknown listening module ~s: "
+ "make sure there is no typo and ~s.beam "
+ "exists inside either ~s or ~s directory",
+ [Mod, Mod,
+ filename:dirname(code:which(?MODULE)),
+ ext_mod:modules_dir()]),
+ erlang:error(badarg)
end.
--spec validate_module_options(module(), [{atom(), any()}]) -> [{atom(), any()}].
-validate_module_options(Module, Opts) ->
- try known_listen_options(Module) of
- _ ->
- maybe_start_zlib(Opts),
- lists:filtermap(
- fun({Opt, Val}) ->
- case validate_module_option(Module, Opt, Val) of
- {ok, NewVal} -> {true, {Opt, NewVal}};
- error -> false
- end
- end, Opts)
- catch _:undef ->
- ?WARNING_MSG("module '~s' doesn't export listen_opt_type/1",
- [Module]),
- Opts
+-spec validate_opts(listen_opts()) -> listener().
+validate_opts(Opts) ->
+ case lists:keyfind(module, 1, Opts) of
+ {_, Mod} ->
+ validate_module(Mod),
+ Opts1 = validate_opts(Mod, Opts),
+ {Opts2, Opts3} = lists:partition(
+ fun({port, _}) -> true;
+ ({transport, _}) -> true;
+ ({module, _}) -> true;
+ (_) -> false
+ end, Opts1),
+ Port = proplists:get_value(port, Opts2),
+ Transport = proplists:get_value(transport, Opts2, tcp),
+ IP = proplists:get_value(ip, Opts3, all_zero_ip(Opts3)),
+ {{Port, IP, Transport}, Mod, Opts3};
+ false ->
+ ?ERROR_MSG("Missing required listening option: module", []),
+ erlang:error(badarg)
end.
--spec validate_module_option(module(), atom(), any()) -> {ok, any()} | error.
-validate_module_option(Module, Opt, Val) ->
- case Module:listen_opt_type(Opt) of
- VFun when is_function(VFun) ->
- try VFun(Val) of
- NewVal -> {ok, NewVal}
- catch {invalid_syntax, Error} ->
- ?ERROR_MSG("ignoring listen option '~s' with "
- "invalid value: ~p: ~s",
- [Opt, Val, Error]),
- error;
- _:_ ->
- ?ERROR_MSG("ignoring listen option '~s' with "
- "invalid value: ~p",
- [Opt, Val]),
- error
- end;
+-spec validate_opts(module(), listen_opts()) -> listen_opts().
+validate_opts(Mod, Opts) ->
+ Defaults = listen_options() ++ Mod:listen_options(),
+ {Opts1, Defaults1} =
+ lists:mapfoldl(
+ fun({Opt, Val} = OptVal, Defs) ->
+ case proplists:is_defined(Opt, Defaults) of
+ true ->
+ NewOptVal = case lists:member(OptVal, Defaults) of
+ true -> [];
+ false -> [validate_module_opt(Mod, Opt, Val)]
+ end,
+ {NewOptVal, proplists:delete(Opt, Defs)};
+ false ->
+ ?ERROR_MSG("Unknown listening option '~s' of "
+ "module ~s; available options are: ~s",
+ [Opt, Mod,
+ misc:join_atoms(
+ proplists:get_keys(Defaults),
+ <<", ">>)]),
+ erlang:error(badarg)
+ end
+ end, Defaults, Opts),
+ case lists:filter(fun is_atom/1, Defaults1) of
[] ->
- ?ERROR_MSG("unknown listen option '~s' for '~s' will be likely "
- "ignored because the listening module doesn't have "
- "any options", [Opt, Module]),
- {ok, Val};
- KnownOpts when is_list(KnownOpts) ->
- ?ERROR_MSG("unknown listen option '~s' for '~s' will be likely "
- "ignored, available options are: ~s",
- [Opt, Module, misc:join_atoms(KnownOpts, <<", ">>)]),
- {ok, Val}
+ lists:flatten(Opts1);
+ MissingRequiredOpts ->
+ ?ERROR_MSG("Missing required listening option(s): ~s",
+ [misc:join_atoms(MissingRequiredOpts, <<", ">>)]),
+ erlang:error(badarg)
end.
--type transport() :: udp | tcp.
--type port_ip_transport() :: inet:port_number() |
- {inet:port_number(), transport()} |
- {inet:port_number(), inet:ip_address()} |
- {inet:port_number(), inet:ip_address(),
- transport()}.
--spec validate_cfg(list()) -> [{port_ip_transport(), module(), list()}].
-
-validate_cfg(L) ->
- lists:map(
- fun(LOpts) ->
- lists:foldl(
- fun({port, Port}, {{_, IP, T}, Mod, Opts}) ->
- true = ?IS_PORT(Port),
- {{Port, IP, T}, Mod, Opts};
- ({ip, IP}, {{Port, _, T}, Mod, Opts}) ->
- {{Port, prepare_ip(IP), T}, Mod, Opts};
- ({transport, T}, {{Port, IP, _}, Mod, Opts}) ->
- true = ?IS_TRANSPORT(T),
- {{Port, IP, T}, Mod, Opts};
- ({module, Mod}, {Port, _, Opts}) ->
- {Port, Mod, Opts};
- (Opt, {Port, Mod, Opts}) ->
- {Port, Mod, [Opt|Opts]}
- end, {{5222, all_zero_ip(LOpts), tcp}, ejabberd_c2s, []}, LOpts)
- end, L).
-
-prepare_ip({A, B, C, D} = IP)
- when ?IS_CHAR(A) and ?IS_CHAR(B) and ?IS_CHAR(C) and ?IS_CHAR(D) ->
- IP;
-prepare_ip({A, B, C, D, E, F, G, H} = IP)
- when ?IS_UINT(A) and ?IS_UINT(B) and ?IS_UINT(C) and ?IS_UINT(D)
- and ?IS_UINT(E) and ?IS_UINT(F) and ?IS_UINT(G) and ?IS_UINT(H) ->
- IP;
-prepare_ip(IP) when is_list(IP) ->
- {ok, Addr} = inet_parse:address(IP),
- Addr;
-prepare_ip(IP) when is_binary(IP) ->
- prepare_ip(binary_to_list(IP)).
+-spec validate_module_opt(module(), atom(), any()) -> {atom(), any()}.
+validate_module_opt(Module, Opt, Val) ->
+ VFun = try Module:listen_opt_type(Opt)
+ catch _:_ -> listen_opt_type(Opt)
+ end,
+ try {Opt, VFun(Val)}
+ catch _:_ ->
+ ?ERROR_MSG("Invalid value of listening option ~s: ~s",
+ [Opt, misc:format_val({yaml, Val})]),
+ erlang:error(badarg)
+ end.
+-spec all_zero_ip(listen_opts()) -> inet:ip_address().
all_zero_ip(Opts) ->
case proplists:get_bool(inet6, Opts) of
true -> {0,0,0,0,0,0,0,0};
false -> {0,0,0,0}
end.
-maybe_start_zlib(Opts) ->
- case proplists:get_bool(zlib, Opts) of
- true ->
- ejabberd:start_app(ezlib);
- false ->
- ok
- end.
+-spec check_overlapping_listeners([listener()]) -> [listener()].
+check_overlapping_listeners(Listeners) ->
+ lists:foldl(
+ fun({{Port, IP, Transport} = Key, _, _}, Acc) ->
+ case lists:member(Key, Acc) of
+ true ->
+ ?ERROR_MSG("Overlapping listeners found at ~s",
+ [format_endpoint(Key)]),
+ erlang:error(badarg);
+ false ->
+ ZeroIP = case size(IP) of
+ 8 -> {0,0,0,0,0,0,0,0};
+ 4 -> {0,0,0,0}
+ end,
+ Key1 = {Port, ZeroIP, Transport},
+ case lists:member(Key1, Acc) of
+ true ->
+ ?ERROR_MSG(
+ "Overlapping listeners found at ~s and ~s",
+ [format_endpoint(Key), format_endpoint(Key1)]),
+ erlang:error(badarg);
+ false ->
+ [Key|Acc]
+ end
+ end
+ end, [], Listeners),
+ Listeners.
+
+listen_opt_type(port) ->
+ fun(I) when is_integer(I), I>0, I<65536 -> I end;
+listen_opt_type(module) ->
+ fun(A) when is_atom(A) -> A end;
+listen_opt_type(ip) ->
+ fun(S) ->
+ {ok, Addr} = inet_parse:address(binary_to_list(S)),
+ Addr
+ end;
+listen_opt_type(transport) ->
+ fun(tcp) -> tcp;
+ (udp) -> udp
+ end;
+listen_opt_type(accept_interval) ->
+ fun(I) when is_integer(I), I>=0 -> I end;
+listen_opt_type(backlog) ->
+ fun(I) when is_integer(I), I>=0 -> I end;
+listen_opt_type(inet) ->
+ fun(B) when is_boolean(B) -> B end;
+listen_opt_type(inet6) ->
+ fun(B) when is_boolean(B) -> B end;
+listen_opt_type(supervisor) ->
+ fun(B) when is_boolean(B) -> B end;
+listen_opt_type(certfile) ->
+ fun(S) ->
+ ok = ejabberd_pkix:add_certfile(S),
+ iolist_to_binary(S)
+ end;
+listen_opt_type(ciphers) -> fun iolist_to_binary/1;
+listen_opt_type(dhfile) -> fun misc:try_read_file/1;
+listen_opt_type(cafile) -> fun misc:try_read_file/1;
+listen_opt_type(protocol_options) ->
+ fun (Options) -> str:join(Options, <<"|">>) end;
+listen_opt_type(tls_compression) ->
+ fun(B) when is_boolean(B) -> B end;
+listen_opt_type(tls) ->
+ fun(B) when is_boolean(B) -> B end;
+listen_opt_type(max_stanza_size) ->
+ fun(I) when is_integer(I), I>0 -> I;
+ (unlimited) -> infinity;
+ (infinity) -> infinity
+ end;
+listen_opt_type(max_fsm_queue) ->
+ fun(I) when is_integer(I), I>0 -> I end;
+listen_opt_type(shaper) ->
+ fun acl:shaper_rules_validator/1;
+listen_opt_type(access) ->
+ fun acl:access_rules_validator/1.
+
+listen_options() ->
+ [module, port,
+ {transport, tcp},
+ {ip, <<"0.0.0.0">>},
+ {inet, true},
+ {inet6, false},
+ {accept_interval, 0},
+ {backlog, 5},
+ {supervisor, true}].
opt_type(listen) -> fun validate_cfg/1;
opt_type(_) -> [listen].
+
+%%%----------------------------------------------------------------------
+%%% Some legacy code used by ejabberd_web_admin only
+%%%----------------------------------------------------------------------
+parse_listener_portip(PortIP, Opts) ->
+ {IPOpt, Opts2} = strip_ip_option(Opts),
+ {IPVOpt, OptsClean} = case proplists:get_bool(inet6, Opts2) of
+ true -> {inet6, proplists:delete(inet6, Opts2)};
+ false -> {inet, Opts2}
+ end,
+ {Port, IPT, Proto} =
+ case add_proto(PortIP, Opts) of
+ {P, Prot} ->
+ T = get_ip_tuple(IPOpt, IPVOpt),
+ {P, T, Prot};
+ {P, T, Prot} when is_integer(P) and is_tuple(T) ->
+ {P, T, Prot};
+ {P, S, Prot} when is_integer(P) and is_binary(S) ->
+ {ok, T} = inet_parse:address(binary_to_list(S)),
+ {P, T, Prot}
+ end,
+ IPV = case tuple_size(IPT) of
+ 4 -> inet;
+ 8 -> inet6
+ end,
+ {Port, IPT, IPV, Proto, OptsClean}.
+
+add_proto(Port, Opts) when is_integer(Port) ->
+ {Port, get_proto(Opts)};
+add_proto({Port, Proto}, _Opts) when is_atom(Proto) ->
+ {Port, normalize_proto(Proto)};
+add_proto({Port, Addr}, Opts) ->
+ {Port, Addr, get_proto(Opts)};
+add_proto({Port, Addr, Proto}, _Opts) ->
+ {Port, Addr, normalize_proto(Proto)}.
+
+strip_ip_option(Opts) ->
+ {IPL, OptsNoIP} = lists:partition(
+ fun({ip, _}) -> true;
+ (_) -> false
+ end,
+ Opts),
+ case IPL of
+ %% Only the first ip option is considered
+ [{ip, T1} | _] ->
+ {T1, OptsNoIP};
+ [] ->
+ {no_ip_option, OptsNoIP}
+ end.
+
+get_ip_tuple(no_ip_option, inet) ->
+ {0, 0, 0, 0};
+get_ip_tuple(no_ip_option, inet6) ->
+ {0, 0, 0, 0, 0, 0, 0, 0};
+get_ip_tuple(IPOpt, _IPVOpt) ->
+ IPOpt.
+
+get_proto(Opts) ->
+ case proplists:get_value(proto, Opts) of
+ undefined ->
+ tcp;
+ Proto ->
+ normalize_proto(Proto)
+ end.
+
+normalize_proto(udp) -> udp;
+normalize_proto(_) -> tcp.