diff options
author | Evgeny Khramtsov <ekhramtsov@process-one.net> | 2018-09-17 11:21:02 +0300 |
---|---|---|
committer | Evgeny Khramtsov <ekhramtsov@process-one.net> | 2018-09-17 11:21:02 +0300 |
commit | de385591d01deec5a498feef33cd4eb3f8a12b77 (patch) | |
tree | ec79961d0c75e53016224e4aa073825c14ee20dc /src/ejabberd_bosh.erl | |
parent | Reintroduce change removed by mistake in 'Improve match macro' (diff) |
Refactor ejabberd listener API
Diffstat (limited to 'src/ejabberd_bosh.erl')
-rw-r--r-- | src/ejabberd_bosh.erl | 109 |
1 files changed, 38 insertions, 71 deletions
diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl index 1a650803e..e39a67132 100644 --- a/src/ejabberd_bosh.erl +++ b/src/ejabberd_bosh.erl @@ -23,20 +23,18 @@ %%% %%%------------------------------------------------------------------- -module(ejabberd_bosh). - +-behaviour(xmpp_socket). +-behaviour(p1_fsm). -protocol({xep, 124, '1.11'}). -protocol({xep, 206, '1.4'}). --behaviour(p1_fsm). - %% API -export([start/2, start/3, start_link/3]). -export([send_xml/2, setopts/2, controlling_process/2, - migrate/3, become_controller/2, - reset_stream/1, change_shaper/2, monitor/1, close/1, + reset_stream/1, change_shaper/2, close/1, sockname/1, peername/1, process_request/3, send/2, - change_controller/2]). + get_transport/1, get_owner/1]). %% gen_fsm callbacks -export([init/1, wait_for_session/2, wait_for_session/3, @@ -167,22 +165,12 @@ setopts({http_bind, FsmRef, _IP}, Opts) -> controlling_process(_Socket, _Pid) -> ok. -become_controller(FsmRef, C2SPid) -> - p1_fsm:send_all_state_event(FsmRef, - {become_controller, C2SPid}). - -change_controller({http_bind, FsmRef, _IP}, C2SPid) -> - become_controller(FsmRef, C2SPid). - reset_stream({http_bind, _FsmRef, _IP} = Socket) -> Socket. change_shaper({http_bind, FsmRef, _IP}, Shaper) -> p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}). -monitor({http_bind, FsmRef, _IP}) -> - erlang:monitor(process, FsmRef). - close({http_bind, FsmRef, _IP}) -> catch p1_fsm:sync_send_all_state_event(FsmRef, close). @@ -191,10 +179,11 @@ sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}. peername({http_bind, _FsmRef, IP}) -> {ok, IP}. -migrate(FsmRef, Node, After) when node(FsmRef) == node() -> - catch erlang:send_after(After, FsmRef, {migrate, Node}); -migrate(_FsmRef, _Node, _After) -> - ok. +get_transport(_Socket) -> + http_bind. + +get_owner({http_bind, FsmRef, _IP}) -> + FsmRef. process_request(Data, IP, Type) -> Opts1 = ejabberd_c2s_config:get_c2s_limits(), @@ -295,30 +284,26 @@ init([#body{attrs = Attrs}, IP, SID]) -> buf_new(XMPPDomain)), Opts2} end, - xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket, - [{receiver, self()}|Opts]), - Inactivity = gen_mod:get_module_opt(XMPPDomain, - mod_bosh, max_inactivity), - MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat), - ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), - State = #state{host = XMPPDomain, sid = SID, ip = IP, - xmpp_ver = XMPPVer, el_ibuf = InBuf, - max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), - inactivity_timeout = Inactivity, - shaped_receivers = ShapedReceivers, - shaper_state = ShaperState}, - NewState = restart_inactivity_timer(State), - mod_bosh:open_session(SID, self()), - {ok, wait_for_session, NewState}; -init([StateName, State]) -> - mod_bosh:open_session(State#state.sid, self()), - case State#state.c2s_pid of - C2SPid when is_pid(C2SPid) -> - NewSocket = make_socket(self(), State#state.ip), - C2SPid ! {change_socket, NewSocket}, - NewState = restart_inactivity_timer(State), - {ok, StateName, NewState}; - _ -> {stop, normal} + case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of + {ok, C2SPid} -> + ejabberd_c2s:accept(C2SPid), + Inactivity = gen_mod:get_module_opt(XMPPDomain, + mod_bosh, max_inactivity), + MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat), + ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN), + State = #state{host = XMPPDomain, sid = SID, ip = IP, + xmpp_ver = XMPPVer, el_ibuf = InBuf, + max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain), + inactivity_timeout = Inactivity, + shaped_receivers = ShapedReceivers, + shaper_state = ShaperState}, + NewState = restart_inactivity_timer(State), + mod_bosh:open_session(SID, self()), + {ok, wait_for_session, NewState}; + {error, Reason} -> + {stop, Reason}; + ignore -> + ignore end. wait_for_session(_Event, State) -> @@ -525,7 +510,7 @@ active1(#body{attrs = Attrs} = Req, From, State) -> end end. -handle_event({become_controller, C2SPid}, StateName, +handle_event({activate, C2SPid}, StateName, State) -> State1 = route_els(State#state{c2s_pid = C2SPid}), {next_state, StateName, State1}; @@ -598,24 +583,11 @@ handle_info({timeout, TRef, shaper_timeout}, StateName, {stop, normal, State}; _ -> {next_state, StateName, State} end; -handle_info({migrate, Node}, StateName, State) -> - if Node /= node() -> - NewState = bounce_receivers(State, migrated), - {migrate, NewState, - {Node, ?MODULE, start, [StateName, NewState]}, 0}; - true -> {next_state, StateName, State} - end; handle_info(_Info, StateName, State) -> ?ERROR_MSG("unexpected info:~n** Msg: ~p~n** StateName: ~p", [_Info, StateName]), {next_state, StateName, State}. -terminate({migrated, ClonePid}, _StateName, State) -> - ?INFO_MSG("Migrating session \"~s\" (c2s_pid = " - "~p) to ~p on node ~p", - [State#state.sid, State#state.c2s_pid, ClonePid, - node(ClonePid)]), - mod_bosh:close_session(State#state.sid); terminate(_Reason, _StateName, State) -> mod_bosh:close_session(State#state.sid), case State#state.c2s_pid of @@ -718,7 +690,7 @@ do_reply(State, From, Body, RID) -> Responses2 = gb_trees:insert(RID, Body, Responses1), State#state{responses = Responses2}. -bounce_receivers(State, Reason) -> +bounce_receivers(State, _Reason) -> Receivers = gb_trees:to_list(State#state.receivers), ShapedReceivers = lists:map(fun ({_, From, #body{attrs = Attrs} = Body}) -> @@ -726,18 +698,13 @@ bounce_receivers(State, Reason) -> {RID, {From, Body}} end, p1_queue:to_list(State#state.shaped_receivers)), - lists:foldl(fun ({RID, {From, Body}}, AccState) -> - NewBody = if Reason == closed -> - #body{http_reason = - <<"Session closed">>, - attrs = - [{type, <<"terminate">>}, - {condition, - <<"other-request">>}]}; - Reason == migrated -> - Body#body{http_reason = - <<"Session migrated">>} - end, + lists:foldl(fun ({RID, {From, _Body}}, AccState) -> + NewBody = #body{http_reason = + <<"Session closed">>, + attrs = + [{type, <<"terminate">>}, + {condition, + <<"other-request">>}]}, do_reply(AccState, From, NewBody, RID) end, State, Receivers ++ ShapedReceivers). |