aboutsummaryrefslogtreecommitdiff
path: root/src/mod_mqtt.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_mqtt.erl')
-rw-r--r--src/mod_mqtt.erl111
1 files changed, 53 insertions, 58 deletions
diff --git a/src/mod_mqtt.erl b/src/mod_mqtt.erl
index 566804f36..196b6efbe 100644
--- a/src/mod_mqtt.erl
+++ b/src/mod_mqtt.erl
@@ -19,6 +19,7 @@
-behaviour(p1_server).
-behaviour(gen_mod).
-behaviour(ejabberd_listener).
+-dialyzer({no_improper_lists, join_filter/1}).
%% gen_mod API
-export([start/2, stop/1, reload/3, depends/2, mod_options/1, mod_opt_type/1]).
@@ -135,7 +136,7 @@ publish({_, S, _} = USR, Pkt, ExpiryTime) ->
ok | {error, db_failure | subscribe_forbidden}.
subscribe({_, S, _} = USR, TopicFilter, SubOpts, ID) ->
Mod = gen_mod:ram_db_mod(S, ?MODULE),
- Limit = gen_mod:get_module_opt(S, ?MODULE, max_topic_depth),
+ Limit = mod_mqtt_opt:max_topic_depth(S),
case check_topic_depth(TopicFilter, Limit) of
allow ->
case check_subscribe_access(TopicFilter, USR) of
@@ -157,15 +158,15 @@ unsubscribe({U, S, R}, Topic) ->
[{publish(), seconds()}].
select_retained({_, S, _} = USR, TopicFilter, QoS, SubID) ->
Mod = gen_mod:db_mod(S, ?MODULE),
- Limit = gen_mod:get_module_opt(S, ?MODULE, match_retained_limit),
+ Limit = mod_mqtt_opt:match_retained_limit(S),
select_retained(Mod, USR, TopicFilter, QoS, SubID, Limit).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Host, Opts]) ->
- Mod = gen_mod:db_mod(Host, Opts, ?MODULE),
- RMod = gen_mod:ram_db_mod(Host, Opts, ?MODULE),
+ Mod = gen_mod:db_mod(Opts, ?MODULE),
+ RMod = gen_mod:ram_db_mod(Opts, ?MODULE),
try
ok = Mod:init(Host, Opts),
ok = RMod:init(),
@@ -194,6 +195,9 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Options
%%%===================================================================
+-spec mod_options(binary()) -> [{access_publish, [{[binary()], acl:acl()}]} |
+ {access_subscribe, [{[binary()], acl:acl()}]} |
+ {atom(), any()}].
mod_options(Host) ->
[{match_retained_limit, 1000},
{max_topic_depth, 8},
@@ -204,55 +208,45 @@ mod_options(Host) ->
{access_publish, []},
{db_type, ejabberd_config:default_db(Host, ?MODULE)},
{ram_db_type, ejabberd_config:default_ram_db(Host, ?MODULE)},
- {queue_type, ejabberd_config:default_queue_type(Host)},
- {use_cache, ejabberd_config:use_cache(Host)},
- {cache_size, ejabberd_config:cache_size(Host)},
- {cache_missed, ejabberd_config:cache_missed(Host)},
- {cache_life_time, ejabberd_config:cache_life_time(Host)}].
+ {queue_type, ejabberd_option:queue_type(Host)},
+ {use_cache, ejabberd_option:use_cache(Host)},
+ {cache_size, ejabberd_option:cache_size(Host)},
+ {cache_missed, ejabberd_option:cache_missed(Host)},
+ {cache_life_time, ejabberd_option:cache_life_time(Host)}].
mod_opt_type(max_queue) ->
- fun(I) when is_integer(I), I > 0 -> I;
- (infinity) -> unlimited;
- (unlimited) -> unlimited
- end;
+ econf:pos_int(unlimited);
mod_opt_type(session_expiry) ->
- fun(I) when is_integer(I), I>= 0 -> I end;
+ econf:non_neg_int();
mod_opt_type(match_retained_limit) ->
- fun(I) when is_integer(I), I>0 -> I;
- (unlimited) -> infinity;
- (infinity) -> infinity
- end;
+ econf:pos_int(infinity);
mod_opt_type(max_topic_depth) ->
- fun(I) when is_integer(I), I>0 -> I;
- (unlimited) -> infinity;
- (infinity) -> infinity
- end;
+ econf:pos_int(infinity);
mod_opt_type(max_topic_aliases) ->
- fun(I) when is_integer(I), I>=0, I<65536 -> I end;
+ econf:int(0, 65535);
mod_opt_type(access_subscribe) ->
- fun validate_topic_access/1;
+ topic_access_validator();
mod_opt_type(access_publish) ->
- fun validate_topic_access/1;
+ topic_access_validator();
+mod_opt_type(queue_type) ->
+ econf:well_known(queue_type, ?MODULE);
mod_opt_type(db_type) ->
- fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
+ econf:well_known(db_type, ?MODULE);
mod_opt_type(ram_db_type) ->
- fun(T) -> ejabberd_config:v_db(?MODULE, T) end;
-mod_opt_type(queue_type) ->
- fun(ram) -> ram; (file) -> file end;
-mod_opt_type(O) when O == cache_life_time; O == cache_size ->
- fun(I) when is_integer(I), I > 0 -> I;
- (infinity) -> infinity
- end;
-mod_opt_type(O) when O == use_cache; O == cache_missed ->
- fun (B) when is_boolean(B) -> B end.
+ econf:well_known(ram_db_type, ?MODULE);
+mod_opt_type(use_cache) ->
+ econf:well_known(use_cache, ?MODULE);
+mod_opt_type(cache_size) ->
+ econf:well_known(cache_size, ?MODULE);
+mod_opt_type(cache_missed) ->
+ econf:well_known(cache_missed, ?MODULE);
+mod_opt_type(cache_life_time) ->
+ econf:well_known(cache_life_time, ?MODULE).
listen_opt_type(tls_verify) ->
- fun(B) when is_boolean(B) -> B end;
+ econf:bool();
listen_opt_type(max_payload_size) ->
- fun(I) when is_integer(I), I>0 -> I;
- (unlimited) -> infinity;
- (infinity) -> infinity
- end.
+ econf:pos_int(infinity).
listen_options() ->
[{max_fsm_queue, 5000},
@@ -436,30 +430,31 @@ split_path(Path) ->
%%%===================================================================
%%% Validators
%%%===================================================================
-validate_topic_access(FilterRules) ->
- lists:map(
- fun({TopicFilter, Access}) ->
- Rule = acl:access_rules_validator(Access),
- try
- mqtt_codec:topic_filter(TopicFilter),
- {split_path(TopicFilter), Rule}
- catch _:_ ->
- ?ERROR_MSG("Invalid topic filter: ~s", [TopicFilter]),
- erlang:error(badarg)
- end
- end, lists:reverse(lists:keysort(1, FilterRules))).
+-spec topic_access_validator() -> econf:validator().
+topic_access_validator() ->
+ econf:and_then(
+ econf:map(
+ fun(TF) ->
+ try split_path(mqtt_codec:topic_filter(TF))
+ catch _:{mqtt_codec, _} = Reason ->
+ econf:fail(Reason)
+ end
+ end,
+ econf:acl(),
+ [{return, orddict}]),
+ fun lists:reverse/1).
%%%===================================================================
%%% ACL checks
%%%===================================================================
check_subscribe_access(Topic, {_, S, _} = USR) ->
- Rules = gen_mod:get_module_opt(S, mod_mqtt, access_subscribe),
+ Rules = mod_mqtt_opt:access_subscribe(S),
check_access(Topic, USR, Rules).
check_publish_access(<<$$, _/binary>>, _) ->
deny;
check_publish_access(Topic, {_, S, _} = USR) ->
- Rules = gen_mod:get_module_opt(S, mod_mqtt, access_publish),
+ Rules = mod_mqtt_opt:access_publish(S),
check_access(Topic, USR, Rules).
check_access(_, _, []) ->
@@ -544,9 +539,9 @@ init_payload_cache(Mod, Host, Opts) ->
-spec cache_opts(gen_mod:opts()) -> [proplists:property()].
cache_opts(Opts) ->
- MaxSize = gen_mod:get_opt(cache_size, Opts),
- CacheMissed = gen_mod:get_opt(cache_missed, Opts),
- LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of
+ MaxSize = mod_mqtt_opt:cache_size(Opts),
+ CacheMissed = mod_mqtt_opt:cache_missed(Opts),
+ LifeTime = case mod_mqtt_opt:cache_life_time(Opts) of
infinity -> infinity;
I -> timer:seconds(I)
end,
@@ -556,7 +551,7 @@ cache_opts(Opts) ->
use_cache(Mod, Host) ->
case erlang:function_exported(Mod, use_cache, 1) of
true -> Mod:use_cache(Host);
- false -> gen_mod:get_module_opt(Host, ?MODULE, use_cache)
+ false -> mod_mqtt_opt:use_cache(Host)
end.
-spec cache_nodes(module(), binary()) -> [node()].