aboutsummaryrefslogtreecommitdiff
path: root/src/ejabberd_c2s.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-12-30 00:00:36 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-12-30 00:00:36 +0300
commite7fe4dc474ed180a4200b2bdefc2ff58b12340c0 (patch)
tree83a2ca201a8fae1f1ba49a2f2fb541ad64e02f91 /src/ejabberd_c2s.erl
parentAdd xmpp_stream_out behaviour and rewrite s2s/SM code (diff)
More refactoring on session management
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r--src/ejabberd_c2s.erl209
1 files changed, 127 insertions, 82 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 07d04fbc4..f22960c50 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -37,17 +37,18 @@
compress_methods/1, bind/2, get_password_fun/1,
check_password_fun/1, check_password_digest_fun/1,
unauthenticated_stream_features/1, authenticated_stream_features/1,
- handle_stream_start/2, handle_stream_end/2, handle_stream_close/2,
+ handle_stream_start/2, handle_stream_end/2,
handle_unauthenticated_packet/2, handle_authenticated_packet/2,
handle_auth_success/4, handle_auth_failure/4, handle_send/3,
handle_recv/3, handle_cdata/2, handle_unbinded_packet/2]).
%% Hooks
--export([handle_unexpected_info/2, handle_unexpected_cast/2,
- reject_unauthenticated_packet/2, process_closed/2]).
+-export([handle_unexpected_cast/2,
+ reject_unauthenticated_packet/2, process_closed/2,
+ process_terminated/2, process_info/2]).
%% API
-export([get_presence/1, get_subscription/2, get_subscribed/1,
open_session/1, call/3, send/2, close/1, close/2, stop/1, establish/1,
- copy_state/2, add_hooks/0]).
+ reply/2, copy_state/2, set_timeout/2, add_hooks/1]).
-include("ejabberd.hrl").
-include("xmpp.hrl").
@@ -76,6 +77,9 @@ socket_type() ->
call(Ref, Msg, Timeout) ->
xmpp_stream_in:call(Ref, Msg, Timeout).
+reply(Ref, Reply) ->
+ xmpp_stream_in:reply(Ref, Reply).
+
-spec get_presence(pid()) -> presence().
get_presence(Ref) ->
call(Ref, get_presence, 1000).
@@ -112,37 +116,39 @@ stop(Ref) ->
send(Pid, Pkt) when is_pid(Pid) ->
xmpp_stream_in:send(Pid, Pkt);
send(#{lserver := LServer} = State, Pkt) ->
- case ejabberd_hooks:run_fold(c2s_filter_send, LServer, Pkt, [State]) of
- drop -> State;
- Pkt1 -> xmpp_stream_in:send(State, Pkt1)
+ case ejabberd_hooks:run_fold(c2s_filter_send, LServer, {Pkt, State}, []) of
+ {drop, State1} -> State1;
+ {Pkt1, State1} -> xmpp_stream_in:send(State1, Pkt1)
end.
+-spec set_timeout(state(), timeout()) -> state().
+set_timeout(State, Timeout) ->
+ xmpp_stream_in:set_timeout(State, Timeout).
+
-spec establish(state()) -> state().
establish(State) ->
xmpp_stream_in:establish(State).
--spec add_hooks() -> ok.
-add_hooks() ->
- lists:foreach(
- fun(Host) ->
- ejabberd_hooks:add(c2s_closed, Host, ?MODULE, process_closed, 100),
- ejabberd_hooks:add(c2s_unauthenticated_packet, Host, ?MODULE,
- reject_unauthenticated_packet, 100),
- ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE,
- handle_unexpected_info, 100),
- ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE,
- handle_unexpected_cast, 100)
-
- end, ?MYHOSTS).
+-spec add_hooks(binary()) -> ok.
+add_hooks(Host) ->
+ ejabberd_hooks:add(c2s_closed, Host, ?MODULE, process_closed, 100),
+ ejabberd_hooks:add(c2s_terminated, Host, ?MODULE,
+ process_terminated, 100),
+ ejabberd_hooks:add(c2s_unauthenticated_packet, Host, ?MODULE,
+ reject_unauthenticated_packet, 100),
+ ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE,
+ process_info, 100),
+ ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE,
+ handle_unexpected_cast, 100).
%% Copies content of one c2s state to another.
%% This is needed for session migration from one pid to another.
-spec copy_state(state(), state()) -> state().
copy_state(#{owner := Owner} = NewState,
- #{jid := JID, resource := Resource, sid := {Time, _},
- auth_module := AuthModule, lserver := LServer,
- pres_t := PresT, pres_a := PresA,
- pres_f := PresF} = OldState) ->
+ #{jid := JID, resource := Resource, sid := {Time, _},
+ auth_module := AuthModule, lserver := LServer,
+ pres_t := PresT, pres_a := PresA,
+ pres_f := PresF} = OldState) ->
State1 = case OldState of
#{pres_last := Pres, pres_timestamp := PresTS} ->
NewState#{pres_last => Pres, pres_timestamp => PresTS};
@@ -158,10 +164,46 @@ copy_state(#{owner := Owner} = NewState,
pres_f => PresF},
ejabberd_hooks:run_fold(c2s_copy_state, LServer, State2, [OldState]).
+-spec open_session(state()) -> {ok, state()} | state().
+open_session(#{user := U, server := S, resource := R,
+ sid := SID, ip := IP, auth_module := AuthModule} = State) ->
+ JID = jid:make(U, S, R),
+ change_shaper(State),
+ Conn = get_conn_type(State),
+ State1 = State#{conn => Conn, resource => R, jid => JID},
+ Prio = try maps:get(pres_last, State) of
+ Pres -> get_priority_from_presence(Pres)
+ catch _:{badkey, _} ->
+ undefined
+ end,
+ Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}],
+ ejabberd_sm:open_session(SID, U, S, R, Prio, Info),
+ xmpp_stream_in:establish(State1).
+
%%%===================================================================
%%% Hooks
%%%===================================================================
-handle_unexpected_info(State, Info) ->
+process_info(#{lserver := LServer} = State,
+ {route, From, To, Packet0}) ->
+ Packet = xmpp:set_from_to(Packet0, From, To),
+ {Pass, State1} = case Packet of
+ #presence{} ->
+ process_presence_in(State, Packet);
+ #message{} ->
+ process_message_in(State, Packet);
+ #iq{} ->
+ process_iq_in(State, Packet)
+ end,
+ if Pass ->
+ Packet1 = ejabberd_hooks:run_fold(
+ user_receive_packet, LServer, Packet, [State1]),
+ ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
+ send(State1, Packet1);
+ true ->
+ ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
+ State1
+ end;
+process_info(State, Info) ->
?WARNING_MSG("got unexpected info: ~p", [Info]),
State.
@@ -173,8 +215,22 @@ reject_unauthenticated_packet(State, Pkt) ->
Err = xmpp:err_not_authorized(),
xmpp_stream_in:send_error(State, Pkt, Err).
-process_closed(State, _Reason) ->
- stop(State).
+process_closed(State, Reason) ->
+ stop(State#{stop_reason => Reason}).
+
+process_terminated(#{socket := Socket, jid := JID} = State,
+ Reason) ->
+ Status = format_reason(State, Reason),
+ ?INFO_MSG("(~s) Closing c2s connection for ~s: ~s",
+ [ejabberd_socket:pp(Socket), jid:to_string(JID), Status]),
+ Pres = #presence{type = unavailable,
+ status = xmpp:mk_text(Status),
+ from = JID, to = jid:remove_resource(JID)},
+ State1 = broadcast_presence_unavailable(State, Pres),
+ bounce_message_queue(),
+ State1;
+process_terminated(State, _Reason) ->
+ State.
%%%===================================================================
%%% xmpp_stream_in callbacks
@@ -248,25 +304,9 @@ bind(R, #{user := U, server := S, access := Access, lang := Lang,
end
end.
--spec open_session(state()) -> {ok, state()} | state().
-open_session(#{user := U, server := S, resource := R,
- sid := SID, ip := IP, auth_module := AuthModule} = State) ->
- JID = jid:make(U, S, R),
- change_shaper(State),
- Conn = get_conn_type(State),
- State1 = State#{conn => Conn, resource => R, jid => JID},
- Prio = try maps:get(pres_last, State) of
- Pres -> get_priority_from_presence(Pres)
- catch _:{badkey, _} ->
- undefined
- end,
- Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}],
- ejabberd_sm:open_session(SID, U, S, R, Prio, Info),
- State1.
-
handle_stream_start(StreamStart,
#{lserver := LServer, ip := IP, lang := Lang} = State) ->
- case lists:member(LServer, ?MYHOSTS) of
+ case ejabberd_router:is_my_host(LServer) of
false ->
send(State, xmpp:serr_host_unknown());
true ->
@@ -284,10 +324,8 @@ handle_stream_start(StreamStart,
end.
handle_stream_end(Reason, #{lserver := LServer} = State) ->
- ejabberd_hooks:run_fold(c2s_closed, LServer, State, [Reason]).
-
-handle_stream_close(_Reason, #{lserver := LServer} = State) ->
- ejabberd_hooks:run_fold(c2s_closed, LServer, State, [normal]).
+ State1 = State#{stop_reason => Reason},
+ ejabberd_hooks:run_fold(c2s_closed, LServer, State1, [Reason]).
handle_auth_success(User, Mech, AuthModule,
#{socket := Socket, ip := IP, lserver := LServer} = State) ->
@@ -296,8 +334,7 @@ handle_auth_success(User, Mech, AuthModule,
ejabberd_auth:backend_type(AuthModule),
ejabberd_config:may_hide_data(jlib:ip_to_list(IP))]),
State1 = State#{auth_module => AuthModule},
- ejabberd_hooks:run_fold(c2s_auth_result, LServer,
- State1, [true, User]).
+ ejabberd_hooks:run_fold(c2s_auth_result, LServer, State1, [true, User]).
handle_auth_failure(User, Mech, Reason,
#{socket := Socket, ip := IP, lserver := LServer} = State) ->
@@ -307,16 +344,13 @@ handle_auth_failure(User, Mech, Reason,
true -> ""
end,
ejabberd_config:may_hide_data(jlib:ip_to_list(IP)), Reason]),
- ejabberd_hooks:run_fold(c2s_auth_result, LServer,
- State, [false, User]).
+ ejabberd_hooks:run_fold(c2s_auth_result, LServer, State, [false, User]).
handle_unbinded_packet(Pkt, #{lserver := LServer} = State) ->
- ejabberd_hooks:run_fold(c2s_unbinded_packet, LServer,
- State, [Pkt]).
+ ejabberd_hooks:run_fold(c2s_unbinded_packet, LServer, State, [Pkt]).
handle_unauthenticated_packet(Pkt, #{lserver := LServer} = State) ->
- ejabberd_hooks:run_fold(c2s_unauthenticated_packet,
- LServer, State, [Pkt]).
+ ejabberd_hooks:run_fold(c2s_unauthenticated_packet, LServer, State, [Pkt]).
handle_authenticated_packet(Pkt, #{lserver := LServer} = State) when not ?is_stanza(Pkt) ->
ejabberd_hooks:run_fold(c2s_authenticated_packet,
@@ -366,20 +400,22 @@ init([State, Opts]) ->
zlib => Zlib,
lang => ?MYLANG,
server => ?MYNAME,
+ lserver => ?MYNAME,
access => Access,
shaper => Shaper},
ejabberd_hooks:run_fold(c2s_init, {ok, State1}, [Opts]).
-handle_call(get_presence, _From, #{jid := JID} = State) ->
+handle_call(get_presence, From, #{jid := JID} = State) ->
Pres = try maps:get(pres_last, State)
catch _:{badkey, _} ->
BareJID = jid:remove_resource(JID),
#presence{from = JID, to = BareJID, type = unavailable}
end,
- {reply, Pres, State};
-handle_call(get_subscribed, _From, #{pres_f := PresF} = State) ->
- Subscribed = ?SETS:to_list(PresF),
- {reply, Subscribed, State};
+ reply(From, Pres),
+ State;
+handle_call(get_subscribed, From, #{pres_f := PresF} = State) ->
+ reply(From, ?SETS:to_list(PresF)),
+ State;
handle_call(Request, From, #{lserver := LServer} = State) ->
ejabberd_hooks:run_fold(
c2s_handle_call, LServer, State, [Request, From]).
@@ -387,30 +423,22 @@ handle_call(Request, From, #{lserver := LServer} = State) ->
handle_cast(Msg, #{lserver := LServer} = State) ->
ejabberd_hooks:run_fold(c2s_handle_cast, LServer, State, [Msg]).
-handle_info({route, From, To, Packet0}, #{lserver := LServer} = State) ->
- Packet = xmpp:set_from_to(Packet0, From, To),
- {Pass, NewState} = case Packet of
- #presence{} ->
- process_presence_in(State, Packet);
- #message{} ->
- process_message_in(State, Packet);
- #iq{} ->
- process_iq_in(State, Packet)
- end,
- if Pass ->
- Packet1 = ejabberd_hooks:run_fold(
- user_receive_packet, LServer, Packet, [NewState]),
- ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- send(NewState, Packet1);
- true ->
- ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]),
- NewState
- end;
handle_info(Info, #{lserver := LServer} = State) ->
ejabberd_hooks:run_fold(c2s_handle_info, LServer, State, [Info]).
-terminate(_Reason, _State) ->
- ok.
+terminate(Reason, #{sid := SID, jid := _,
+ user := U, server := S, resource := R,
+ lserver := LServer} = State) ->
+ Status = format_reason(State, Reason),
+ case maps:is_key(pres_last, State) of
+ true ->
+ ejabberd_sm:close_session_unset_presence(SID, U, S, R, Status);
+ false ->
+ ejabberd_sm:close_session(SID, U, S, R)
+ end,
+ ejabberd_hooks:run_fold(c2s_terminated, LServer, State, [Reason]);
+terminate(Reason, #{lserver := LServer} = State) ->
+ ejabberd_hooks:run_fold(c2s_terminated, LServer, State, [Reason]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -684,6 +712,15 @@ resource_conflict_action(U, S, R) ->
{accept_resource, Rnew}
end.
+-spec bounce_message_queue() -> ok.
+bounce_message_queue() ->
+ receive {route, From, To, Pkt} ->
+ ejabberd_router:route(From, To, Pkt),
+ bounce_message_queue()
+ after 0 ->
+ ok
+ end.
+
-spec new_uniq_id() -> binary().
new_uniq_id() ->
iolist_to_binary(
@@ -735,6 +772,14 @@ do_some_magic(#{pres_a := PresA, pres_f := PresF} = State, From) ->
end
end.
+-spec format_reason(state(), term()) -> binary().
+format_reason(#{stop_reason := Reason}, _) ->
+ xmpp_stream_in:format_error(Reason);
+format_reason(_, Reason) when Reason /= normal ->
+ <<"internal server error">>;
+format_reason(_, _) ->
+ <<"">>.
+
transform_listen_option(Opt, Opts) ->
[Opt|Opts].