aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_bosh.erl
diff options
context:
space:
mode:
authorEvgeny Khramtsov <ekhramtsov@process-one.net>2018-09-17 11:21:02 +0300
committerEvgeny Khramtsov <ekhramtsov@process-one.net>2018-09-17 11:21:02 +0300
commitde385591d01deec5a498feef33cd4eb3f8a12b77 (patch)
treeec79961d0c75e53016224e4aa073825c14ee20dc /src/ejabberd_bosh.erl
parentReintroduce change removed by mistake in 'Improve match macro' (diff)
Refactor ejabberd listener API
Diffstat (limited to 'src/ejabberd_bosh.erl')
-rw-r--r--src/ejabberd_bosh.erl109
1 files changed, 38 insertions, 71 deletions
diff --git a/src/ejabberd_bosh.erl b/src/ejabberd_bosh.erl
index 1a650803e..e39a67132 100644
--- a/src/ejabberd_bosh.erl
+++ b/src/ejabberd_bosh.erl
@@ -23,20 +23,18 @@
%%%
%%%-------------------------------------------------------------------
-module(ejabberd_bosh).
-
+-behaviour(xmpp_socket).
+-behaviour(p1_fsm).
-protocol({xep, 124, '1.11'}).
-protocol({xep, 206, '1.4'}).
--behaviour(p1_fsm).
-
%% API
-export([start/2, start/3, start_link/3]).
-export([send_xml/2, setopts/2, controlling_process/2,
- migrate/3, become_controller/2,
- reset_stream/1, change_shaper/2, monitor/1, close/1,
+ reset_stream/1, change_shaper/2, close/1,
sockname/1, peername/1, process_request/3, send/2,
- change_controller/2]).
+ get_transport/1, get_owner/1]).
%% gen_fsm callbacks
-export([init/1, wait_for_session/2, wait_for_session/3,
@@ -167,22 +165,12 @@ setopts({http_bind, FsmRef, _IP}, Opts) ->
controlling_process(_Socket, _Pid) -> ok.
-become_controller(FsmRef, C2SPid) ->
- p1_fsm:send_all_state_event(FsmRef,
- {become_controller, C2SPid}).
-
-change_controller({http_bind, FsmRef, _IP}, C2SPid) ->
- become_controller(FsmRef, C2SPid).
-
reset_stream({http_bind, _FsmRef, _IP} = Socket) ->
Socket.
change_shaper({http_bind, FsmRef, _IP}, Shaper) ->
p1_fsm:send_all_state_event(FsmRef, {change_shaper, Shaper}).
-monitor({http_bind, FsmRef, _IP}) ->
- erlang:monitor(process, FsmRef).
-
close({http_bind, FsmRef, _IP}) ->
catch p1_fsm:sync_send_all_state_event(FsmRef,
close).
@@ -191,10 +179,11 @@ sockname(_Socket) -> {ok, {{0, 0, 0, 0}, 0}}.
peername({http_bind, _FsmRef, IP}) -> {ok, IP}.
-migrate(FsmRef, Node, After) when node(FsmRef) == node() ->
- catch erlang:send_after(After, FsmRef, {migrate, Node});
-migrate(_FsmRef, _Node, _After) ->
- ok.
+get_transport(_Socket) ->
+ http_bind.
+
+get_owner({http_bind, FsmRef, _IP}) ->
+ FsmRef.
process_request(Data, IP, Type) ->
Opts1 = ejabberd_c2s_config:get_c2s_limits(),
@@ -295,30 +284,26 @@ init([#body{attrs = Attrs}, IP, SID]) ->
buf_new(XMPPDomain)),
Opts2}
end,
- xmpp_socket:start(ejabberd_c2s, ?MODULE, Socket,
- [{receiver, self()}|Opts]),
- Inactivity = gen_mod:get_module_opt(XMPPDomain,
- mod_bosh, max_inactivity),
- MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat),
- ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
- State = #state{host = XMPPDomain, sid = SID, ip = IP,
- xmpp_ver = XMPPVer, el_ibuf = InBuf,
- max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
- inactivity_timeout = Inactivity,
- shaped_receivers = ShapedReceivers,
- shaper_state = ShaperState},
- NewState = restart_inactivity_timer(State),
- mod_bosh:open_session(SID, self()),
- {ok, wait_for_session, NewState};
-init([StateName, State]) ->
- mod_bosh:open_session(State#state.sid, self()),
- case State#state.c2s_pid of
- C2SPid when is_pid(C2SPid) ->
- NewSocket = make_socket(self(), State#state.ip),
- C2SPid ! {change_socket, NewSocket},
- NewState = restart_inactivity_timer(State),
- {ok, StateName, NewState};
- _ -> {stop, normal}
+ case ejabberd_c2s:start({?MODULE, Socket}, [{receiver, self()}|Opts]) of
+ {ok, C2SPid} ->
+ ejabberd_c2s:accept(C2SPid),
+ Inactivity = gen_mod:get_module_opt(XMPPDomain,
+ mod_bosh, max_inactivity),
+ MaxConcat = gen_mod:get_module_opt(XMPPDomain, mod_bosh, max_concat),
+ ShapedReceivers = buf_new(XMPPDomain, ?MAX_SHAPED_REQUESTS_QUEUE_LEN),
+ State = #state{host = XMPPDomain, sid = SID, ip = IP,
+ xmpp_ver = XMPPVer, el_ibuf = InBuf,
+ max_concat = MaxConcat, el_obuf = buf_new(XMPPDomain),
+ inactivity_timeout = Inactivity,
+ shaped_receivers = ShapedReceivers,
+ shaper_state = ShaperState},
+ NewState = restart_inactivity_timer(State),
+ mod_bosh:open_session(SID, self()),
+ {ok, wait_for_session, NewState};
+ {error, Reason} ->
+ {stop, Reason};
+ ignore ->
+ ignore
end.
wait_for_session(_Event, State) ->
@@ -525,7 +510,7 @@ active1(#body{attrs = Attrs} = Req, From, State) ->
end
end.
-handle_event({become_controller, C2SPid}, StateName,
+handle_event({activate, C2SPid}, StateName,
State) ->
State1 = route_els(State#state{c2s_pid = C2SPid}),
{next_state, StateName, State1};
@@ -598,24 +583,11 @@ handle_info({timeout, TRef, shaper_timeout}, StateName,
{stop, normal, State};
_ -> {next_state, StateName, State}
end;
-handle_info({migrate, Node}, StateName, State) ->
- if Node /= node() ->
- NewState = bounce_receivers(State, migrated),
- {migrate, NewState,
- {Node, ?MODULE, start, [StateName, NewState]}, 0};
- true -> {next_state, StateName, State}
- end;
handle_info(_Info, StateName, State) ->
?ERROR_MSG("unexpected info:~n** Msg: ~p~n** StateName: ~p",
[_Info, StateName]),
{next_state, StateName, State}.
-terminate({migrated, ClonePid}, _StateName, State) ->
- ?INFO_MSG("Migrating session \"~s\" (c2s_pid = "
- "~p) to ~p on node ~p",
- [State#state.sid, State#state.c2s_pid, ClonePid,
- node(ClonePid)]),
- mod_bosh:close_session(State#state.sid);
terminate(_Reason, _StateName, State) ->
mod_bosh:close_session(State#state.sid),
case State#state.c2s_pid of
@@ -718,7 +690,7 @@ do_reply(State, From, Body, RID) ->
Responses2 = gb_trees:insert(RID, Body, Responses1),
State#state{responses = Responses2}.
-bounce_receivers(State, Reason) ->
+bounce_receivers(State, _Reason) ->
Receivers = gb_trees:to_list(State#state.receivers),
ShapedReceivers = lists:map(fun ({_, From,
#body{attrs = Attrs} = Body}) ->
@@ -726,18 +698,13 @@ bounce_receivers(State, Reason) ->
{RID, {From, Body}}
end,
p1_queue:to_list(State#state.shaped_receivers)),
- lists:foldl(fun ({RID, {From, Body}}, AccState) ->
- NewBody = if Reason == closed ->
- #body{http_reason =
- <<"Session closed">>,
- attrs =
- [{type, <<"terminate">>},
- {condition,
- <<"other-request">>}]};
- Reason == migrated ->
- Body#body{http_reason =
- <<"Session migrated">>}
- end,
+ lists:foldl(fun ({RID, {From, _Body}}, AccState) ->
+ NewBody = #body{http_reason =
+ <<"Session closed">>,
+ attrs =
+ [{type, <<"terminate">>},
+ {condition,
+ <<"other-request">>}]},
do_reply(AccState, From, NewBody, RID)
end,
State, Receivers ++ ShapedReceivers).