summaryrefslogtreecommitdiff
path: root/src/ejabberd_c2s.erl
diff options
context:
space:
mode:
authorEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-11-12 13:27:15 +0300
committerEvgeniy Khramtsov <ekhramtsov@process-one.net>2016-11-12 13:27:15 +0300
commit78a44e01762e00102f5e3e3f0b49690cc7866c31 (patch)
treeb8ac7773f510ee3c1da4802bce2badc71c34c0b2 /src/ejabberd_c2s.erl
parentAdd more tests for offline storage (diff)
parentSupport several groups separated by ; in add_rosteritem command (diff)
Merge branch 'master' into xml-ng
Conflicts: src/adhoc.erl src/cyrsasl_oauth.erl src/ejabberd_c2s.erl src/ejabberd_config.erl src/ejabberd_service.erl src/gen_mod.erl src/mod_admin_extra.erl src/mod_announce.erl src/mod_carboncopy.erl src/mod_client_state.erl src/mod_configure.erl src/mod_echo.erl src/mod_mam.erl src/mod_muc.erl src/mod_muc_room.erl src/mod_offline.erl src/mod_pubsub.erl src/mod_stats.erl src/node_flat_sql.erl src/randoms.erl
Diffstat (limited to 'src/ejabberd_c2s.erl')
-rw-r--r--src/ejabberd_c2s.erl201
1 files changed, 164 insertions, 37 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl
index 98631054..7ef708d3 100644
--- a/src/ejabberd_c2s.erl
+++ b/src/ejabberd_c2s.erl
@@ -32,6 +32,7 @@
-protocol({xep, 78, '2.5'}).
-protocol({xep, 138, '2.0'}).
-protocol({xep, 198, '1.3'}).
+-protocol({xep, 356, '7.1'}).
-update_info({update, 0}).
@@ -48,10 +49,16 @@
send_element/2,
socket_type/0,
get_presence/1,
+ get_last_presence/1,
get_aux_field/2,
set_aux_field/3,
del_aux_field/2,
get_subscription/2,
+ get_queued_stanzas/1,
+ get_csi_state/1,
+ set_csi_state/2,
+ get_resume_timeout/1,
+ set_resume_timeout/2,
send_filtered/5,
broadcast/4,
get_subscribed/1,
@@ -112,9 +119,12 @@
mgmt_pending_since,
mgmt_timeout,
mgmt_max_timeout,
+ mgmt_ack_timeout,
+ mgmt_ack_timer,
mgmt_resend,
mgmt_stanzas_in = 0,
mgmt_stanzas_out = 0,
+ mgmt_stanzas_req = 0,
ask_offline = true,
lang = <<"">>}).
@@ -182,6 +192,9 @@ socket_type() -> xml_stream.
get_presence(FsmRef) ->
(?GEN_FSM):sync_send_all_state_event(FsmRef,
{get_presence}, 1000).
+get_last_presence(FsmRef) ->
+ (?GEN_FSM):sync_send_all_state_event(FsmRef,
+ {get_last_presence}, 1000).
-spec get_aux_field(any(), state()) -> {ok, any()} | error.
get_aux_field(Key, #state{aux_fields = Opts}) ->
@@ -218,6 +231,27 @@ get_subscription(LFrom, StateData) ->
true -> none
end.
+get_queued_stanzas(#state{mgmt_queue = Queue} = StateData) ->
+ lists:map(fun({_N, Time, El}) ->
+ add_resent_delay_info(StateData, El, Time)
+ end, queue:to_list(Queue)).
+
+get_csi_state(#state{csi_state = CsiState}) ->
+ CsiState.
+
+set_csi_state(#state{} = StateData, CsiState) ->
+ StateData#state{csi_state = CsiState};
+set_csi_state(FsmRef, CsiState) ->
+ FsmRef ! {set_csi_state, CsiState}.
+
+get_resume_timeout(#state{mgmt_timeout = Timeout}) ->
+ Timeout.
+
+set_resume_timeout(#state{} = StateData, Timeout) ->
+ StateData#state{mgmt_timeout = Timeout};
+set_resume_timeout(FsmRef, Timeout) ->
+ FsmRef ! {set_resume_timeout, Timeout}.
+
-spec send_filtered(pid(), binary(), jid(), jid(), stanza()) -> any().
send_filtered(FsmRef, Feature, From, To, Packet) ->
FsmRef ! {send_filtered, Feature, From, To, Packet}.
@@ -282,13 +316,18 @@ init([{SockMod, Socket}, Opts]) ->
_ -> 1000
end,
ResumeTimeout = case proplists:get_value(resume_timeout, Opts) of
- Timeout when is_integer(Timeout), Timeout >= 0 -> Timeout;
+ RTimeo when is_integer(RTimeo), RTimeo >= 0 -> RTimeo;
_ -> 300
end,
MaxResumeTimeout = case proplists:get_value(max_resume_timeout, Opts) of
Max when is_integer(Max), Max >= ResumeTimeout -> Max;
_ -> ResumeTimeout
end,
+ AckTimeout = case proplists:get_value(ack_timeout, Opts) of
+ ATimeo when is_integer(ATimeo), ATimeo > 0 -> ATimeo * 1000;
+ infinity -> undefined;
+ _ -> 60000
+ end,
ResendOnTimeout = case proplists:get_value(resend_on_timeout, Opts) of
Resend when is_boolean(Resend) -> Resend;
if_offline -> if_offline;
@@ -312,6 +351,7 @@ init([{SockMod, Socket}, Opts]) ->
mgmt_max_queue = MaxAckQueue,
mgmt_timeout = ResumeTimeout,
mgmt_max_timeout = MaxResumeTimeout,
+ mgmt_ack_timeout = AckTimeout,
mgmt_resend = ResendOnTimeout},
{ok, wait_for_stream, StateData, ?C2S_OPEN_TIMEOUT}.
@@ -1147,6 +1187,15 @@ handle_sync_event({get_presence}, _From, StateName,
Resource = StateData#state.resource,
Reply = {User, Resource, Show, Status},
fsm_reply(Reply, StateName, StateData);
+handle_sync_event({get_last_presence}, _From, StateName,
+ StateData) ->
+ User = StateData#state.user,
+ Server = StateData#state.server,
+ PresLast = StateData#state.pres_last,
+ Resource = StateData#state.resource,
+ Reply = {User, Server, Resource, PresLast},
+ fsm_reply(Reply, StateName, StateData);
+
handle_sync_event(get_subscribed, _From, StateName,
StateData) ->
Subscribed = (?SETS):to_list(StateData#state.pres_f),
@@ -1159,7 +1208,7 @@ handle_sync_event({resume_session, Time}, _From, _StateName,
StateData#state.user,
StateData#state.server,
StateData#state.resource),
- {stop, normal, {ok, StateData}, StateData#state{mgmt_state = resumed}};
+ {stop, normal, {resume, StateData}, StateData#state{mgmt_state = resumed}};
handle_sync_event({resume_session, _Time}, _From, StateName,
StateData) ->
{reply, {error, <<"Previous session not found">>}, StateName, StateData};
@@ -1347,8 +1396,13 @@ handle_info({route, From, To, Packet}, StateName, StateData) when ?is_stanza(Pac
groupchat -> ok;
headline -> ok;
_ ->
- ejabberd_router:route_error(
- To, From, Packet, xmpp:err_service_unavailable())
+ case xmpp:has_subtag(Packet, #muc_user{}) of
+ true ->
+ ok;
+ false ->
+ ejabberd_router:route_error(
+ To, From, Packet, xmpp:err_service_unavailable())
+ end
end,
{false, StateData}
end
@@ -1444,8 +1498,24 @@ handle_info({broadcast, Type, From, Packet}, StateName, StateData) ->
From, jid:make(USR), Packet)
end, lists:usort(Recipients)),
fsm_next_state(StateName, StateData);
+handle_info({set_csi_state, CsiState}, StateName, StateData) ->
+ fsm_next_state(StateName, StateData#state{csi_state = CsiState});
+handle_info({set_resume_timeout, Timeout}, StateName, StateData) ->
+ fsm_next_state(StateName, StateData#state{mgmt_timeout = Timeout});
handle_info(dont_ask_offline, StateName, StateData) ->
fsm_next_state(StateName, StateData#state{ask_offline = false});
+handle_info(close, StateName, StateData) ->
+ ?DEBUG("Timeout waiting for stream management acknowledgement of ~s",
+ [jid:to_string(StateData#state.jid)]),
+ close(self()),
+ fsm_next_state(StateName, StateData#state{mgmt_ack_timer = undefined});
+handle_info({_Ref, {resume, OldStateData}}, StateName, StateData) ->
+ %% This happens if the resume_session/1 request timed out; the new session
+ %% now receives the late response.
+ ?DEBUG("Received old session state for ~s after failed resumption",
+ [jid:to_string(OldStateData#state.jid)]),
+ handle_unacked_stanzas(OldStateData#state{mgmt_resend = false}),
+ fsm_next_state(StateName, StateData);
handle_info(Info, StateName, StateData) ->
?ERROR_MSG("Unexpected info: ~p", [Info]),
fsm_next_state(StateName, StateData).
@@ -1562,6 +1632,7 @@ send_text(StateData, Text) ->
send_element(StateData, El) when StateData#state.mgmt_state == pending ->
?DEBUG("Cannot send element while waiting for resumption: ~p", [El]);
send_element(StateData, #xmlel{} = El) when StateData#state.xml_socket ->
+ ?DEBUG("Send XML on stream = ~p", [fxml:element_to_binary(El)]),
(StateData#state.sockmod):send_xml(StateData#state.socket,
{xmlstreamelement, El});
send_element(StateData, #xmlel{} = El) ->
@@ -1585,8 +1656,8 @@ send_stanza(StateData, Stanza) when StateData#state.csi_state == inactive ->
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == pending ->
mgmt_queue_add(StateData, Stanza);
send_stanza(StateData, Stanza) when StateData#state.mgmt_state == active ->
- NewStateData = send_stanza_and_ack_req(StateData, Stanza),
- mgmt_queue_add(NewStateData, Stanza);
+ NewStateData = mgmt_queue_add(StateData, Stanza),
+ mgmt_send_stanza(NewStateData, Stanza);
send_stanza(StateData, Stanza) ->
send_element(StateData, Stanza),
StateData.
@@ -2101,13 +2172,25 @@ fsm_next_state(session_established, StateData) ->
?C2S_HIBERNATE_TIMEOUT};
fsm_next_state(wait_for_resume, #state{mgmt_timeout = 0} = StateData) ->
{stop, normal, StateData};
-fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined} =
- StateData) ->
+fsm_next_state(wait_for_resume, #state{mgmt_pending_since = undefined,
+ sid = SID, jid = JID, ip = IP,
+ conn = Conn, auth_module = AuthModule,
+ server = Host} = StateData) ->
+ case StateData of
+ #state{mgmt_ack_timer = undefined} ->
+ ok;
+ #state{mgmt_ack_timer = Timer} ->
+ erlang:cancel_timer(Timer)
+ end,
?INFO_MSG("Waiting for resumption of stream for ~s",
- [jid:to_string(StateData#state.jid)]),
+ [jid:to_string(JID)]),
+ Info = [{ip, IP}, {conn, Conn}, {auth_module, AuthModule}],
+ NewStateData = ejabberd_hooks:run_fold(c2s_session_pending, Host, StateData,
+ [SID, JID, Info]),
{next_state, wait_for_resume,
- StateData#state{mgmt_state = pending, mgmt_pending_since = os:timestamp()},
- StateData#state.mgmt_timeout};
+ NewStateData#state{mgmt_state = pending,
+ mgmt_pending_since = os:timestamp()},
+ NewStateData#state.mgmt_timeout};
fsm_next_state(wait_for_resume, StateData) ->
Diff = timer:now_diff(os:timestamp(), StateData#state.mgmt_pending_since),
Timeout = max(StateData#state.mgmt_timeout - Diff div 1000, 1),
@@ -2338,15 +2421,16 @@ handle_r(StateData) ->
-spec handle_a(state(), sm_a()) -> state().
handle_a(StateData, #sm_a{h = H}) ->
- check_h_attribute(StateData, H).
+ NewStateData = check_h_attribute(StateData, H),
+ maybe_renew_ack_request(NewStateData).
-spec handle_resume(state(), sm_resume()) -> {ok, state()} | error.
handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) ->
R = case stream_mgmt_enabled(StateData) of
true ->
case inherit_session_state(StateData, PrevID) of
- {ok, InheritedState} ->
- {ok, InheritedState, H};
+ {ok, InheritedState, Info} ->
+ {ok, InheritedState, Info, H};
{error, Err, InH} ->
{error, #sm_failed{reason = 'item-not-found',
h = InH, xmlns = Xmlns}, Err};
@@ -2360,7 +2444,7 @@ handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) ->
<<"XEP-0198 disabled">>}
end,
case R of
- {ok, ResumedState, NumHandled} ->
+ {ok, ResumedState, ResumedInfo, NumHandled} ->
NewState = check_h_attribute(ResumedState, NumHandled),
AttrXmlns = NewState#state.mgmt_xmlns,
AttrId = make_resume_id(NewState),
@@ -2374,11 +2458,16 @@ handle_resume(StateData, #sm_resume{h = H, previd = PrevID, xmlns = Xmlns}) ->
end,
handle_unacked_stanzas(NewState, SendFun),
send_element(NewState, #sm_r{xmlns = AttrXmlns}),
- FlushedState = csi_flush_queue(NewState),
- NewStateData = FlushedState#state{csi_state = active},
+ NewState1 = csi_flush_queue(NewState),
+ NewState2 = ejabberd_hooks:run_fold(c2s_session_resumed,
+ StateData#state.server,
+ NewState1,
+ [NewState1#state.sid,
+ NewState1#state.jid,
+ ResumedInfo]),
?INFO_MSG("Resumed session for ~s",
- [jid:to_string(NewStateData#state.jid)]),
- {ok, NewStateData};
+ [jid:to_string(NewState2#state.jid)]),
+ {ok, NewState2};
{error, El, Msg} ->
send_element(StateData, El),
?INFO_MSG("Cannot resume session for ~s@~s: ~s",
@@ -2413,15 +2502,45 @@ update_num_stanzas_in(#state{mgmt_state = MgmtState} = StateData, El)
update_num_stanzas_in(StateData, _El) ->
StateData.
--spec send_stanza_and_ack_req(state(), stanza()) -> state().
-send_stanza_and_ack_req(StateData, Stanza) ->
- AckReq = #sm_r{xmlns = StateData#state.mgmt_xmlns},
- case send_element(StateData, Stanza) == ok andalso
- send_element(StateData, AckReq) == ok of
+mgmt_send_stanza(StateData, Stanza) ->
+ case send_element(StateData, Stanza) of
+ ok ->
+ maybe_request_ack(StateData);
+ _ ->
+ StateData#state{mgmt_state = pending}
+ end.
+
+maybe_request_ack(#state{mgmt_ack_timer = undefined} = StateData) ->
+ request_ack(StateData);
+maybe_request_ack(StateData) ->
+ StateData.
+
+request_ack(#state{mgmt_xmlns = Xmlns,
+ mgmt_ack_timeout = AckTimeout} = StateData) ->
+ AckReq = #sm_r{xmlns = Xmlns},
+ case {send_element(StateData, AckReq), AckTimeout} of
+ {ok, undefined} ->
+ ok;
+ {ok, Timeout} ->
+ Timer = erlang:send_after(Timeout, self(), close),
+ StateData#state{mgmt_ack_timer = Timer,
+ mgmt_stanzas_req = StateData#state.mgmt_stanzas_out};
+ _ ->
+ StateData#state{mgmt_state = pending}
+ end.
+
+maybe_renew_ack_request(#state{mgmt_ack_timer = undefined} = StateData) ->
+ StateData;
+maybe_renew_ack_request(#state{mgmt_ack_timer = Timer,
+ mgmt_queue = Queue,
+ mgmt_stanzas_out = NumStanzasOut,
+ mgmt_stanzas_req = NumStanzasReq} = StateData) ->
+ erlang:cancel_timer(Timer),
+ case NumStanzasReq < NumStanzasOut andalso not queue:is_empty(Queue) of
true ->
- StateData;
+ request_ack(StateData#state{mgmt_ack_timer = undefined});
false ->
- StateData#state{mgmt_state = pending}
+ StateData#state{mgmt_ack_timer = undefined}
end.
-spec mgmt_queue_add(state(), xmpp_element()) -> state().
@@ -2473,7 +2592,12 @@ handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData, F)
fun({_, Time, Pkt}) ->
From = xmpp:get_from(Pkt),
To = xmpp:get_to(Pkt),
- F(From, To, Pkt, Time)
+ case {From, To} of
+ {#jid{}, #jid{}} ->
+ F(From, To, Pkt, Time);
+ {_, _} ->
+ ?DEBUG("Dropping stanza due to invalid JID(s)", [])
+ end
end, queue:to_list(Queue))
end;
handle_unacked_stanzas(_StateData, _F) ->
@@ -2540,7 +2664,8 @@ handle_unacked_stanzas(#state{mgmt_state = MgmtState} = StateData)
[StateData, From,
StateData#state.jid, El]) of
true ->
- ok;
+ ?DEBUG("Dropping archived message stanza from ~p",
+ [jid:to_string(xmpp:get_from(El))]);
false ->
ReRoute(From, To, El, Time)
end
@@ -2580,7 +2705,7 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
OldPID ->
OldSID = {Time, OldPID},
case catch resume_session(OldSID) of
- {ok, OldStateData} ->
+ {resume, OldStateData} ->
NewSID = {Time, self()}, % Old time, new PID
Priority = case OldStateData#state.pres_last of
undefined ->
@@ -2604,13 +2729,13 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
pres_timestamp = OldStateData#state.pres_timestamp,
privacy_list = OldStateData#state.privacy_list,
aux_fields = OldStateData#state.aux_fields,
- csi_state = OldStateData#state.csi_state,
mgmt_xmlns = OldStateData#state.mgmt_xmlns,
mgmt_queue = OldStateData#state.mgmt_queue,
mgmt_timeout = OldStateData#state.mgmt_timeout,
mgmt_stanzas_in = OldStateData#state.mgmt_stanzas_in,
mgmt_stanzas_out = OldStateData#state.mgmt_stanzas_out,
- mgmt_state = active}};
+ mgmt_state = active,
+ csi_state = active}, Info};
{error, Msg} ->
{error, Msg};
_ ->
@@ -2623,7 +2748,7 @@ inherit_session_state(#state{user = U, server = S} = StateData, ResumeID) ->
-spec resume_session({integer(), pid()}) -> any().
resume_session({Time, PID}) ->
- (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 5000).
+ (?GEN_FSM):sync_send_all_state_event(PID, {resume_session, Time}, 15000).
-spec make_resume_id(state()) -> binary().
make_resume_id(StateData) ->
@@ -2640,11 +2765,11 @@ add_resent_delay_info(#state{server = From}, El, Time) ->
%%% XEP-0352
%%%----------------------------------------------------------------------
-spec csi_filter_stanza(state(), stanza()) -> state().
-csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData,
- Stanza) ->
+csi_filter_stanza(#state{csi_state = CsiState, jid = JID, server = Server} =
+ StateData, Stanza) ->
{StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_filter_stanza, Server,
{StateData, [Stanza]},
- [Server, Stanza]),
+ [Server, JID, Stanza]),
StateData2 = lists:foldl(fun(CurStanza, AccState) ->
send_stanza(AccState, CurStanza)
end, StateData1#state{csi_state = active},
@@ -2652,9 +2777,11 @@ csi_filter_stanza(#state{csi_state = CsiState, server = Server} = StateData,
StateData2#state{csi_state = CsiState}.
-spec csi_flush_queue(state()) -> state().
-csi_flush_queue(#state{csi_state = CsiState, server = Server} = StateData) ->
+csi_flush_queue(#state{csi_state = CsiState, jid = JID, server = Server} =
+ StateData) ->
{StateData1, Stanzas} = ejabberd_hooks:run_fold(csi_flush_queue, Server,
- {StateData, []}, [Server]),
+ {StateData, []},
+ [Server, JID]),
StateData2 = lists:foldl(fun(CurStanza, AccState) ->
send_stanza(AccState, CurStanza)
end, StateData1#state{csi_state = active},