aboutsummaryrefslogtreecommitdiff
path: root/src/mod_mqtt_sql.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/mod_mqtt_sql.erl')
-rw-r--r--src/mod_mqtt_sql.erl151
1 files changed, 151 insertions, 0 deletions
diff --git a/src/mod_mqtt_sql.erl b/src/mod_mqtt_sql.erl
new file mode 100644
index 000000000..a11f8e04c
--- /dev/null
+++ b/src/mod_mqtt_sql.erl
@@ -0,0 +1,151 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2002-2019 ProcessOne, SARL. All Rights Reserved.
+%%%
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%%
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%%
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%%%
+%%%-------------------------------------------------------------------
+-module(mod_mqtt_sql).
+-behaviour(mod_mqtt).
+-compile([{parse_transform, ejabberd_sql_pt}]).
+
+%% API
+-export([init/2, publish/6, delete_published/2, lookup_published/2]).
+-export([list_topics/1]).
+%% Unsupported backend API
+-export([init/0]).
+-export([subscribe/4, unsubscribe/2, find_subscriber/2]).
+-export([open_session/1, close_session/1, lookup_session/1]).
+
+-include("logger.hrl").
+-include("ejabberd_sql_pt.hrl").
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+init() ->
+ ?ERROR_MSG("Backend 'sql' is only supported for db_type", []),
+ {error, db_failure}.
+
+init(_Host, _Opts) ->
+ ok.
+
+publish({U, LServer, R}, Topic, Payload, QoS, Props, ExpiryTime) ->
+ PayloadFormat = encode_pfi(maps:get(payload_format_indicator, Props, binary)),
+ ResponseTopic = maps:get(response_topic, Props, <<"">>),
+ CorrelationData = maps:get(correlation_data, Props, <<"">>),
+ ContentType = maps:get(content_type, Props, <<"">>),
+ UserProps = encode_props(maps:get(user_property, Props, [])),
+ case ?SQL_UPSERT(LServer, "mqtt_pub",
+ ["!topic=%(Topic)s",
+ "!server_host=%(LServer)s",
+ "username=%(U)s",
+ "resource=%(R)s",
+ "payload=%(Payload)s",
+ "qos=%(QoS)d",
+ "payload_format=%(PayloadFormat)d",
+ "response_topic=%(ResponseTopic)s",
+ "correlation_data=%(CorrelationData)s",
+ "content_type=%(ContentType)s",
+ "user_properties=%(UserProps)s",
+ "expiry=%(ExpiryTime)d"]) of
+ ok -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+delete_published({_, LServer, _}, Topic) ->
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("delete from mqtt_pub where "
+ "topic=%(Topic)s and %(LServer)H")) of
+ {updated, _} -> ok;
+ _Err -> {error, db_failure}
+ end.
+
+lookup_published({_, LServer, _}, Topic) ->
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(payload)s, @(qos)d, @(payload_format)d, "
+ "@(content_type)s, @(response_topic)s, "
+ "@(correlation_data)s, @(user_properties)s, @(expiry)d "
+ "from mqtt_pub where topic=%(Topic)s and %(LServer)H")) of
+ {selected, [{Payload, QoS, PayloadFormat, ContentType,
+ ResponseTopic, CorrelationData, EncProps, Expiry}]} ->
+ try decode_props(EncProps) of
+ UserProps ->
+ try decode_pfi(PayloadFormat) of
+ PFI ->
+ Props = #{payload_format_indicator => PFI,
+ content_type => ContentType,
+ response_topic => ResponseTopic,
+ correlation_data => CorrelationData,
+ user_property => UserProps},
+ {ok, {Payload, QoS, Props, Expiry}}
+ catch _:badarg ->
+ ?ERROR_MSG("Malformed value of 'payload_format' column "
+ "for topic '~s'", [Topic]),
+ {error, db_failure}
+ end
+ catch _:badarg ->
+ ?ERROR_MSG("Malformed value of 'user_properties' column "
+ "for topic '~s'", [Topic]),
+ {error, db_failure}
+ end;
+ {selected, []} ->
+ {error, notfound};
+ _ ->
+ {error, db_failure}
+ end.
+
+list_topics(LServer) ->
+ case ejabberd_sql:sql_query(
+ LServer,
+ ?SQL("select @(topic)s from mqtt_pub where %(LServer)H")) of
+ {selected, Res} ->
+ {ok, [Topic || {Topic} <- Res]};
+ _ ->
+ {error, db_failure}
+ end.
+
+open_session(_) ->
+ erlang:nif_error(unsupported_db).
+
+close_session(_) ->
+ erlang:nif_error(unsupported_db).
+
+lookup_session(_) ->
+ erlang:nif_error(unsupported_db).
+
+subscribe(_, _, _, _) ->
+ erlang:nif_error(unsupported_db).
+
+unsubscribe(_, _) ->
+ erlang:nif_error(unsupported_db).
+
+find_subscriber(_, _) ->
+ erlang:nif_error(unsupported_db).
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+encode_pfi(binary) -> 0;
+encode_pfi(utf8) -> 1.
+
+decode_pfi(0) -> binary;
+decode_pfi(1) -> utf8.
+
+encode_props([]) -> <<"">>;
+encode_props(L) -> term_to_binary(L).
+
+decode_props(<<"">>) -> [];
+decode_props(Bin) -> binary_to_term(Bin).