diff options
Diffstat (limited to 'src/ejabberd_http_ws.erl')
-rw-r--r-- | src/ejabberd_http_ws.erl | 189 |
1 files changed, 94 insertions, 95 deletions
diff --git a/src/ejabberd_http_ws.erl b/src/ejabberd_http_ws.erl index 24554a8cc..768a284ce 100644 --- a/src/ejabberd_http_ws.erl +++ b/src/ejabberd_http_ws.erl @@ -5,7 +5,7 @@ %%% Created : 09-10-2010 by Eric Cestari <ecestari@process-one.net> %%% %%% -%%% ejabberd, Copyright (C) 2002-2016 ProcessOne +%%% ejabberd, Copyright (C) 2002-2019 ProcessOne %%% %%% This program is free software; you can redistribute it and/or %%% modify it under the terms of the GNU General Public License as @@ -23,39 +23,33 @@ %%% %%%---------------------------------------------------------------------- -module(ejabberd_http_ws). - --behaviour(ejabberd_config). - -author('ecestari@process-one.net'). - --behaviour(gen_fsm). +-behaviour(xmpp_socket). +-behaviour(p1_fsm). -export([start/1, start_link/1, init/1, handle_event/3, handle_sync_event/4, code_change/4, handle_info/3, terminate/3, send_xml/2, setopts/2, sockname/1, - peername/1, controlling_process/2, become_controller/2, - close/1, socket_handoff/6, opt_type/1]). + peername/1, controlling_process/2, get_owner/1, + reset_stream/1, close/1, change_shaper/2, + socket_handoff/3, get_transport/1]). --include("ejabberd.hrl"). -include("logger.hrl"). --include("jlib.hrl"). +-include("xmpp.hrl"). -include("ejabberd_http.hrl"). --define(PING_INTERVAL, 60). --define(WEBSOCKET_TIMEOUT, 300). - -record(state, {socket :: ws_socket(), - ping_interval = ?PING_INTERVAL :: non_neg_integer(), + ping_interval :: non_neg_integer(), ping_timer = make_ref() :: reference(), - pong_expected :: boolean(), - timeout = ?WEBSOCKET_TIMEOUT :: non_neg_integer(), + pong_expected = false :: boolean(), + timeout :: non_neg_integer(), timer = make_ref() :: reference(), input = [] :: list(), - waiting_input = false :: false | pid(), - last_receiver :: pid(), + active = false :: boolean(), + c2s_pid :: pid(), ws :: {#ws{}, pid()}, rfc_compilant = undefined :: boolean() | undefined}). @@ -75,19 +69,25 @@ -export_type([ws_socket/0]). start(WS) -> - gen_fsm:start(?MODULE, [WS], ?FSMOPTS). + p1_fsm:start(?MODULE, [WS], ?FSMOPTS). start_link(WS) -> - gen_fsm:start_link(?MODULE, [WS], ?FSMOPTS). + p1_fsm:start_link(?MODULE, [WS], ?FSMOPTS). send_xml({http_ws, FsmRef, _IP}, Packet) -> - gen_fsm:sync_send_all_state_event(FsmRef, - {send_xml, Packet}). + case catch p1_fsm:sync_send_all_state_event(FsmRef, + {send_xml, Packet}, + 15000) + of + {'EXIT', {timeout, _}} -> {error, timeout}; + {'EXIT', _} -> {error, einval}; + Res -> Res + end. setopts({http_ws, FsmRef, _IP}, Opts) -> case lists:member({active, once}, Opts) of true -> - gen_fsm:send_all_state_event(FsmRef, + p1_fsm:send_all_state_event(FsmRef, {activate, self()}); _ -> ok end. @@ -98,64 +98,73 @@ peername({http_ws, _FsmRef, IP}) -> {ok, IP}. controlling_process(_Socket, _Pid) -> ok. -become_controller(FsmRef, C2SPid) -> - gen_fsm:send_all_state_event(FsmRef, - {become_controller, C2SPid}). - close({http_ws, FsmRef, _IP}) -> - catch gen_fsm:sync_send_all_state_event(FsmRef, close). + catch p1_fsm:sync_send_all_state_event(FsmRef, close). + +reset_stream({http_ws, _FsmRef, _IP} = Socket) -> + Socket. + +change_shaper({http_ws, FsmRef, _IP}, Shaper) -> + p1_fsm:send_all_state_event(FsmRef, {new_shaper, Shaper}). + +get_transport(_Socket) -> + websocket. -socket_handoff(LocalPath, Request, Socket, SockMod, Buf, Opts) -> - ejabberd_websocket:socket_handoff(LocalPath, Request, Socket, SockMod, - Buf, Opts, ?MODULE, fun get_human_html_xmlel/0). +get_owner({http_ws, FsmRef, _IP}) -> + FsmRef. + +socket_handoff(LocalPath, Request, Opts) -> + ejabberd_websocket:socket_handoff(LocalPath, Request, Opts, ?MODULE, fun get_human_html_xmlel/0). %%% Internal init([{#ws{ip = IP, http_opts = HOpts}, _} = WS]) -> SOpts = lists:filtermap(fun({stream_management, _}) -> true; ({max_ack_queue, _}) -> true; + ({ack_timeout, _}) -> true; ({resume_timeout, _}) -> true; ({max_resume_timeout, _}) -> true; ({resend_on_timeout, _}) -> true; + ({access, _}) -> true; (_) -> false end, HOpts), - Opts = [{xml_socket, true} | ejabberd_c2s_config:get_c2s_limits() ++ SOpts], - PingInterval = ejabberd_config:get_option( - {websocket_ping_interval, ?MYNAME}, - fun(I) when is_integer(I), I>=0 -> I end, - ?PING_INTERVAL) * 1000, - WSTimeout = ejabberd_config:get_option( - {websocket_timeout, ?MYNAME}, - fun(I) when is_integer(I), I>0 -> I end, - ?WEBSOCKET_TIMEOUT) * 1000, + Opts = ejabberd_c2s_config:get_c2s_limits() ++ SOpts, + PingInterval = ejabberd_option:websocket_ping_interval(), + WSTimeout = ejabberd_option:websocket_timeout(), Socket = {http_ws, self(), IP}, ?DEBUG("Client connected through websocket ~p", [Socket]), - ejabberd_socket:start(ejabberd_c2s, ?MODULE, Socket, - Opts), - Timer = erlang:start_timer(WSTimeout, self(), []), - {ok, loop, - #state{socket = Socket, timeout = WSTimeout, - timer = Timer, ws = WS, - ping_interval = PingInterval}}. - -handle_event({activate, From}, StateName, StateData) -> - case StateData#state.input of - [] -> - {next_state, StateName, - StateData#state{waiting_input = From}}; - Input -> - Receiver = From, - lists:foreach(fun(I) when is_binary(I)-> - Receiver ! {tcp, StateData#state.socket, I}; - (I2) -> - Receiver ! {tcp, StateData#state.socket, [I2]} - end, Input), - {next_state, StateName, - StateData#state{input = [], waiting_input = false, - last_receiver = Receiver}} + case ejabberd_c2s:start(?MODULE, Socket, [{receiver, self()}|Opts]) of + {ok, C2SPid} -> + ejabberd_c2s:accept(C2SPid), + Timer = erlang:start_timer(WSTimeout, self(), []), + {ok, loop, + #state{socket = Socket, timeout = WSTimeout, + timer = Timer, ws = WS, c2s_pid = C2SPid, + ping_interval = PingInterval}}; + {error, Reason} -> + {stop, Reason}; + ignore -> + ignore end. +handle_event({activate, From}, StateName, State) -> + State1 = case State#state.input of + [] -> State#state{active = true}; + Input -> + lists:foreach( + fun(I) when is_binary(I)-> + From ! {tcp, State#state.socket, I}; + (I2) -> + From ! {tcp, State#state.socket, [I2]} + end, Input), + State#state{active = false, input = []} + end, + {next_state, StateName, State1#state{c2s_pid = From}}; +handle_event({new_shaper, Shaper}, StateName, #state{ws = {_, WsPid}} = StateData) -> + WsPid ! {new_shaper, Shaper}, + {next_state, StateName, StateData}. + handle_sync_event({send_xml, Packet}, _From, StateName, #state{ws = {_, WsPid}, rfc_compilant = R} = StateData) -> Packet2 = case {case R of undefined -> true; V -> V end, Packet} of @@ -187,15 +196,15 @@ handle_sync_event({send_xml, Packet}, _From, StateName, case Packet2 of {xmlstreamstart, Name, Attrs3} -> B = fxml:element_to_binary(#xmlel{name = Name, attrs = Attrs3}), - WsPid ! {send, <<(binary:part(B, 0, byte_size(B)-2))/binary, ">">>}; + route_text(WsPid, <<(binary:part(B, 0, byte_size(B)-2))/binary, ">">>); {xmlstreamend, Name} -> - WsPid ! {send, <<"</", Name/binary, ">">>}; + route_text(WsPid, <<"</", Name/binary, ">">>); {xmlstreamelement, El} -> - WsPid ! {send, fxml:element_to_binary(El)}; + route_text(WsPid, fxml:element_to_binary(El)); {xmlstreamraw, Bin} -> - WsPid ! {send, Bin}; + route_text(WsPid, Bin); {xmlstreamcdata, Bin2} -> - WsPid ! {send, Bin2}; + route_text(WsPid, Bin2); skip -> ok end, @@ -210,7 +219,7 @@ handle_sync_event(close, _From, StateName, #state{ws = {_, WsPid}, rfc_compilant when StateName /= stream_end_sent -> Close = #xmlel{name = <<"close">>, attrs = [{<<"xmlns">>, <<"urn:ietf:params:xml:ns:xmpp-framing">>}]}, - WsPid ! {send, fxml:element_to_binary(Close)}, + route_text(WsPid, fxml:element_to_binary(Close)), {stop, normal, StateData}; handle_sync_event(close, _From, _StateName, StateData) -> {stop, normal, StateData}. @@ -219,14 +228,13 @@ handle_info(closed, _StateName, StateData) -> {stop, normal, StateData}; handle_info({received, Packet}, StateName, StateDataI) -> {StateData, Parsed} = parse(StateDataI, Packet), - SD = case StateData#state.waiting_input of + SD = case StateData#state.active of false -> Input = StateData#state.input ++ if is_binary(Parsed) -> [Parsed]; true -> Parsed end, StateData#state{input = Input}; - Receiver -> - Receiver ! {tcp, StateData#state.socket, Parsed}, - setup_timers(StateData#state{waiting_input = false, - last_receiver = Receiver}) + true -> + StateData#state.c2s_pid ! {tcp, StateData#state.socket, Parsed}, + setup_timers(StateData#state{active = false}) end, {next_state, StateName, SD}; handle_info(PingPong, StateName, StateData) when PingPong == ping orelse @@ -236,18 +244,20 @@ handle_info(PingPong, StateName, StateData) when PingPong == ping orelse StateData2#state{pong_expected = false}}; handle_info({timeout, Timer, _}, _StateName, #state{timer = Timer} = StateData) -> + ?DEBUG("Closing websocket connection from hitting inactivity timeout", []), {stop, normal, StateData}; handle_info({timeout, Timer, _}, StateName, #state{ping_timer = Timer, ws = {_, WsPid}} = StateData) -> case StateData#state.pong_expected of false -> - cancel_timer(StateData#state.ping_timer), + misc:cancel_timer(StateData#state.ping_timer), PingTimer = erlang:start_timer(StateData#state.ping_interval, self(), []), WsPid ! {ping, <<>>}, {next_state, StateName, StateData#state{ping_timer = PingTimer, pong_expected = true}}; true -> + ?DEBUG("Closing websocket connection from missing pongs", []), {stop, normal, StateData} end; handle_info(_, StateName, StateData) -> @@ -257,19 +267,13 @@ code_change(_OldVsn, StateName, StateData, _Extra) -> {ok, StateName, StateData}. terminate(_Reason, _StateName, StateData) -> - case StateData#state.waiting_input of - false -> ok; - Receiver -> - ?DEBUG("C2S Pid : ~p", [Receiver]), - Receiver ! {tcp_closed, StateData#state.socket} - end, - ok. + StateData#state.c2s_pid ! {tcp_closed, StateData#state.socket}. setup_timers(StateData) -> - cancel_timer(StateData#state.timer), + misc:cancel_timer(StateData#state.timer), Timer = erlang:start_timer(StateData#state.timeout, self(), []), - cancel_timer(StateData#state.ping_timer), + misc:cancel_timer(StateData#state.ping_timer), PingTimer = case StateData#state.ping_interval of 0 -> StateData#state.ping_timer; V -> erlang:start_timer(V, self(), []) @@ -277,12 +281,8 @@ setup_timers(StateData) -> StateData#state{timer = Timer, ping_timer = PingTimer, pong_expected = false}. -cancel_timer(Timer) -> - erlang:cancel_timer(Timer), - receive {timeout, Timer, _} -> ok after 0 -> ok end. - get_human_html_xmlel() -> - Heading = <<"ejabberd ", (jlib:atom_to_binary(?MODULE))/binary>>, + Heading = <<"ejabberd ", (misc:atom_to_binary(?MODULE))/binary>>, #xmlel{name = <<"html">>, attrs = [{<<"xmlns">>, <<"http://www.w3.org/1999/xhtml">>}], @@ -353,6 +353,7 @@ parsed_items(List) -> when element(1, El) == xmlel; element(1, El) == xmlstreamstart; element(1, El) == xmlstreamelement; + element(1, El) == xmlstreamcdata; element(1, El) == xmlstreamend -> parsed_items([El | List]); {'$gen_event', {xmlstreamerror, _}} -> @@ -361,9 +362,7 @@ parsed_items(List) -> lists:reverse(List) end. -opt_type(websocket_ping_interval) -> - fun (I) when is_integer(I), I >= 0 -> I end; -opt_type(websocket_timeout) -> - fun (I) when is_integer(I), I > 0 -> I end; -opt_type(_) -> - [websocket_ping_interval, websocket_timeout]. +-spec route_text(pid(), binary()) -> ok. +route_text(Pid, Data) -> + Pid ! {text, Data}, + ok. |