aboutsummaryrefslogtreecommitdiff
path: root/src/mod_mqtt_mnesia.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_mqtt_mnesia.erl')
-rw-r--r--src/mod_mqtt_mnesia.erl132
1 files changed, 132 insertions, 0 deletions
diff --git a/src/mod_mqtt_mnesia.erl b/src/mod_mqtt_mnesia.erl
new file mode 100644
index 000000000..3439c9304
--- /dev/null
+++ b/src/mod_mqtt_mnesia.erl
@@ -0,0 +1,132 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2002-2019 ProcessOne, SARL. All Rights Reserved.
+%%%
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%%
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%%
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%%%
+%%%-------------------------------------------------------------------
+-module(mod_mqtt_mnesia).
+-behaviour(mod_mqtt).
+
+%% API
+-export([init/2, publish/6, delete_published/2, lookup_published/2]).
+-export([list_topics/1, use_cache/1]).
+%% Unsupported backend API
+-export([init/0]).
+-export([subscribe/4, unsubscribe/2, find_subscriber/2]).
+-export([open_session/1, close_session/1, lookup_session/1]).
+
+-include("logger.hrl").
+
+-record(mqtt_pub, {topic_server :: {binary(), binary()},
+ user :: binary(),
+ resource :: binary(),
+ qos :: 0..2,
+ payload :: binary(),
+ expiry :: non_neg_integer(),
+ payload_format = binary :: binary | utf8,
+ response_topic = <<>> :: binary(),
+ correlation_data = <<>> :: binary(),
+ content_type = <<>> :: binary(),
+ user_properties = [] :: [{binary(), binary()}]}).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init(_Host, _Opts) ->
+ case ejabberd_mnesia:create(
+ ?MODULE, mqtt_pub,
+ [{disc_only_copies, [node()]},
+ {attributes, record_info(fields, mqtt_pub)}]) of
+ {atomic, _} ->
+ ok;
+ Err ->
+ {error, Err}
+ end.
+
+use_cache(Host) ->
+ case mnesia:table_info(mqtt_pub, storage_type) of
+ disc_only_copies ->
+ gen_mod:get_module_opt(Host, mod_mqtt, use_cache);
+ _ ->
+ false
+ end.
+
+publish({U, LServer, R}, Topic, Payload, QoS, Props, ExpiryTime) ->
+ PayloadFormat = maps:get(payload_format_indicator, Props, binary),
+ ResponseTopic = maps:get(response_topic, Props, <<"">>),
+ CorrelationData = maps:get(correlation_data, Props, <<"">>),
+ ContentType = maps:get(content_type, Props, <<"">>),
+ UserProps = maps:get(user_property, Props, []),
+ mnesia:dirty_write(#mqtt_pub{topic_server = {Topic, LServer},
+ user = U,
+ resource = R,
+ qos = QoS,
+ payload = Payload,
+ expiry = ExpiryTime,
+ payload_format = PayloadFormat,
+ response_topic = ResponseTopic,
+ correlation_data = CorrelationData,
+ content_type = ContentType,
+ user_properties = UserProps}).
+
+delete_published({_, S, _}, Topic) ->
+ mnesia:dirty_delete(mqtt_pub, {Topic, S}).
+
+lookup_published({_, S, _}, Topic) ->
+ case mnesia:dirty_read(mqtt_pub, {Topic, S}) of
+ [#mqtt_pub{qos = QoS,
+ payload = Payload,
+ expiry = ExpiryTime,
+ payload_format = PayloadFormat,
+ response_topic = ResponseTopic,
+ correlation_data = CorrelationData,
+ content_type = ContentType,
+ user_properties = UserProps}] ->
+ Props = #{payload_format => PayloadFormat,
+ response_topic => ResponseTopic,
+ correlation_data => CorrelationData,
+ content_type => ContentType,
+ user_property => UserProps},
+ {ok, {Payload, QoS, Props, ExpiryTime}};
+ [] ->
+ {error, notfound}
+ end.
+
+list_topics(S) ->
+ {ok, [Topic || {Topic, S1} <- mnesia:dirty_all_keys(mqtt_pub), S1 == S]}.
+
+init() ->
+ erlang:nif_error(unsupported_db).
+
+open_session(_) ->
+ erlang:nif_error(unsupported_db).
+
+close_session(_) ->
+ erlang:nif_error(unsupported_db).
+
+lookup_session(_) ->
+ erlang:nif_error(unsupported_db).
+
+subscribe(_, _, _, _) ->
+ erlang:nif_error(unsupported_db).
+
+unsubscribe(_, _) ->
+ erlang:nif_error(unsupported_db).
+
+find_subscriber(_, _) ->
+ erlang:nif_error(unsupported_db).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================