From e7fe4dc474ed180a4200b2bdefc2ff58b12340c0 Mon Sep 17 00:00:00 2001 From: Evgeniy Khramtsov Date: Fri, 30 Dec 2016 00:00:36 +0300 Subject: More refactoring on session management --- src/ejabberd_c2s.erl | 209 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 127 insertions(+), 82 deletions(-) (limited to 'src/ejabberd_c2s.erl') diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 07d04fbc4..f22960c50 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -37,17 +37,18 @@ compress_methods/1, bind/2, get_password_fun/1, check_password_fun/1, check_password_digest_fun/1, unauthenticated_stream_features/1, authenticated_stream_features/1, - handle_stream_start/2, handle_stream_end/2, handle_stream_close/2, + handle_stream_start/2, handle_stream_end/2, handle_unauthenticated_packet/2, handle_authenticated_packet/2, handle_auth_success/4, handle_auth_failure/4, handle_send/3, handle_recv/3, handle_cdata/2, handle_unbinded_packet/2]). %% Hooks --export([handle_unexpected_info/2, handle_unexpected_cast/2, - reject_unauthenticated_packet/2, process_closed/2]). +-export([handle_unexpected_cast/2, + reject_unauthenticated_packet/2, process_closed/2, + process_terminated/2, process_info/2]). %% API -export([get_presence/1, get_subscription/2, get_subscribed/1, open_session/1, call/3, send/2, close/1, close/2, stop/1, establish/1, - copy_state/2, add_hooks/0]). + reply/2, copy_state/2, set_timeout/2, add_hooks/1]). -include("ejabberd.hrl"). -include("xmpp.hrl"). @@ -76,6 +77,9 @@ socket_type() -> call(Ref, Msg, Timeout) -> xmpp_stream_in:call(Ref, Msg, Timeout). +reply(Ref, Reply) -> + xmpp_stream_in:reply(Ref, Reply). + -spec get_presence(pid()) -> presence(). get_presence(Ref) -> call(Ref, get_presence, 1000). @@ -112,37 +116,39 @@ stop(Ref) -> send(Pid, Pkt) when is_pid(Pid) -> xmpp_stream_in:send(Pid, Pkt); send(#{lserver := LServer} = State, Pkt) -> - case ejabberd_hooks:run_fold(c2s_filter_send, LServer, Pkt, [State]) of - drop -> State; - Pkt1 -> xmpp_stream_in:send(State, Pkt1) + case ejabberd_hooks:run_fold(c2s_filter_send, LServer, {Pkt, State}, []) of + {drop, State1} -> State1; + {Pkt1, State1} -> xmpp_stream_in:send(State1, Pkt1) end. +-spec set_timeout(state(), timeout()) -> state(). +set_timeout(State, Timeout) -> + xmpp_stream_in:set_timeout(State, Timeout). + -spec establish(state()) -> state(). establish(State) -> xmpp_stream_in:establish(State). --spec add_hooks() -> ok. -add_hooks() -> - lists:foreach( - fun(Host) -> - ejabberd_hooks:add(c2s_closed, Host, ?MODULE, process_closed, 100), - ejabberd_hooks:add(c2s_unauthenticated_packet, Host, ?MODULE, - reject_unauthenticated_packet, 100), - ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, - handle_unexpected_info, 100), - ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE, - handle_unexpected_cast, 100) - - end, ?MYHOSTS). +-spec add_hooks(binary()) -> ok. +add_hooks(Host) -> + ejabberd_hooks:add(c2s_closed, Host, ?MODULE, process_closed, 100), + ejabberd_hooks:add(c2s_terminated, Host, ?MODULE, + process_terminated, 100), + ejabberd_hooks:add(c2s_unauthenticated_packet, Host, ?MODULE, + reject_unauthenticated_packet, 100), + ejabberd_hooks:add(c2s_handle_info, Host, ?MODULE, + process_info, 100), + ejabberd_hooks:add(c2s_handle_cast, Host, ?MODULE, + handle_unexpected_cast, 100). %% Copies content of one c2s state to another. %% This is needed for session migration from one pid to another. -spec copy_state(state(), state()) -> state(). copy_state(#{owner := Owner} = NewState, - #{jid := JID, resource := Resource, sid := {Time, _}, - auth_module := AuthModule, lserver := LServer, - pres_t := PresT, pres_a := PresA, - pres_f := PresF} = OldState) -> + #{jid := JID, resource := Resource, sid := {Time, _}, + auth_module := AuthModule, lserver := LServer, + pres_t := PresT, pres_a := PresA, + pres_f := PresF} = OldState) -> State1 = case OldState of #{pres_last := Pres, pres_timestamp := PresTS} -> NewState#{pres_last => Pres, pres_timestamp => PresTS}; @@ -158,10 +164,46 @@ copy_state(#{owner := Owner} = NewState, pres_f => PresF}, ejabberd_hooks:run_fold(c2s_copy_state, LServer, State2, [OldState]). +-spec open_session(state()) -> {ok, state()} | state(). +open_session(#{user := U, server := S, resource := R, + sid := SID, ip := IP, auth_module := AuthModule} = State) -> + JID = jid:make(U, S, R), + change_shaper(State), + Conn = get_conn_type(State), + State1 = State#{conn => Conn, resource => R, jid => JID}, + Prio = try maps:get(pres_last, State) of + Pres -> get_priority_from_presence(Pres) + catch _:{badkey, _} -> + undefined + end, + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], + ejabberd_sm:open_session(SID, U, S, R, Prio, Info), + xmpp_stream_in:establish(State1). + %%%=================================================================== %%% Hooks %%%=================================================================== -handle_unexpected_info(State, Info) -> +process_info(#{lserver := LServer} = State, + {route, From, To, Packet0}) -> + Packet = xmpp:set_from_to(Packet0, From, To), + {Pass, State1} = case Packet of + #presence{} -> + process_presence_in(State, Packet); + #message{} -> + process_message_in(State, Packet); + #iq{} -> + process_iq_in(State, Packet) + end, + if Pass -> + Packet1 = ejabberd_hooks:run_fold( + user_receive_packet, LServer, Packet, [State1]), + ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), + send(State1, Packet1); + true -> + ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), + State1 + end; +process_info(State, Info) -> ?WARNING_MSG("got unexpected info: ~p", [Info]), State. @@ -173,8 +215,22 @@ reject_unauthenticated_packet(State, Pkt) -> Err = xmpp:err_not_authorized(), xmpp_stream_in:send_error(State, Pkt, Err). -process_closed(State, _Reason) -> - stop(State). +process_closed(State, Reason) -> + stop(State#{stop_reason => Reason}). + +process_terminated(#{socket := Socket, jid := JID} = State, + Reason) -> + Status = format_reason(State, Reason), + ?INFO_MSG("(~s) Closing c2s connection for ~s: ~s", + [ejabberd_socket:pp(Socket), jid:to_string(JID), Status]), + Pres = #presence{type = unavailable, + status = xmpp:mk_text(Status), + from = JID, to = jid:remove_resource(JID)}, + State1 = broadcast_presence_unavailable(State, Pres), + bounce_message_queue(), + State1; +process_terminated(State, _Reason) -> + State. %%%=================================================================== %%% xmpp_stream_in callbacks @@ -248,25 +304,9 @@ bind(R, #{user := U, server := S, access := Access, lang := Lang, end end. --spec open_session(state()) -> {ok, state()} | state(). -open_session(#{user := U, server := S, resource := R, - sid := SID, ip := IP, auth_module := AuthModule} = State) -> - JID = jid:make(U, S, R), - change_shaper(State), - Conn = get_conn_type(State), - State1 = State#{conn => Conn, resource => R, jid => JID}, - Prio = try maps:get(pres_last, State) of - Pres -> get_priority_from_presence(Pres) - catch _:{badkey, _} -> - undefined - end, - Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], - ejabberd_sm:open_session(SID, U, S, R, Prio, Info), - State1. - handle_stream_start(StreamStart, #{lserver := LServer, ip := IP, lang := Lang} = State) -> - case lists:member(LServer, ?MYHOSTS) of + case ejabberd_router:is_my_host(LServer) of false -> send(State, xmpp:serr_host_unknown()); true -> @@ -284,10 +324,8 @@ handle_stream_start(StreamStart, end. handle_stream_end(Reason, #{lserver := LServer} = State) -> - ejabberd_hooks:run_fold(c2s_closed, LServer, State, [Reason]). - -handle_stream_close(_Reason, #{lserver := LServer} = State) -> - ejabberd_hooks:run_fold(c2s_closed, LServer, State, [normal]). + State1 = State#{stop_reason => Reason}, + ejabberd_hooks:run_fold(c2s_closed, LServer, State1, [Reason]). handle_auth_success(User, Mech, AuthModule, #{socket := Socket, ip := IP, lserver := LServer} = State) -> @@ -296,8 +334,7 @@ handle_auth_success(User, Mech, AuthModule, ejabberd_auth:backend_type(AuthModule), ejabberd_config:may_hide_data(jlib:ip_to_list(IP))]), State1 = State#{auth_module => AuthModule}, - ejabberd_hooks:run_fold(c2s_auth_result, LServer, - State1, [true, User]). + ejabberd_hooks:run_fold(c2s_auth_result, LServer, State1, [true, User]). handle_auth_failure(User, Mech, Reason, #{socket := Socket, ip := IP, lserver := LServer} = State) -> @@ -307,16 +344,13 @@ handle_auth_failure(User, Mech, Reason, true -> "" end, ejabberd_config:may_hide_data(jlib:ip_to_list(IP)), Reason]), - ejabberd_hooks:run_fold(c2s_auth_result, LServer, - State, [false, User]). + ejabberd_hooks:run_fold(c2s_auth_result, LServer, State, [false, User]). handle_unbinded_packet(Pkt, #{lserver := LServer} = State) -> - ejabberd_hooks:run_fold(c2s_unbinded_packet, LServer, - State, [Pkt]). + ejabberd_hooks:run_fold(c2s_unbinded_packet, LServer, State, [Pkt]). handle_unauthenticated_packet(Pkt, #{lserver := LServer} = State) -> - ejabberd_hooks:run_fold(c2s_unauthenticated_packet, - LServer, State, [Pkt]). + ejabberd_hooks:run_fold(c2s_unauthenticated_packet, LServer, State, [Pkt]). handle_authenticated_packet(Pkt, #{lserver := LServer} = State) when not ?is_stanza(Pkt) -> ejabberd_hooks:run_fold(c2s_authenticated_packet, @@ -366,20 +400,22 @@ init([State, Opts]) -> zlib => Zlib, lang => ?MYLANG, server => ?MYNAME, + lserver => ?MYNAME, access => Access, shaper => Shaper}, ejabberd_hooks:run_fold(c2s_init, {ok, State1}, [Opts]). -handle_call(get_presence, _From, #{jid := JID} = State) -> +handle_call(get_presence, From, #{jid := JID} = State) -> Pres = try maps:get(pres_last, State) catch _:{badkey, _} -> BareJID = jid:remove_resource(JID), #presence{from = JID, to = BareJID, type = unavailable} end, - {reply, Pres, State}; -handle_call(get_subscribed, _From, #{pres_f := PresF} = State) -> - Subscribed = ?SETS:to_list(PresF), - {reply, Subscribed, State}; + reply(From, Pres), + State; +handle_call(get_subscribed, From, #{pres_f := PresF} = State) -> + reply(From, ?SETS:to_list(PresF)), + State; handle_call(Request, From, #{lserver := LServer} = State) -> ejabberd_hooks:run_fold( c2s_handle_call, LServer, State, [Request, From]). @@ -387,30 +423,22 @@ handle_call(Request, From, #{lserver := LServer} = State) -> handle_cast(Msg, #{lserver := LServer} = State) -> ejabberd_hooks:run_fold(c2s_handle_cast, LServer, State, [Msg]). -handle_info({route, From, To, Packet0}, #{lserver := LServer} = State) -> - Packet = xmpp:set_from_to(Packet0, From, To), - {Pass, NewState} = case Packet of - #presence{} -> - process_presence_in(State, Packet); - #message{} -> - process_message_in(State, Packet); - #iq{} -> - process_iq_in(State, Packet) - end, - if Pass -> - Packet1 = ejabberd_hooks:run_fold( - user_receive_packet, LServer, Packet, [NewState]), - ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - send(NewState, Packet1); - true -> - ejabberd_hooks:run(c2s_loop_debug, [{route, From, To, Packet}]), - NewState - end; handle_info(Info, #{lserver := LServer} = State) -> ejabberd_hooks:run_fold(c2s_handle_info, LServer, State, [Info]). -terminate(_Reason, _State) -> - ok. +terminate(Reason, #{sid := SID, jid := _, + user := U, server := S, resource := R, + lserver := LServer} = State) -> + Status = format_reason(State, Reason), + case maps:is_key(pres_last, State) of + true -> + ejabberd_sm:close_session_unset_presence(SID, U, S, R, Status); + false -> + ejabberd_sm:close_session(SID, U, S, R) + end, + ejabberd_hooks:run_fold(c2s_terminated, LServer, State, [Reason]); +terminate(Reason, #{lserver := LServer} = State) -> + ejabberd_hooks:run_fold(c2s_terminated, LServer, State, [Reason]). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -684,6 +712,15 @@ resource_conflict_action(U, S, R) -> {accept_resource, Rnew} end. +-spec bounce_message_queue() -> ok. +bounce_message_queue() -> + receive {route, From, To, Pkt} -> + ejabberd_router:route(From, To, Pkt), + bounce_message_queue() + after 0 -> + ok + end. + -spec new_uniq_id() -> binary(). new_uniq_id() -> iolist_to_binary( @@ -735,6 +772,14 @@ do_some_magic(#{pres_a := PresA, pres_f := PresF} = State, From) -> end end. +-spec format_reason(state(), term()) -> binary(). +format_reason(#{stop_reason := Reason}, _) -> + xmpp_stream_in:format_error(Reason); +format_reason(_, Reason) when Reason /= normal -> + <<"internal server error">>; +format_reason(_, _) -> + <<"">>. + transform_listen_option(Opt, Opts) -> [Opt|Opts]. -- cgit v1.2.3