aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_http_ws.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ejabberd_http_ws.erl')
-rw-r--r--src/ejabberd_http_ws.erl189
1 files changed, 94 insertions, 95 deletions
diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl
index 24554a8cc..768a284ce 100644
--- a/src/ejabberd_http_ws.erl
+++ b/src/ejabberd_http_ws.erl
@@ -5,7 +5,7 @@
%%% Created : 09-10-2010 by Eric Cestari <ecestari@process-one.net>
%%%
%%%
-%%% ejabberd, Copyright (C) 2002-2016 ProcessOne
+%%% ejabberd, Copyright (C) 2002-2019 ProcessOne
%%%
%%% This program is free software; you can redistribute it and/or
%%% modify it under the terms of the GNU General Public License as
@@ -23,39 +23,33 @@
%%%
%%%----------------------------------------------------------------------
-module(ejabberd_http_ws).
-
--behaviour(ejabberd_config).
-
-author('ecestari@process-one.net').
-
--behaviour(gen_fsm).
+-behaviour(xmpp_socket).
+-behaviour(p1_fsm).
-export([start/1, start_link/1, init/1, handle_event/3,
handle_sync_event/4, code_change/4, handle_info/3,
terminate/3, send_xml/2, setopts/2, sockname/1,
- peername/1, controlling_process/2, become_controller/2,
- close/1, socket_handoff/6, opt_type/1]).
+ peername/1, controlling_process/2, get_owner/1,
+ reset_stream/1, close/1, change_shaper/2,
+ socket_handoff/3, get_transport/1]).
--include("ejabberd.hrl").
-include("logger.hrl").
--include("jlib.hrl").
+-include("xmpp.hrl").
-include("ejabberd_http.hrl").
--define(PING_INTERVAL, 60).
--define(WEBSOCKET_TIMEOUT, 300).
-
-record(state,
{socket :: ws_socket(),
- ping_interval = ?PING_INTERVAL :: non_neg_integer(),
+ ping_interval :: non_neg_integer(),
ping_timer = make_ref() :: reference(),
- pong_expected :: boolean(),
- timeout = ?WEBSOCKET_TIMEOUT :: non_neg_integer(),
+ pong_expected = false :: boolean(),
+ timeout :: non_neg_integer(),
timer = make_ref() :: reference(),
input = [] :: list(),
- waiting_input = false :: false | pid(),
- last_receiver :: pid(),
+ active = false :: boolean(),
+ c2s_pid :: pid(),
ws :: {#ws{}, pid()},
rfc_compilant = undefined :: boolean() | undefined}).
@@ -75,19 +69,25 @@
-export_type([ws_socket/0]).
start(WS) ->
- gen_fsm:start(?MODULE, [WS], ?FSMOPTS).
+ p1_fsm:start(?MODULE, [WS], ?FSMOPTS).
start_link(WS) ->
- gen_fsm:start_link(?MODULE, [WS], ?FSMOPTS).
+ p1_fsm:start_link(?MODULE, [WS], ?FSMOPTS).
send_xml({http_ws, FsmRef, _IP}, Packet) ->
- gen_fsm:sync_send_all_state_event(FsmRef,
- {send_xml, Packet}).
+ case catch p1_fsm:sync_send_all_state_event(FsmRef,
+ {send_xml, Packet},
+ 15000)
+ of
+ {'EXIT', {timeout, _}} -> {error, timeout};
+ {'EXIT', _} -> {error, einval};
+ Res -> Res
+ end.
setopts({http_ws, FsmRef, _IP}, Opts) ->
case lists:member({active, once}, Opts) of
true ->
- gen_fsm:send_all_state_event(FsmRef,
+ p1_fsm:send_all_state_event(FsmRef,
{activate, self()});
_ -> ok
end.
@@ -98,64 +98,73 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}.
controlling_process(_Socket, _Pid) -> ok.
-become_controller(FsmRef, C2SPid) ->
- gen_fsm:send_all_state_event(FsmRef,
- {become_controller, C2SPid}).
-
close({http_ws, FsmRef, _IP}) ->
- catch gen_fsm:sync_send_all_state_event(FsmRef, close).
+ catch p1_fsm:sync_send_all_state_event(FsmRef, close).
+
+reset_stream({http_ws, _FsmRef, _IP} = Socket) ->
+ Socket.
+
+change_shaper({http_ws, FsmRef, _IP}, Shaper) ->
+ p1_fsm:send_all_state_event(FsmRef, {new_shaper, Shaper}).
+
+get_transport(_Socket) ->
+ websocket.
-socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) ->
- ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod,
- Buf, Opts, ?MODULE, fun get_human_html_xmlel/0).
+get_owner({http_ws, FsmRef, _IP}) ->
+ FsmRef.
+
+socket_handoff(LocalPath, Request, Opts) ->
+ ejabberd_websocket:socket_handoff(LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0).
%%% Internal
init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) ->
SOpts = lists:filtermap(fun({stream_management, _}) -> true;
({max_ack_queue, _}) -> true;
+ ({ack_timeout, _}) -> true;
({resume_timeout, _}) -> true;
({max_resume_timeout, _}) -> true;
({resend_on_timeout, _}) -> true;
+ ({access, _}) -> true;
(_) -> false
end, HOpts),
- Opts = [{xml_socket, true} | ejabberd_c2s_config:get_c2s_limits() ++ SOpts],
- PingInterval = ejabberd_config:get_option(
- {websocket_ping_interval, ?MYNAME},
- fun(I) when is_integer(I), I>=0 -> I end,
- ?PING_INTERVAL) * 1000,
- WSTimeout = ejabberd_config:get_option(
- {websocket_timeout, ?MYNAME},
- fun(I) when is_integer(I), I>0 -> I end,
- ?WEBSOCKET_TIMEOUT) * 1000,
+ Opts = ejabberd_c2s_config:get_c2s_limits() ++ SOpts,
+ PingInterval = ejabberd_option:websocket_ping_interval(),
+ WSTimeout = ejabberd_option:websocket_timeout(),
Socket = {http_ws, self(), IP},
?DEBUG("Client connected through websocket ~p",
[Socket]),
- ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket,
- Opts),
- Timer = erlang:start_timer(WSTimeout, self(), []),
- {ok, loop,
- #state{socket = Socket, timeout = WSTimeout,
- timer = Timer, ws = WS,
- ping_interval = PingInterval}}.
-
-handle_event({activate, From}, StateName, StateData) ->
- case StateData#state.input of
- [] ->
- {next_state, StateName,
- StateData#state{waiting_input = From}};
- Input ->
- Receiver = From,
- lists:foreach(fun(I) when is_binary(I)->
- Receiver ! {tcp, StateData#state.socket, I};
- (I2) ->
- Receiver ! {tcp, StateData#state.socket, [I2]}
- end, Input),
- {next_state, StateName,
- StateData#state{input = [], waiting_input = false,
- last_receiver = Receiver}}
+ case ejabberd_c2s:start(?MODULE, Socket, [{receiver, self()}|Opts]) of
+ {ok, C2SPid} ->
+ ejabberd_c2s:accept(C2SPid),
+ Timer = erlang:start_timer(WSTimeout, self(), []),
+ {ok, loop,
+ #state{socket = Socket, timeout = WSTimeout,
+ timer = Timer, ws = WS, c2s_pid = C2SPid,
+ ping_interval = PingInterval}};
+ {error, Reason} ->
+ {stop, Reason};
+ ignore ->
+ ignore
end.
+handle_event({activate, From}, StateName, State) ->
+ State1 = case State#state.input of
+ [] -> State#state{active = true};
+ Input ->
+ lists:foreach(
+ fun(I) when is_binary(I)->
+ From ! {tcp, State#state.socket, I};
+ (I2) ->
+ From ! {tcp, State#state.socket, [I2]}
+ end, Input),
+ State#state{active = false, input = []}
+ end,
+ {next_state, StateName, State1#state{c2s_pid = From}};
+handle_event({new_shaper, Shaper}, StateName, #state{ws = {_, WsPid}} = StateData) ->
+ WsPid ! {new_shaper, Shaper},
+ {next_state, StateName, StateData}.
+
handle_sync_event({send_xml, Packet}, _From, StateName,
#state{ws = {_, WsPid}, rfc_compilant = R} = StateData) ->
Packet2 = case {case R of undefined -> true; V -> V end, Packet} of
@@ -187,15 +196,15 @@ handle_sync_event({send_xml, Packet}, _From, StateName,
case Packet2 of
{xmlstreamstart, Name, Attrs3} ->
B = fxml:element_to_binary(#xmlel{name = Name, attrs = Attrs3}),
- WsPid ! {send, <<(binary:part(B, 0, byte_size(B)-2))/binary, ">">>};
+ route_text(WsPid, <<(binary:part(B, 0, byte_size(B)-2))/binary, ">">>);
{xmlstreamend, Name} ->
- WsPid ! {send, <<"</", Name/binary, ">">>};
+ route_text(WsPid, <<"</", Name/binary, ">">>);
{xmlstreamelement, El} ->
- WsPid ! {send, fxml:element_to_binary(El)};
+ route_text(WsPid, fxml:element_to_binary(El));
{xmlstreamraw, Bin} ->
- WsPid ! {send, Bin};
+ route_text(WsPid, Bin);
{xmlstreamcdata, Bin2} ->
- WsPid ! {send, Bin2};
+ route_text(WsPid, Bin2);
skip ->
ok
end,
@@ -210,7 +219,7 @@ handle_sync_event(close, _From, StateName, #state{ws = {_, WsPid}, rfc_compilant
when StateName /= stream_end_sent ->
Close = #xmlel{name = <<"close">>,
attrs = [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-framing">>}]},
- WsPid ! {send, fxml:element_to_binary(Close)},
+ route_text(WsPid, fxml:element_to_binary(Close)),
{stop, normal, StateData};
handle_sync_event(close, _From, _StateName, StateData) ->
{stop, normal, StateData}.
@@ -219,14 +228,13 @@ handle_info(closed, _StateName, StateData) ->
{stop, normal, StateData};
handle_info({received, Packet}, StateName, StateDataI) ->
{StateData, Parsed} = parse(StateDataI, Packet),
- SD = case StateData#state.waiting_input of
+ SD = case StateData#state.active of
false ->
Input = StateData#state.input ++ if is_binary(Parsed) -> [Parsed]; true -> Parsed end,
StateData#state{input = Input};
- Receiver ->
- Receiver ! {tcp, StateData#state.socket, Parsed},
- setup_timers(StateData#state{waiting_input = false,
- last_receiver = Receiver})
+ true ->
+ StateData#state.c2s_pid ! {tcp, StateData#state.socket, Parsed},
+ setup_timers(StateData#state{active = false})
end,
{next_state, StateName, SD};
handle_info(PingPong, StateName, StateData) when PingPong == ping orelse
@@ -236,18 +244,20 @@ handle_info(PingPong, StateName, StateData) when PingPong == ping orelse
StateData2#state{pong_expected = false}};
handle_info({timeout, Timer, _}, _StateName,
#state{timer = Timer} = StateData) ->
+ ?DEBUG("Closing websocket connection from hitting inactivity timeout", []),
{stop, normal, StateData};
handle_info({timeout, Timer, _}, StateName,
#state{ping_timer = Timer, ws = {_, WsPid}} = StateData) ->
case StateData#state.pong_expected of
false ->
- cancel_timer(StateData#state.ping_timer),
+ misc:cancel_timer(StateData#state.ping_timer),
PingTimer = erlang:start_timer(StateData#state.ping_interval,
self(), []),
WsPid ! {ping, <<>>},
{next_state, StateName,
StateData#state{ping_timer = PingTimer, pong_expected = true}};
true ->
+ ?DEBUG("Closing websocket connection from missing pongs", []),
{stop, normal, StateData}
end;
handle_info(_, StateName, StateData) ->
@@ -257,19 +267,13 @@ code_change(_OldVsn, StateName, StateData, _Extra) ->
{ok, StateName, StateData}.
terminate(_Reason, _StateName, StateData) ->
- case StateData#state.waiting_input of
- false -> ok;
- Receiver ->
- ?DEBUG("C2S Pid : ~p", [Receiver]),
- Receiver ! {tcp_closed, StateData#state.socket}
- end,
- ok.
+ StateData#state.c2s_pid ! {tcp_closed, StateData#state.socket}.
setup_timers(StateData) ->
- cancel_timer(StateData#state.timer),
+ misc:cancel_timer(StateData#state.timer),
Timer = erlang:start_timer(StateData#state.timeout,
self(), []),
- cancel_timer(StateData#state.ping_timer),
+ misc:cancel_timer(StateData#state.ping_timer),
PingTimer = case StateData#state.ping_interval of
0 -> StateData#state.ping_timer;
V -> erlang:start_timer(V, self(), [])
@@ -277,12 +281,8 @@ setup_timers(StateData) ->
StateData#state{timer = Timer, ping_timer = PingTimer,
pong_expected = false}.
-cancel_timer(Timer) ->
- erlang:cancel_timer(Timer),
- receive {timeout, Timer, _} -> ok after 0 -> ok end.
-
get_human_html_xmlel() ->
- Heading = <<"ejabberd ", (jlib:atom_to_binary(?MODULE))/binary>>,
+ Heading = <<"ejabberd ", (misc:atom_to_binary(?MODULE))/binary>>,
#xmlel{name = <<"html">>,
attrs =
[{<<"xmlns">>, <<"http://www.w3.org/1999/xhtml">>}],
@@ -353,6 +353,7 @@ parsed_items(List) ->
when element(1, El) == xmlel;
element(1, El) == xmlstreamstart;
element(1, El) == xmlstreamelement;
+ element(1, El) == xmlstreamcdata;
element(1, El) == xmlstreamend ->
parsed_items([El | List]);
{'$gen_event', {xmlstreamerror, _}} ->
@@ -361,9 +362,7 @@ parsed_items(List) ->
lists:reverse(List)
end.
-opt_type(websocket_ping_interval) ->
- fun (I) when is_integer(I), I >= 0 -> I end;
-opt_type(websocket_timeout) ->
- fun (I) when is_integer(I), I > 0 -> I end;
-opt_type(_) ->
- [websocket_ping_interval, websocket_timeout].
+-spec route_text(pid(), binary()) -> ok.
+route_text(Pid, Data) ->
+ Pid ! {text, Data},
+ ok.