aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEvgeny Khramtsov <ekhramtsov@process-one.net>2019-04-23 19:18:22 +0300
committerEvgeny Khramtsov <ekhramtsov@process-one.net>2019-04-23 19:18:22 +0300
commitedba1aebb51a7bb4f8a5f4f069fba3ed9540d729 (patch)
treeb6467b7582c38afd1edef8411ac99b478609e360 /src
parentFix handling of list arguments on pgsql (diff)
Add WebSockets support to mod_mqtt
Example configuration: listen: ... - port: 5280 module: ejabberd_http request_handlers: "/mqtt": mod_mqtt modules: ... mod_mqtt: {}
Diffstat (limited to 'src')
-rw-r--r--src/mod_mqtt.erl5
-rw-r--r--src/mod_mqtt_session.erl4
-rw-r--r--src/mod_mqtt_ws.erl171
3 files changed, 178 insertions, 2 deletions
diff --git a/src/mod_mqtt.erl b/src/mod_mqtt.erl
index 9fd1e57ac..566804f36 100644
--- a/src/mod_mqtt.erl
+++ b/src/mod_mqtt.erl
@@ -27,6 +27,8 @@
terminate/2, code_change/3]).
%% ejabberd_listener API
-export([start/3, start_link/3, listen_opt_type/1, listen_options/0, accept/1]).
+%% ejabberd_http API
+-export([socket_handoff/3]).
%% Legacy ejabberd_listener API
-export([become_controller/2, socket_type/0]).
%% API
@@ -98,6 +100,9 @@ become_controller(Pid, _) ->
accept(Pid) ->
mod_mqtt_session:accept(Pid).
+socket_handoff(LocalPath, Request, Opts) ->
+ mod_mqtt_ws:socket_handoff(LocalPath, Request, Opts).
+
open_session({U, S, R}) ->
Mod = gen_mod:ram_db_mod(S, ?MODULE),
Mod:open_session({U, S, R}).
diff --git a/src/mod_mqtt_session.erl b/src/mod_mqtt_session.erl
index d988b513e..bbcf9258a 100644
--- a/src/mod_mqtt_session.erl
+++ b/src/mod_mqtt_session.erl
@@ -64,8 +64,8 @@
session_expiry_non_zero | unknown_topic_alias.
-type state() :: #state{}.
--type sockmod() :: gen_tcp | fast_tls.
--type socket() :: {sockmod(), inet:socket() | fast_tls:tls_socket()}.
+-type sockmod() :: gen_tcp | fast_tls | mod_mqtt_ws.
+-type socket() :: {sockmod(), inet:socket() | fast_tls:tls_socket() | mod_mqtt_ws:socket()}.
-type peername() :: {inet:ip_address(), inet:port_number()}.
-type seconds() :: non_neg_integer().
-type milli_seconds() :: non_neg_integer().
diff --git a/src/mod_mqtt_ws.erl b/src/mod_mqtt_ws.erl
new file mode 100644
index 000000000..872553445
--- /dev/null
+++ b/src/mod_mqtt_ws.erl
@@ -0,0 +1,171 @@
+%%%-------------------------------------------------------------------
+%%% @author Evgeny Khramtsov <ekhramtsov@process-one.net>
+%%% @copyright (C) 2002-2019 ProcessOne, SARL. All Rights Reserved.
+%%%
+%%% Licensed under the Apache License, Version 2.0 (the "License");
+%%% you may not use this file except in compliance with the License.
+%%% You may obtain a copy of the License at
+%%%
+%%% http://www.apache.org/licenses/LICENSE-2.0
+%%%
+%%% Unless required by applicable law or agreed to in writing, software
+%%% distributed under the License is distributed on an "AS IS" BASIS,
+%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%%% See the License for the specific language governing permissions and
+%%% limitations under the License.
+%%%
+%%%-------------------------------------------------------------------
+-module(mod_mqtt_ws).
+-ifndef(GEN_SERVER).
+-define(GEN_SERVER, gen_server).
+-endif.
+-behaviour(?GEN_SERVER).
+
+%% API
+-export([socket_handoff/3]).
+-export([start/1, start_link/1]).
+-export([peername/1, setopts/2, send/2, close/1]).
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3, format_status/2]).
+
+-include("xmpp.hrl").
+-include("ejabberd_http.hrl").
+-include("logger.hrl").
+
+-define(SEND_TIMEOUT, timer:seconds(15)).
+
+-record(state, {socket :: socket(),
+ ws_pid :: pid(),
+ mqtt_session :: undefined | pid()}).
+
+-type peername() :: {inet:ip_address(), inet:port_number()}.
+-type socket() :: {http_ws, pid(), peername()}.
+-export_type([socket/0]).
+
+%%%===================================================================
+%%% API
+%%%===================================================================
+socket_handoff(LocalPath, Request, Opts) ->
+ ejabberd_websocket:socket_handoff(
+ LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0).
+
+start({#ws{http_opts = Opts}, _} = WS) ->
+ ?GEN_SERVER:start(?MODULE, [WS], ejabberd_config:fsm_limit_opts(Opts)).
+
+start_link({#ws{http_opts = Opts}, _} = WS) ->
+ ?GEN_SERVER:start_link(?MODULE, [WS], ejabberd_config:fsm_limit_opts(Opts)).
+
+-spec peername(socket()) -> {ok, peername()}.
+peername({http_ws, _, IP}) ->
+ {ok, IP}.
+
+-spec setopts(socket(), list()) -> ok.
+setopts(_WSock, _Opts) ->
+ ok.
+
+-spec send(socket(), iodata()) -> ok | {error, timeout | einval}.
+send({http_ws, Pid, _}, Data) ->
+ try ?GEN_SERVER:call(Pid, {send, Data}, ?SEND_TIMEOUT)
+ catch exit:{timeout, {?GEN_SERVER, _, _}} ->
+ {error, timeout};
+ exit:{_, {?GEN_SERVER, _, _}} ->
+ {error, einval}
+ end.
+
+-spec close(socket()) -> ok.
+close({http_ws, Pid, _}) ->
+ ?GEN_SERVER:cast(Pid, close).
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+init([{#ws{ip = IP, http_opts = ListenOpts}, WsPid}]) ->
+ Socket = {http_ws, self(), IP},
+ case mod_mqtt_session:start(?MODULE, Socket, ListenOpts) of
+ {ok, Pid} ->
+ erlang:monitor(process, Pid),
+ erlang:monitor(process, WsPid),
+ mod_mqtt_session:accept(Pid),
+ State = #state{socket = Socket,
+ ws_pid = WsPid,
+ mqtt_session = Pid},
+ {ok, State};
+ {error, Reason} ->
+ {stop, Reason};
+ ignore ->
+ ignore
+ end.
+
+handle_call({send, Data}, _From, #state{ws_pid = WsPid} = State) ->
+ WsPid ! {send, Data},
+ {reply, ok, State};
+handle_call(Request, From, State) ->
+ ?WARNING_MSG("Got unexpected call from ~p: ~p", [From, Request]),
+ {noreply, State}.
+
+handle_cast(close, State) ->
+ {stop, normal, State#state{mqtt_session = undefined}};
+handle_cast(Request, State) ->
+ ?WARNING_MSG("Got unexpected cast: ~p", [Request]),
+ {noreply, State}.
+
+handle_info(closed, State) ->
+ {stop, normal, State};
+handle_info({received, Data}, State) ->
+ State#state.mqtt_session ! {tcp, State#state.socket, Data},
+ {noreply, State};
+handle_info({'DOWN', _, process, Pid, _}, State)
+ when Pid == State#state.mqtt_session orelse Pid == State#state.ws_pid ->
+ {stop, normal, State};
+handle_info(Info, State) ->
+ ?WARNING_MSG("Got unexpected info: ~p", [Info]),
+ {noreply, State}.
+
+terminate(_Reason, State) ->
+ if State#state.mqtt_session /= undefined ->
+ State#state.mqtt_session ! {tcp_closed, State#state.socket};
+ true ->
+ ok
+ end.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+format_status(_Opt, Status) ->
+ Status.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+-spec get_human_html_xmlel() -> xmlel().
+get_human_html_xmlel() ->
+ Heading = <<"ejabberd mod_mqtt">>,
+ #xmlel{name = <<"html">>,
+ attrs =
+ [{<<"xmlns">>, <<"http://www.w3.org/1999/xhtml">>}],
+ children =
+ [#xmlel{name = <<"head">>, attrs = [],
+ children =
+ [#xmlel{name = <<"title">>, attrs = [],
+ children = [{xmlcdata, Heading}]}]},
+ #xmlel{name = <<"body">>, attrs = [],
+ children =
+ [#xmlel{name = <<"h1">>, attrs = [],
+ children = [{xmlcdata, Heading}]},
+ #xmlel{name = <<"p">>, attrs = [],
+ children =
+ [{xmlcdata, <<"An implementation of ">>},
+ #xmlel{name = <<"a">>,
+ attrs =
+ [{<<"href">>,
+ <<"http://tools.ietf.org/html/rfc6455">>}],
+ children =
+ [{xmlcdata,
+ <<"WebSocket protocol">>}]}]},
+ #xmlel{name = <<"p">>, attrs = [],
+ children =
+ [{xmlcdata,
+ <<"This web page is only informative. To "
+ "use WebSocket connection you need an MQTT "
+ "client that supports it.">>}]}]}]}.