aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_http_ws.erl
diff options
context:
space:
mode:
authorEvgeny Khramtsov <ekhramtsov@process-one.net>2018-09-17 11:21:02 +0300
committerEvgeny Khramtsov <ekhramtsov@process-one.net>2018-09-17 11:21:02 +0300
commitde385591d01deec5a498feef33cd4eb3f8a12b77 (patch)
treeec79961d0c75e53016224e4aa073825c14ee20dc /src/ejabberd_http_ws.erl
parentReintroduce change removed by mistake in 'Improve match macro' (diff)
Refactor ejabberd listener API
Diffstat (limited to 'src/ejabberd_http_ws.erl')
-rw-r--r--src/ejabberd_http_ws.erl94
1 files changed, 44 insertions, 50 deletions
diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl
index a9d98b882..d10dbd108 100644
--- a/src/ejabberd_http_ws.erl
+++ b/src/ejabberd_http_ws.erl
@@ -23,19 +23,17 @@
%%%
%%%----------------------------------------------------------------------
-module(ejabberd_http_ws).
-
--behaviour(ejabberd_config).
-
-author('ecestari@process-one.net').
-
+-behaviour(ejabberd_config).
+-behaviour(xmpp_socket).
-behaviour(p1_fsm).
-export([start/1, start_link/1, init/1, handle_event/3,
handle_sync_event/4, code_change/4, handle_info/3,
terminate/3, send_xml/2, setopts/2, sockname/1,
- peername/1, controlling_process/2, become_controller/2,
- monitor/1, reset_stream/1, close/1, change_shaper/2,
- socket_handoff/3, opt_type/1]).
+ peername/1, controlling_process/2, get_owner/1,
+ reset_stream/1, close/1, change_shaper/2,
+ socket_handoff/3, get_transport/1, opt_type/1]).
-include("logger.hrl").
@@ -54,8 +52,8 @@
timeout = ?WEBSOCKET_TIMEOUT :: non_neg_integer(),
timer = make_ref() :: reference(),
input = [] :: list(),
- waiting_input = false :: false | pid(),
- last_receiver = self() :: pid(),
+ active = false :: boolean(),
+ c2s_pid :: pid(),
ws :: {#ws{}, pid()},
rfc_compilant = undefined :: boolean() | undefined}).
@@ -104,15 +102,9 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}.
controlling_process(_Socket, _Pid) -> ok.
-become_controller(FsmRef, C2SPid) ->
- p1_fsm:send_all_state_event(FsmRef, {activate, C2SPid}).
-
close({http_ws, FsmRef, _IP}) ->
catch p1_fsm:sync_send_all_state_event(FsmRef, close).
-monitor({http_ws, FsmRef, _IP}) ->
- erlang:monitor(process, FsmRef).
-
reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
Socket.
@@ -120,6 +112,12 @@ change_shaper({http_ws, _FsmRef, _IP}, _Shaper) ->
%% TODO???
ok.
+get_transport(_Socket) ->
+ websocket.
+
+get_owner({http_ws, FsmRef, _IP}) ->
+ FsmRef.
+
socket_handoff(LocalPath, Request, Opts) ->
ejabberd_websocket:socket_handoff(LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0).
@@ -145,31 +143,34 @@ init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
Socket = {http_ws, self(), IP},
?DEBUG("Client connected through websocket ~p",
[Socket]),
- xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
- [{receiver, self()}|Opts]),
- Timer = erlang:start_timer(WSTimeout, self(), []),
- {ok, loop,
- #state{socket = Socket, timeout = WSTimeout,
- timer = Timer, ws = WS,
- ping_interval = PingInterval}}.
-
-handle_event({activate, From}, StateName, StateData) ->
- case StateData#state.input of
- [] ->
- {next_state, StateName,
- StateData#state{waiting_input = From}};
- Input ->
- Receiver = From,
- lists:foreach(fun(I) when is_binary(I)->
- Receiver ! {tcp, StateData#state.socket, I};
- (I2) ->
- Receiver ! {tcp, StateData#state.socket, [I2]}
- end, Input),
- {next_state, StateName,
- StateData#state{input = [], waiting_input = false,
- last_receiver = Receiver}}
+ case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of
+ {ok, C2SPid} ->
+ ejabberd_c2s:accept(C2SPid),
+ Timer = erlang:start_timer(WSTimeout, self(), []),
+ {ok, loop,
+ #state{socket = Socket, timeout = WSTimeout,
+ timer = Timer, ws = WS, c2s_pid = C2SPid,
+ ping_interval = PingInterval}};
+ {error, Reason} ->
+ {stop, Reason};
+ ignore ->
+ ignore
end.
+handle_event({activate, From}, StateName, State) ->
+ State1 = case State#state.input of
+ [] -> State#state{active = true};
+ Input ->
+ lists:foreach(
+ fun(I) when is_binary(I)->
+ From ! {tcp, State#state.socket, I};
+ (I2) ->
+ From ! {tcp, State#state.socket, [I2]}
+ end, Input),
+ State#state{active = false, input = []}
+ end,
+ {next_state, StateName, State1#state{c2s_pid = From}}.
+
handle_sync_event({send_xml, Packet}, _From, StateName,
#state{ws = {_, WsPid}, rfc_compilant = R} = StateData) ->
Packet2 = case {case R of undefined -> true; V -> V end, Packet} of
@@ -233,14 +234,13 @@ handle_info(closed, _StateName, StateData) ->
{stop, normal, StateData};
handle_info({received, Packet}, StateName, StateDataI) ->
{StateData, Parsed} = parse(StateDataI, Packet),
- SD = case StateData#state.waiting_input of
+ SD = case StateData#state.active of
false ->
Input = StateData#state.input ++ if is_binary(Parsed) -> [Parsed]; true -> Parsed end,
StateData#state{input = Input};
- Receiver ->
- Receiver ! {tcp, StateData#state.socket, Parsed},
- setup_timers(StateData#state{waiting_input = false,
- last_receiver = Receiver})
+ true ->
+ StateData#state.c2s_pid ! {tcp, StateData#state.socket, Parsed},
+ setup_timers(StateData#state{active = false})
end,
{next_state, StateName, SD};
handle_info(PingPong, StateName, StateData) when PingPong == ping orelse
@@ -273,13 +273,7 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
terminate(_Reason, _StateName, StateData) ->
- case StateData#state.waiting_input of
- false -> ok;
- Receiver ->
- ?DEBUG("C2S Pid : ~p", [Receiver]),
- Receiver ! {tcp_closed, StateData#state.socket}
- end,
- ok.
+ StateData#state.c2s_pid ! {tcp_closed, StateData#state.socket}.
setup_timers(StateData) ->
misc:cancel_timer(StateData#state.timer),