diff options
Diffstat (limited to 'src/mod_mqtt.erl')
-rw-r--r-- | src/mod_mqtt.erl | 111 |
1 files changed, 53 insertions, 58 deletions
diff --git a/src/mod_mqtt.erl b/src/mod_mqtt.erl index 566804f36..196b6efbe 100644 --- a/src/mod_mqtt.erl +++ b/src/mod_mqtt.erl @@ -19,6 +19,7 @@ -behaviour(p1_server). -behaviour(gen_mod). -behaviour(ejabberd_listener). +-dialyzer({no_improper_lists, join_filter/1}). %% gen_mod API -export([start/2, stop/1, reload/3, depends/2, mod_options/1, mod_opt_type/1]). @@ -135,7 +136,7 @@ publish({_, S, _} = USR, Pkt, ExpiryTime) -> ok | {error, db_failure | subscribe_forbidden}. subscribe({_, S, _} = USR, TopicFilter, SubOpts, ID) -> Mod = gen_mod:ram_db_mod(S, ?MODULE), - Limit = gen_mod:get_module_opt(S, ?MODULE, max_topic_depth), + Limit = mod_mqtt_opt:max_topic_depth(S), case check_topic_depth(TopicFilter, Limit) of allow -> case check_subscribe_access(TopicFilter, USR) of @@ -157,15 +158,15 @@ unsubscribe({U, S, R}, Topic) -> [{publish(), seconds()}]. select_retained({_, S, _} = USR, TopicFilter, QoS, SubID) -> Mod = gen_mod:db_mod(S, ?MODULE), - Limit = gen_mod:get_module_opt(S, ?MODULE, match_retained_limit), + Limit = mod_mqtt_opt:match_retained_limit(S), select_retained(Mod, USR, TopicFilter, QoS, SubID, Limit). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== init([Host, Opts]) -> - Mod = gen_mod:db_mod(Host, Opts, ?MODULE), - RMod = gen_mod:ram_db_mod(Host, Opts, ?MODULE), + Mod = gen_mod:db_mod(Opts, ?MODULE), + RMod = gen_mod:ram_db_mod(Opts, ?MODULE), try ok = Mod:init(Host, Opts), ok = RMod:init(), @@ -194,6 +195,9 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Options %%%=================================================================== +-spec mod_options(binary()) -> [{access_publish, [{[binary()], acl:acl()}]} | + {access_subscribe, [{[binary()], acl:acl()}]} | + {atom(), any()}]. mod_options(Host) -> [{match_retained_limit, 1000}, {max_topic_depth, 8}, @@ -204,55 +208,45 @@ mod_options(Host) -> {access_publish, []}, {db_type, ejabberd_config:default_db(Host, ?MODULE)}, {ram_db_type, ejabberd_config:default_ram_db(Host, ?MODULE)}, - {queue_type, ejabberd_config:default_queue_type(Host)}, - {use_cache, ejabberd_config:use_cache(Host)}, - {cache_size, ejabberd_config:cache_size(Host)}, - {cache_missed, ejabberd_config:cache_missed(Host)}, - {cache_life_time, ejabberd_config:cache_life_time(Host)}]. + {queue_type, ejabberd_option:queue_type(Host)}, + {use_cache, ejabberd_option:use_cache(Host)}, + {cache_size, ejabberd_option:cache_size(Host)}, + {cache_missed, ejabberd_option:cache_missed(Host)}, + {cache_life_time, ejabberd_option:cache_life_time(Host)}]. mod_opt_type(max_queue) -> - fun(I) when is_integer(I), I > 0 -> I; - (infinity) -> unlimited; - (unlimited) -> unlimited - end; + econf:pos_int(unlimited); mod_opt_type(session_expiry) -> - fun(I) when is_integer(I), I>= 0 -> I end; + econf:non_neg_int(); mod_opt_type(match_retained_limit) -> - fun(I) when is_integer(I), I>0 -> I; - (unlimited) -> infinity; - (infinity) -> infinity - end; + econf:pos_int(infinity); mod_opt_type(max_topic_depth) -> - fun(I) when is_integer(I), I>0 -> I; - (unlimited) -> infinity; - (infinity) -> infinity - end; + econf:pos_int(infinity); mod_opt_type(max_topic_aliases) -> - fun(I) when is_integer(I), I>=0, I<65536 -> I end; + econf:int(0, 65535); mod_opt_type(access_subscribe) -> - fun validate_topic_access/1; + topic_access_validator(); mod_opt_type(access_publish) -> - fun validate_topic_access/1; + topic_access_validator(); +mod_opt_type(queue_type) -> + econf:well_known(queue_type, ?MODULE); mod_opt_type(db_type) -> - fun(T) -> ejabberd_config:v_db(?MODULE, T) end; + econf:well_known(db_type, ?MODULE); mod_opt_type(ram_db_type) -> - fun(T) -> ejabberd_config:v_db(?MODULE, T) end; -mod_opt_type(queue_type) -> - fun(ram) -> ram; (file) -> file end; -mod_opt_type(O) when O == cache_life_time; O == cache_size -> - fun(I) when is_integer(I), I > 0 -> I; - (infinity) -> infinity - end; -mod_opt_type(O) when O == use_cache; O == cache_missed -> - fun (B) when is_boolean(B) -> B end. + econf:well_known(ram_db_type, ?MODULE); +mod_opt_type(use_cache) -> + econf:well_known(use_cache, ?MODULE); +mod_opt_type(cache_size) -> + econf:well_known(cache_size, ?MODULE); +mod_opt_type(cache_missed) -> + econf:well_known(cache_missed, ?MODULE); +mod_opt_type(cache_life_time) -> + econf:well_known(cache_life_time, ?MODULE). listen_opt_type(tls_verify) -> - fun(B) when is_boolean(B) -> B end; + econf:bool(); listen_opt_type(max_payload_size) -> - fun(I) when is_integer(I), I>0 -> I; - (unlimited) -> infinity; - (infinity) -> infinity - end. + econf:pos_int(infinity). listen_options() -> [{max_fsm_queue, 5000}, @@ -436,30 +430,31 @@ split_path(Path) -> %%%=================================================================== %%% Validators %%%=================================================================== -validate_topic_access(FilterRules) -> - lists:map( - fun({TopicFilter, Access}) -> - Rule = acl:access_rules_validator(Access), - try - mqtt_codec:topic_filter(TopicFilter), - {split_path(TopicFilter), Rule} - catch _:_ -> - ?ERROR_MSG("Invalid topic filter: ~s", [TopicFilter]), - erlang:error(badarg) - end - end, lists:reverse(lists:keysort(1, FilterRules))). +-spec topic_access_validator() -> econf:validator(). +topic_access_validator() -> + econf:and_then( + econf:map( + fun(TF) -> + try split_path(mqtt_codec:topic_filter(TF)) + catch _:{mqtt_codec, _} = Reason -> + econf:fail(Reason) + end + end, + econf:acl(), + [{return, orddict}]), + fun lists:reverse/1). %%%=================================================================== %%% ACL checks %%%=================================================================== check_subscribe_access(Topic, {_, S, _} = USR) -> - Rules = gen_mod:get_module_opt(S, mod_mqtt, access_subscribe), + Rules = mod_mqtt_opt:access_subscribe(S), check_access(Topic, USR, Rules). check_publish_access(<<$$, _/binary>>, _) -> deny; check_publish_access(Topic, {_, S, _} = USR) -> - Rules = gen_mod:get_module_opt(S, mod_mqtt, access_publish), + Rules = mod_mqtt_opt:access_publish(S), check_access(Topic, USR, Rules). check_access(_, _, []) -> @@ -544,9 +539,9 @@ init_payload_cache(Mod, Host, Opts) -> -spec cache_opts(gen_mod:opts()) -> [proplists:property()]. cache_opts(Opts) -> - MaxSize = gen_mod:get_opt(cache_size, Opts), - CacheMissed = gen_mod:get_opt(cache_missed, Opts), - LifeTime = case gen_mod:get_opt(cache_life_time, Opts) of + MaxSize = mod_mqtt_opt:cache_size(Opts), + CacheMissed = mod_mqtt_opt:cache_missed(Opts), + LifeTime = case mod_mqtt_opt:cache_life_time(Opts) of infinity -> infinity; I -> timer:seconds(I) end, @@ -556,7 +551,7 @@ cache_opts(Opts) -> use_cache(Mod, Host) -> case erlang:function_exported(Mod, use_cache, 1) of true -> Mod:use_cache(Host); - false -> gen_mod:get_module_opt(Host, ?MODULE, use_cache) + false -> mod_mqtt_opt:use_cache(Host) end. -spec cache_nodes(module(), binary()) -> [node()]. |