diff options
author | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2016-11-12 13:27:15 +0300 |
---|---|---|
committer | Evgeniy Khramtsov <ekhramtsov@process-one.net> | 2016-11-12 13:27:15 +0300 |
commit | 78a44e01762e00102f5e3e3f0b49690cc7866c31 (patch) | |
tree | b8ac7773f510ee3c1da4802bce2badc71c34c0b2 /src/ejabberd_c2s.erl | |
parent | Add more tests for offline storage (diff) | |
parent | Support several groups separated by ; in add_rosteritem command (diff) |
Merge branch 'master' into xml-ng
Conflicts:
src/adhoc.erl
src/cyrsasl_oauth.erl
src/ejabberd_c2s.erl
src/ejabberd_config.erl
src/ejabberd_service.erl
src/gen_mod.erl
src/mod_admin_extra.erl
src/mod_announce.erl
src/mod_carboncopy.erl
src/mod_client_state.erl
src/mod_configure.erl
src/mod_echo.erl
src/mod_mam.erl
src/mod_muc.erl
src/mod_muc_room.erl
src/mod_offline.erl
src/mod_pubsub.erl
src/mod_stats.erl
src/node_flat_sql.erl
src/randoms.erl
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r-- | src/ejabberd_c2s.erl | 201 |
1 files changed, 164 insertions, 37 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 98631054..7ef708d3 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -32,6 +32,7 @@ -protocol({xep, 78, '2.5'}). -protocol({xep, 138, '2.0'}). -protocol({xep, 198, '1.3'}). +-protocol({xep, 356, '7.1'}). -update_info({update, 0}). @@ -48,10 +49,16 @@ send_element/2, socket_type/0, get_presence/1, + get_last_presence/1, get_aux_field/2, set_aux_field/3, del_aux_field/2, get_subscription/2, + get_queued_stanzas/1, + get_csi_state/1, + set_csi_state/2, + get_resume_timeout/1, + set_resume_timeout/2, send_filtered/5, broadcast/4, get_subscribed/1, @@ -112,9 +119,12 @@ mgmt_pending_since, mgmt_timeout, mgmt_max_timeout, + mgmt_ack_timeout, + mgmt_ack_timer, mgmt_resend, mgmt_stanzas_in = 0, mgmt_stanzas_out = 0, + mgmt_stanzas_req = 0, ask_offline = true, lang = <<"">>}). @@ -182,6 +192,9 @@ socket_type() -> xml_stream. get_presence(FsmRef) -> (?GEN_FSM):sync_send_all_state_event(FsmRef, {get_presence}, 1000). +get_last_presence(FsmRef) -> + (?GEN_FSM):sync_send_all_state_event(FsmRef, + {get_last_presence}, 1000). -spec get_aux_field(any(), state()) -> {ok, any()} | error. get_aux_field(Key, #state{aux_fields = Opts}) -> @@ -218,6 +231,27 @@ get_subscription(LFrom, StateData) -> true -> none end. +get_queued_stanzas(#state{mgmt_queue = Queue} = StateData) -> + lists:map(fun({_N, Time, El}) -> + add_resent_delay_info(StateData, El, Time) + end, queue:to_list(Queue)). + +get_csi_state(#state{csi_state = CsiState}) -> + CsiState. + +set_csi_state(#state{} = StateData, CsiState) -> + StateData#state{csi_state = CsiState}; +set_csi_state(FsmRef, CsiState) -> + FsmRef ! {set_csi_state, CsiState}. + +get_resume_timeout(#state{mgmt_timeout = Timeout}) -> + Timeout. + +set_resume_timeout(#state{} = StateData, Timeout) -> + StateData#state{mgmt_timeout = Timeout}; +set_resume_timeout(FsmRef, Timeout) -> + FsmRef ! {set_resume_timeout, Timeout}. + -spec send_filtered(pid(), binary(), jid(), jid(), stanza()) -> any(). send_filtered(FsmRef, Feature, From, To, Packet) -> FsmRef ! {send_filtered, Feature, From, To, Packet}. @@ -282,13 +316,18 @@ init([{SockMod, Socket}, Opts]) -> _ -> 1000 end, ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of - Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout; + RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo; _ -> 300 end, MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of Max when is_integer(Max), Max >= ResumeTimeout -> Max; _ -> ResumeTimeout end, + AckTimeout = case proplists:get_value(ack_timeout, Opts) of + ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000; + infinity -> undefined; + _ -> 60000 + end, ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of Resend when is_boolean(Resend) -> Resend; if_offline -> if_offline; @@ -312,6 +351,7 @@ init([{SockMod, Socket}, Opts]) -> mgmt_max_queue = MaxAckQueue, mgmt_timeout = ResumeTimeout, mgmt_max_timeout = MaxResumeTimeout, + mgmt_ack_timeout = AckTimeout, mgmt_resend = ResendOnTimeout}, {ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}. @@ -1147,6 +1187,15 @@ handle_sync_event({get_presence}, _From, StateName, Resource = StateData#state.resource, Reply = {User, Resource, Show, Status}, fsm_reply(Reply, StateName, StateData); +handle_sync_event({get_last_presence}, _From, StateName, + StateData) -> + User = StateData#state.user, + Server = StateData#state.server, + PresLast = StateData#state.pres_last, + Resource = StateData#state.resource, + Reply = {User, Server, Resource, PresLast}, + fsm_reply(Reply, StateName, StateData); + handle_sync_event(get_subscribed, _From, StateName, StateData) -> Subscribed = (?SETS):to_list(StateData#state.pres_f), @@ -1159,7 +1208,7 @@ handle_sync_event({resume_session, Time}, _From, _StateName, StateData#state.user, StateData#state.server, StateData#state.resource), - {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}}; + {stop, normal, {resume, StateData}, StateData#state{mgmt_state = resumed}}; handle_sync_event({resume_session, _Time}, _From, StateName, StateData) -> {reply, {error, <<"Previous session not found">>}, StateName, StateData}; @@ -1347,8 +1396,13 @@ handle_info({route, From, To, Packet}, StateName, StateData) when ?is_stanza(Pac groupchat -> ok; headline -> ok; _ -> - ejabberd_router:route_error( - To, From, Packet, xmpp:err_service_unavailable()) + case xmpp:has_subtag(Packet, #muc_user{}) of + true -> + ok; + false -> + ejabberd_router:route_error( + To, From, Packet, xmpp:err_service_unavailable()) + end end, {false, StateData} end @@ -1444,8 +1498,24 @@ handle_info({broadcast, Type, From, Packet}, StateName, StateData) -> From, jid:make(USR), Packet) end, lists:usort(Recipients)), fsm_next_state(StateName, StateData); +handle_info({set_csi_state, CsiState}, StateName, StateData) -> + fsm_next_state(StateName, StateData#state{csi_state = CsiState}); +handle_info({set_resume_timeout, Timeout}, StateName, StateData) -> + fsm_next_state(StateName, StateData#state{mgmt_timeout = Timeout}); handle_info(dont_ask_offline, StateName, StateData) -> fsm_next_state(StateName, StateData#state{ask_offline = false}); +handle_info(close, StateName, StateData) -> + ?DEBUG("Timeout waiting for stream management acknowledgement of ~s", + [jid:to_string(StateData#state.jid)]), + close(self()), + fsm_next_state(StateName, StateData#state{mgmt_ack_timer = undefined}); +handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) -> + %% This happens if the resume_session/1 request timed out; the new session + %% now receives the late response. + ?DEBUG("Received old session state for ~s after failed resumption", + [jid:to_string(OldStateData#state.jid)]), + handle_unacked_stanzas(OldStateData#state{mgmt_resend = false}), + fsm_next_state(StateName, StateData); handle_info(Info, StateName, StateData) -> ?ERROR_MSG("Unexpected info: ~p", [Info]), fsm_next_state(StateName, StateData). @@ -1562,6 +1632,7 @@ send_text(StateData, Text) -> send_element(StateData, El) when StateData#state.mgmt_state == pending -> ?DEBUG("Cannot send element while waiting for resumption: ~p", [El]); send_element(StateData, #xmlel{} = El) when StateData#state.xml_socket -> + ?DEBUG("Send XML on stream = ~p", [fxml:element_to_binary(El)]), (StateData#state.sockmod):send_xml(StateData#state.socket, {xmlstreamelement, El}); send_element(StateData, #xmlel{} = El) -> @@ -1585,8 +1656,8 @@ send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive -> send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending -> mgmt_queue_add(StateData, Stanza); send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active -> - NewStateData = send_stanza_and_ack_req(StateData, Stanza), - mgmt_queue_add(NewStateData, Stanza); + NewStateData = mgmt_queue_add(StateData, Stanza), + mgmt_send_stanza(NewStateData, Stanza); send_stanza(StateData, Stanza) -> send_element(StateData, Stanza), StateData. @@ -2101,13 +2172,25 @@ fsm_next_state(session_established, StateData) -> ?C2S_HIBERNATE_TIMEOUT}; fsm_next_state(wait_for_resume, #state{mgmt_timeout = 0} = StateData) -> {stop, normal, StateData}; -fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} = - StateData) -> +fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined, + sid = SID, jid = JID, ip = IP, + conn = Conn, auth_module = AuthModule, + server = Host} = StateData) -> + case StateData of + #state{mgmt_ack_timer = undefined} -> + ok; + #state{mgmt_ack_timer = Timer} -> + erlang:cancel_timer(Timer) + end, ?INFO_MSG("Waiting for resumption of stream for ~s", - [jid:to_string(StateData#state.jid)]), + [jid:to_string(JID)]), + Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}], + NewStateData = ejabberd_hooks:run_fold(c2s_session_pending, Host, StateData, + [SID, JID, Info]), {next_state, wait_for_resume, - StateData#state{mgmt_state = pending, mgmt_pending_since = os:timestamp()}, - StateData#state.mgmt_timeout}; + NewStateData#state{mgmt_state = pending, + mgmt_pending_since = os:timestamp()}, + NewStateData#state.mgmt_timeout}; fsm_next_state(wait_for_resume, StateData) -> Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since), Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1), @@ -2338,15 +2421,16 @@ handle_r(StateData) -> -spec handle_a(state(), sm_a()) -> state(). handle_a(StateData, #sm_a{h = H}) -> - check_h_attribute(StateData, H). + NewStateData = check_h_attribute(StateData, H), + maybe_renew_ack_request(NewStateData). -spec handle_resume(state(), sm_resume()) -> {ok, state()} | error. handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> R = case stream_mgmt_enabled(StateData) of true -> case inherit_session_state(StateData, PrevID) of - {ok, InheritedState} -> - {ok, InheritedState, H}; + {ok, InheritedState, Info} -> + {ok, InheritedState, Info, H}; {error, Err, InH} -> {error, #sm_failed{reason = 'item-not-found', h = InH, xmlns = Xmlns}, Err}; @@ -2360,7 +2444,7 @@ handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> <<"XEP-0198 disabled">>} end, case R of - {ok, ResumedState, NumHandled} -> + {ok, ResumedState, ResumedInfo, NumHandled} -> NewState = check_h_attribute(ResumedState, NumHandled), AttrXmlns = NewState#state.mgmt_xmlns, AttrId = make_resume_id(NewState), @@ -2374,11 +2458,16 @@ handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) -> end, handle_unacked_stanzas(NewState, SendFun), send_element(NewState, #sm_r{xmlns = AttrXmlns}), - FlushedState = csi_flush_queue(NewState), - NewStateData = FlushedState#state{csi_state = active}, + NewState1 = csi_flush_queue(NewState), + NewState2 = ejabberd_hooks:run_fold(c2s_session_resumed, + StateData#state.server, + NewState1, + [NewState1#state.sid, + NewState1#state.jid, + ResumedInfo]), ?INFO_MSG("Resumed session for ~s", - [jid:to_string(NewStateData#state.jid)]), - {ok, NewStateData}; + [jid:to_string(NewState2#state.jid)]), + {ok, NewState2}; {error, El, Msg} -> send_element(StateData, El), ?INFO_MSG("Cannot resume session for ~s@~s: ~s", @@ -2413,15 +2502,45 @@ update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El) update_num_stanzas_in(StateData, _El) -> StateData. --spec send_stanza_and_ack_req(state(), stanza()) -> state(). -send_stanza_and_ack_req(StateData, Stanza) -> - AckReq = #sm_r{xmlns = StateData#state.mgmt_xmlns}, - case send_element(StateData, Stanza) == ok andalso - send_element(StateData, AckReq) == ok of +mgmt_send_stanza(StateData, Stanza) -> + case send_element(StateData, Stanza) of + ok -> + maybe_request_ack(StateData); + _ -> + StateData#state{mgmt_state = pending} + end. + +maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) -> + request_ack(StateData); +maybe_request_ack(StateData) -> + StateData. + +request_ack(#state{mgmt_xmlns = Xmlns, + mgmt_ack_timeout = AckTimeout} = StateData) -> + AckReq = #sm_r{xmlns = Xmlns}, + case {send_element(StateData, AckReq), AckTimeout} of + {ok, undefined} -> + ok; + {ok, Timeout} -> + Timer = erlang:send_after(Timeout, self(), close), + StateData#state{mgmt_ack_timer = Timer, + mgmt_stanzas_req = StateData#state.mgmt_stanzas_out}; + _ -> + StateData#state{mgmt_state = pending} + end. + +maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) -> + StateData; +maybe_renew_ack_request(#state{mgmt_ack_timer = Timer, + mgmt_queue = Queue, + mgmt_stanzas_out = NumStanzasOut, + mgmt_stanzas_req = NumStanzasReq} = StateData) -> + erlang:cancel_timer(Timer), + case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of true -> - StateData; + request_ack(StateData#state{mgmt_ack_timer = undefined}); false -> - StateData#state{mgmt_state = pending} + StateData#state{mgmt_ack_timer = undefined} end. -spec mgmt_queue_add(state(), xmpp_element()) -> state(). @@ -2473,7 +2592,12 @@ handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData, F) fun({_, Time, Pkt}) -> From = xmpp:get_from(Pkt), To = xmpp:get_to(Pkt), - F(From, To, Pkt, Time) + case {From, To} of + {#jid{}, #jid{}} -> + F(From, To, Pkt, Time); + {_, _} -> + ?DEBUG("Dropping stanza due to invalid JID(s)", []) + end end, queue:to_list(Queue)) end; handle_unacked_stanzas(_StateData, _F) -> @@ -2540,7 +2664,8 @@ handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData) [StateData, From, StateData#state.jid, El]) of true -> - ok; + ?DEBUG("Dropping archived message stanza from ~p", + [jid:to_string(xmpp:get_from(El))]); false -> ReRoute(From, To, El, Time) end @@ -2580,7 +2705,7 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> OldPID -> OldSID = {Time, OldPID}, case catch resume_session(OldSID) of - {ok, OldStateData} -> + {resume, OldStateData} -> NewSID = {Time, self()}, % Old time, new PID Priority = case OldStateData#state.pres_last of undefined -> @@ -2604,13 +2729,13 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> pres_timestamp = OldStateData#state.pres_timestamp, privacy_list = OldStateData#state.privacy_list, aux_fields = OldStateData#state.aux_fields, - csi_state = OldStateData#state.csi_state, mgmt_xmlns = OldStateData#state.mgmt_xmlns, mgmt_queue = OldStateData#state.mgmt_queue, mgmt_timeout = OldStateData#state.mgmt_timeout, mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in, mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out, - mgmt_state = active}}; + mgmt_state = active, + csi_state = active}, Info}; {error, Msg} -> {error, Msg}; _ -> @@ -2623,7 +2748,7 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) -> -spec resume_session({integer(), pid()}) -> any(). resume_session({Time, PID}) -> - (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 5000). + (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 15000). -spec make_resume_id(state()) -> binary(). make_resume_id(StateData) -> @@ -2640,11 +2765,11 @@ add_resent_delay_info(#state{server = From}, El, Time) -> %%% XEP-0352 %%%---------------------------------------------------------------------- -spec csi_filter_stanza(state(), stanza()) -> state(). -csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData, - Stanza) -> +csi_filter_stanza(#state{csi_state = CsiState, jid = JID, server = Server} = + StateData, Stanza) -> {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server, {StateData, [Stanza]}, - [Server, Stanza]), + [Server, JID, Stanza]), StateData2 = lists:foldl(fun(CurStanza, AccState) -> send_stanza(AccState, CurStanza) end, StateData1#state{csi_state = active}, @@ -2652,9 +2777,11 @@ csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData, StateData2#state{csi_state = CsiState}. -spec csi_flush_queue(state()) -> state(). -csi_flush_queue(#state{csi_state = CsiState, server = Server} = StateData) -> +csi_flush_queue(#state{csi_state = CsiState, jid = JID, server = Server} = + StateData) -> {StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server, - {StateData, []}, [Server]), + {StateData, []}, + [Server, JID]), StateData2 = lists:foldl(fun(CurStanza, AccState) -> send_stanza(AccState, CurStanza) end, StateData1#state{csi_state = active}, |